## Aggregation

In [2]:
from pyspark import SparkContext, SQLContext
sc = SparkContext()
spark = SQLContext(sc)

In [3]:
df = spark.read.csv(path= "../data/retail-data/by-day/*.csv",header=True,inferSchema=None)

In [7]:
df.createOrReplaceTempView("dfTable")

In [8]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [10]:
spark.sql("select * from dfTable limit 5").show(3, False)

+---------+---------+-------------------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                    |Quantity|InvoiceDate        |UnitPrice|CustomerID|Country       |
+---------+---------+-------------------------------+--------+-------------------+---------+----------+--------------+
|580538   |23084    |RABBIT NIGHT LIGHT             |48      |2011-12-05 08:38:00|1.79     |14075.0   |United Kingdom|
|580538   |23077    |DOUGHNUT LIP GLOSS             |20      |2011-12-05 08:38:00|1.25     |14075.0   |United Kingdom|
|580538   |22906    |12 MESSAGE CARDS WITH ENVELOPES|24      |2011-12-05 08:38:00|1.65     |14075.0   |United Kingdom|
+---------+---------+-------------------------------+--------+-------------------+---------+----------+--------------+
only showing top 3 rows



In [11]:
df.count()

541909

In [14]:
from pyspark.sql.functions import *
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [16]:
spark.sql("Select count(*) from dfTable").show()

+--------+
|count(1)|
+--------+
|  541909|
+--------+



In [17]:
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [19]:
spark.sql("select count( distinct StockCode ) from dfTable").show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [21]:
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [23]:
spark.sql("select approx_count_distinct(StockCode,0.1) from dfTable").show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



#### First and Last

In [24]:
df.select(first("StockCode"), last("StockCode")).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                  23084|                 22168|
+-----------------------+----------------------+



In [26]:
spark.sql("select first(StockCode) , last(StockCode) from dfTable").show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                  23084|                 22168|
+-----------------------+----------------------+



#### Min and Max

In [27]:
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|           -1|          992|
+-------------+-------------+



In [28]:
spark.sql("select min(Quantity), max(Quantity) from dfTable").show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|           -1|          992|
+-------------+-------------+



#### sum

In [36]:
df.select (sum("Quantity"), sumDistinct("Quantity"), avg("Quantity")).show()

+-------------+----------------------+----------------+
|sum(Quantity)|sum(DISTINCT Quantity)|   avg(Quantity)|
+-------------+----------------------+----------------+
|    5176450.0|               29310.0|9.55224954743324|
+-------------+----------------------+----------------+



In [37]:
spark.sql("select sum(Quantity),avg(Quantity) from dfTable").show()

+-----------------------------+-----------------------------+
|sum(CAST(Quantity AS DOUBLE))|avg(CAST(Quantity AS DOUBLE))|
+-----------------------------+-----------------------------+
|                    5176450.0|             9.55224954743324|
+-----------------------------+-----------------------------+



In [49]:
df.select(count("Quantity").alias("total_transaction"),\
         sum("Quantity").alias("total_quantity"),\
         avg("Quantity").alias("average_purchase"),\
         mean("Quantity").alias("mean_quantity"))\
.selectExpr("total_quantity/total_transaction","average_purchase","mean_quantity").show()

+------------------------------------+----------------+----------------+
|(total_quantity / total_transaction)|average_purchase|   mean_quantity|
+------------------------------------+----------------+----------------+
|                    9.55224954743324|9.55224954743324|9.55224954743324|
+------------------------------------+----------------+----------------+



#### Variance, Standard Deviation, Skewness and Kurtosis

In [53]:
df.select(var_pop("Quantity"), var_samp("Quantity"), stddev_pop("Quantity"), stddev_samp("Quantity"),skewness("Quantity"),kurtosis("Quantity")).show()

+------------------+------------------+--------------------+---------------------+-------------------+------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)| skewness(Quantity)|kurtosis(Quantity)|
+------------------+------------------+--------------------+---------------------+-------------------+------------------+
|47559.303646608765|47559.391409298456|   218.0809566344773|    218.0811578502335|-0.2640755761052901|119768.05495539501|
+------------------+------------------+--------------------+---------------------+-------------------+------------------+



In [59]:
spark.sql("select var_pop(Quantity) as var_pop, var_samp(Quantity) as var_samp, stddev_pop(Quantity) as stddev_pop,\
stddev_samp(Quantity) as stddev_samp,skewness(Quantity) as skewness,kurtosis(Quantity) as kurtosis from dfTable").show()

