# Aggregating DataFrames in PySpark HW Solutions

First let's start up our PySpark instance

In [1]:
import findspark
findspark.init()

import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
# May take awhile locally
spark = SparkSession.builder.appName("aggregate").getOrCreate()

cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print("You are working with", cores, "core(s)")
spark

You are working with 1 core(s)


## Read in the dataFrame for this Notebook

In [25]:
airbnb = spark.read.csv('nyc_air_bnb.csv',inferSchema=True,header=True)

## About this dataset

This dataset describes the listing activity and metrics for Air BNB bookers in NYC, NY for 2019. Each line in the dataset is a booking. 

**Source:** https://www.kaggle.com/dgomonov/new-york-city-airbnb-open-data/data

Let's go ahead and view the first few records of the dataset so we know what we are working with.

In [26]:
airbnb.limit(5).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.64,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.1,1,0


In [27]:
print(airbnb.printSchema())

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Notice here that some of the columns that are obviously numeric have been incorrectly identified as "strings". Let's edit that. Otherwise we cannot aggregate any of the numeric columns.

In [28]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

df = airbnb.withColumn("price", airbnb["price"].cast(IntegerType())) \
        .withColumn("minimum_nights", airbnb["minimum_nights"].cast(IntegerType())) \
        .withColumn("number_of_reviews", airbnb["number_of_reviews"].cast(IntegerType())) \
        .withColumn("reviews_per_month", airbnb["reviews_per_month"].cast(IntegerType())) \
        .withColumn("calculated_host_listings_count", airbnb["calculated_host_listings_count"].cast(IntegerType()))
#QA
print(df.printSchema())
df.limit(5).toPandas()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: integer (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)

None


Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.0,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.0,2,355
2,3647,THE VILLAGE OF HARLEM....NEW YORK !,4632,Elisabeth,Manhattan,Harlem,40.80902,-73.9419,Private room,150,3,0,,,1,365
3,3831,Cozy Entire Floor of Brownstone,4869,LisaRoxanne,Brooklyn,Clinton Hill,40.68514,-73.95976,Entire home/apt,89,1,270,2019-07-05,4.0,1,194
4,5022,Entire Apt: Spacious Studio/Loft by central park,7192,Laura,Manhattan,East Harlem,40.79851,-73.94399,Entire home/apt,80,10,9,2018-11-19,0.0,1,0


### Alright now we are ready to dig in!


### 1. How many rows are in this dataset?

In [29]:
df.count()

49079

### 2. How many total reviews does each host have?

In [30]:
from pyspark.ml.feature import SQLTransformer

query = SQLTransformer(
    
    statement="SELECT host_id, SUM(number_of_reviews) AS Total FROM __THIS__ GROUP BY host_id ORDER BY Total DESC"
                        
                    ) 

query.transform(df).show()


+---------+-----+
|  host_id|Total|
+---------+-----+
| 37312959| 2273|
|   344035| 2205|
| 26432133| 2017|
| 35524316| 1971|
| 40176101| 1818|
|  4734398| 1798|
| 16677326| 1355|
|  6885157| 1346|
|219517861| 1281|
| 23591164| 1269|
| 59529529| 1229|
| 47621202| 1205|
| 22959695| 1157|
| 58391491| 1154|
| 21641206| 1062|
|   137814| 1059|
|156948703| 1052|
|156684502| 1046|
|  3441272| 1013|
|  7831209|  970|
+---------+-----+
only showing top 20 rows



### 3. Show the min and max of all the numeric variables in the dataset

In [31]:
limit_summary = df.select("price","minimum_nights","number_of_reviews","last_review","reviews_per_month","calculated_host_listings_count","availability_365").describe()
limit_summary.toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,count,48887.0,48891.0,48738.0,38845.0,38858.0,48891.0,48737.0
1,mean,152.22298361527604,7.12861262809106,23.25827896097501,2.629232137931034,1.0178341654228216,7.65500807919658,112.59808769518024
2,stddev,238.5414668883948,20.82853436534699,44.55794478531731,8.964786212322723,1.637776883614613,34.822554018675234,131.60972881440708
3,min,-74.0,0.0,0.0,-73.94134,0.0,0.0,0.0
4,max,10000.0,1250.0,629.0,9.66,58.0,365.0,365.0


