In [13]:
import logging
import pyspark.pandas as ps

import pyspark.sql.functions as Function
from pyspark.sql.functions import when

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import split, explode, weekofyear, year, avg, col, count
  



In [14]:
spark = SparkSession.builder \
            .appName('yelp_processor') \
            .getOrCreate()

22/11/08 14:44:14 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


#### First let's have a look into the `yelp_academic_dataset_business.json`

In [15]:
business_data = f'./yelp_dataset/raw_data/yelp_academic_dataset_business.json'

In [16]:
df = spark.read.json(business_data)

                                                                                

In [17]:
df.describe()

                                                                                

DataFrame[summary: string, address: string, business_id: string, categories: string, city: string, is_open: string, latitude: string, longitude: string, name: string, postal_code: string, review_count: string, stars: string, state: string]

In [18]:
pddf = df.pandas_api()

In [37]:
pddf['state'].count_values()

AttributeError: 'Series' object has no attribute 'count_values'

#### Let's cherry pick some data from the state of AZ

In [34]:
filtered_businesses = pddf[pddf['state'] == 'IL']

In [35]:
len(filtered_businesses.index)

2145

In [17]:
filtered_businesses = filtered_businesses[filtered_businesses['is_open'] == 1]

In [18]:
len(filtered_businesses.index)

8108

In [19]:
filtered_businesses.drop('any')

Unnamed: 0,address,attributes,business_id,categories,city,hours,is_open,latitude,longitude,name,postal_code,review_count,stars,state


In [20]:
len(filtered_businesses.index)

8108

#### Now that I have cherry picked my businesses, I will try to get the reviews that correspond to the cherrypicked business ids

In [21]:
review_path = './yelp_dataset/raw_data/yelp_academic_dataset_review.json'
review_df = spark.read.json(review_path)

                                                                                

In [22]:
review_df.describe()

                                                                                

DataFrame[summary: string, business_id: string, cool: string, date: string, funny: string, review_id: string, stars: string, text: string, useful: string, user_id: string]

In [23]:
type(review_df)

pyspark.sql.dataframe.DataFrame

Intially I was exploring the data as a pandas dataframe, but soon enought I realised that treating them with sql queries was more flexible. And this is where the adventure begins!

In [24]:
review_df.createOrReplaceTempView("reviews")

In [25]:
rsqlDF = spark.sql('select * from reviews')

In [26]:
rsqlDF.count()

                                                                                

6990280

In [27]:
df3 = rsqlDF.join(sqlDF,'business_id','leftsemi')

In [28]:
df3.count()

# write to json

                                                                                

986088

In [29]:
df3.describe()

                                                                                

DataFrame[summary: string, business_id: string, cool: string, date: string, funny: string, review_id: string, stars: string, text: string, useful: string, user_id: string]

In [30]:
df3.na.drop('any').show(5, truncate=False)

+----------------------+----+-------------------+-----+----------------------+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [31]:
df3.count()


                                                                                

986088

Repeat the same steps for the next table (checkin.json)

In [32]:
checkin_path = './yelp_dataset/raw_data/yelp_academic_dataset_checkin.json'
checkin_df = spark.read.json(checkin_path)
checkin_df.describe()

                                                                                

DataFrame[summary: string, business_id: string, date: string]

In [33]:
checkin_df.createOrReplaceTempView("checkin")

c_sqlDF = spark.sql('select * from checkin')
c_sqlDF.count()

131930

In [34]:
df4 = c_sqlDF.join(sqlDF,'business_id','leftsemi')

In [35]:
df4.count()


18473

In [36]:
df4.describe()

DataFrame[summary: string, business_id: string, date: string]

### Aggregation playground 

From this point ownward things get messy

In [39]:
df5 = df4.select("*", explode(split('date', ',')).alias("exploded")).groupBy("business_id").agg(count("exploded").alias("no_checkins")).sort("no_checkins", ascending=False)

In [40]:
df5.show(10)



+--------------------+-----------+
|         business_id|no_checkins|
+--------------------+-----------+
|c_4c5rJECZSfNgFj7...|      37518|
|QHWYlmVbLC3K6eglW...|       6820|
|SwBhaxfQPbyhsi0QH...|       5394|
|-K0LoSCfh8i5U_y53...|       4955|
|3YqUe2FTCQr0pPVK8...|       4476|
|AFYI0sfZ6WdVELjjE...|       4279|
|fCDMLD21ypv1XZ_Ey...|       4202|
|sihT-_DtwOdnDDDJb...|       4116|
|LdECsE8lJS7v5GTFT...|       3936|
|k-tHn9uxgWdq1ROHq...|       3862|
+--------------------+-----------+
only showing top 10 rows



                                                                                

