In [2]:
df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("data/retail-data/all/*.csv")\
    .coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")

In [4]:
df.count()

541909

### Aggregation Functions

In [6]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [7]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [8]:
from pyspark.sql.functions import *

In [9]:
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [10]:
df.select(first("StockCode"), last("StockCode")).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



In [11]:
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [12]:
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [13]:
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [14]:
df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
    .selectExpr(
        "total_purchases/total_transactions",
        "avg_purchases",
        "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



In [15]:
df.select(var_pop("Quantity"), var_samp("Quantity"), 
         stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+------------------+------------------+--------------------+---------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+------------------+------------------+--------------------+---------------------+
|47559.303646609056|47559.391409298754|  218.08095663447796|   218.08115785023418|
+------------------+------------------+--------------------+---------------------+



In [16]:
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+-------------------+------------------+
| skewness(Quantity)|kurtosis(Quantity)|
+-------------------+------------------+
|-0.2640755761052562|119768.05495536952|
+-------------------+------------------+



In [18]:
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
         covar_pop("InvoiceNo", "Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085635685E-4|             1052.7280543902734|            1052.7260778741693|
+-------------------------+-------------------------------+------------------------------+



In [19]:
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



### Grouping 

In [22]:
df.groupBy("InvoiceNo", "CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   536596|      null|    6|
|   537252|      null|    1|
|   538041|      null|    1|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
+---------+----------+-----+
only showing top 20 rows



In [27]:
df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



In [28]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
    .show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

### Window Functions 

In [29]:
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [34]:
from pyspark.sql.window import Window
windowSpec = Window\
    .partitionBy("CustomerId", "date")\
    .orderBy(desc("Quantity"))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [35]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [37]:
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [41]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
    .select(
        col("CustomerId"),
        col("date"),
        col("Quantity"),
        purchaseRank.alias("quantityRank"),
        purchaseDenseRank.alias("quantityDenseRank"),
        maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

### Grouping Sets

In [42]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [44]:
spark.sql("""
    SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
    GROUP BY customerId, stockCode 
    ORDER BY CustomerId DESC, stockCode DESC
""").show()

+----------+---------+-------------+
|CustomerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
|     18287|    85173|           48|
|     18287|   85040A|           48|
|     18287|   85039B|          120|
|     18287|   85039A|           96|
|     18287|    84920|            4|
|     18287|    84584|            6|
|     18287|   84507C|            6|
|     18287|   72351B|           24|
|     18287|   72351A|           24|
|     18287|   72349B|           60|
|     18287|    47422|           24|
|     18287|    47421|           48|
|     18287|    35967|           36|
|     18287|    23445|           20|
|     18287|    23378|           24|
|     18287|    23376|           48|
|     18287|    23310|           36|
|     18287|    23274|           12|
|     18287|    23272|           12|
|     18287|    23269|           36|
+----------+---------+-------------+
only showing top 20 rows



In [45]:
spark.sql("""
    SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
    GROUP BY customerId, stockCode GROUPING SETS ((customerId, stockCode), ())
    ORDER BY CustomerId DESC, stockCode DESC
""").show()

+----------+---------+-------------+
|customerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
|     18287|    85173|           48|
|     18287|   85040A|           48|
|     18287|   85039B|          120|
|     18287|   85039A|           96|
|     18287|    84920|            4|
|     18287|    84584|            6|
|     18287|   84507C|            6|
|     18287|   72351B|           24|
|     18287|   72351A|           24|
|     18287|   72349B|           60|
|     18287|    47422|           24|
|     18287|    47421|           48|
|     18287|    35967|           36|
|     18287|    23445|           20|
|     18287|    23378|           24|
|     18287|    23376|           48|
|     18287|    23310|           36|
|     18287|    23274|           12|
|     18287|    23272|           12|
|     18287|    23269|           36|
+----------+---------+-------------+
only showing top 20 rows



In [46]:
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
    .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
    .orderBy("Date")
rolledUpDF.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|          EIRE|           243|
|2010-12-01|     Australia|           107|
|2010-12-01|       Germany|           117|
|2010-12-01|        France|           449|
|2010-12-01|   Netherlands|            97|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|          null|         26814|
|2010-12-01|        Norway|          1852|
|2010-12-02|          null|         21023|
|2010-12-02|       Germany|           146|
|2010-12-02|          EIRE|             4|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|      Portugal|            65|
|2010-12-03|   Switzerland|           110|
|2010-12-03|        Poland|           140|
|2010-12-03|       Belgium|           528|
|2010-12-03|          null|         14830|
|2010-12-03|         Italy|           164|
|2010-12-03|         Spain|           400|
+----------

In [48]:
rolledUpDF.where("Country Is NULL").show()

+----------+-------+--------------+
|      Date|Country|total_quantity|
+----------+-------+--------------+
|      null|   null|       5176450|
|2010-12-01|   null|         26814|
|2010-12-02|   null|         21023|
|2010-12-03|   null|         14830|
|2010-12-05|   null|         16395|
|2010-12-06|   null|         21419|
|2010-12-07|   null|         24995|
|2010-12-08|   null|         22741|
|2010-12-09|   null|         18431|
|2010-12-10|   null|         20297|
|2010-12-12|   null|         10565|
|2010-12-13|   null|         17623|
|2010-12-14|   null|         20098|
|2010-12-15|   null|         18229|
|2010-12-16|   null|         29632|
|2010-12-17|   null|         16069|
|2010-12-19|   null|          3795|
|2010-12-20|   null|         14965|
|2010-12-21|   null|         15467|
|2010-12-22|   null|          3192|
+----------+-------+--------------+
only showing top 20 rows



In [49]:
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
    .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|                null|      5176450|
|null|               Japan|        25218|
|null|         Unspecified|         3300|
|null|           Australia|        83653|
|null|            Portugal|        16180|
|null|             Germany|       117448|
|null|             Finland|        10666|
|null|                 RSA|          352|
|null|           Hong Kong|         4769|
|null|              Cyprus|         6317|
|null|             Denmark|         8188|
|null|               Spain|        26824|
|null|United Arab Emirates|          982|
|null|  European Community|          497|
|null|     Channel Islands|         9479|
|null|              Norway|        19247|
|null|                 USA|         1034|
|null|             Lebanon|          386|
|null|           Singapore|         5234|
|null|      Czech Republic|          592|
+----+--------------------+-------

In [52]:
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))\
    .show()

+----------+---------+-------------+-------------+
|customerId|stockCode|grouping_id()|sum(Quantity)|
+----------+---------+-------------+-------------+
|     17809|    22961|            0|           72|
|     16098|    22726|            0|           40|
|     17548|    22553|            0|          -24|
|     14307|    22736|            0|           10|
|     17908|    21811|            0|            1|
|     14729|    21427|            0|            2|
|     12433|    22315|            0|           24|
|     14594|    21143|            0|            3|
|     16928|    22774|            0|          138|
|     17855|    22037|            0|           12|
|     18041|    22196|            0|           30|
|     18041|    21328|            0|            2|
|     16244|    22352|            0|            6|
|     14449|    22147|            0|            8|
|     16781|   82494L|            0|            6|
|     17581|    22961|            0|           24|
|     17838|   79066K|         

In [53]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [59]:
pivoted.where("date > '2011-12-05'").select("date", "`USA_sum(CAST(Quantity AS BIGINT))`").show()

+----------+---------------------------------+
|      date|USA_sum(CAST(Quantity AS BIGINT))|
+----------+---------------------------------+
|2011-12-06|                             null|
|2011-12-09|                             null|
|2011-12-08|                             -196|
|2011-12-07|                             null|
+----------+---------------------------------+



### User-Defined Aggregation Functions 