# Chapter 7. Aggregations

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("../sdg/data/retail-data/all/*.csv")\
    .coalesce(5)

In [3]:
df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [4]:
df.createOrReplaceTempView("dfTable")

In [5]:
df.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.

In [6]:
df.count()

541909

## Aggregation Functions

In [7]:
from pyspark.sql.functions import count

df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [8]:
from pyspark.sql.functions import countDistinct

df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [9]:
from pyspark.sql.functions import approx_count_distinct

df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [10]:
from pyspark.sql.functions import first, last

df.select(first("StockCode"), last("StockCode")).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



In [11]:
from pyspark.sql.functions import min, max

df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [12]:
from pyspark.sql.functions import sum

df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [13]:
from pyspark.sql.functions import sumDistinct

df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [14]:
from pyspark.sql.functions import avg, mean

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    mean("Quantity").alias("mean_purchases")
).selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases"
).show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



*Wikipedia:*
> In statistics, mean, median, and mode are all known as measures of central tendency, and in colloquial usage any of these might be called an average value.

The term *mean* also refers to *expected value*

In [15]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

df.select(var_pop("Quantity"), stddev_pop("Quantity"), var_samp("Quantity"), stddev_samp("Quantity")).show()

+------------------+--------------------+------------------+---------------------+
| var_pop(Quantity)|stddev_pop(Quantity)|var_samp(Quantity)|stddev_samp(Quantity)|
+------------------+--------------------+------------------+---------------------+
|47559.303646609056|  218.08095663447796|47559.391409298754|   218.08115785023418|
+------------------+--------------------+------------------+---------------------+



In [16]:
from pyspark.sql.functions import skewness, kurtosis

df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+-------------------+------------------+
| skewness(Quantity)|kurtosis(Quantity)|
+-------------------+------------------+
|-0.2640755761052562|119768.05495536952|
+-------------------+------------------+



In [17]:
from pyspark.sql.functions import corr, covar_pop, covar_samp

df.select(
    corr("InvoiceNo", "Quantity"),
    covar_pop("InvoiceNo", "Quantity"),
    covar_samp("InvoiceNo", "Quantity")
).show()

+-------------------------+------------------------------+-------------------------------+
|corr(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|
+-------------------------+------------------------------+-------------------------------+
|     4.912186085635685E-4|            1052.7260778741693|             1052.7280543902734|
+-------------------------+------------------------------+-------------------------------+



In [18]:
from pyspark.sql.functions import collect_set, collect_list

df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



## Grouping

In [19]:
df.groupBy("InvoiceNo", "CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   536596|      null|    6|
|   537252|      null|    1|
|   538041|      null|    1|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
+---------+----------+-----+
only showing top 20 rows



In [20]:
from pyspark.sql.functions import expr

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")
).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



In [21]:
aggregations = {"Quantity": "avg", "UnitPrice": "avg"}

df.groupBy("InvoiceNo").agg(aggregations).show()

+---------+------------------+------------------+
|InvoiceNo|    avg(UnitPrice)|     avg(Quantity)|
+---------+------------------+------------------+
|   536596| 5.723333333333334|               1.5|
|   536938|3.2885714285714287|33.142857142857146|
|   537252|              0.85|              31.0|
|   537691| 3.282000000000001|              8.15|
|   538041|               0.0|              30.0|
|   538184| 2.461538461538462|12.076923076923077|
|   538517| 2.855849056603773|3.0377358490566038|
|   538879| 1.184736842105263|21.157894736842106|
|   539275|3.0833333333333335|              26.0|
|   539630| 3.391666666666667|20.333333333333332|
|   540499|           5.53125|              3.75|
|   540540| 3.809090909090909|2.1363636363636362|
|  C540850|              1.25|              -1.0|
|   540976|2.8972916666666673|10.520833333333334|
|   541432|             3.325|             12.25|
|   541518|2.2398019801980196| 23.10891089108911|
|   541783| 4.036571428571427|11.314285714285715|


## Window Functions

Spark supports three kinds of window functions:
- ranking functions
- analytic functions
- aggregate functions

In [22]:
from pyspark.sql.functions import col, to_date

dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))