In [41]:
c_sqlDF.join(sqlDF,'business_id','leftsemi')

DataFrame[business_id: string, date: string]

In [42]:
test_df = sqlDF.join(df5, sqlDF.business_id == df4.business_id)

In [43]:
test_df.show(5)

+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+--------------------+-----------+
|             address|          attributes|         business_id|          categories|            city|               hours|is_open|     latitude|     longitude|                name|postal_code|review_count|stars|state|         business_id|no_checkins|
+--------------------+--------------------+--------------------+--------------------+----------------+--------------------+-------+-------------+--------------+--------------------+-----------+------------+-----+-----+--------------------+-----------+
| 8150 Bryan Dairy Rd|{null, null, null...|vxrGNnuEef7YCfB7m...|Coffee & Tea, Don...|   Pinellas Park|{5:0-22:0, 5:0-22...|      1|   27.8718285|   -82.7502853|             Dunkin'|      33777|           8|  2.0|   FL|vxrGNnuEef7YCfB7m...|     



In [45]:
df3 = df3.withColumn('week', weekofyear('date'))
df3 = df3.withColumn('year', year('date'))

In [47]:
df3.show(5)

+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+----+----+
|         business_id|cool|               date|funny|           review_id|stars|                text|useful|             user_id|week|year|
+--------------------+----+-------------------+-----+--------------------+-----+--------------------+------+--------------------+----+----+
|uMvVYRgGNXf5boolA...|   0|2015-06-21 14:48:06|    0|rGQRf8UafX7OTlMNN...|  5.0|My experience wit...|     2|1WHRWwQmZOZDAhp2Q...|  25|2015|
|BVndHaLihEYbr76Z0...|   0|2014-10-11 16:22:06|    0|OAhBYw8IQ6wlfw1ow...|  5.0|Great place for b...|     0|1C2lxzUo1Hyye4RFI...|  41|2014|
|CLEWowfkj-wKYJlQD...|   1|2016-03-07 00:02:18|    0|u2vzZaOqJ2feRshaa...|  5.0|I go to blow bar ...|     2|NDZvyYHTUWWu-kqgQ...|  10|2016|
|pR8u8hXf1vvzoAGOo...|   0|2016-08-25 17:17:46|    0|EZarjNNbO_2yH1Xbi...|  5.0|First time here a...|     1|R_W9WlKiA56VzVbRz...|  34|2016|
|AgbRp5NLsP1-J1fdg..

In [48]:
grouped_data = df3.select("business_id", "stars", "week", "year")\
                .groupBy("business_id", "week", "year")\
                .agg(avg("stars").alias("avg_stars"))\
                .sort("avg_stars", ascending=False)


In [49]:
grouped_data.show(10)



+--------------------+----+----+---------+
|         business_id|week|year|avg_stars|
+--------------------+----+----+---------+
|Xbu-HcPKCyLqyZCIL...|  45|2012|      5.0|
|z55MjytBCcIza_LFT...|  34|2013|      5.0|
|hGiiXUgQrl1oqY1Be...|  26|2015|      5.0|
|3OGzmGqWwsyGLkhnx...|   6|2017|      5.0|
|SwBhaxfQPbyhsi0QH...|   5|2012|      5.0|
|QcuTCSfnI6WqKpuDL...|  40|2013|      5.0|
|JQz0_R70G3bjQ5dRq...|  18|2017|      5.0|
|ixPTo6Hum7nNZ7A4V...|  46|2014|      5.0|
|kKPJLiHIr9Gd9sYs3...|  45|2015|      5.0|
|quv6eMqyJ1NIkjPgI...|   4|2016|      5.0|
+--------------------+----+----+---------+
only showing top 10 rows





In [50]:
grouped_data_avg = grouped_data.select("*")\
    .groupBy("business_id", "year")\
    .agg(avg("avg_stars").alias("avg_stars_y"))\
    .sort("avg_stars_y", ascending=False)

In [51]:
grouped_data_avg.show()

                                                                                

+--------------------+----+-----------+
|         business_id|year|avg_stars_y|
+--------------------+----+-----------+
|CGZM3xRSaiYWPmZTt...|2019|        5.0|
|FJue-hS7-ZYLv_tnI...|2021|        5.0|
|mhPaO2zdKoI7lKRAi...|2021|        5.0|
|pyM9OTnfDoM2naIYv...|2019|        5.0|
|ABeyWKAGm3ISQ0Tq-...|2020|        5.0|
|-ch3ET0QStiKPWo1V...|2015|        5.0|
|gd22TmYLmbrgW6XEk...|2014|        5.0|
|ymi8Cb93TUSN6wwlv...|2008|        5.0|
|68vO3osEXU00U5uLw...|2021|        5.0|
|hGn7G9rWHW9E2diFV...|2016|        5.0|
|h1x65PeAa_rEfffzE...|2015|        5.0|
|pyrM6aaTOgB7iDLr5...|2019|        5.0|
|yuCHAXdpgbQdvDbzT...|2020|        5.0|
|MSVowoBcMisrot9FP...|2021|        5.0|
|e1rNGKIwNi528_J7Q...|2021|        5.0|
|DXoWKZEVA_PW1o_cd...|2015|        5.0|
|G0qAfGpIHU0gDJl-t...|2018|        5.0|
|04zP1Y6kdBNBXUJ3m...|2018|        5.0|
|xMhGSZMB85vnk3OVP...|2017|        5.0|
|GtVgio1t55VYUm8JW...|2020|        5.0|
+--------------------+----+-----------+
only showing top 20 rows



### Aggregations start to take shape 

At this point I has an idea on how my python code would look like to I started to play around with my aggregations both in pycharm and jupyter notebook.

In [1]:
import os
import argparse
import logging

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, \
    explode, weekofyear, year, avg, when, count

LOG_TAG = 'YelpDatasetAggregator'


class YelpDatasetAggregator:

    def __init__(self):
        self._app_name = 'yelp_aggregator'
        self._logger = logging.getLogger(LOG_TAG)
        logging.basicConfig(level=logging.INFO)

        self._spark_session = False
        try:
            self._spark_session = SparkSession.builder \
                .appName(self._app_name) \
                .getOrCreate()
            logging.info(f'Spark session for {self._app_name} '
                         f'was successfully created')
        except:
            logging.info(f'Spark session for {self._app_name} '
                         f'was unsuccessful')

    def read_json_data(self,
                       input_data,
                       table_name='temp'):
        """
        Reads data from json

        Args:
            input_data (string): Full path to json file with input data
            table_name (string): Name of the temporary table

        Returns:
            (pyspark.sql.dataframe.DataFrame): Resulted cleaned up table
        """
        df = ''

        if os.path.exists(input_data):

            self._logger.info(f'Reading {input_data}')
            df = self._spark_session.read.json(input_data)
            df.createOrReplaceTempView(table_name)

        else:
            self._logger.info(f'T input data path does not exist {input_data}')

        return df

    

In [2]:
input_data = './yelp_dataset/clean_data'
yelp_aggregator = YelpDatasetAggregator()

businesses_df = yelp_aggregator.read_json_data(
        os.path.join(input_data, 'yelp_academic_dataset_business.json')
    )

reviews_df = yelp_aggregator.read_json_data(
        os.path.join(input_data, 'yelp_academic_dataset_review.json')
    )

checkin_df = yelp_aggregator.read_json_data(
        os.path.join(input_data, 'yelp_academic_dataset_checkin.json')
    )

22/11/08 13:33:19 WARN Utils: Your hostname, Nikolettas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.3 instead (on interface en0)
22/11/08 13:33:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/08 13:33:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


INFO:root:Spark session for yelp_aggregator was successfully created
INFO:YelpDatasetAggregator:Reading ./yelp_dataset/clean_data/yelp_academic_dataset_business.json
INFO:YelpDatasetAggregator:Reading ./yelp_dataset/clean_data/yelp_academic_dataset_review.json


22/11/08 13:33:23 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.




In [3]:
# Calculate weekly average stars per business
reviews_df = reviews_df.withColumn('week', weekofyear('date'))
reviews_df = reviews_df.withColumn('year', year('date'))

grouped_weekly = reviews_df\
    .select("*")\
    .groupBy("business_id", "week", "year")\
    .agg(avg("stars").alias("avg_stars_weekly"))\
    .sort("avg_stars_weekly", ascending=True)

grouped_weekly.show(10)

grouped_weekly_yearly = grouped_weekly\
    .select("*")\
    .groupBy("business_id", "year")\
    .agg(avg("avg_stars_weekly").alias("avg_stars_weekly_yearly"))\
    .sort("avg_stars_weekly_yearly", ascending=True)

grouped_weekly_yearly.show(10)

                                                                                

+--------------------+----+----+----------------+
|         business_id|week|year|avg_stars_weekly|
+--------------------+----+----+----------------+
|TzaHqnoOUlSDPF2wL...|  34|2018|             1.0|
|eNM4YpOYxGqiQFn_p...|  24|2016|             1.0|
|rxGXX-5oVduCREDI6...|  23|2021|             1.0|
|btCV4udJp4Hzv7GYv...|  31|2021|             1.0|
|VFNqQHHzvQ71k11Wu...|  42|2021|             1.0|
|kBhu25HU_hfsmhUAA...|  33|2016|             1.0|
|AUOk3xhNTyXShq3Mk...|  52|2021|             1.0|
|f-7gzwLtoqis8Stcr...|  17|2021|             1.0|
|w9qOUs1Nkyyiy0gIy...|   9|2019|             1.0|
|qI2NdIdo3YK5ezlUB...|  16|2020|             1.0|
+--------------------+----+----+----------------+
only showing top 10 rows



[Stage 11:>                                                         (0 + 8) / 8]

+--------------------+----+-----------------------+
|         business_id|year|avg_stars_weekly_yearly|
+--------------------+----+-----------------------+
|ksma5FPvWkaZvcwHz...|2017|                    1.0|
|pXcCvfcVWLRTFiicH...|2022|                    1.0|
|GEdrmeb5ubO2Gq5Mg...|2016|                    1.0|
|sG13h54FVCanFhwgw...|2021|                    1.0|
|f_xxwfH2g_WkOdtDq...|2019|                    1.0|
|RgiKENuItZe1LaeEz...|2020|                    1.0|
|R8tmRzfmYX9Dx5HaL...|2015|                    1.0|
|dogcOCiqIr3Q09VW_...|2020|                    1.0|
|bbDaL2a1jYrfNK6p9...|2018|                    1.0|
|IWRg2QAqkMfyb3M07...|2013|                    1.0|
+--------------------+----+-----------------------+
only showing top 10 rows



                                                                                

In [4]:
businesses_reviews_weekly = businesses_df.\
    join(grouped_weekly, 'business_id')
businesses_reviews_weekly_checkins = businesses_reviews_weekly.\
    join(checkin_df, 'business_id')

In [5]:
businesses_reviews_weekly_checkins

DataFrame[business_id: string, address: string, attributes: struct<AcceptsInsurance:string,AgesAllowed:string,Alcohol:string,Ambience:string,BYOB:string,BYOBCorkage:string,BestNights:string,BikeParking:string,BusinessAcceptsBitcoin:string,BusinessAcceptsCreditCards:string,BusinessParking:string,ByAppointmentOnly:string,Caters:string,CoatCheck:string,Corkage:string,DietaryRestrictions:string,DogsAllowed:string,DriveThru:string,GoodForDancing:string,GoodForKids:string,GoodForMeal:string,HairSpecializesIn:string,HappyHour:string,HasTV:string,Music:string,NoiseLevel:string,Open24Hours:string,OutdoorSeating:string,RestaurantsAttire:string,RestaurantsCounterService:string,RestaurantsDelivery:string,RestaurantsGoodForGroups:string,RestaurantsPriceRange2:string,RestaurantsReservations:string,RestaurantsTableService:string,RestaurantsTakeOut:string,Smoking:string,WheelchairAccessible:string,WiFi:string>, categories: string, city: string, hours: struct<Friday:string,Monday:string,Saturday:string

In [6]:
businesses_reviews_weekly_checkins

DataFrame[business_id: string, address: string, attributes: struct<AcceptsInsurance:string,AgesAllowed:string,Alcohol:string,Ambience:string,BYOB:string,BYOBCorkage:string,BestNights:string,BikeParking:string,BusinessAcceptsBitcoin:string,BusinessAcceptsCreditCards:string,BusinessParking:string,ByAppointmentOnly:string,Caters:string,CoatCheck:string,Corkage:string,DietaryRestrictions:string,DogsAllowed:string,DriveThru:string,GoodForDancing:string,GoodForKids:string,GoodForMeal:string,HairSpecializesIn:string,HappyHour:string,HasTV:string,Music:string,NoiseLevel:string,Open24Hours:string,OutdoorSeating:string,RestaurantsAttire:string,RestaurantsCounterService:string,RestaurantsDelivery:string,RestaurantsGoodForGroups:string,RestaurantsPriceRange2:string,RestaurantsReservations:string,RestaurantsTableService:string,RestaurantsTakeOut:string,Smoking:string,WheelchairAccessible:string,WiFi:string>, categories: string, city: string, hours: struct<Friday:string,Monday:string,Saturday:string

In [7]:
businesses_reviews_weekly_checkins\
        .select("*", explode(split('date', ',')).alias("exploded"))\
        .groupBy("name", "categories", "avg_stars_weekly", "stars") \
        .agg(count("exploded").alias("no_checkins"))\
        .sort("no_checkins", ascending=False)\
        .show(10)



+--------------------+--------------------+----------------+-----+-----------+
|                name|          categories|avg_stars_weekly|stars|no_checkins|
+--------------------+--------------------+----------------+-----+-----------+
|Tucson Internatio...|Professional Serv...|             4.0|  4.0|    1049256|
|Tucson Internatio...|Professional Serv...|             5.0|  4.0|     928188|
|HUB Restaurant & ...|Ice Cream & Froze...|             4.0|  4.0|     480109|
|HUB Restaurant & ...|Ice Cream & Froze...|             5.0|  4.0|     385882|
|            Cup Cafe|Breakfast & Brunc...|             5.0|  4.0|     371966|
|Tucson Internatio...|Professional Serv...|             3.0|  4.0|     343026|
|      Barrio Brewing|Nightlife, Sports...|             4.0|  4.0|     338940|
|   Bobo's Restaurant|Burgers, Sandwich...|             5.0|  4.5|     337120|
|     El Guero Canelo|Mexican, Restaurants|             5.0|  4.0|     309583|
|         Miss Saigon|Vegetarian, Vietn...|         

                                                                                

In [8]:
# Splitting date in checkins to count the entries per week and year
checkin_df = checkin_df.withColumn('exploded_date', explode(split('date', ',')))
checkin_df = checkin_df.withColumn('week', weekofyear('exploded_date'))
checkin_df = checkin_df.withColumn('year', year('exploded_date'))

checkin_df = checkin_df.drop('exploded_date')

# Combining businesses, weekly stars, overall stars and checkins
businesses_reviews_weekly = businesses_df.\
    join(grouped_weekly, 'business_id')
businesses_reviews_weekly_checkins = businesses_reviews_weekly.\
    join(checkin_df, # 'business_id')
         (businesses_reviews_weekly.business_id == checkin_df.business_id) & \
         (businesses_reviews_weekly.week == checkin_df.week) & \
         (businesses_reviews_weekly.year == checkin_df.year))

# businesses_reviews_weekly_checkins\
#     .select("*", explode(split('date', ',')).alias("exploded"))\
#     .groupBy("name", "categories", "week", "year", "avg_stars_weekly", "stars") \
#     .agg(count("exploded").alias("no_checkins"))\
#     .sort("no_checkins", ascending=False)\
#     .show(10)

In [62]:
businesses_reviews_weekly_checkins.show(5)

[Stage 128:>                                                        (0 + 8) / 8]

22/11/08 13:32:11 ERROR Executor: Exception in task 6.0 in stage 128.0 (TID 841)
java.lang.OutOfMemoryError: Java heap space
	at org.apache.spark.sql.catalyst.expressions.UnsafeRow.copy(UnsafeRow.java:492)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.generate_doConsume_1$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.generate_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$4155/0x0000000801734040.apply(Unknown Source)
	at org.apache.

22/11/08 13:32:12 ERROR Executor: Exception in task 0.0 in stage 128.0 (TID 835)
java.lang.OutOfMemoryError: Java heap space
22/11/08 13:32:12 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 0.0 in stage 128.0 (TID 835),5,main]
java.lang.OutOfMemoryError: Java heap space
22/11/08 13:32:12 WARN TaskSetManager: Lost task 2.0 in stage 128.0 (TID 837) (192.168.0.3 executor driver): UnknownReason
22/11/08 13:32:12 ERROR TaskSetManager: Task 2 in stage 128.0 failed 1 times; aborting job
22/11/08 13:32:12 WARN TaskSetManager: Lost task 3.0 in stage 128.0 (TID 838) (192.168.0.3 executor driver): java.lang.OutOfMemoryError: Java heap space

22/11/08 13:32:12 ERROR Utils: Uncaught exception in thread task-result-getter-3
java.lang.OutOfMemoryError: Java heap space


Exception in thread "task-result-getter-3" java.lang.OutOfMemoryError: Java heap space
Exception in thread "Thread-2" java.lang.OutOfMemoryError: Java heap space) / 8]
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o776.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 128.0 failed 1 times, most recent failure: Lost task 2.0 in stage 128.0 (TID 837) (192.168.0.3 executor driver): UnknownReason
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.schedu

22/11/08 13:32:15 WARN SparkContext: Ignoring Exception while stopping SparkContext from shutdown hook
java.lang.OutOfMemoryError: Java heap space
22/11/08 13:32:15 WARN TaskSetManager: Lost task 7.0 in stage 128.0 (TID 842) (192.168.0.3 executor driver): TaskKilled (Stage cancelled)
22/11/08 13:32:15 ERROR Executor: Exception in task 1.0 in stage 128.0 (TID 836)
java.lang.OutOfMemoryError: Java heap space: failed reallocation of scalar replaced objects
22/11/08 13:32:15 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch worker for task 1.0 in stage 128.0 (TID 836),5,main]
java.lang.OutOfMemoryError: Java heap space: failed reallocation of scalar replaced objects
22/11/08 13:32:15 ERROR Executor: Exception in task 4.0 in stage 128.0 (TID 839)
java.lang.OutOfMemoryError: Java heap space
22/11/08 13:32:15 ERROR SparkUncaughtExceptionHandler: [Container in shutdown] Uncaught exception in thread Thread[Executor task launch 

ERROR:root:Exception while sending command.                         (0 + 4) / 8]
Traceback (most recent call last):
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: <unprintable Py4JJavaError object>

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3433, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/var/folders/nn/jyjrmyb96217hmy9vnyx78nr0000gn/T/ipykernel_178/3818012214.py", line 1, in <module>
    businesses_reviews_weekly_checkins.show(5)
  File "/Users/nxirakia/miniforge3/env

