In [None]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("myApp")\
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
    .config("spark.sql.repl.eagerEval.enabled", True)\
    .getOrCreate()

In [None]:
business = spark\
    .read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://host.docker.internal:7017/yelp.business")\
    .load()

In [None]:
business.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- 

In [None]:
review = spark\
    .read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://host.docker.internal:7017/yelp.review")\
    .load()

In [None]:
review.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- cool: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: integer (nullable = true)
 |-- user_id: string (nullable = true)



In [None]:
tip = spark\
    .read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://host.docker.internal:7017/yelp.tip")\
    .load()

In [None]:
tip.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- compliment_count: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: string (nullable = true)



In [None]:
photo = spark\
    .read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://host.docker.internal:7017/yelp.photo")\
    .load()

In [None]:
photo.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- business_id: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- label: string (nullable = true)
 |-- photo_id: string (nullable = true)



In [None]:
from pyspark.sql.functions import collect_list

business_photos = photo.groupby('business_id')\
    .agg(collect_list('photo_id').alias('photo_ids'))

In [None]:
business = business.join(business_photos, on=['business_id'], how='left')

In [None]:
from pyspark.sql.functions import split, col, array_contains

restaurant = business.withColumn('category_list', split(col('categories'), ', '))\
    .filter(array_contains(col('category_list'), 'Restaurants'))\
    .select('business_id', 'name', 'address', 'city', 'state', 'postal_code', 'latitude', 'longitude', 'stars', 'category_list', 'photo_ids')

In [None]:
from pyspark.sql.functions import avg

business_mean_star = review.select('business_id', 'stars')\
    .groupBy('business_id')\
    .agg(avg('stars').alias('mean_star'))

In [None]:
restaurant = restaurant.join(business_mean_star, on=['business_id'], how='inner')

In [None]:
restaurant.write\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri", "mongodb://host.docker.internal:7017/yelp.restaurant")\
    .save()