In [23]:
dfWithDate.createOrReplaceTempView("dfWithDate")

In [24]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

windowSpec = Window.partitionBy("CustomerId", "date")\
    .orderBy(desc("Quantity"))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [25]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [26]:
from pyspark.sql.functions import dense_rank, rank

purchaseDensRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [27]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
    .select(
        col("CustomerId"),
        col("date"),
        col("Quantity"),
        purchaseRank.alias("quantityRank"),
        purchaseDensRank.alias("quantityDenseRank"),
        maxPurchaseQuantity.alias("maxPurchaseQuantity")
    ).show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

## Grouping Sets

In [28]:
dfNoNull = dfWithDate.na.drop()

In [29]:
dfNoNull.createOrReplaceTempView("dfNotNull")

In [30]:
dfNoNull.show()

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|      date|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|U

In [31]:
spark.sql("""
SELECT CustomerId, StockCode, sum(Quantity) FROM dfNotNull
GROUP BY CustomerId, StockCode
ORDER BY CustomerId DESC, StockCode DESC
""").show()

+----------+---------+-------------+
|CustomerId|StockCode|sum(Quantity)|
+----------+---------+-------------+
|     18287|    85173|           48|
|     18287|   85040A|           48|
|     18287|   85039B|          120|
|     18287|   85039A|           96|
|     18287|    84920|            4|
|     18287|    84584|            6|
|     18287|   84507C|            6|
|     18287|   72351B|           24|
|     18287|   72351A|           24|
|     18287|   72349B|           60|
|     18287|    47422|           24|
|     18287|    47421|           48|
|     18287|    35967|           36|
|     18287|    23445|           20|
|     18287|    23378|           24|
|     18287|    23376|           48|
|     18287|    23310|           36|
|     18287|    23274|           12|
|     18287|    23272|           12|
|     18287|    23269|           36|
+----------+---------+-------------+
only showing top 20 rows



> Grouping sets depend on null values for aggregation levels. If you do not filter-out null values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.

In [32]:
spark.sql("""
SELECT customerId, StockCode, sum(Quantity) FROM dfNotNull
GROUP BY CustomerId, StockCode GROUPING SETS ((CustomerId, StockCode), (CustomerId), ())
ORDER BY CustomerId DESC NULLS FIRST, StockCode DESC NULLS FIRST
""").show()

+----------+---------+-------------+
|CustomerId|StockCode|sum(Quantity)|
+----------+---------+-------------+
|      null|     null|      4906888|
|     18287|     null|         1586|
|     18287|    85173|           48|
|     18287|   85040A|           48|
|     18287|   85039B|          120|
|     18287|   85039A|           96|
|     18287|    84920|            4|
|     18287|    84584|            6|
|     18287|   84507C|            6|
|     18287|   72351B|           24|
|     18287|   72351A|           24|
|     18287|   72349B|           60|
|     18287|    47422|           24|
|     18287|    47421|           48|
|     18287|    35967|           36|
|     18287|    23445|           20|
|     18287|    23378|           24|
|     18287|    23376|           48|
|     18287|    23310|           36|
|     18287|    23274|           12|
+----------+---------+-------------+
only showing top 20 rows



In [33]:
rolledUpFD = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
    .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
    .orderBy("Date", "Country")

