In [3]:
import os
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

os.environ["SPARK_HOME"] = "/usr/local/spark"

spark = SparkSession \
    .builder \
    .appName("Yelp samples") \
    .getOrCreate()

reviewDF = spark.read.json("/Users/johne/Downloads/yelp_dataset/review.json")
reviewDF.printSchema()

businessDF = spark.read.json("/Users/johne/Downloads/yelp_dataset/business.json")
businessDF.printSchema()

# Avg ratting and Ambience of businesses with star rating > 4.5
ambienceBusinessDF = businessDF.select("business_id", "stars") \
    .filter("stars > 4.5") \
    .groupBy("business_id") \
    .avg("stars") \
    .join(businessDF, "business_id") \
    .filter("attributes.Ambience != '' and attributes.Ambience != 'None'") \
    .sort("stars") \
    .select("attributes.Ambience", "name", "stars")

print type(businessDF)
print type(ambienceBusinessDF)

ambienceBusinessDF.show()



root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

+--------------------+--------------------+-----+
|            Ambience|                name|stars|
+--------------------+--------------------+-----+
|{'romantic': Fals...|Vivalia Bistro Ex...|  5.0|
|{'touristy': Fals...|Stir Krazy Mongol...|  5.0|
|{'touristy': Fals...|Krispy Krunchy Ch...|  5.0|
|{'touristy': Fals...|Islas Filipino BB...|  5.0|
|{'romantic': Fals...|Fresh Med Mediter...|  5.0|
|{'romantic': Fals...|       Papa Murphy's|  5.0|
|{'romantic': Fals...|Not Your Typical ...|  5.0|
|{'touristy': Fals...|         Don's Diner|  5.0|
|{'touristy': True...|            Hot Shot|  5.0|
|{'touristy': Fals...|   Raspados Imperial|  5.0|
|{'touristy': Fals...|    Thai 999 Express|  5.0|
|{'touristy': Fals...| The Brilliant Bagel|  5.0|
|{'romantic': Fals...|         Cafe Fresco|  5.0|
|{'touristy': Fals...|   YZ's Karaoke Cafe|  5.0|
|{'romantic': Fals...|       King of Clubs|  5.0|
|{'touristy': Fals...|    Wallace Espresso|  5.0|
|{'touristy': Fals...|    Zookz Sandwiches|  5.0|


In [None]:
# Businesses review distribution
# for businesses started above 4
goodBusinessReviewsDF = businessDF \
    .select("business_id", "stars", "name") \
    .filter("stars > 4.5") \
    .distinct() \
    .join(reviewDF, "business_id") \
    .select("name", "text", "useful", "funny", "cool")

goodBusinessReviewsDF.show(10)