In [120]:
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window, Row

VBox()

In [2]:
# Source bucket
s3_bucket = "s3://polakowo-yelp2/"

VBox()

In [3]:
# Source data paths: Yelp
yelp_dir = s3_bucket + "yelp_dataset/"

business_path = yelp_dir + "business.json"
review_path = yelp_dir + "review.json"
user_path = yelp_dir + "user.json"
checkin_path = yelp_dir + "checkin.json"
tip_path = yelp_dir + "tip.json"
photo_path = yelp_dir + "photo.json"

VBox()

In [4]:
# Source data paths: Demographics
demo_dir = s3_bucket + "demo_dataset/"

demo_path = demo_dir + "us-cities-demographics.json"

VBox()

In [5]:
# Source data paths: Weather
weather_dir = s3_bucket + "weather_dataset/"

city_attr_path = weather_dir + "city_attributes.csv"
weather_temp_path = weather_dir + "temperature.csv"
weather_desc_path = weather_dir + "weather_description.csv"

VBox()

In [6]:
# Target data paths
staging_dir = s3_bucket + "staging_data/"

business_attributes_path = staging_dir + "business_attributes"
cities_path = staging_dir + "cities"
addresses_path = staging_dir + "addresses"
categories_path = staging_dir + "categories"
business_categories_path = staging_dir + "business_categories"
hours_path = staging_dir + "hours"
businesses_path = staging_dir + "businesses"
reviews_path = staging_dir + "reviews"
users_path = staging_dir + "users"
elite_years_path = staging_dir + "elite_years"
friends_path = staging_dir + "friends"
checkins_path = staging_dir + "checkins"
tips_path = staging_dir + "tips"
photos_path = staging_dir + "photos"
city_weather_path = staging_dir + "city_weather"

VBox()

In [7]:
def describe(df):
    print("nulls:", df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).collect())
    print("count:", df.count())
    print("schema:")
    df.printSchema()
    print("example:", df.limit(1).collect())

VBox()

In [8]:
def write_csv(df, path):
    # For tables with standardized text fields
    # CSV can be parsed and processed incrementally
    df.write\
        .format('csv')\
        .option("header", "true")\
        .option("delimiter", "\t")\
        .mode('overwrite')\
        .save(path)

VBox()

In [9]:
def write_json(df, path, n_partitions):
    # JSON files have to be parsed as a whole by Amazon Redshift
    # Redshift accepts JSON files of max. 4MB size (-> smart partitioning)
    # What you gain from using JSON is stricter string handling
    df.repartition(n_partitions).write\
        .format('json')\
        .option("nullValue", None)\
        .mode('overwrite')\
        .save(path)

VBox()

In [10]:
def write_parquet(df, path):
    # The main format for our output files, as it combines pros of CSV and JSON formats
    df.write.parquet(path, mode="overwrite")

VBox()

# business tables

In [11]:
business_df = spark.read.json(business_path)

VBox()

In [12]:
describe(business_df)

VBox()

nulls: [Row(address=0, attributes=28836, business_id=0, categories=482, city=0, hours=44830, is_open=0, latitude=0, longitude=0, name=0, postal_code=0, review_count=0, stars=0, state=0)]
count: 192609
schema:
root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Cork

## business_attributes

In [13]:
business_attributes_df = business_df.select("business_id", "attributes.*")

VBox()

In [14]:
def parse_boolean(x):
    if x is None or x == 'None':
        return None
    if x == 'True':
        return True
    if x == 'False':
        return False

parse_boolean_udf = F.udf(parse_boolean, T.BooleanType())

bool_attrs = [
    "AcceptsInsurance",
    "BYOB",
    "BikeParking", 
    "BusinessAcceptsBitcoin", 
    "BusinessAcceptsCreditCards",
    "ByAppointmentOnly", 
    "Caters", 
    "CoatCheck", 
    "Corkage", 
    "DogsAllowed",
    "DriveThru", 
    "GoodForDancing", 
    "GoodForKids",
    "HappyHour", 
    "HasTV",
    "Open24Hours", 
    "OutdoorSeating", 
    "RestaurantsCounterService", 
    "RestaurantsDelivery", 
    "RestaurantsGoodForGroups", 
    "RestaurantsReservations", 
    "RestaurantsTableService", 
    "RestaurantsTakeOut",
    "WheelchairAccessible"
]

for attr in bool_attrs:
    business_attributes_df = business_attributes_df.withColumn(attr, parse_boolean_udf(attr))

VBox()

In [15]:
def parse_string(x):
    if x is None or x == '':
        return None
    # Some strings are of format u"u'string'"
    return x.replace("u'", "").replace("'", "").lower()
    
parse_string_udf = F.udf(parse_string, T.StringType())

str_attrs = [
    "AgesAllowed", 
    "Alcohol",
    "BYOBCorkage",
    "NoiseLevel",
    "RestaurantsAttire",
    "Smoking",
    "WiFi",
]