rolledUpFD.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       4906888|
|2010-12-01|          null|         24032|
|2010-12-01|     Australia|           107|
|2010-12-01|          EIRE|           243|
|2010-12-01|        France|           449|
|2010-12-01|       Germany|           117|
|2010-12-01|   Netherlands|            97|
|2010-12-01|        Norway|          1852|
|2010-12-01|United Kingdom|         21167|
|2010-12-02|          null|         20855|
|2010-12-02|          EIRE|             4|
|2010-12-02|       Germany|           146|
|2010-12-02|United Kingdom|         20705|
|2010-12-03|          null|         11548|
|2010-12-03|       Belgium|           528|
|2010-12-03|          EIRE|          2375|
|2010-12-03|        France|           239|
|2010-12-03|       Germany|           170|
|2010-12-03|         Italy|           164|
|2010-12-03|        Poland|           140|
+----------

In [34]:
dfNoNull.cube("Date", "Country").agg(sum("Quantity").alias("total_quantity"))\
    .select("Date", "Country", "total_quantity")\
    .orderBy("Date", "Country")\
    .show()

+----+------------------+--------------+
|Date|           Country|total_quantity|
+----+------------------+--------------+
|null|              null|       4906888|
|null|         Australia|         83653|
|null|           Austria|          4827|
|null|           Bahrain|           260|
|null|           Belgium|         23152|
|null|            Brazil|           356|
|null|            Canada|          2763|
|null|   Channel Islands|          9479|
|null|            Cyprus|          6317|
|null|    Czech Republic|           592|
|null|           Denmark|          8188|
|null|              EIRE|        136329|
|null|European Community|           497|
|null|           Finland|         10666|
|null|            France|        109848|
|null|           Germany|        117448|
|null|            Greece|          1556|
|null|           Iceland|          2458|
|null|            Israel|          3990|
|null|             Italy|          7999|
+----+------------------+--------------+
only showing top

In [35]:
from pyspark.sql.functions import grouping_id

dfCubeWithGroupingId = dfNoNull.cube("CustomerId", "StockCode")\
    .agg(grouping_id(), sum("Quantity").alias("total_quantity"))\
    .orderBy(grouping_id().desc())

In [36]:
dfCubeWithGroupingId.show()

+----------+---------+-------------+--------------+
|CustomerId|StockCode|grouping_id()|total_quantity|
+----------+---------+-------------+--------------+
|      null|     null|            3|       4906888|
|      null|    22427|            2|          1954|
|      null|    22468|            2|           464|
|      null|    22473|            2|           437|
|      null|    85064|            2|           203|
|      null|   90123A|            2|            14|
|      null|    22442|            2|           281|
|      null|    22154|            2|          3525|
|      null|    21446|            2|           400|
|      null|    20697|            2|            19|
|      null|    22946|            2|           353|
|      null|    16015|            2|           662|
|      null|   90003D|            2|            14|
|      null|   85194S|            2|          1134|
|      null|   85130C|            2|           479|
|      null|   90183C|            2|             1|
|      null|

In [37]:
dfCubeWithGroupingId.where(grouping_id() == 3).show()

+----------+---------+-------------+--------------+
|CustomerId|StockCode|grouping_id()|total_quantity|
+----------+---------+-------------+--------------+
|      null|     null|            3|       4906888|
+----------+---------+-------------+--------------+



In [38]:
dfCubeWithGroupingId.where(grouping_id() == 2).show()

+----------+---------+-------------+--------------+
|CustomerId|StockCode|grouping_id()|total_quantity|
+----------+---------+-------------+--------------+
|      null|    22427|            2|          1954|
|      null|    22867|            2|          5948|
|      null|    22350|            2|          1355|
|      null|    21413|            2|            36|
|      null|    22683|            2|           563|
|      null|    21736|            2|            77|
|      null|   84927A|            2|            45|
|      null|   90123B|            2|             8|
|      null|    84748|            2|            28|
|      null|    21371|            2|           275|
|      null|   84750A|            2|            -3|
|      null|    23545|            2|          2475|
|      null|    23505|            2|          1065|
|      null|    23531|            2|           963|
|      null|    20685|            2|          3835|
|      null|   72800B|            2|           140|
|      null|

In [39]:
dfCubeWithGroupingId.where(grouping_id() == 1).show()

