In [1]:
import pyspark
import pandas as pd

In [2]:
!conda install -y -c conda-forge matplotlib
import matplotlib.pyplot as plt

Collecting package metadata: done
Solving environment: done


  current version: 4.6.8
  latest version: 4.6.14

Please update conda by running

    $ conda update -n base -c defaults conda



## Package Plan ##

  environment location: /home/varun/anaconda3/envs/tensorflow1

  added / updated specs:
    - matplotlib


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    cycler-0.10.0              |             py_1           8 KB  conda-forge
    kiwisolver-1.1.0           |   py36hc9558a2_0          86 KB  conda-forge
    matplotlib-3.0.3           |           py36_1           6 KB  conda-forge
    matplotlib-base-3.0.3      |   py36h5f35d83_1         6.7 MB  conda-forge
    pyparsing-2.4.0            |             py_0          55 KB  conda-forge
    tk-8.6.9                   |    h84994c4_1001         3.2 MB  conda-forge
    ------------------------------------------------------------
  

In [3]:
import glob
import os

In [4]:
from pyspark.sql.functions import col
from pyspark.sql.functions import sum as spark_sum
from pyspark.sql.functions import regexp_extract
from pyspark.sql.functions import udf

In [48]:
import json

# Paths

In [8]:
path = r"/home/varun/Downloads/yelp_dataset/"
review_file = r"review.json"
user_file = r"user.json"
business_file = r"business.json"
tip_file = r"tip.json"
checkin_file = r"checkin.json"

# Spark Session

In [11]:
spark = pyspark.sql.SparkSession.builder.appName('yelp_eda').getOrCreate()
spark

# Review DF

In [12]:
review_df = spark.read.format('json').option("inferSchema", True).load(path+review_file)

In [23]:
review_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- cool: long (nullable = true)
 |-- date: string (nullable = true)
 |-- funny: long (nullable = true)
 |-- review_id: string (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)
 |-- user_id: string (nullable = true)



In [46]:
review_top_100_business_df = review_df.groupBy('business_id').count().sort(col('count').desc()).limit(100)

In [47]:
review_top_100_business_df.show(10,truncate=False)

+----------------------+-----+
|business_id           |count|
+----------------------+-----+
|4JNXUYY8wbaaDmk3BPzlWw|8570 |
|RESDUcs7fIiihp38-d6_6g|8568 |
|K7lWdNUhCbcnEvI0NhGewg|6887 |
|f4x1YBxkLrZg652xt2KR5g|5847 |
|cYwJA2A6I12KNkm2rtXd5g|5575 |
|DkYS3arLOhA8si5uUEmHOw|5206 |
|2weQS-RnoOBhb1KsHKyoSQ|4534 |
|5LNZ67Yw9RD6nf4_UhXOjw|4522 |
|iCQpiavjjPzJ5_3gPD5Ebg|4351 |
|SMPbvZLSMMb7KU76YNYMGg|4350 |
+----------------------+-----+
only showing top 10 rows



In [36]:
review_top_100_business_df.printSchema()

root
 |-- business_id: string (nullable = true)
 |-- count: long (nullable = false)



In [51]:
top_100_business_lst = [row.business_id for row in review_top_100_business_df.select('business_id').collect()]

In [52]:
top_100_business_lst

