In [0]:
#Load data
df = spark.read.format("csv")\
    .option("header","true")\
    .option("inferSchema","true")\
    .load("dbfs:/FileStore/shared_uploads/xavier211192@gmail.com/data/retail-data/all/online_retail_dataset.csv").coalesce(5)
df.cache()
df.createOrReplaceTempView('dfTable')

## Basic aggregations ##

In [0]:
##Basic Aggregation
#count #df.count() is an action df.select(count) is a transformation
from pyspark.sql.functions import count,countDistinct,approx_count_distinct
df.select(count("StockCode")).show()

#count distinct
df.select(countDistinct("StockCode")).show()

#count approx distinct
df.select(approx_count_distinct("StockCode",0.05)).show()

In [0]:
from pyspark.sql.functions import first,last,min,max
#first and last
df.select(first("StockCode"),last("StockCode")).show()
#min and max
df.select(min("Quantity"),max("Quantity")).show()

In [0]:
from pyspark.sql.functions import sum,sumDistinct
#sum
df.select(sum("Quantity")).show()
#sum distinct
df.select(sumDistinct("Quantity")).show()

In [0]:
from pyspark.sql.functions import sum, count, avg, expr
#avg and mean are the same: syntax is different.
df.select(
    count("Quantity").alias("total"),
    sum("Quantity").alias("sum"),
    avg("Quantity").alias("avg"),
    expr("mean(Quantity)").alias("mean"))\
    .selectExpr(
    "total/sum","avg","mean").show()

In [0]:
#sample and population variance and std deviation
from pyspark.sql.functions import var_pop,stddev_pop
from pyspark.sql.functions import var_samp,stddev_samp

df.select(var_pop("Quantity"),var_samp("Quantity"),
         stddev_pop("Quantity"),stddev_samp("Quantity")).show()

In [0]:
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"),kurtosis("Quantity")).show()

In [0]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo","Quantity"),
         covar_pop("InvoiceNo", "Quantity")).show()

In [0]:
from pyspark.sql.functions import collect_list,collect_set
#All values
df.select(collect_list("Country")).show()
#unique values
df.select(collect_set("Country")).show()

## Groupings ##

In [0]:
#Simple grouping
df.groupBy("InvoiceNo","CustomerId").count().show()

In [0]:
#Grouping with Expressions
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(count("Quantity").alias("cnt_quantity"), expr("count(Quantity)")).show()

In [0]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show()

## Window functions ##

In [0]:
#Add a new date column
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
from pyspark.sql.functions import col, to_date, desc, max, dense_rank
from pyspark.sql.window import Window
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))

#1 define a window - use all preceding rows up and until current row
windowSpec = Window\
            .partitionBy("CustomerID","date")\
            .orderBy(desc("Quantity"))\
            .rowsBetween(Window.unboundedPreceding, Window.currentRow)
#2 define aggregation maxPurchaseQuantity
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
print(maxPurchaseQuantity)

#2.1 purchaseRank
purchaseDenseRank = dense_rank().over(windowSpec)
print(purchaseDenseRank)

In [0]:
# 3 Perform a Select 
dfWithDate.where("CustomerId IS NOT NULL AND ").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()


In [0]:
# 1 define window spec
windowSpec = Window.partitionBy("CustomerId","date").orderBy("Quantity").rowsBetween(Window.unboundedPreceding, Window.currentRow)
# 2 rank and min purchase quantity
minPurchaseQuantity = min("Quantity").over(windowSpec)
reverseRank = dense_rank().over(windowSpec)

dfWithDate.where("CustomerId is NOT NULL").\
                 select(
                   col("CustomerId"),
                   col("date"),
                   col("Quantity"),
                   reverseRank.alias("reverseRank"),
                   minPurchaseQuantity.alias("minPurchase")
                 ).orderBy("CustomerId").show()

### Grouping Sets ###

In [0]:
#drop nulls
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView('dfNoNull')

In [0]:
from pyspark.sql.functions import  sum
dfNoNull.groupBy("CustomerId","StockCode").sum("Quantity").show()