+----------+---------+-------------+--------------+
|CustomerId|StockCode|grouping_id()|total_quantity|
+----------+---------+-------------+--------------+
|     16752|     null|            1|            54|
|     14083|     null|            1|           732|
|     15716|     null|            1|           744|
|     14524|     null|            1|           676|
|     13124|     null|            1|          2147|
|     17001|     null|            1|          2164|
|     17392|     null|            1|           320|
|     13693|     null|            1|            -6|
|     15709|     null|            1|           121|
|     15669|     null|            1|           339|
|     16226|     null|            1|           192|
|     15152|     null|            1|          2694|
|     16796|     null|            1|           186|
|     15466|     null|            1|           246|
|     18278|     null|            1|            66|
|     14121|     null|            1|          2078|
|     16793|

In [40]:
dfCubeWithGroupingId.where(grouping_id() == 0).show()

+----------+---------+-------------+--------------+
|CustomerId|StockCode|grouping_id()|total_quantity|
+----------+---------+-------------+--------------+
|     16029|    21731|            0|           984|
|     12431|    22726|            0|            28|
|     15862|    22435|            0|             2|
|     14307|    21705|            0|            12|
|     17908|    21326|            0|            12|
|     14849|    21586|            0|            12|
|     17377|    22630|            0|            16|
|     12472|    22333|            0|            14|
|     15694|    20749|            0|            72|
|     13715|    21172|            0|            12|
|     17017|    22555|            0|            72|
|     13117|    84945|            0|            24|
|     16916|    22505|            0|             1|
|     16916|    22558|            0|             2|
|     15061|    22075|            0|           312|
|     15574|    22087|            0|             2|
|     17757|

In [41]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [42]:
pivoted.columns

['date',
 'Australia_sum(CAST(Quantity AS BIGINT))',
 'Australia_sum(UnitPrice)',
 'Australia_sum(CAST(CustomerID AS BIGINT))',
 'Austria_sum(CAST(Quantity AS BIGINT))',
 'Austria_sum(UnitPrice)',
 'Austria_sum(CAST(CustomerID AS BIGINT))',
 'Bahrain_sum(CAST(Quantity AS BIGINT))',
 'Bahrain_sum(UnitPrice)',
 'Bahrain_sum(CAST(CustomerID AS BIGINT))',
 'Belgium_sum(CAST(Quantity AS BIGINT))',
 'Belgium_sum(UnitPrice)',
 'Belgium_sum(CAST(CustomerID AS BIGINT))',
 'Brazil_sum(CAST(Quantity AS BIGINT))',
 'Brazil_sum(UnitPrice)',
 'Brazil_sum(CAST(CustomerID AS BIGINT))',
 'Canada_sum(CAST(Quantity AS BIGINT))',
 'Canada_sum(UnitPrice)',
 'Canada_sum(CAST(CustomerID AS BIGINT))',
 'Channel Islands_sum(CAST(Quantity AS BIGINT))',
 'Channel Islands_sum(UnitPrice)',
 'Channel Islands_sum(CAST(CustomerID AS BIGINT))',
 'Cyprus_sum(CAST(Quantity AS BIGINT))',
 'Cyprus_sum(UnitPrice)',
 'Cyprus_sum(CAST(CustomerID AS BIGINT))',
 'Czech Republic_sum(CAST(Quantity AS BIGINT))',
 'Czech Republic_

In [43]:
pivoted.where("date > '2011-12-05'").select("date", "`USA_sum(CAST(Quantity AS BIGINT))`").show()

+----------+---------------------------------+
|      date|USA_sum(CAST(Quantity AS BIGINT))|
+----------+---------------------------------+
|2011-12-06|                             null|
|2011-12-09|                             null|
|2011-12-08|                             -196|
|2011-12-07|                             null|
+----------+---------------------------------+



## User-Defined Aggregation Functions

Only available in Scala or Java