In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
conf = pyspark.SparkConf().setAppName('SparkApp').setMaster('local')
SparkContext.setSystemProperty('spark.executor.memory', '2g')
# sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [3]:
df = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("C:/Users/Peter Lok/Desktop/Spark-The-Definitive-Guide-master/data/retail-data/all/online-retail-dataset.csv")\
  .coalesce(5)

In [4]:
df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [5]:
df.createOrReplaceTempView("dfTable")

In [6]:
df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows



In [7]:
df.count()

541909

In [8]:
from pyspark.sql.functions import count, countDistinct, approx_count_distinct

In [9]:
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [10]:
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [11]:
df.select(approx_count_distinct("StockCode",0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [12]:
from pyspark.sql.functions import min, max

In [13]:
df.select(min("Quantity"), max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [14]:
from pyspark.sql.functions import sum, sumDistinct

In [15]:
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [16]:
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [17]:
from pyspark.sql.functions import sum, count, avg, expr

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
  .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



In [18]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
  stddev_pop("Quantity"), stddev_samp("Quantity")).show()


+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660879| 47559.39140929848|  218.08095663447733|   218.08115785023355|
+-----------------+------------------+--------------------+---------------------+



In [19]:
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+------------------+------------------+
|skewness(Quantity)|kurtosis(Quantity)|
+------------------+------------------+
|-0.264075576105298|119768.05495534067|
+------------------+------------------+



In [20]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
    covar_pop("InvoiceNo", "Quantity")).show()


+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085636837E-4|             1052.7280543912716|            1052.7260778751674|
+-------------------------+-------------------------------+------------------------------+



In [42]:
from pyspark.sql.functions import collect_set, collect_list
df.agg(collect_set("Country"), collect_list("Country")).show()

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|collect_set(Country)                                                                                                                                                                                                                                                                                                                                                                              |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [44]:
from pyspark.sql.functions import count

df.groupBy("InvoiceNo").agg(
    count("Quantity").alias("quan"),
    expr("count(Quantity)")).show()

+---------+----+
|InvoiceNo|quan|
+---------+----+
|   536596|   6|
|   536938|  14|
|   537252|   1|
|   537691|  20|
|   538041|   1|
|   538184|  26|
|   538517|  53|
|   538879|  19|
|   539275|   6|
|   539630|  12|
|   540499|  24|
|   540540|  22|
|  C540850|   1|
|   540976|  48|
|   541432|   4|
|   541518| 101|
|   541783|  35|
|   542026|   9|
|   542375|   6|
|  C542604|   8|
+---------+----+
only showing top 20 rows



In [45]:
df.groupBy("InvoiceNo").agg(
    expr("avg(Quantity)"),
    expr("stddev_pop(Quantity)"))\
  .show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

In [29]:
from pyspark.sql.functions import col, to_date
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")

In [30]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
  .partitionBy("CustomerId", "date")\
  .orderBy(desc("Quantity"))\
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)


In [31]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [32]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)


In [33]:
from pyspark.sql.functions import col

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|     12346|2011-01-18|   74215|           1|                1|              74215|
|     12346|2011-01-18|  -74215|           2|                2|              74215|
|     12347|2010-12-07|      36|           1|                1|                 36|
|     12347|2010-12-07|      30|           2|                2|                 36|
|     12347|2010-12-07|      24|           3|                3|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|                 36|
|     12347|2010-12-07|      12|           4|                4|             

In [34]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [35]:
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
  .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
  .orderBy("Date")
rolledUpDF.show()

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|       Germany|           117|
|2010-12-01|        France|           449|
|2010-12-01|     Australia|           107|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|        Norway|          1852|
|2010-12-01|          null|         26814|
|2010-12-01|          EIRE|           243|
|2010-12-01|   Netherlands|            97|
|2010-12-02|       Germany|           146|
|2010-12-02|          EIRE|             4|
|2010-12-02|          null|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|         Spain|           400|
|2010-12-03|        France|           239|
|2010-12-03|       Germany|           170|
|2010-12-03|   Switzerland|           110|
|2010-12-03|United Kingdom|         10439|
|2010-12-03|          EIRE|          2575|
|2010-12-03|      Portugal|            65|
+----------

In [53]:
from pyspark.sql.functions import sum

dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
  .select("Date", "Country", "sum(Quantity)").orderBy(col("Date").asc()).show()

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|             Denmark|         8188|
|null|             Lebanon|          386|
|null|              Canada|         2763|
|null|              Sweden|        35637|
|null|               Japan|        25218|
|null|      United Kingdom|      4263829|
|null|              Greece|         1556|
|null|             Germany|       117448|
|null|      Czech Republic|          592|
|null|              France|       110480|
|null|               Italy|         7999|
|null|              Poland|         3653|
|null|                 USA|         1034|
|null|United Arab Emirates|          982|
|null|             Iceland|         2458|
|null|           Singapore|         5234|
|null|           Lithuania|          652|
|null|               Malta|          944|
|null|  European Community|          497|
|null|             Finland|        10666|
+----+--------------------+-------

In [37]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()