Py4JError: py4j.reflection.TypeUtil.isInstanceOf does not exist in the JVM

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/Users/nxirakia/miniforge3/envs/Yelp_Data_Engineering/lib/python3.10/site-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Some nice errors that show that my querying skills are not optimal 

In [9]:
checkin_df.show(10)

+--------------------+--------------------+----+----+
|         business_id|                date|week|year|
+--------------------+--------------------+----+----+
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  43|2011|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  46|2011|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  46|2011|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  11|2012|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  11|2012|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  25|2012|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  44|2012|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  45|2012|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  18|2013|
|c4P7jdIvKN-9Ol75p...|2011-10-25 00:22:...|  26|2013|
+--------------------+--------------------+----+----+
only showing top 10 rows



In [10]:
checkin_df_weekly = checkin_df\
    .select("*").groupBy("business_id", "week", "year")\
    .agg(count("week").alias("weekly_checkins"))\
    .sort("weekly_checkins", ascending=False)

checkin_df_weekly.show(10)

# checkin_df_yearly = checkin_df\
#     .select("*").groupBy("business_id", "year")\
#     .agg(count("year").alias("no_yearly_checkins"))\
#     .sort("no_yearly_checkins", ascending=False)
# checkin_df_yearly.show(10)

+--------------------+----+----+---------------+
|         business_id|week|year|weekly_checkins|
+--------------------+----+----+---------------+
|-DxeodWsu1zuv_4ND...|   7|2016|             54|
|t_fm_mMoVyArU9bzR...|  41|2013|             51|
|p76UGLhyXLxUZij1M...|   2|2022|             49|
|DNMdZVVfhus2H-dH0...|  41|2014|             49|
|ZJXc_MYDaoNr2n4uR...|  34|2011|             46|
|Aa0EldaVchavAAWL1...|  21|2014|             45|
|Aa0EldaVchavAAWL1...|  45|2013|             44|
|Aa0EldaVchavAAWL1...|  39|2013|             44|
|Aa0EldaVchavAAWL1...|  42|2012|             43|
|Aa0EldaVchavAAWL1...|  24|2012|             41|
+--------------------+----+----+---------------+
only showing top 10 rows