### 4. Which host had the highest number of reviews?

Only display the top result.

Bonus: format the column names

In [32]:
from pyspark.sql import functions

df.groupBy("host_id").agg(sum("number_of_reviews").alias("Reviews")).orderBy(sum("number_of_reviews").desc()).show(1) 

+--------+-------+
| host_id|Reviews|
+--------+-------+
|37312959|   2273|
+--------+-------+
only showing top 1 row



### 5. On average, how many nights did most hosts specify for a minimum?

In [33]:
query = SQLTransformer(
    
    statement="SELECT AVG(minimum_nights) AS AVG_MIN_NIGHTS FROM __THIS__ "
                    ) 

query.transform(df).show()


df.agg({'minimum_nights':'avg'}).withColumnRenamed("avg(minimum_nights)", "Avg Min Nights").show()

+------------------+
|    AVG_MIN_NIGHTS|
+------------------+
|7.1286126280910596|
+------------------+

+------------------+
|    Avg Min Nights|
+------------------+
|7.1286126280910596|
+------------------+



In [34]:
df.agg(mean(df.minimum_nights)).show()

+-------------------+
|avg(minimum_nights)|
+-------------------+
| 7.1286126280910596|
+-------------------+



### 6. What is the most expensive neighborhood to stay in on average?

Note: only show the one result

In [37]:
query = SQLTransformer(
    
    statement="SELECT neighbourhood, AVG(price) AS AVG_NEIGH FROM __THIS__ GROUP BY neighbourhood ORDER BY AVG_NEIGH DESC"
                    ) 

query.transform(df).show()

+------------------+------------------+
|     neighbourhood|         AVG_NEIGH|
+------------------+------------------+
|    Fort Wadsworth|             800.0|
|           Woodrow|             700.0|
|          Sea Gate| 548.3333333333334|
|           Tribeca|  490.638418079096|
|         Riverdale|442.09090909090907|
|      Prince's Bay|             409.5|
| Battery Park City| 367.5571428571429|
|     Randall Manor|352.94444444444446|
| Flatiron District|           341.925|
|              NoHo|297.85526315789474|
|              SoHo| 287.2773109243698|
|           Midtown| 282.7839065541856|
|          Neponsit| 274.6666666666667|
|      West Village| 267.6958224543081|
| Greenwich Village| 263.3205128205128|
|           Chelsea| 249.7785778577858|
|       Willowbrook|             249.0|
|  Theater District|248.01388888888889|
|            Nolita|230.13833992094862|
|Financial District|225.49059139784947|
+------------------+------------------+
only showing top 20 rows



### 7. Display a two by two table that shows the average prices by room type (private and shared only) and neighborhood group (Manhattan and Brooklyn only)

In [11]:
df.filter("room_type IN('Private room','Shared room')").groupBy("room_type").pivot("neighbourhood_group", ["Manhattan", "Brooklyn"]).avg('price').show(100)

+------------+------------------+-----------------+
|   room_type|         Manhattan|         Brooklyn|
+------------+------------------+-----------------+
| Shared room| 89.06903765690376|50.52784503631961|
|Private room|116.05400302114803|76.47234042553191|
+------------+------------------+-----------------+



In [48]:
query = SQLTransformer(
    
    statement="SELECT room_type, neighbourhood_group, AVG(price) FROM __THIS__ WHERE room_type IN('Private room', 'Shared room') AND neighbourhood_group IN ('Manhattan','Brooklyn') GROUP BY room_type, neighbourhood_group"
    
)

query.transform(df).show()

+------------+-------------------+------------------+
|   room_type|neighbourhood_group|        avg(price)|
+------------+-------------------+------------------+
|Private room|          Manhattan|116.05400302114803|
| Shared room|          Manhattan| 89.06903765690376|
| Shared room|           Brooklyn| 50.52784503631961|
|Private room|           Brooklyn| 76.47234042553191|
+------------+-------------------+------------------+



### Alright that's all folks!

### Great job!