In [0]:
%sql
--#Total quantity of all stock codes and customers
Select CustomerId,StockCode, SUM(QUANTITY) from dfNoNull
group by CustomerId,StockCode order by CustomerID desc, StockCode desc

CustomerId,StockCode,sum(QUANTITY)
18287,85173,48
18287,85040A,48
18287,85039B,120
18287,85039A,96
18287,84920,4
18287,84584,6
18287,84507C,6
18287,72351B,24
18287,72351A,24
18287,72349B,60


In [0]:
%sql
--Using Grouping sets achieve the same 
Select CustomerId,StockCode, SUM(QUANTITY) from dfNoNull
group by CustomerId,StockCode GROUPING SETS((customerId,stockCode))
order by CustomerID desc, StockCode desc

CustomerId,StockCode,sum(QUANTITY)
18287,85173,48
18287,85040A,48
18287,85039B,120
18287,85039A,96
18287,84920,4
18287,84584,6
18287,84507C,6
18287,72351B,24
18287,72351A,24
18287,72349B,60


In [0]:
#Rollup over date and country
rolledUpDF = dfNoNull.rollup("Date","Country").sum("Quantity").orderBy("Date")

In [0]:
display(rolledUpDF)
#where both are nulls is the grandtotal

Country,date,sum(Quantity)
Saudi Arabia,,75
EIRE,,142637
Bahrain,,260
United Arab Emirates,,982
Israel,,4353
Czech Republic,,592
Austria,,4827
USA,,1034
European Community,,497
Belgium,,23152


In [0]:
rolledUpDF.where("Country is NULL").show()

In [0]:
#where one of the column is null is subtotal
rolledUpDF.where("Country is NULL").show()
rolledUpDF.where("Date is NULL").show()

In [0]:
#Cube takes the rollup to a level deeper.
dfNoNull.cube("Date","Country").sum("Quantity").orderBy("Date").show()

In [0]:
from pyspark.sql.functions import grouping_id,sum,desc
dfNoNull.cube("CustomerId","StockCode")\
        .agg(grouping_id(), sum(col("Quantity")))\
        .orderBy(desc("grouping_id()")).show()

In [0]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
display(pivoted)

