In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("Basic ETL") \
    .getOrCreate()

In [3]:
df = spark.read.json("yelp_academic_dataset_business.json")

In [4]:
# Columns of interest: business id, name, address, hours, city, state, number of reviews, stars

In [5]:
interested = df.select(["business_id", "name", "full_address", "hours", "city", "state", "review_count", "stars"])

In [6]:
from pyspark.sql import functions as F

In [7]:
cities = interested.select("city", "review_count", "stars").sort("review_count", ascending=False)

In [8]:
cities.show(5)


+---------+------------+-----+
|     city|review_count|stars|
+---------+------------+-----+
|Las Vegas|        4578|  4.0|
|Las Vegas|        3984|  4.5|
|Las Vegas|        3828|  3.5|
|Las Vegas|        3046|  4.0|
|Las Vegas|        3007|  3.0|
+---------+------------+-----+
only showing top 5 rows



In [9]:
cities = cities.withColumn('pre_norm', cities['review_count'] * cities['stars'])

In [10]:
cities = cities.groupby('city').sum()

In [11]:
cities.show(5)

+--------------+-----------------+----------+-------------+
|          city|sum(review_count)|sum(stars)|sum(pre_norm)|
+--------------+-----------------+----------+-------------+
|         Ratho|               13|       3.5|         45.5|
|         Tempe|            69678|    8247.5|     260148.0|
|    Harrisburg|              254|      74.0|        966.5|
|Île des Soeurs|               21|       4.0|         84.0|
|  Fountain Hls|                7|       3.5|         24.5|
+--------------+-----------------+----------+-------------+
only showing top 5 rows



In [12]:
#normalize data
cities = cities.withColumn('agg_rating', cities['sum(pre_norm)'] / cities['sum(review_count)'])

In [13]:
cities.show(10)

+---------------+-----------------+----------+-------------+------------------+
|           city|sum(review_count)|sum(stars)|sum(pre_norm)|        agg_rating|
+---------------+-----------------+----------+-------------+------------------+
|          Ratho|               13|       3.5|         45.5|               3.5|
|          Tempe|            69678|    8247.5|     260148.0|3.7335744424352018|
|     Harrisburg|              254|      74.0|        966.5|3.8051181102362204|
| Île des Soeurs|               21|       4.0|         84.0|               4.0|
|   Fountain Hls|                7|       3.5|         24.5|               3.5|
|North Las Vegas|            10808|    2116.5|      38489.5|3.5612046632124352|
|  Lawrenceville|               97|       7.5|        368.0|3.7938144329896906|
|          Mesa |               27|       3.0|         81.0|               3.0|
|    C Las Vegas|               17|       3.0|         51.0|               3.0|
|    Wilkinsburg|               23|     

In [14]:
# depending on where this data was going / how it needs to be pulled, 
# this is where I'd write to either a .csv file / some sort of DB
#
#
#

In [15]:
# analyst wants : map of dog-friendly places (because who doesn't...?)
#
#    
#

In [16]:
dogs_allowed = df.select(["business_id", "name", "latitude", "longitude", 'attributes.Dogs Allowed']).filter(df['attributes.Dogs Allowed'] == True)


In [17]:
coords = dogs_allowed.select('latitude', 'longitude').toPandas()

In [18]:
from ipyleaflet import Map, Marker

In [19]:
longitudes = coords['longitude'].values.tolist()
latitudes = coords['latitude'].values.tolist()

In [20]:
m = Map(center=(latitudes[0], longitudes[0]), zoom=10)

In [21]:
markers = []

In [23]:
for lat, lon in zip(latitudes, longitudes):
    point = (lat, lon)
    markers.append(point)

In [24]:
for x in markers:
    m.add_layer(Marker(location=x))

In [25]:
display(m)

Map(basemap={'url': 'https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', 'max_zoom': 19, 'attribution': 'Map …