# NTU SC1015 - Yelp Dataset Preparation
[Yelp Dataset](https://www.yelp.com/dataset) is a subset of Yelp's internal data for academic research:

![Yelp Dataset ERD](/files/shared_uploads/programnom@gmail.com/yelp_dataset_erd.png)

## Objective
```
80K     Dataset_User_Agreement.pdf
114M    yelp_academic_dataset_business.json
274M    yelp_academic_dataset_checkin.json
5.0G    yelp_academic_dataset_review.json
173M    yelp_academic_dataset_tip.json
3.2G    yelp_academic_dataset_user.json
```

Since the data files are too large to be processed manually, we will extract a **chronogical** subset of the Yelp Dataset with Spark for further analysis.
Additionally, some data transformations (eg. **denormalisation**) will be performed to simplify later analysis.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.getActiveSession()

## Load Dataset
Load the Yelp dataset with Spark:

In [0]:
prefix = "yelp_academic_dataset_"
s3_bucket = "ntu-sc1015-yelp"
tables = [
    "business",
    "checkin",
    "review",
    "tip",
    "user",
]

dataset = {}
for table in tables:
    dataset[table] = spark.read.json(f"s3://{s3_bucket}/{prefix}{table}.json")

Prefix column names with to prevent name collisions when joining:

In [0]:
for table in ["business", "user"]:
    for col in dataset[table].columns:
        if col != f"{table}_id":
            dataset[table] = dataset[table].withColumnRenamed(col, f"{table}_{col}")

Presort `Business` & `User` tables to speed up later sort merge joins

In [0]:
business_df = dataset["business"].sort("business_id").cache()
user_df = dataset["user"].sort("user_id").cache()

## Reviews
Sample last `n_sample` reviews & denormalise review data.

In [0]:
n_sample = 100000

(
    dataset["review"].withColumn("date", F.to_date("date"))
    # sample the n_sample last reviews in chronological order to ensure that reviews are contiguous over time. 
    .sort("date", ascending=False)
    .limit(n_sample)
    # denormalise by joining use and business data to reviews
    # join business first as it is the smallest table
    .join(business_df, "business_id")
    .join(user_df, "user_id")
    # write as one parquet file back to s3
    .coalesce(1)
    .write
    .mode("overwrite")
    .parquet(f"s3a://{s3_bucket}/yelp_reviews.parquet")
)

## Tips
Sample last `n_sample` tips & denormalise Tips data.


In [0]:
(
    dataset["tip"].withColumn("date", F.to_date("date"))
    # sample the n_sample last tips in chronological order to ensure that tips are contiguous over time. 
    .sort("date", ascending=False)
    .limit(n_sample)
    # denormalise by joining use and business data to tips
    # join business first as it is the smallest table
    .join(business_df, "business_id")
    .join(user_df, "user_id")
    # write as one parquet file back to s3
    .coalesce(1)
    .write
    .mode("overwrite")
    .parquet(f"s3a://{s3_bucket}/yelp_tips.parquet")
)

## Check Ins
Check ins are represented as a list of comma-seperated timestamps in the dataset.
- Expand comma seperated list into individual check in rows.
- Denormalise check ins.

In [0]:
(
    dataset["checkin"].join(business_df, "business_id")
    # expand comma delimited dates into individual rows
    .select(F.explode(F.split("date", ",")))
    # write as one parquet file back to s3
    .coalesce(1)
    .write
    .mode("overwrite")
    .parquet(f"s3a://{s3_bucket}/yelp_checkins.parquet")
)