+------------------+------------------+-----------------+-----------------+-------------------+------------------+
|           var_pop|          var_samp|       stddev_pop|      stddev_samp|           skewness|          kurtosis|
+------------------+------------------+-----------------+-----------------+-------------------+------------------+
|47559.303646608765|47559.391409298456|218.0809566344773|218.0811578502335|-0.2640755761052901|119768.05495539501|
+------------------+------------------+-----------------+-----------------+-------------------+------------------+



In [61]:
df.select(corr("Quantity","InvoiceNo"), covar_pop("Quantity","InvoiceNo"),covar_samp("Quantity","InvoiceNo")).show()

+-------------------------+------------------------------+-------------------------------+
|corr(Quantity, InvoiceNo)|covar_pop(Quantity, InvoiceNo)|covar_samp(Quantity, InvoiceNo)|
+-------------------------+------------------------------+-------------------------------+
|     4.912186085616445E-4|            1052.7260778701327|             1052.7280543862369|
+-------------------------+------------------------------+-------------------------------+



In [62]:
spark.sql("select corr(Quantity,InvoiceNo), covar_pop(Quantity,InvoiceNo),covar_samp(Quantity,InvoiceNo) from dfTable").show()

+---------------------------------------------------------+--------------------------------------------------------------+---------------------------------------------------------------+
|corr(CAST(Quantity AS DOUBLE), CAST(InvoiceNo AS DOUBLE))|covar_pop(CAST(Quantity AS DOUBLE), CAST(InvoiceNo AS DOUBLE))|covar_samp(CAST(Quantity AS DOUBLE), CAST(InvoiceNo AS DOUBLE))|
+---------------------------------------------------------+--------------------------------------------------------------+---------------------------------------------------------------+
|                                     4.912186085616445E-4|                                            1052.7260778701327|                                             1052.7280543862369|
+---------------------------------------------------------+--------------------------------------------------------------+---------------------------------------------------------------+



In [70]:
df.agg(collect_set("Country")).show(2,False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_set(Country)                                                                                                                                                                                                                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [71]:
df.agg(collect_list("Country")).show()

+---------------------+
|collect_list(Country)|
+---------------------+
| [United Kingdom, ...|
+---------------------+



In [75]:
df.groupby("InvoiceNo","CustomerId").count().show(5)

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   580666|   15856.0|    1|
|   580700|   14219.0|    7|
|   574948|   15805.0|    8|
|   575034|   15573.0|   48|
|   575876|   15608.0|    2|
+---------+----------+-----+
only showing top 5 rows



In [80]:
df.groupby("InvoiceNo","CustomerId").agg(count("Quantity"), expr("count(Quantity)")).show(3)

+---------+----------+---------------+---------------+
|InvoiceNo|CustomerId|count(Quantity)|count(Quantity)|
+---------+----------+---------------+---------------+
|   580666|   15856.0|              1|              1|
|   580700|   14219.0|              7|              7|
|   574948|   15805.0|              8|              8|
+---------+----------+---------------+---------------+
only showing top 3 rows



#### Grouping with Mapping

In [92]:
df.groupBy("InvoiceNo").agg(\
                            expr("avg(Quantity)"),\
                                 expr("stddev_pop(Quantity)")
                                ).show(3)

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   574966|               6.0|   3.640054944640259|
|   575091|11.552631578947368|   5.008925551458656|
|   578057| 4.607142857142857|   8.755974636597271|
+---------+------------------+--------------------+
only showing top 3 rows



In [96]:
spark.sql("select InvoiceNo, avg(Quantity), stddev_pop(Quantity) from dfTable Group by InvoiceNo").show(3)

+---------+-----------------------------+------------------------------------+
|InvoiceNo|avg(CAST(Quantity AS DOUBLE))|stddev_pop(CAST(Quantity AS DOUBLE))|
+---------+-----------------------------+------------------------------------+
|   574966|                          6.0|                   3.640054944640259|
|   575091|           11.552631578947368|                   5.008925551458656|
|   578057|            4.607142857142857|                   8.755974636597271|
+---------+-----------------------------+------------------------------------+
only showing top 3 rows



#### Window Functions

In [139]:
dfWithDate = df.withColumn("Date", to_date(col("InvoiceDate"), 'yyyy-MM-dd hh:mm:ss'))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [140]:
dfWithDate.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|      Date|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+----------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|2011-12-05|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|2011-12-05|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|2011-12-05|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|2011-12-05|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|2011-12-05|
|   580538|    21544|SKULLS  WATER TRA...|      

In [141]:
from pyspark.sql.window import Window

In [143]:
windowSpec = Window.partitionBy("Date","CustomerID").orderBy(desc("Quantity")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [144]:
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)