date,Australia_sum(CAST(Quantity AS BIGINT)),Australia_sum(UnitPrice),Australia_sum(CAST(CustomerID AS BIGINT)),Austria_sum(CAST(Quantity AS BIGINT)),Austria_sum(UnitPrice),Austria_sum(CAST(CustomerID AS BIGINT)),Bahrain_sum(CAST(Quantity AS BIGINT)),Bahrain_sum(UnitPrice),Bahrain_sum(CAST(CustomerID AS BIGINT)),Belgium_sum(CAST(Quantity AS BIGINT)),Belgium_sum(UnitPrice),Belgium_sum(CAST(CustomerID AS BIGINT)),Brazil_sum(CAST(Quantity AS BIGINT)),Brazil_sum(UnitPrice),Brazil_sum(CAST(CustomerID AS BIGINT)),Canada_sum(CAST(Quantity AS BIGINT)),Canada_sum(UnitPrice),Canada_sum(CAST(CustomerID AS BIGINT)),Channel Islands_sum(CAST(Quantity AS BIGINT)),Channel Islands_sum(UnitPrice),Channel Islands_sum(CAST(CustomerID AS BIGINT)),Cyprus_sum(CAST(Quantity AS BIGINT)),Cyprus_sum(UnitPrice),Cyprus_sum(CAST(CustomerID AS BIGINT)),Czech Republic_sum(CAST(Quantity AS BIGINT)),Czech Republic_sum(UnitPrice),Czech Republic_sum(CAST(CustomerID AS BIGINT)),Denmark_sum(CAST(Quantity AS BIGINT)),Denmark_sum(UnitPrice),Denmark_sum(CAST(CustomerID AS BIGINT)),EIRE_sum(CAST(Quantity AS BIGINT)),EIRE_sum(UnitPrice),EIRE_sum(CAST(CustomerID AS BIGINT)),European Community_sum(CAST(Quantity AS BIGINT)),European Community_sum(UnitPrice),European Community_sum(CAST(CustomerID AS BIGINT)),Finland_sum(CAST(Quantity AS BIGINT)),Finland_sum(UnitPrice),Finland_sum(CAST(CustomerID AS BIGINT)),France_sum(CAST(Quantity AS BIGINT)),France_sum(UnitPrice),France_sum(CAST(CustomerID AS BIGINT)),Germany_sum(CAST(Quantity AS BIGINT)),Germany_sum(UnitPrice),Germany_sum(CAST(CustomerID AS BIGINT)),Greece_sum(CAST(Quantity AS BIGINT)),Greece_sum(UnitPrice),Greece_sum(CAST(CustomerID AS BIGINT)),Hong Kong_sum(CAST(Quantity AS BIGINT)),Hong Kong_sum(UnitPrice),Hong Kong_sum(CAST(CustomerID AS BIGINT)),Iceland_sum(CAST(Quantity AS BIGINT)),Iceland_sum(UnitPrice),Iceland_sum(CAST(CustomerID AS BIGINT)),Israel_sum(CAST(Quantity AS BIGINT)),Israel_sum(UnitPrice),Israel_sum(CAST(CustomerID AS BIGINT)),Italy_sum(CAST(Quantity AS BIGINT)),Italy_sum(UnitPrice),Italy_sum(CAST(CustomerID AS BIGINT)),Japan_sum(CAST(Quantity AS BIGINT)),Japan_sum(UnitPrice),Japan_sum(CAST(CustomerID AS BIGINT)),Lebanon_sum(CAST(Quantity AS BIGINT)),Lebanon_sum(UnitPrice),Lebanon_sum(CAST(CustomerID AS BIGINT)),Lithuania_sum(CAST(Quantity AS BIGINT)),Lithuania_sum(UnitPrice),Lithuania_sum(CAST(CustomerID AS BIGINT)),Malta_sum(CAST(Quantity AS BIGINT)),Malta_sum(UnitPrice),Malta_sum(CAST(CustomerID AS BIGINT)),Netherlands_sum(CAST(Quantity AS BIGINT)),Netherlands_sum(UnitPrice),Netherlands_sum(CAST(CustomerID AS BIGINT)),Norway_sum(CAST(Quantity AS BIGINT)),Norway_sum(UnitPrice),Norway_sum(CAST(CustomerID AS BIGINT)),Poland_sum(CAST(Quantity AS BIGINT)),Poland_sum(UnitPrice),Poland_sum(CAST(CustomerID AS BIGINT)),Portugal_sum(CAST(Quantity AS BIGINT)),Portugal_sum(UnitPrice),Portugal_sum(CAST(CustomerID AS BIGINT)),RSA_sum(CAST(Quantity AS BIGINT)),RSA_sum(UnitPrice),RSA_sum(CAST(CustomerID AS BIGINT)),Saudi Arabia_sum(CAST(Quantity AS BIGINT)),Saudi Arabia_sum(UnitPrice),Saudi Arabia_sum(CAST(CustomerID AS BIGINT)),Singapore_sum(CAST(Quantity AS BIGINT)),Singapore_sum(UnitPrice),Singapore_sum(CAST(CustomerID AS BIGINT)),Spain_sum(CAST(Quantity AS BIGINT)),Spain_sum(UnitPrice),Spain_sum(CAST(CustomerID AS BIGINT)),Sweden_sum(CAST(Quantity AS BIGINT)),Sweden_sum(UnitPrice),Sweden_sum(CAST(CustomerID AS BIGINT)),Switzerland_sum(CAST(Quantity AS BIGINT)),Switzerland_sum(UnitPrice),Switzerland_sum(CAST(CustomerID AS BIGINT)),USA_sum(CAST(Quantity AS BIGINT)),USA_sum(UnitPrice),USA_sum(CAST(CustomerID AS BIGINT)),United Arab Emirates_sum(CAST(Quantity AS BIGINT)),United Arab Emirates_sum(UnitPrice),United Arab Emirates_sum(CAST(CustomerID AS BIGINT)),United Kingdom_sum(CAST(Quantity AS BIGINT)),United Kingdom_sum(UnitPrice),United Kingdom_sum(CAST(CustomerID AS BIGINT)),Unspecified_sum(CAST(Quantity AS BIGINT)),Unspecified_sum(UnitPrice),Unspecified_sum(CAST(CustomerID AS BIGINT))
2011-05-06,,,,42.0,58.95,74484.0,,,,182.0,63.32,260274.0,,,,,,,,,,,,,,,,,,,1694.0,75.37,283120.0,,,,,,,,,,222.0,12.64,126857.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,88.0,113.1,187530.0,,,,,,,,,,,,,17404,6952.629999999985,18722445,,,
2011-01-30,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,3367,2321.720000000002,11334037,,,
2011-10-07,,,,,,,,,,,,,,,,,,,,,,345.0,136.51000000000002,360180.0,325.0,49.38,127810.0,637.0,27.69,74364.0,448.0,225.1,760461.0,,,,,,,527.0,88.43999999999998,483504.0,2053.0,327.79999999999995,1150698.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,212.0,25.35,140558.0,,,,,,,,,,,,,,,,,,,227.0,123.08,275880.0,,,,,,,,,,,,,25657,12425.409999999976,27566889,,,
2011-01-23,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,126.0,60.4,152724.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,5068,2551.560000000002,13409810,,,
2011-07-18,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,1377.0,146.97999999999988,999037.0,,,,,,,921.0,132.25000000000003,533282.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,2160.0,230.46,955800.0,,,,,,,,,,,,,,,,9908,26946.80000000045,16266983,,,
2011-07-07,,,,,,,,,,143.0,96.71000000000004,247257.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,550.0,175.46000000000006,763866.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,216.0,139.00000000000003,338514.0,,,,,,,,,,,,,18047,6223.269999999994,18629372,,,
2011-08-21,,,,,,,,,,,,,,,,,,,800.0,7.16,58264.0,,,,,,,,,,148.0,115.71,238576.0,,,,,,,270.0,115.96000000000002,376350.0,187.0,51.17,225324.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,1.0,25.6,68388.0,,,,,,,,,,,,,6758,3010.6999999999944,15639107,,,
2011-11-18,,,,,,,,,,315.0,87.74000000000001,210460.0,,,,,,,,,,300.0,245.99000000000004,1239100.0,-40.0,6.94,38343.0,,,,263.0,230.84999999999985,819350.0,,,,,,,550.0,167.77,593838.0,491.0,128.18000000000004,274863.0,,,,,,,,,,,,,544.0,232.32,692230.0,,,,,,,,,,,,,34.0,19.25,29292.0,242.0,186.04,435540.0,-8.0,8.190000000000001,38337.0,,,,,,,,,,,,,,,,-3.0,2.89,12697.0,669.0,78.28999999999998,547844.0,,,,,,,20178,10676.19999999988,29578557,,,
2010-12-15,,,,-48.0,0.42,12865.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,134.0,117.72,842659.0,-12.0,31.3,75882.0,,,,,,,,,,-56.0,8.06,25332.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,18211,4661.809999999966,19690139,,,
2010-12-01,107.0,73.9,174034.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,243.0,133.64,313131.0,,,,,,,449.0,55.29,251660.0,117.0,93.82000000000002,364538.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,97.0,16.85,25582.0,1852.0,102.67,907609.0,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,23949,12428.080000000024,28785059,,,


In [0]:
from pyspark.sql.functions import collect_set,collect_list
from pyspark.sql.window import Window

window = Window\
                   .partitionBy("date","Country")\
                   .orderBy(desc("Quantity"))\
                   .rowsBetween(Window.unboundedPreceding, Window.currentRow)

max_q = max(col("Quantity")).over(window)

wind_df = dfWithDate.select(col("Date"),col("Country"),col("Quantity"),max_q.alias("max_q"))

In [0]:
%sql

Select Date,Country, sum(Quantity) from dfNoNull
group by Date,Country grouping sets(Date,Country,())
order by Date

Date,Country,sum(Quantity)
,Austria,4827
,,5176450
,Switzerland,30325
,Poland,3653
,Lebanon,386
,Australia,83653
,Sweden,35637
,Spain,26824
,Bahrain,260
,Portugal,16180
