# Aggregating DataFrames in PySpark

 - GroupBy
 - Pivot
 - Aggregate methods
 - Combos of each

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('agg2').getOrCreate()
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
print('cores : {}'.format(cores))
spark

cores : 1


In [3]:
import pandas as pd
airbnb = pd.read_csv('s3://************/nyc_air_bnb.csv')
for col in airbnb.columns:
    airbnb[col] = airbnb[col].astype('str')
airbnb = spark.createDataFrame(airbnb)

In [4]:
airbnb.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: string (nullable = true)
 |-- minimum_nights: string (nullable = true)
 |-- number_of_reviews: string (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: string (nullable = true)
 |-- calculated_host_listings_count: string (nullable = true)
 |-- availability_365: string (nullable = true)



In [5]:
airbnb.limit(2).toPandas()

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539,Clean & quiet apt home by the park,2787,John,Brooklyn,Kensington,40.64749,-73.97237,Private room,149,1,9,2018-10-19,0.21,6,365
1,2595,Skylit Midtown Castle,2845,Jennifer,Manhattan,Midtown,40.75362,-73.98377,Entire home/apt,225,1,45,2019-05-21,0.38,2,355


In [6]:
numeric_col = ['latitude', #double,
            'longitude', #double
            'price' , #int
            'minimum_nights', #int
            'number_of_reviews', #int
            'reviews_per_month', #double
            'calculated_host_listings_count', #int
            'availability_365'] # int]

In [7]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType, IntegerType

df = airbnb.withColumn('latitude', col('latitude').cast(DoubleType())) \
    .withColumn('longitude', col('longitude').cast(DoubleType())) \
    .withColumn('price', col('price').cast(IntegerType())) \
    .withColumn('minimum_nights', col('minimum_nights').cast(IntegerType())) \
    .withColumn('number_of_reviews', col('number_of_reviews').cast(IntegerType())) \
    .withColumn('reviews_per_month', col('reviews_per_month').cast(DoubleType())) \
    .withColumn('calculated_host_listings_count', col('calculated_host_listings_count').cast(IntegerType()))\
    .withColumn('availability_365', col('availability_365').cast(IntegerType()))
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: string (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: string (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)



# GroupBy and Aggregate Functions

In [8]:
df.groupBy("neighbourhood_group").count().show(7)

+-------------------+-----+
|neighbourhood_group|count|
+-------------------+-----+
|             Queens| 5666|
|           Brooklyn|20104|
|      Staten Island|  373|
|          Manhattan|21661|
|              Bronx| 1091|
+-------------------+-----+



In [10]:
# Then you can add the following aggregate functions: mean, count, min, max, sum
# Like this for example
df.groupBy("neighbourhood_group").mean("price").show(5)

+-------------------+------------------+
|neighbourhood_group|        avg(price)|
+-------------------+------------------+
|             Queens| 99.51764913519237|
|           Brooklyn|124.38320732192598|
|      Staten Island|114.81233243967829|
|          Manhattan| 196.8758136743456|
|              Bronx|  87.4967919340055|
+-------------------+------------------+



In [11]:
# This is another way of doing the above but I don't recommend it
# because you can only do one var at a time
df.groupBy("neighbourhood").agg({'price' : 'mean'}).show(5)

+-------------+----------+
|neighbourhood|avg(price)|
+-------------+----------+
|       Corona| 59.171875|
| Richmondtown|      78.0|
| Prince's Bay|     409.5|
|  Westerleigh|      71.5|
|   Mill Basin|    179.75|
+-------------+----------+
only showing top 5 rows



In [15]:
# This method is way more versatile
# Allows you to call on more than one aggregate function at a time
from pyspark.sql.functions import min, max

df.groupBy("neighbourhood").agg(min(df.price).alias("Min_Price"), \
                                max(df.price).alias("Max_Price")).show(5)

+-------------+---------+---------+
|neighbourhood|Min_Price|Max_Price|
+-------------+---------+---------+
|       Corona|       23|      359|
| Richmondtown|       78|       78|
| Prince's Bay|       85|     1250|
|  Westerleigh|       40|      103|
|   Mill Basin|       85|      299|
+-------------+---------+---------+
only showing top 5 rows



In [17]:
summary = df.summary("count", "min", "25%", "75%", "max")
summary.toPandas()

Unnamed: 0,summary,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,count,48895.0,48895,48895.0,48895,48895,48895,48895.0,48895.0,48895,48895,48895,48895,48895,48895.0,48895,48895
1,min,10000070.0,1 Bed Apt in Utopic Williamsburg,1000014.0,'Cil,Bronx,Allerton,40.49979,-74.24442,Entire home/apt,0,1,0,2011-03-28,0.01,1,0
2,25%,9470677.0,2.0,7817764.0,,,,40.69008,-73.98308,,69,1,1,,0.28,1,0
3,75%,29152320.0,,107434423.0,,,,40.7631,-73.93629,,175,5,24,,4.49,2,227
4,max,9999939.0,"ﾏﾝﾊｯﾀﾝ､駅から徒歩4分でどこに行くのにも便利な場所!女性の方希望,ｷﾚｲなお部屋｡",9997988.0,현선,Staten Island,Woodside,40.91306,-73.71299,Shared room,10000,1250,629,,,327,365


In [18]:
limit_summary = df.select("price", "minimum_nights", "number_of_reviews").summary("count", "min", "max")
limit_summary.toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews
0,count,48895,48895,48895
1,min,0,1,0
2,max,10000,1250,629


### Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).

In [21]:
from pyspark.sql.functions import min, max

df.agg(min(df.price).alias("Min_Price"), \
       max(df.price).alias("Max_Price")).show()

+---------+---------+
|Min_Price|Max_Price|
+---------+---------+
|        0|    10000|
+---------+---------+



In [25]:
from pyspark.sql.functions import countDistinct, avg, stddev

df.select(countDistinct("neighbourhood_group").alias("CountD"), \
          avg("price"), \
          stddev("price")).show()

+------+-----------------+------------------+
|CountD|       avg(price)|stddev_samp(price)|
+------+-----------------+------------------+
|     5|152.7206871868289|240.15416974718752|
+------+-----------------+------------------+



### Pivot Function

Provides a two way table and must be used in conjunction with groupBy.

In [26]:
# Pivot Function
# pivot(pivot_col, values=None)
df.groupBy("room_type").pivot("neighbourhood_group", ["Queens", "Brooklyn"]).count().show(10)

+---------------+------+--------+
|      room_type|Queens|Brooklyn|
+---------------+------+--------+
|    Shared room|   198|     413|
|Entire home/apt|  2096|    9559|
|   Private room|  3372|   10132|
+---------------+------+--------+



In [30]:
# You can also filter your results if you need to
# We some invalid data in the above output
# So we could select only the "Share room" types if we wanted to
df.filter("room_type='Shared room'").groupBy("room_type")\
    .pivot("neighbourhood_group", ["Queens", "Brooklyn"])\
    .count()\
    .show(100)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|Shared room|   198|     413|
+-----------+------+--------+



### Comine all three!

It is also possible to combine all three method into one call: GroupBy, Pivot and Agg like this:

In [32]:
from pyspark.sql.functions import min, max

df.groupBy("neighbourhood")\
    .pivot("neighbourhood_group", ["Queens", "Brooklyn"])\
    .agg(min(df.price).alias("Min_Price"), \
         max(df.price).alias("Max_Price"))\
    .show()

+------------------+----------------+----------------+------------------+------------------+
|     neighbourhood|Queens_Min_Price|Queens_Max_Price|Brooklyn_Min_Price|Brooklyn_Max_Price|
+------------------+----------------+----------------+------------------+------------------+
|            Corona|              23|             359|              null|              null|
|      Prince's Bay|            null|            null|              null|              null|
|      Richmondtown|            null|            null|              null|              null|
|        Mill Basin|            null|            null|                85|               299|
|       Westerleigh|            null|            null|              null|              null|
|      Civic Center|            null|            null|              null|              null|
|        Douglaston|              40|             178|              null|              null|
|        Mount Hope|            null|            null|              nu