['4JNXUYY8wbaaDmk3BPzlWw',
 'RESDUcs7fIiihp38-d6_6g',
 'K7lWdNUhCbcnEvI0NhGewg',
 'f4x1YBxkLrZg652xt2KR5g',
 'cYwJA2A6I12KNkm2rtXd5g',
 'DkYS3arLOhA8si5uUEmHOw',
 '2weQS-RnoOBhb1KsHKyoSQ',
 '5LNZ67Yw9RD6nf4_UhXOjw',
 'iCQpiavjjPzJ5_3gPD5Ebg',
 'SMPbvZLSMMb7KU76YNYMGg',
 'ujHiaprwCQ5ewziu0Vi9rw',
 'AV6weBrZFFBfRGCbcRGO4g',
 'KskYqH1Bi7Z_61pH6Om8pg',
 'El4FC8jcawUVgw_0EIcbaQ',
 'rcaPajgKOJC2vo_l3xa42A',
 'eoHdUeQDNgQ6WYEnP2aiRw',
 'FaHADZARwnY4yvlvpnsfGA',
 'faPVqws-x-5k2CQKDNtHxw',
 'OETh78qcgDltvHULowwhJg',
 'Wxxvi3LZbHNIDwJ-ZimtnA',
 'u_vPjx925UPEG9DFOAAvFQ',
 'hihud--QRriCYZw1zZvW4g',
 '7sPNbCx7vGAaH7SbNPZ6oA',
 'g8OnV26ywJlZpezdBnOWUQ',
 '3kdSl5mo9dWC4clrQjEDGg',
 'XZbuPXdyA0ZtTu3AzqtQhg',
 'XXW_OFaYQkkGOGniujZFHg',
 'YJ8ljUhLsz6CtT_2ORNFmg',
 'na4Th5DrNauOv-c43QQFvA',
 'HhVmDybpU7L50Kb5A0jXTg',
 'RwMLuOkImBIqqYj4SSKSPg',
 'P7pxQFqr7yBKMMI2J51udw',
 'MpmFFw0GE_2iRFPdsRpJbA',
 'eAc9Vd6loOgRQolMXQt6FA',
 'yfxDa8RFOvJPQh0rNtakHA',
 'BLIJ-p5wYuAhw6Pp6mh6mw',
 'o7AiTlyWUrBSzdz6oMHj5w',
 

In [54]:
review_top_business_df = review_df.filter(review_df.business_id == top_100_business_lst[0]).select('*')

In [55]:
review_top_business_df.count()

8570

# Join review with business df

In [24]:
business_df = spark.read.format('json').option("inferSchema", True).load(path+business_file)

In [26]:
business_df.count()

192609

In [27]:
business_df.printSchema()

root
 |-- address: string (nullable = true)
 |-- attributes: struct (nullable = true)
 |    |-- AcceptsInsurance: string (nullable = true)
 |    |-- AgesAllowed: string (nullable = true)
 |    |-- Alcohol: string (nullable = true)
 |    |-- Ambience: string (nullable = true)
 |    |-- BYOB: string (nullable = true)
 |    |-- BYOBCorkage: string (nullable = true)
 |    |-- BestNights: string (nullable = true)
 |    |-- BikeParking: string (nullable = true)
 |    |-- BusinessAcceptsBitcoin: string (nullable = true)
 |    |-- BusinessAcceptsCreditCards: string (nullable = true)
 |    |-- BusinessParking: string (nullable = true)
 |    |-- ByAppointmentOnly: string (nullable = true)
 |    |-- Caters: string (nullable = true)
 |    |-- CoatCheck: string (nullable = true)
 |    |-- Corkage: string (nullable = true)
 |    |-- DietaryRestrictions: string (nullable = true)
 |    |-- DogsAllowed: string (nullable = true)
 |    |-- DriveThru: string (nullable = true)
 |    |-- GoodForDancing: str

In [57]:
top_business_df = business_df.filter(business_df.business_id == top_100_business_lst[0]).select('*')

In [59]:
top_business_df.toPandas()

Unnamed: 0,address,attributes,business_id,categories,city,hours,is_open,latitude,longitude,name,postal_code,review_count,stars,state
0,3655 Las Vegas Blvd S,"(None, None, 'full_bar', {'romantic': True, 'i...",4JNXUYY8wbaaDmk3BPzlWw,"Food, French, Breakfast & Brunch, Restaurants,...",Las Vegas,"(7:0-0:0, 7:0-23:0, 7:0-0:0, 7:0-23:0, 7:0-23:...",1,36.112859,-115.172434,Mon Ami Gabi,89109,8348,4.0,NV
