##Choosing Data Source

In [1]:
data_source = "google_reviews" #@param ["google_reviews", "yelp_reviews"]

### Installing Pyspark and importing google drive and libraries

In [None]:
!pip install pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .config("dfs.client.read.shortcircuit.skip.checksum", "true")\
        .getOrCreate()
spark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/




```
# This is formatted as code
```

**Importing google drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

!ls /content/drive/MyDrive/ColabNotebooks/data

Mounted at /content/drive
google_reviews	health_grade  yelp_reviews


**Importing Libraries**

In [None]:
from pyspark.sql.functions import col, desc,sum,count
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import datetime

**Displaying the current data in the input parquet folder**




In [None]:
input_parquet_path = f"/content/drive/MyDrive/ColabNotebooks/data/{data_source}/input_parquet"

current_data_df = spark.read.parquet(input_parquet_path)
current_data_df.show()


+--------------------+-------------+----------------+-------+------+-------------+--------+---------+-----------+
|           review_id|restaurant_id| restaurant_name| county|rating|reviewer_name|comments|operation|review_date|
+--------------------+-------------+----------------+-------+------+-------------+--------+---------+-----------+
|9dcd47c2-d002-11e...|        64892| Spanish Delight| Bergen|     2|         ****|   *****|      add| 2022-06-04|
|9dcd488a-d002-11e...|        56849|   Namaste India| Bergen|     3|         ****|   *****|      add| 2022-06-04|
|9dcd4948-d002-11e...|        67432|    Little Italy| Bergen|     4|         ****|   *****|      add| 2022-06-04|
|9dcd4a06-d002-11e...|        52314|Carribean Dreams|Passaic|     3|         ****|   *****|      add| 2022-06-04|
|9dcd4aba-d002-11e...|        58976|  Malibu Kitchen|Passaic|     2|         ****|   *****|      add| 2022-06-04|
|9dcd4c0e-d002-11e...|        64892| Spanish Delight| Bergen|     3|         ****|   ***

In [None]:
current_data_df.count()

2836

**Reading Raw Data**

In [None]:
input_csv_path = f"/content/drive/MyDrive/ColabNotebooks/data/{data_source}/raw_data/reviews.csv"

review_df = spark.read.csv(input_csv_path,header=True,inferSchema=True)

#Creating a raw data history with timestamp
review_history = f'/content/drive/MyDrive/ColabNotebooks/data/{data_source}/raw_data_history/reviews-' + str(datetime.datetime.now())+'.csv'
review_df.write.csv(review_history,header=True)

review_df = review_df.withColumn("review_date", F.to_date(F.col("review_date").cast("string"), 'yyyyMMdd'))



In [None]:
review_df.printSchema()


root
 |-- review_id: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- restaurant_id: integer (nullable = true)
 |-- restaurant_name: string (nullable = true)
 |-- county: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- reviewer_name: string (nullable = true)
 |-- comments: string (nullable = true)
 |-- operation: string (nullable = true)



**Dates_List created**

In [None]:
dates_list = review_df.select('review_date').distinct().rdd.map(lambda x : x[0]).collect()
print(dates_list)