for attr in str_attrs:
    business_attributes_df = business_attributes_df.withColumn(attr, parse_string_udf(attr))

VBox()

In [16]:
def parse_integer(x):
    if x is None or x == 'None':
        return None
    return int(x)
    
parse_integer_udf = F.udf(parse_integer, T.IntegerType())

int_attrs = [
    "RestaurantsPriceRange2",
]

for attr in int_attrs:
    business_attributes_df = business_attributes_df.withColumn(attr, parse_integer_udf(attr))

VBox()

In [17]:
import ast

def parse_boolean_dict(x):
    if x is None or x == 'None' or x == '':
        return None
    return ast.literal_eval(x)

parse_boolean_dict_udf = F.udf(parse_boolean_dict, T.MapType(T.StringType(), T.BooleanType()))

bool_dict_attrs = [
    "Ambience",
    "BestNights",
    "BusinessParking",
    "DietaryRestrictions",
    "GoodForMeal",
    "HairSpecializesIn",
    "Music"
]

for attr in bool_dict_attrs:
    business_attributes_df = business_attributes_df.withColumn(attr, parse_boolean_dict_udf(attr))
    # Get all keys of the MapType
    # [Row(key=u'romantic'), Row(key=u'casual'), ...
    key_rows = business_attributes_df.select(F.explode(attr)).select("key").distinct().collect()
    # Convert each key into column (with proper name)
    exprs = ["{}['{}'] as {}".format(attr, row.key, attr+"_"+row.key.replace('-', '_')) for row in key_rows]
    business_attributes_df = business_attributes_df.selectExpr("*", *exprs).drop(attr)

VBox()

In [18]:
describe(business_attributes_df)

VBox()

nulls: [Row(business_id=0, AcceptsInsurance=185373, AgesAllowed=192486, Alcohol=144146, BYOB=192581, BYOBCorkage=191186, BikeParking=107398, BusinessAcceptsBitcoin=179526, BusinessAcceptsCreditCards=79590, ByAppointmentOnly=145880, Caters=152114, CoatCheck=189109, Corkage=191947, DogsAllowed=185213, DriveThru=189420, GoodForDancing=187872, GoodForKids=126362, HappyHour=187409, HasTV=144565, NoiseLevel=148730, Open24Hours=192596, OutdoorSeating=137854, RestaurantsAttire=143970, RestaurantsCounterService=192598, RestaurantsDelivery=140155, RestaurantsGoodForGroups=137933, RestaurantsPriceRange2=84527, RestaurantsReservations=140371, RestaurantsTableService=175465, RestaurantsTakeOut=130601, Smoking=189111, WheelchairAccessible=172696, WiFi=142545, Ambience_romantic=145072, Ambience_casual=145072, Ambience_trendy=145072, Ambience_intimate=145072, Ambience_hipster=145695, Ambience_upscale=145260, Ambience_divey=152636, Ambience_touristy=145072, Ambience_classy=145072, BestNights_sunday=189

In [19]:
write_parquet(business_attributes_df, business_attributes_path)

VBox()

In [20]:
# Check maximum length of each column (for Redshift)
print(business_attributes_df.select("business_id").agg(F.max(F.length("business_id"))).first())
print(business_attributes_df.select("AgesAllowed").agg(F.max(F.length("AgesAllowed"))).first())
print(business_attributes_df.select("Alcohol").agg(F.max(F.length("Alcohol"))).first())
print(business_attributes_df.select("BYOBCorkage").agg(F.max(F.length("BYOBCorkage"))).first())
print(business_attributes_df.select("NoiseLevel").agg(F.max(F.length("NoiseLevel"))).first())
print(business_attributes_df.select("RestaurantsAttire").agg(F.max(F.length("RestaurantsAttire"))).first())
print(business_attributes_df.select("Smoking").agg(F.max(F.length("Smoking"))).first())
print(business_attributes_df.select("WiFi").agg(F.max(F.length("WiFi"))).first())

VBox()

Row(max(length(business_id))=22)
Row(max(length(AgesAllowed))=7)
Row(max(length(Alcohol))=13)
Row(max(length(BYOBCorkage))=11)
Row(max(length(NoiseLevel))=9)
Row(max(length(RestaurantsAttire))=6)
Row(max(length(Smoking))=7)
Row(max(length(WiFi))=4)

## demographics

In [21]:
demo_df = spark.read.json(demo_path)

VBox()

In [22]:
describe(demo_df)

VBox()

nulls: [Row(datasetid=0, fields=0, record_timestamp=0, recordid=0)]
count: 2891
schema:
root
 |-- datasetid: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- average_household_size: double (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- count: long (nullable = true)
 |    |-- female_population: long (nullable = true)
 |    |-- foreign_born: long (nullable = true)
 |    |-- male_population: long (nullable = true)
 |    |-- median_age: double (nullable = true)
 |    |-- number_of_veterans: long (nullable = true)
 |    |-- race: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- state_code: string (nullable = true)
 |    |-- total_population: long (nullable = true)
 |-- record_timestamp: string (nullable = true)
 |-- recordid: string (nullable = true)

example: [Row(datasetid='us-cities-demographics', fields=Row(average_household_size=2.73, city='Newark', count=76402, female_population=143873, foreign_born=86253, male_

In [23]:
def prepare_race(x):
    return x.replace(" ", "_").replace("-", "_").lower()
    
prepare_race_udf = F.udf(prepare_race, T.StringType())

demo_df = demo_df.select("fields.*")\
    .withColumn("race", prepare_race_udf("race"))
demo_df = demo_df.groupby(*set(demo_df.schema.names).difference(set(["race", "count"])))\
    .pivot('race')\
    .max('count')

VBox()

In [24]:
describe(demo_df)

VBox()

nulls: [Row(state_code=0, city=0, number_of_veterans=7, male_population=1, state=0, median_age=0, total_population=0, foreign_born=7, female_population=1, average_household_size=8, american_indian_and_alaska_native=57, asian=13, black_or_african_american=12, hispanic_or_latino=0, white=7)]
count: 596
schema:
root
 |-- state_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- male_population: long (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- total_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- american_indian_and_alaska_native: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black_or_african_american: long (nullable = true)
 |-- hispanic_or_latino: long (nullable = true)
 |-- white: long (nullable = true)

example: 

## cities

In [37]:
cities_df = business_df.selectExpr("city", "state as state_code")\
    .distinct()\
    .join(demo_df, ["city", "state_code"], how="left")\
    .withColumn("city_id", F.monotonically_increasing_id())

VBox()

In [38]:
describe(cities_df)

VBox()

nulls: [Row(city=0, state_code=0, number_of_veterans=1210, male_population=1210, state=1210, median_age=1210, total_population=1210, foreign_born=1210, female_population=1210, average_household_size=1210, american_indian_and_alaska_native=1211, asian=1210, black_or_african_american=1210, hispanic_or_latino=1210, white=1210, city_id=0)]
count: 1258
schema:
root
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- male_population: long (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- total_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- american_indian_and_alaska_native: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black_or_african_american: long (nullable = true)
 |-- hispanic_or_latino: long (nullable = tru

In [39]:
write_parquet(cities_df, cities_path)

VBox()

In [40]:
print(cities_df.select("city").agg(F.max(F.length("city"))).first())
print(cities_df.select("state_code").agg(F.max(F.length("state_code"))).first())
print(cities_df.select("state").agg(F.max(F.length("state"))).first())

VBox()

Row(max(length(city))=50)
Row(max(length(state_code))=3)
Row(max(length(state))=14)

## addresses

In [42]:
addresses_df = business_df.selectExpr("address", "latitude", "longitude", "postal_code", "city", "state as state_code")\
    .join(cities_df.select("city", "state_code", "city_id"), ["city", "state_code"], how='left')\
    .drop("city", "state_code")\
    .distinct()\
    .withColumn("address_id", F.monotonically_increasing_id())

VBox()

In [43]:
describe(addresses_df)

VBox()

nulls: [Row(address=0, latitude=0, longitude=0, postal_code=0, city_id=0, address_id=0)]
count: 178763
schema:
root
 |-- address: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- city_id: long (nullable = true)
 |-- address_id: long (nullable = false)

example: [Row(address='3495 Lawrence Ave E', latitude=43.757291, longitude=-79.2293784, postal_code='M1H 1B2', city_id=8589934592, address_id=0)]

In [44]:
write_parquet(addresses_df, addresses_path)

VBox()

In [45]:
print(addresses_df.select("address").agg(F.max(F.length("address"))).first())
print(addresses_df.select("postal_code").agg(F.max(F.length("postal_code"))).first())

VBox()

Row(max(length(address))=118)
Row(max(length(postal_code))=8)

## categories

In [46]:
import re
def parse_categories(categories):
    if categories is None:
        return []
    parsed = []
    # Some strings contain commas, so they have to be extracted beforehand
    require_attention = set(["Wills, Trusts, & Probates"])
    for s in require_attention:
        if categories.find(s) > -1:
            parsed.append(s)
            categories = categories.replace(s, "")
    return list(filter(None, parsed + re.split(r",\s*", categories)))
    
parse_categories_udf = F.udf(parse_categories, T.ArrayType(T.StringType()))
business_categories_df = business_df.select("business_id", "categories")\
    .withColumn("categories", parse_categories_udf("categories"))

VBox()

In [47]:
categories_df = business_categories_df.select(F.explode("categories").alias("category"))\
    .dropDuplicates()\
    .sort("category")\
    .withColumn("category_id", F.monotonically_increasing_id())

VBox()

In [48]:
describe(categories_df)

VBox()

nulls: [Row(category=0, category_id=0)]
count: 1298
schema:
root
 |-- category: string (nullable = true)
 |-- category_id: long (nullable = false)

example: [Row(category='3D Printing', category_id=0)]

In [49]:
write_parquet(categories_df, categories_path)

VBox()

In [50]:
print(categories_df.select("category").agg(F.max(F.length("category"))).first())

VBox()

Row(max(length(category))=35)

## business_categories

In [51]:
import re
def zip_categories(business_id, categories):
    return list(zip([business_id] * len(categories), categories))
    
zip_categories_udf = F.udf(zip_categories, T.ArrayType(T.ArrayType(T.StringType())))

business_categories_df = business_categories_df.select(F.explode(zip_categories_udf("business_id", "categories")).alias("cols"))\
    .selectExpr("cols[0] as business_id", "cols[1] as category")\
    .dropDuplicates()
business_categories_df = business_categories_df.join(categories_df, business_categories_df["category"] == categories_df["category"], how="left")\
    .drop("category")

VBox()

In [52]:
describe(business_categories_df)

VBox()

nulls: [Row(business_id=0, category_id=0)]
count: 788110
schema:
root
 |-- business_id: string (nullable = true)
 |-- category_id: long (nullable = true)

example: [Row(business_id='LB6ZyCfUzeX9OLdunHhnOQ', category_id=747324309510)]

In [53]:
write_parquet(business_categories_df, business_categories_path)

VBox()

## hours

In [54]:
hours_df = business_df.select("business_id", "hours.*")

VBox()

In [55]:
def parse_hours(x):
    if x is None:
        return None
    # Transform 8:30 to 830
    convert_to_int = lambda x: int(x.split(':')[0]) * 100 + int(x.split(':')[1])
    return {
        "from": convert_to_int(x.split('-')[0]),
        "to": convert_to_int(x.split('-')[1])
    }
    
parse_hours_udf = F.udf(parse_hours, T.StructType([
    T.StructField('from', T.IntegerType(), nullable=True),
    T.StructField('to', T.IntegerType(), nullable=True)
]))

hour_attrs = [
    "Monday",
    "Tuesday",
    "Wednesday",
    "Thursday",
    "Friday",
    "Saturday",
    "Sunday",
]

for attr in hour_attrs:
    hours_df = hours_df.withColumn(attr, parse_hours_udf(attr))\
        .selectExpr("*", attr+".from as "+attr+"_from", attr+".to as "+attr+"_to")\
        .drop(attr)

VBox()

In [56]:
describe(hours_df)

VBox()

nulls: [Row(business_id=0, Monday_from=56842, Monday_to=56842, Tuesday_from=49181, Tuesday_to=49181, Wednesday_from=47452, Wednesday_to=47452, Thursday_from=46706, Thursday_to=46706, Friday_from=47435, Friday_to=47435, Saturday_from=66748, Saturday_to=66748, Sunday_from=101273, Sunday_to=101273)]
count: 192609
schema:
root
 |-- business_id: string (nullable = true)
 |-- Monday_from: integer (nullable = true)
 |-- Monday_to: integer (nullable = true)
 |-- Tuesday_from: integer (nullable = true)
 |-- Tuesday_to: integer (nullable = true)
 |-- Wednesday_from: integer (nullable = true)
 |-- Wednesday_to: integer (nullable = true)
 |-- Thursday_from: integer (nullable = true)
 |-- Thursday_to: integer (nullable = true)
 |-- Friday_from: integer (nullable = true)
 |-- Friday_to: integer (nullable = true)
 |-- Saturday_from: integer (nullable = true)
 |-- Saturday_to: integer (nullable = true)
 |-- Sunday_from: integer (nullable = true)
 |-- Sunday_to: integer (nullable = true)

example: [Row

In [57]:
write_parquet(hours_df, hours_path)

VBox()

## businesses

In [58]:
# Remove quotes which cause errors in Amazon Redshift COPY statement
businesses_df = business_df.join(addresses_df, (business_df["address"] == addresses_df["address"]) 
                 & (business_df["latitude"] == addresses_df["latitude"]) 
                 & (business_df["longitude"] == addresses_df["longitude"])
                 & (business_df["postal_code"] == addresses_df["postal_code"]), how="left")\
    .selectExpr("business_id", "address_id", "cast(is_open as boolean)", "name", "review_count", "stars")

VBox()

In [59]:
describe(businesses_df)

VBox()

nulls: [Row(business_id=0, address_id=0, is_open=0, name=0, review_count=0, stars=0)]
count: 196728
schema:
root
 |-- business_id: string (nullable = true)
 |-- address_id: long (nullable = true)
 |-- is_open: boolean (nullable = true)
 |-- name: string (nullable = true)
 |-- review_count: long (nullable = true)
 |-- stars: double (nullable = true)

example: [Row(business_id='nn8RDkUz0cWLcEhuga4f7Q', address_id=1271310319798, is_open=True, name='Handy AZ Man', review_count=19, stars=4.5)]

In [60]:
write_parquet(businesses_df, businesses_path)

VBox()

# review tables

In [61]:
review_df = spark.read.json(review_path)

VBox()

In [62]:
describe(review_df)

VBox()

nulls: [Row(business_id=0, cool=0, date=0, funny=0, review_id=0, stars=0, text=0, useful=0, user_id=0)]
count: 6685900
schema:
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

example: [Row(business_id='ujmEBvifdJM6h6RLv4wQIg', cool=0, date='2013-05-07 04:34:36', funny=1, review_id='Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id='hG7b0MtEbXx5QzbzE6C_VA')]

## reviews

In [152]:
# date field looks more like timestamp
# Long text is hard to insert into CSV so omit
reviews_df = review_df.withColumnRenamed("date", "ts")\
    .withColumn("ts", F.to_timestamp("ts"))

VBox()

In [153]:
describe(reviews_df)

VBox()

nulls: [Row(business_id=0, cool=0, ts=0, funny=0, review_id=0, stars=0, text=0, useful=0, user_id=0)]
count: 6685900
schema:
root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)

example: [Row(business_id='ujmEBvifdJM6h6RLv4wQIg', cool=0, ts=datetime.datetime(2013, 5, 7, 4, 34, 36), funny=1, review_id='Q1sbwvVQXV2734tPgoKj4Q', stars=1.0, text='Total bill for this horrible service? Over $8Gs. These crooks actually had the nerve to charge us $69 for 3 pills. I checked online the pills can be had for 19 cents EACH! Avoid Hospital ERs at all costs.', useful=6, user_id='hG7b0MtEbXx5QzbzE6C_VA')]

In [154]:
write_parquet(reviews_df, reviews_path)

VBox()

In [66]:
print(reviews_df.select("text").agg(F.max(F.length("text"))).first())
# and multiply by 4 to get the amount of bytes a column in Redshift should allocate

VBox()

Row(max(length(text))=5000)

# user tables

In [67]:
user_df = spark.read.json(user_path)

VBox()

In [68]:
describe(user_df)

VBox()

nulls: [Row(average_stars=0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, elite=0, fans=0, friends=0, funny=0, name=0, review_count=0, useful=0, user_id=0, yelping_since=0)]
count: 1637138
schema:
root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- elite: string (nullable = true)

## users

In [155]:
users_df = user_df.drop("elite", "friends")\
    .withColumn("yelping_since", F.to_timestamp("yelping_since"))

VBox()

In [156]:
describe(users_df)

VBox()

nulls: [Row(average_stars=0, compliment_cool=0, compliment_cute=0, compliment_funny=0, compliment_hot=0, compliment_list=0, compliment_more=0, compliment_note=0, compliment_photos=0, compliment_plain=0, compliment_profile=0, compliment_writer=0, cool=0, fans=0, funny=0, name=0, review_count=0, useful=0, user_id=0, yelping_since=0)]
count: 1637138
schema:
root
 |-- average_stars: double (nullable = true)
 |-- compliment_cool: long (nullable = true)
 |-- compliment_cute: long (nullable = true)
 |-- compliment_funny: long (nullable = true)
 |-- compliment_hot: long (nullable = true)
 |-- compliment_list: long (nullable = true)
 |-- compliment_more: long (nullable = true)
 |-- compliment_note: long (nullable = true)
 |-- compliment_photos: long (nullable = true)
 |-- compliment_plain: long (nullable = true)
 |-- compliment_profile: long (nullable = true)
 |-- compliment_writer: long (nullable = true)
 |-- cool: long (nullable = true)
 |-- fans: long (nullable = true)
 |-- funny: long (null

In [157]:
write_parquet(users_df, users_path)

VBox()

## elite_years

In [72]:
elite_years_df = user_df.select("user_id", "elite")\
    .withColumn("year", F.explode(F.split(F.col("elite"), ",")))\
    .where("year != '' and year is not null")\
    .select(F.col("user_id"), F.col("year").cast("integer"))

VBox()

In [73]:
describe(elite_years_df)

VBox()

nulls: [Row(user_id=0, year=0)]
count: 224499
schema:
root
 |-- user_id: string (nullable = true)
 |-- year: integer (nullable = true)

example: [Row(user_id='l6BmjZMeQD3rDxWUbiAiow', year=2015)]

In [74]:
write_parquet(elite_years_df, elite_years_path)

VBox()

## friends

In [75]:
friends_df = user_df.select("user_id", "friends")\
    .withColumn("friend_id", F.explode(F.split(F.col("friends"), ", ")))\
    .where("friend_id != '' and friend_id is not null")\
    .select(F.col("user_id"), F.col("friend_id"))\
    .distinct()

VBox()

In [76]:
describe(friends_df)

VBox()

nulls: [Row(user_id=0, friend_id=0)]
count: 75531114
schema:
root
 |-- user_id: string (nullable = true)
 |-- friend_id: string (nullable = true)

example: [Row(user_id='l6BmjZMeQD3rDxWUbiAiow', friend_id='wMpFA46lihK8oFns_5p65A')]

In [77]:
write_parquet(friends_df, friends_path)

VBox()

# checkin tables

In [78]:
checkin_df = spark.read.json(checkin_path)

VBox()

In [79]:
describe(checkin_df)

VBox()

nulls: [Row(business_id=0, date=0)]
count: 161950
schema:
root
 |-- business_id: string (nullable = true)
 |-- date: string (nullable = true)

example: [Row(business_id='--1UhMGODdWsrMastO9DZw', date='2016-04-26 19:49:16, 2016-08-30 18:36:57, 2016-10-15 02:45:18, 2016-11-18 01:54:50, 2017-04-20 18:39:06, 2017-05-03 17:58:02')]

## checkins

In [165]:
checkins_df = checkin_df.selectExpr("business_id", "date as ts")\
    .withColumn("ts", F.explode(F.split(F.col("ts"), ", ")))\
    .where("ts != '' and ts is not null")\
    .withColumn("ts", F.to_timestamp("ts"))

VBox()

In [166]:
describe(checkins_df)

VBox()

nulls: [Row(business_id=0, ts=0)]
count: 19089148
schema:
root
 |-- business_id: string (nullable = true)
 |-- ts: timestamp (nullable = true)

example: [Row(business_id='--1UhMGODdWsrMastO9DZw', ts=datetime.datetime(2016, 4, 26, 19, 49, 16))]

In [167]:
write_parquet(checkins_df, checkins_path)

VBox()

# tip tables

In [83]:
tip_df = spark.read.json(tip_path)

VBox()

In [84]:
describe(tip_df)

VBox()

nulls: [Row(business_id=0, compliment_count=0, date=0, text=0, user_id=0)]
count: 1223094
schema:
root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)

example: [Row(business_id='VaKXUpmWTTWDKbpJ3aQdMw', compliment_count=0, date='2014-03-27 03:51:24', text='Great for watching games, ufc, and whatever else tickles yer fancy', user_id='UPw5DWs_b-e2JRBS-t37Ag')]

## tips

In [162]:
tips_df = tip_df.withColumnRenamed("date", "ts")\
    .withColumn("ts", F.to_timestamp("ts"))\
    .withColumn("tip_id", F.monotonically_increasing_id())

VBox()

In [163]:
describe(tips_df)

VBox()

nulls: [Row(business_id=0, compliment_count=0, ts=0, text=0, user_id=0, tip_id=0)]
count: 1223094
schema:
root
 |-- business_id: string (nullable = true)
 |-- compliment_count: long (nullable = true)
 |-- ts: timestamp (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)
 |-- tip_id: long (nullable = false)

example: [Row(business_id='VaKXUpmWTTWDKbpJ3aQdMw', compliment_count=0, ts=datetime.datetime(2014, 3, 27, 3, 51, 24), text='Great for watching games, ufc, and whatever else tickles yer fancy', user_id='UPw5DWs_b-e2JRBS-t37Ag', tip_id=0)]

In [164]:
write_parquet(tips_df, tips_path)

VBox()

In [88]:
print(tips_df.select("text").agg(F.max(F.length("text"))).first())

VBox()

Row(max(length(text))=500)

# photo tables

In [89]:
photo_df = spark.read.json(photo_path)

VBox()

In [90]:
describe(photo_df)

VBox()

nulls: [Row(business_id=0, caption=0, label=0, photo_id=0)]
count: 200000
schema:
root
 |-- business_id: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- label: string (nullable = true)
 |-- photo_id: string (nullable = true)

example: [Row(business_id='rcaPajgKOJC2vo_l3xa42A', caption='', label='inside', photo_id='MllA1nNpcp1kDteVg6OGUw')]

## photos

In [91]:
write_parquet(photo_df, photos_path)

VBox()

In [92]:
print(photo_df.select("caption").agg(F.max(F.length("caption"))).first())
print(photo_df.select("label").agg(F.max(F.length("label"))).first())

VBox()

Row(max(length(caption))=140)
Row(max(length(label))=7)

# weather tables

## city_attributes

In [93]:
city_attr_df = spark.read\
    .format('csv')\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load(city_attr_path)

VBox()

In [94]:
describe(city_attr_df)

VBox()

nulls: [Row(City=0, Country=0, Latitude=0, Longitude=0)]
count: 36
schema:
root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)

example: [Row(City='Vancouver', Country='Canada', Latitude='49.24966', Longitude='-123.119339')]

In [95]:
# We only want the list of US cities
cities = city_attr_df.where("Country = 'United States'")\
    .select("City")\
    .distinct()\
    .rdd.flatMap(lambda x: x)\
    .collect()

VBox()

In [96]:
# The list of cities provided by the weather dataset
cities

VBox()

['Phoenix', 'Dallas', 'San Antonio', 'Philadelphia', 'Los Angeles', 'Indianapolis', 'San Francisco', 'San Diego', 'Nashville', 'Detroit', 'Portland', 'Pittsburgh', 'Chicago', 'Atlanta', 'Las Vegas', 'Seattle', 'Kansas City', 'Saint Louis', 'Minneapolis', 'Houston', 'Jacksonville', 'Albuquerque', 'Miami', 'New York', 'Charlotte', 'Denver', 'Boston']

In [97]:
# Weather dataset doesn't provide us with the respective state codes though
# How do we know whether "Phoenix" is in AZ or TX?
# The most appropriate solution is finding the biggest city
# Let's find out which of those cities are referenced in Yelp dataset and relevant to us

cities_df = spark.read.parquet(cities_path)

VBox()

In [150]:
cities_df.printSchema()

VBox()

root
 |-- city: string (nullable = true)
 |-- state_code: string (nullable = true)
 |-- number_of_veterans: long (nullable = true)
 |-- male_population: long (nullable = true)
 |-- state: string (nullable = true)
 |-- median_age: double (nullable = true)
 |-- total_population: long (nullable = true)
 |-- foreign_born: long (nullable = true)
 |-- female_population: long (nullable = true)
 |-- average_household_size: double (nullable = true)
 |-- american_indian_and_alaska_native: long (nullable = true)
 |-- asian: long (nullable = true)
 |-- black_or_african_american: long (nullable = true)
 |-- hispanic_or_latino: long (nullable = true)
 |-- white: long (nullable = true)
 |-- city_id: long (nullable = true)

In [119]:
cities_df.filter(F.col("city").isin(cities))\
    .select("city")\
    .distinct()\
    .collect()
# Tables "cities" includes 11 cities out of 36 provided by the weather dataset

VBox()

[Row(city='Phoenix'), Row(city='Dallas'), Row(city='Los Angeles'), Row(city='San Diego'), Row(city='Pittsburgh'), Row(city='Las Vegas'), Row(city='Seattle'), Row(city='New York'), Row(city='Charlotte'), Row(city='Denver'), Row(city='Boston')]

In [142]:
# Now find their states (using Google or any other API)
weather_cities_df = [
    Row(city='Phoenix', state_code='AZ'), 
    Row(city='Dallas', state_code='TX'), 
    Row(city='Los Angeles', state_code='CA'), 
    Row(city='San Diego', state_code='CA'), 
    Row(city='Pittsburgh', state_code='PA'), 
    Row(city='Las Vegas', state_code='NV'), 
    Row(city='Seattle', state_code='WA'), 
    Row(city='New York', state_code='NY'), 
    Row(city='Charlotte', state_code='NC'), 
    Row(city='Denver', state_code='CO'), 
    Row(city='Boston', state_code='MA')
]
weather_cities_schema = T.StructType([
    T.StructField("city", T.StringType()),
    T.StructField("state_code", T.StringType())
])
weather_cities_df = spark.createDataFrame(weather_cities_df, schema=weather_cities_schema) 

VBox()

In [143]:
# Join with the cities dataset to find matches
weather_cities_df = cities_df.join(weather_cities_df, ["city", "state_code"])\
    .select("city", "city_id")\
    .distinct()

VBox()

## temperature

In [127]:
weather_temp_df = spark.read\
    .format('csv')\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load(weather_temp_path)

VBox()

In [128]:
describe(weather_temp_df)

VBox()

nulls: [Row(datetime=0, Vancouver=795, Portland=1, San Francisco=793, Seattle=3, Los Angeles=3, San Diego=1, Las Vegas=1, Phoenix=3, Albuquerque=1, Denver=1, San Antonio=1, Dallas=4, Houston=3, Kansas City=1, Minneapolis=13, Saint Louis=1, Chicago=3, Nashville=2, Indianapolis=7, Atlanta=6, Detroit=1, Jacksonville=1, Charlotte=3, Miami=805, Pittsburgh=3, Toronto=1, Philadelphia=3, New York=793, Montreal=3, Boston=3, Beersheba=798, Tel Aviv District=793, Eilat=792, Haifa=798, Nahariyya=797, Jerusalem=793)]
count: 45253
schema:
root
 |-- datetime: string (nullable = true)
 |-- Vancouver: string (nullable = true)
 |-- Portland: string (nullable = true)
 |-- San Francisco: string (nullable = true)
 |-- Seattle: string (nullable = true)
 |-- Los Angeles: string (nullable = true)
 |-- San Diego: string (nullable = true)
 |-- Las Vegas: string (nullable = true)
 |-- Phoenix: string (nullable = true)
 |-- Albuquerque: string (nullable = true)
 |-- Denver: string (nullable = true)
 |-- San Anton

In [129]:
weather_temp_df = weather_temp_df.select("datetime", *cities)\
    .withColumn("date", F.substring("datetime", 0, 10))\
    .drop("datetime")

VBox()

In [130]:
temp_df = None
for city in cities:
    # Get average temperature in Fahrenheit for each day and city
    df = weather_temp_df.selectExpr("date", f"`{city}` as temperature", f"'{city}' as city")\
        .groupBy("date", "city")\
        .agg(F.mean("temperature").alias("avg_temperature"))
    if temp_df is None:
        temp_df = df
    else:
        temp_df = temp_df.union(df)
weather_temp_df = temp_df

VBox()

In [131]:
# Speed up further joins
weather_temp_df = weather_temp_df.repartition(1).cache()
weather_temp_df.count()

VBox()

50949

In [132]:
describe(weather_temp_df)

VBox()

nulls: [Row(date=0, city=0, avg_temperature=99)]
count: 50949
schema:
root
 |-- date: string (nullable = true)
 |-- city: string (nullable = false)
 |-- avg_temperature: double (nullable = true)

example: [Row(date='2013-09-10', city='Phoenix', avg_temperature=297.9496805555417)]

## weather_description

In [133]:
weather_desc_df = spark.read\
    .format('csv')\
    .option("header", "true")\
    .option("delimiter", ",")\
    .load(weather_desc_path)

VBox()

In [134]:
describe(weather_desc_df)

VBox()

nulls: [Row(datetime=0, Vancouver=793, Portland=1, San Francisco=793, Seattle=1, Los Angeles=1, San Diego=1, Las Vegas=1, Phoenix=1, Albuquerque=1, Denver=1, San Antonio=1, Dallas=1, Houston=1, Kansas City=1, Minneapolis=1, Saint Louis=1, Chicago=1, Nashville=1, Indianapolis=1, Atlanta=1, Detroit=1, Jacksonville=1, Charlotte=1, Miami=793, Pittsburgh=1, Toronto=1, Philadelphia=1, New York=793, Montreal=1, Boston=1, Beersheba=793, Tel Aviv District=793, Eilat=792, Haifa=793, Nahariyya=793, Jerusalem=793)]
count: 45253
schema:
root
 |-- datetime: string (nullable = true)
 |-- Vancouver: string (nullable = true)
 |-- Portland: string (nullable = true)
 |-- San Francisco: string (nullable = true)
 |-- Seattle: string (nullable = true)
 |-- Los Angeles: string (nullable = true)
 |-- San Diego: string (nullable = true)
 |-- Las Vegas: string (nullable = true)
 |-- Phoenix: string (nullable = true)
 |-- Albuquerque: string (nullable = true)
 |-- Denver: string (nullable = true)
 |-- San Antoni

In [135]:
weather_desc_df = weather_desc_df.select("datetime", *cities)\
    .withColumn("date", F.substring("datetime", 0, 10))\
    .drop("datetime")

VBox()

In [136]:
temp_df = None
for city in cities:
    # Get the most frequent description for each day and city
    window = Window.partitionBy("date", "city").orderBy(F.desc("count"))
    df = weather_desc_df.selectExpr("date", f"`{city}` as weather_description", f"'{city}' as city")\
        .groupBy("date", "city", "weather_description")\
        .count()\
        .withColumn("order", F.row_number().over(window))\
        .where(F.col("order") == 1)\
        .drop("count", "order")
    if temp_df is None:
        temp_df = df
    else:
        temp_df = temp_df.union(df)
weather_desc_df = temp_df

VBox()

In [137]:
# Speed up further joins
weather_desc_df = weather_desc_df.repartition(1).cache()
weather_desc_df.count()

VBox()

50949

In [138]:
describe(weather_desc_df)

VBox()

nulls: [Row(date=0, city=0, weather_description=102)]
count: 50949
schema:
root
 |-- date: string (nullable = true)
 |-- city: string (nullable = false)
 |-- weather_description: string (nullable = true)

example: [Row(date='2013-09-10', city='Phoenix', weather_description='light rain')]

## city_weather

In [169]:
# What was the weather in the city when the particular review was posted?
# Join weather description with temperature, and keep only city ids which are present in Yelp
city_weather_df = weather_temp_df.join(weather_desc_df, ["city", "date"])\
    .join(weather_cities_df, "city")\
    .drop("city")\
    .distinct()\
    .withColumn("date", F.to_date("date"))

VBox()

In [170]:
describe(city_weather_df)

VBox()

nulls: [Row(date=0, avg_temperature=33, weather_description=34, city_id=0)]
count: 15096
schema:
root
 |-- date: date (nullable = true)
 |-- avg_temperature: double (nullable = true)
 |-- weather_description: string (nullable = true)
 |-- city_id: long (nullable = true)

example: [Row(date=datetime.date(2013, 11, 23), avg_temperature=285.16625, weather_description='sky is clear', city_id=146028888064)]

In [171]:
write_parquet(city_weather_df, city_weather_path)

VBox()

In [149]:
print(city_weather_df.select("weather_description").agg(F.max(F.length("weather_description"))).first())

VBox()

Row(max(length(weather_description))=23)