[datetime.date(2022, 11, 29), datetime.date(2022, 12, 25), datetime.date(2023, 2, 25), datetime.date(2022, 11, 21), datetime.date(2023, 2, 8), datetime.date(2022, 12, 9), datetime.date(2023, 3, 12), datetime.date(2022, 12, 2), datetime.date(2023, 3, 24), datetime.date(2022, 12, 20), datetime.date(2023, 1, 1), datetime.date(2023, 5, 3), datetime.date(2022, 11, 20), datetime.date(2022, 12, 11), datetime.date(2022, 12, 12), datetime.date(2023, 3, 4), datetime.date(2023, 4, 29), datetime.date(2022, 12, 28), datetime.date(2022, 12, 19), datetime.date(2022, 12, 29), datetime.date(2023, 2, 16), datetime.date(2023, 1, 2), datetime.date(2023, 5, 9), datetime.date(2022, 11, 1), datetime.date(2023, 4, 12), datetime.date(2023, 4, 21), datetime.date(2022, 12, 8), datetime.date(2023, 1, 28), datetime.date(2023, 4, 1), datetime.date(2022, 12, 6), datetime.date(2023, 1, 11), datetime.date(2022, 11, 23), datetime.date(2022, 11, 10), datetime.date(2022, 12, 26), datetime.date(2022, 12, 17), datetime.dat

**Read the existing data only for dates in the dates_list**





In [None]:
review_existing = spark.read.parquet(input_parquet_path).filter(col("review_date").isin(dates_list))


**Rearranging columns of review_df (raw_data) to match with the review_existing (input_parquet)**

In [None]:
review_df = review_df.select('review_id',  'restaurant_id', 'restaurant_name', 'county', 'rating', 'reviewer_name', 'comments', 'operation', 'review_date')
review_df.show(20)

+--------------------+-------------+----------------+-------+------+-------------+--------+---------+-----------+
|           review_id|restaurant_id| restaurant_name| county|rating|reviewer_name|comments|operation|review_date|
+--------------------+-------------+----------------+-------+------+-------------+--------+---------+-----------+
|4febf7ae-d004-11e...|        67432|    Little Italy| Bergen|     4|         ****|   *****|      add| 2022-10-27|
|4fec209e-d004-11e...|        56849|   Namaste India| Bergen|     2|         ****|   *****|      add| 2022-10-27|
|4fec227e-d004-11e...|        56849|   Namaste India| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|4fec236e-d004-11e...|        52314|Carribean Dreams|Passaic|     3|         ****|   *****|      add| 2022-10-27|
|4fec2440-d004-11e...|        64892| Spanish Delight| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|4fec2512-d004-11e...|        54781|      Greek Love| Bergen|     5|         ****|   ***

In [None]:
review_existing.show()

+--------------------+-------------+---------------+-------+------+-------------+--------+---------+-----------+
|           review_id|restaurant_id|restaurant_name| county|rating|reviewer_name|comments|operation|review_date|
+--------------------+-------------+---------------+-------+------+-------------+--------+---------+-----------+
|9dd285e8-d002-11e...|        67432|   Little Italy| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|9dd28728-d002-11e...|        56849|  Namaste India| Bergen|     2|         ****|   *****|      add| 2022-10-27|
|9dd287f0-d002-11e...|        58976| Malibu Kitchen|Passaic|     2|         ****|   *****|      add| 2022-10-27|
|9dd288ae-d002-11e...|        67432|   Little Italy| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|9dd28958-d002-11e...|        67432|   Little Italy| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|9dd28a20-d002-11e...|        54781|     Greek Love| Bergen|     4|         ****|   *****|      

**Merging raw data with the data from input parquet for the dates in the dates_list**

In [None]:
review_df_merge = review_existing.union(review_df)
review_df_merge.show()

+--------------------+-------------+----------------+-------+------+-------------+--------+---------+-----------+
|           review_id|restaurant_id| restaurant_name| county|rating|reviewer_name|comments|operation|review_date|
+--------------------+-------------+----------------+-------+------+-------------+--------+---------+-----------+
|9dd285e8-d002-11e...|        67432|    Little Italy| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|9dd28728-d002-11e...|        56849|   Namaste India| Bergen|     2|         ****|   *****|      add| 2022-10-27|
|9dd287f0-d002-11e...|        58976|  Malibu Kitchen|Passaic|     2|         ****|   *****|      add| 2022-10-27|
|9dd288ae-d002-11e...|        67432|    Little Italy| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|9dd28958-d002-11e...|        67432|    Little Italy| Bergen|     3|         ****|   *****|      add| 2022-10-27|
|9dd28a20-d002-11e...|        54781|      Greek Love| Bergen|     4|         ****|   ***

In [None]:
review_df_merge = review_df_merge.distinct()

In [None]:
review_df_merge.count()

1824

**Writing merged dataframe to the input parquet file in dynamic overwrite mode**

In [None]:
review_df_merge.write.partitionBy('review_date').mode('overwrite').format('parquet').option(
    "partitionOverwriteMode", "dynamic").save(input_parquet_path)


# **Calculating the output values**

**Rating_sum Data calculation**

In [None]:
from pyspark.sql.types import StringType

review_df_merge.createOrReplaceTempView("reviews")
sum_rating_df = spark.sql("select review_date, restaurant_id, restaurant_name, county, sum(rating) rating_sum, count(rating) rating_count " \
                "from reviews group by restaurant_id, restaurant_name, county, review_date order by review_date")
sum_rating_df = sum_rating_df.withColumn("restaurant_id",col("restaurant_id").cast(StringType()))





In [None]:
spark.sql('refresh table reviews')

DataFrame[]

In [None]:
sum_rating_df.show()
sum_rating_df.count()
#health_df2.show()

+-----------+-------------+----------------+-------+----------+------------+
|review_date|restaurant_id| restaurant_name| county|rating_sum|rating_count|
+-----------+-------------+----------------+-------+----------+------------+
| 2022-10-27|        54781|      Greek Love| Bergen|        14|           3|
| 2022-10-27|        58976|  Malibu Kitchen|Passaic|         2|           1|
| 2022-10-27|        64892| Spanish Delight| Bergen|         7|           2|
| 2022-10-27|        52314|Carribean Dreams|Passaic|        11|           3|
| 2022-10-27|        67432|    Little Italy| Bergen|        13|           4|
| 2022-10-27|        56849|   Namaste India| Bergen|         7|           3|
| 2022-10-28|        67432|    Little Italy| Bergen|         6|           2|
| 2022-10-28|        64892| Spanish Delight| Bergen|         5|           2|
| 2022-10-28|        58976|  Malibu Kitchen|Passaic|         8|           3|
| 2022-10-28|        56849|   Namaste India| Bergen|         2|           1|

882

**Adding dates for the missing dates in review_df2 and filling their values with 0**

In [None]:
add_dates = sum_rating_df. \
    groupBy('restaurant_id','restaurant_name', 'restaurant_name', 'county'). \
    agg(F.min('review_date').alias('min_dt'),
        F.max('review_date').alias('max_dt')
        ). \
    withColumn('dt_arr', F.expr('sequence(min_dt, max_dt, interval 1 day)')). \
    withColumn('exploded_date', F.explode('dt_arr')). \
    select('restaurant_id', 'restaurant_name', 'county', F.col('exploded_date').alias('review_date'))

add_dates.show()

+-------------+---------------+-------+-----------+
|restaurant_id|restaurant_name| county|review_date|
+-------------+---------------+-------+-----------+
|        58976| Malibu Kitchen|Passaic| 2022-10-27|
|        58976| Malibu Kitchen|Passaic| 2022-10-28|
|        58976| Malibu Kitchen|Passaic| 2022-10-29|
|        58976| Malibu Kitchen|Passaic| 2022-10-30|
|        58976| Malibu Kitchen|Passaic| 2022-10-31|
|        58976| Malibu Kitchen|Passaic| 2022-11-01|
|        58976| Malibu Kitchen|Passaic| 2022-11-02|
|        58976| Malibu Kitchen|Passaic| 2022-11-03|
|        58976| Malibu Kitchen|Passaic| 2022-11-04|
|        58976| Malibu Kitchen|Passaic| 2022-11-05|
|        58976| Malibu Kitchen|Passaic| 2022-11-06|
|        58976| Malibu Kitchen|Passaic| 2022-11-07|
|        58976| Malibu Kitchen|Passaic| 2022-11-08|
|        58976| Malibu Kitchen|Passaic| 2022-11-09|
|        58976| Malibu Kitchen|Passaic| 2022-11-10|
|        58976| Malibu Kitchen|Passaic| 2022-11-11|
|        589

In [None]:
add_dates.count()

1198

In [None]:
sum_rating_df_all_dates = add_dates. \
    join(sum_rating_df, ['restaurant_id', 'review_date', 'restaurant_name', 'county'], 'left'). \
    fillna(0, subset=['rating_sum','rating_count'])



In [None]:
sum_rating_df_all_dates.count()

1198

In [None]:
sum_rating_df_all_dates.tail(5)

[Row(restaurant_id='54781', review_date=datetime.date(2023, 5, 9), restaurant_name='Greek Love', county='Bergen', rating_sum=5, rating_count=1),
 Row(restaurant_id='54781', review_date=datetime.date(2023, 5, 10), restaurant_name='Greek Love', county='Bergen', rating_sum=12, rating_count=3),
 Row(restaurant_id='54781', review_date=datetime.date(2023, 5, 11), restaurant_name='Greek Love', county='Bergen', rating_sum=0, rating_count=0),
 Row(restaurant_id='54781', review_date=datetime.date(2023, 5, 12), restaurant_name='Greek Love', county='Bergen', rating_sum=0, rating_count=0),
 Row(restaurant_id='54781', review_date=datetime.date(2023, 5, 13), restaurant_name='Greek Love', county='Bergen', rating_sum=13, rating_count=3)]

**Writing Output file of rating_sum**

In [None]:
rating_sum_path = f"/content/drive/MyDrive/ColabNotebooks/data/{data_source}/output_parquet/rating_sum"

sum_rating_df_all_dates.write.partitionBy('review_date').mode('overwrite').format('parquet').option(
    "partitionOverwriteMode", "dynamic").save(rating_sum_path)


**Getting dates 6 days before and after dates in our date_list**

In [None]:
import datetime
required_dates = []
for date in dates_list :
  for x in range(0, 6):
    required_dates.append(date + datetime.timedelta(days=x))
    required_dates.append(date - datetime.timedelta(days=x))

required_dates = list(set(required_dates))


**Reading rating_sum parquet for the dates in required_dates**

In [None]:
final_rating_df = spark.read.parquet(rating_sum_path).filter(col("review_date").isin(required_dates))

In [None]:
final_rating_df.show()

+-------------+----------------+-------+----------+------------+-----------+
|restaurant_id| restaurant_name| county|rating_sum|rating_count|review_date|
+-------------+----------------+-------+----------+------------+-----------+
|        58976|  Malibu Kitchen|Passaic|         6|           3| 2023-01-03|
|        67432|    Little Italy| Bergen|        14|           4| 2023-01-03|
|        64892| Spanish Delight| Bergen|         3|           1| 2023-01-03|
|        52314|Carribean Dreams|Passaic|         3|           1| 2023-01-03|
|        56849|   Namaste India| Bergen|         4|           2| 2023-01-03|
|        54781|      Greek Love| Bergen|         0|           0| 2023-01-03|
|        58976|  Malibu Kitchen|Passaic|         4|           2| 2023-02-10|
|        67432|    Little Italy| Bergen|        15|           4| 2023-02-10|
|        64892| Spanish Delight| Bergen|         6|           2| 2023-02-10|
|        52314|Carribean Dreams|Passaic|         4|           1| 2023-02-10|

In [None]:
final_rating_df.count()

1227

**Creating rolling average window function and calculating rolling average**

In [None]:
w = Window().partitionBy(['restaurant_id']).orderBy('review_date').rowsBetween(-6,0)
final_rating_df = final_rating_df.withColumn('rating_sum_rolling', F.sum("rating_sum").over(w)) \
       .withColumn('rating_count_rolling', F.sum("rating_count").over(w))
final_rating_df = final_rating_df.withColumn('final_rating', final_rating_df['rating_sum_rolling']/final_rating_df['rating_count_rolling'])


In [None]:
final_rating_df.summary()

DataFrame[summary: string, restaurant_id: string, restaurant_name: string, county: string, rating_sum: string, rating_count: string, rating_sum_rolling: string, rating_count_rolling: string, final_rating: string]

In [None]:
final_rating_df.count()

1227

In [None]:
final_rating_df.show()

+-------------+----------------+-------+----------+------------+-----------+------------------+--------------------+------------------+
|restaurant_id| restaurant_name| county|rating_sum|rating_count|review_date|rating_sum_rolling|rating_count_rolling|      final_rating|
+-------------+----------------+-------+----------+------------+-----------+------------------+--------------------+------------------+
|        52314|Carribean Dreams|Passaic|         6|           2| 2022-10-22|                 6|                   2|               3.0|
|        52314|Carribean Dreams|Passaic|         3|           1| 2022-10-23|                 9|                   3|               3.0|
|        52314|Carribean Dreams|Passaic|         0|           0| 2022-10-24|                 9|                   3|               3.0|
|        52314|Carribean Dreams|Passaic|         7|           2| 2022-10-25|                16|                   5|               3.2|
|        52314|Carribean Dreams|Passaic|        

In [None]:
final_rating_dates =[]
for date in dates_list :
  for x in range(0, 6):
    final_rating_dates.append(date + datetime.timedelta(days=x))

final_rating_dates = list(set(final_rating_dates))
print(final_rating_dates)


[datetime.date(2023, 4, 1), datetime.date(2022, 11, 16), datetime.date(2023, 2, 26), datetime.date(2023, 3, 16), datetime.date(2023, 3, 7), datetime.date(2022, 11, 14), datetime.date(2023, 4, 9), datetime.date(2023, 1, 4), datetime.date(2022, 11, 12), datetime.date(2022, 11, 24), datetime.date(2023, 2, 8), datetime.date(2023, 5, 18), datetime.date(2023, 3, 22), datetime.date(2022, 12, 7), datetime.date(2023, 4, 16), datetime.date(2023, 1, 29), datetime.date(2022, 12, 13), datetime.date(2022, 10, 30), datetime.date(2022, 11, 6), datetime.date(2023, 1, 10), datetime.date(2023, 1, 22), datetime.date(2023, 2, 5), datetime.date(2023, 1, 30), datetime.date(2023, 5, 15), datetime.date(2022, 12, 12), datetime.date(2023, 1, 9), datetime.date(2023, 5, 16), datetime.date(2023, 5, 11), datetime.date(2023, 4, 19), datetime.date(2023, 5, 17), datetime.date(2022, 12, 3), datetime.date(2022, 12, 15), datetime.date(2023, 2, 4), datetime.date(2023, 5, 7), datetime.date(2022, 12, 19), datetime.date(2023,

In [None]:
final_rating_df =  final_rating_df.filter(col("review_date").isin(final_rating_dates))
final_rating_df.show()

+-------------+----------------+-------+----------+------------+-----------+------------------+--------------------+------------------+
|restaurant_id| restaurant_name| county|rating_sum|rating_count|review_date|rating_sum_rolling|rating_count_rolling|      final_rating|
+-------------+----------------+-------+----------+------------+-----------+------------------+--------------------+------------------+
|        52314|Carribean Dreams|Passaic|        11|           3| 2022-10-27|                27|                   8|             3.375|
|        52314|Carribean Dreams|Passaic|         0|           0| 2022-10-28|                27|                   8|             3.375|
|        52314|Carribean Dreams|Passaic|         3|           1| 2022-10-29|                30|                   9|3.3333333333333335|
|        52314|Carribean Dreams|Passaic|         0|           0| 2022-10-30|                24|                   7|3.4285714285714284|
|        52314|Carribean Dreams|Passaic|        

In [None]:
final_rating_df.count()

1198

**Calling existing final_rating data**

In [None]:
final_rating_path = f"/content/drive/MyDrive/ColabNotebooks/data/{data_source}/output_parquet/final_rating"

existing_final_rating_df = spark.read.parquet(final_rating_path)


In [None]:
existing_final_rating_df.count()

1795

In [None]:
existing_final_rating_df.show()

+---------------+------+----------+------------+-----------+------------------+-------------+
|restaurant_name|county|rating_sum|rating_count|review_date|      final_rating|restaurant_id|
+---------------+------+----------+------------+-----------+------------------+-------------+
|   Little Italy|Bergen|        20|           6| 2022-01-01|3.3333333333333335|        67432|
|   Little Italy|Bergen|         8|           2| 2022-01-02|               3.5|        67432|
|   Little Italy|Bergen|         0|           0| 2022-01-03|               3.5|        67432|
|   Little Italy|Bergen|         7|           2| 2022-01-04|               3.5|        67432|
|   Little Italy|Bergen|        14|           4| 2022-01-05|               3.5|        67432|
|   Little Italy|Bergen|        11|           3| 2022-01-06|3.5294117647058822|        67432|
|   Little Italy|Bergen|        11|           3| 2022-01-07|              3.55|        67432|
|   Little Italy|Bergen|         7|           2| 2022-01-08|

In [None]:
final_rating_df = final_rating_df.select( 'restaurant_name', 'county', 'rating_sum', 'rating_count', 'review_date', 'final_rating', 'restaurant_id' )
final_rating_df.show(20)

+----------------+-------+----------+------------+-----------+------------------+-------------+
| restaurant_name| county|rating_sum|rating_count|review_date|      final_rating|restaurant_id|
+----------------+-------+----------+------------+-----------+------------------+-------------+
|Carribean Dreams|Passaic|        11|           3| 2022-10-27|             3.375|        52314|
|Carribean Dreams|Passaic|         0|           0| 2022-10-28|             3.375|        52314|
|Carribean Dreams|Passaic|         3|           1| 2022-10-29|3.3333333333333335|        52314|
|Carribean Dreams|Passaic|         0|           0| 2022-10-30|3.4285714285714284|        52314|
|Carribean Dreams|Passaic|         3|           1| 2022-10-31|3.4285714285714284|        52314|
|Carribean Dreams|Passaic|         0|           0| 2022-11-01|3.4285714285714284|        52314|
|Carribean Dreams|Passaic|         0|           0| 2022-11-02|               3.4|        52314|
|Carribean Dreams|Passaic|         7|   

**Selecting the data from existing final_rating data for dates not in the final_rating_dates**

In [None]:

existing_final_rating_DF = existing_final_rating_df.filter(~existing_final_rating_df.review_date.isin(final_rating_dates))


In [None]:
existing_final_rating_DF.count()

1791

In [None]:
final_rating_df = existing_final_rating_DF.union(final_rating_df)

In [None]:
final_rating_df.count()

2989

**Writing final final_rating data**

In [None]:
final_rating_path = f"/content/drive/MyDrive/ColabNotebooks/data/{data_source}/output_parquet/final_rating"

final_rating_df.write.partitionBy('restaurant_id').mode('overwrite').format('parquet').option(
    "partitionOverwriteMode", "dynamic").save(final_rating_path)


In [None]:
new_df = spark.read.parquet(final_rating_path)
new_df.count()

2989