In [1]:
import org.apache.spark.sql.functions.col
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.sql.functions._

In [2]:
val conf = new SparkConf()
                .setAppName("SparkLearning_Charper_5")
                .set("spark.driver.memory","16g")
                    
val Spark = new SparkContext(conf)

conf = org.apache.spark.SparkConf@19e9f0bc
Spark = org.apache.spark.SparkContext@7c58199a


org.apache.spark.SparkContext@7c58199a

In [3]:
val df = spark.read.format("csv")
                    .option("header", "true")
                    .option("inferSchema", "true")
                    .load("Spark-The-Definitive-Guide/data/retail-data/by-day/*.csv")
                    .coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")



df = [InvoiceNo: string, StockCode: string ... 6 more fields]


[InvoiceNo: string, StockCode: string ... 6 more fields]

In [4]:
// the first aggregation type is count
df.count()



541909

In [5]:
// count inside a expression
df.select(count("StockCode")).collect()

Array(541909)

In [6]:
%%SQL
SELECT COUNT(*) FROM dfTable

+--------+
|count(1)|
+--------+
|  541909|
+--------+



In [7]:
// count distinct
df.select(countDistinct("StockCode")).collect()

Array([4070])

In [13]:
// approx count distinct
df.select(approx_count_distinct("StockCode", 0.01)).collect()

[Stage 17:>                                                         (0 + 0) / 5]

Array([4079])

In [14]:
df.select(first("StockCode"), last("StockCode")).collect()

Array([23084,22168])

In [15]:
df.select(min("Quantity"), max("Quantity")).collect()

Array([-80995,80995])

In [16]:
// sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [19]:
df.select(
        count("Quantity").alias("total_transactions"),
        sum("Quantity").alias("total_purchases"),
        avg("Quantity").alias("avg_purchases"),
        expr("mean(Quantity)").alias("mean_purchases"))
    .selectExpr(
    "total_purchases/total_transactions",
    "avg_purchases",
    "mean_purchases")
    .collect()

Array([9.55224954743324,9.55224954743324,9.55224954743324])

In [20]:
//  Variance and Standard Deviation

df.select(
var_pop("Quantity"),
var_samp("Quantity"),
stddev_pop("Quantity"),
stddev_samp("Quantity"))
.collect()

Array([47559.30364660885,47559.39140929855,218.0809566344775,218.0811578502337])

In [21]:
// skewness and Kurtosis
df.select(
skewness("Quantity"),
kurtosis("Quantity"))
.collect()

Array([-0.264075576105286,119768.05495534562])

In [22]:
// Covariance and Correlation
df.select(
corr("InvoiceNo", "Quantity"),
covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity"))
.show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085617365E-4|             1052.7280543863135|            1052.7260778702093|
+-------------------------+-------------------------------+------------------------------+



## Aggregation to complex datatypes

In [23]:
df.agg(
collect_set("Country"),
collect_list("Country"))
.show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



In [24]:
df.groupBy("invoiceNo").count().show()

+---------+-----+
|invoiceNo|count|
+---------+-----+
|   574966|    8|
|   575091|   38|
|   578057|   28|
|   537252|    1|
|   578459|    8|
|  C578132|    1|
|   578292|   72|
|   576112|   20|
|   577022|   38|
|   574592|    8|
|  C576393|    2|
|   577511|   46|
|   577541|   21|
|   580739|    2|
|   580906|    4|
|   573726|    1|
|   575671|   20|
|   570264|    1|
|   570281|    3|
|   569823|   69|
+---------+-----+
only showing top 20 rows



In [25]:
df.groupBy("InvoiceNo")
.agg(
count("Quantity").alias("quan"),
expr("count(Quantity)"))
.show()


+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   574966|   8|              8|
|   575091|  38|             38|
|   578057|  28|             28|
|   537252|   1|              1|
|   578459|   8|              8|
|  C578132|   1|              1|
|   578292|  72|             72|
|   576112|  20|             20|
|   577022|  38|             38|
|   574592|   8|              8|
|  C576393|   2|              2|
|   577511|  46|             46|
|   577541|  21|             21|
|   580739|   2|              2|
|   580906|   4|              4|
|   573726|   1|              1|
|   575671|  20|             20|
|   570264|   1|              1|
|   570281|   3|              3|
|   569823|  69|             69|
+---------+----+---------------+
only showing top 20 rows



In [26]:
df.groupBy("InvoiceNo")
.agg(
"Quantity" ->"avg",
"Quantity" -> "stddev_pop")
.show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   574966|               6.0|   3.640054944640259|
|   575091|11.552631578947368|   5.008925551458656|
|   578057| 4.607142857142857|   8.755974636597271|
|   537252|              31.0|                 0.0|
|   578459|              28.0|                26.0|
|  C578132|              -1.0|                 0.0|
|   578292| 5.902777777777778|   8.759375488618884|
|   576112|              10.9|  7.4959989327640635|
|   577022| 5.131578947368421|   2.903455768848916|
|   574592|              7.25|  4.4651427748729375|
|  C576393|              -3.5|                 2.5|
|   577511|3.1739130434782608|  5.4025128928727195|
|   577541| 9.333333333333334|    9.18245393767158|
|   580739|               2.5|                 0.5|
|   580906|              27.0|  13.076696830622021|
|   573726|             -67.0|                 0.0|
|   575671| 

In [28]:
val dfWithDate = df.withColumn("date", col("InvoiceDate").cast("date"))
dfWithDate.createOrReplaceTempView("dfWithDate")

dfWithDate = [InvoiceNo: string, StockCode: string ... 7 more fields]


[InvoiceNo: string, StockCode: string ... 7 more fields]

In [30]:
import org.apache.spark.sql.expressions.Window
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

windowSpec = org.apache.spark.sql.expressions.WindowSpec@73c9b937


org.apache.spark.sql.expressions.WindowSpec@73c9b937

In [31]:
val maxPurchaseQuantity = max(col("Quantity"))
.over(windowSpec)

maxPurchaseQuantity = max(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)


max(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

In [32]:
val purchaseDenseRank = dense_rank()
.over(windowSpec)
val purchaseRank = rank()
.over(windowSpec)

purchaseDenseRank = DENSE_RANK() OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
purchaseRank = RANK() OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)


RANK() OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)

In [33]:
dfWithDate
.where("CustomerId IS NOT NULL")
.orderBy("CustomerId")
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity"))
.show()

+----------+----------+--------+------------+-----------------+-------------------+
|CustomerId|      date|Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|
+----------+----------+--------+------------+-----------------+-------------------+
|   12346.0|2011-01-18|   74215|           1|                1|              74215|
|   12346.0|2011-01-18|  -74215|           2|                2|              74215|
|   12347.0|2010-12-07|      36|           1|                1|                 36|
|   12347.0|2010-12-07|      30|           2|                2|                 36|
|   12347.0|2010-12-07|      24|           3|                3|                 36|
|   12347.0|2010-12-07|      12|           4|                4|                 36|
|   12347.0|2010-12-07|      12|           4|                4|                 36|
|   12347.0|2010-12-07|      12|           4|                4|                 36|
|   12347.0|2010-12-07|      12|           4|                4|             

In [35]:
// Rollups
val rolledUpDF = dfWithDate.rollup("Date", "Country")
.agg(sum("Quantity"))
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
.orderBy("Date")
rolledUpDF.show(20)

+----------+--------------+--------------+
|      Date|       Country|total_quantity|
+----------+--------------+--------------+
|      null|          null|       5176450|
|2010-12-01|        France|           449|
|2010-12-01|United Kingdom|         23949|
|2010-12-01|        Norway|          1852|
|2010-12-01|     Australia|           107|
|2010-12-01|          null|         26814|
|2010-12-01|          EIRE|           243|
|2010-12-01|       Germany|           117|
|2010-12-01|   Netherlands|            97|
|2010-12-02|          EIRE|             4|
|2010-12-02|       Germany|           146|
|2010-12-02|          null|         21023|
|2010-12-02|United Kingdom|         20873|
|2010-12-03|       Belgium|           528|
|2010-12-03|        France|           239|
|2010-12-03|          EIRE|          2575|
|2010-12-03|   Switzerland|           110|
|2010-12-03|      Portugal|            65|
|2010-12-03|          null|         14830|
|2010-12-03|         Spain|           400|
+----------

rolledUpDF = [Date: date, Country: string ... 1 more field]


[Date: date, Country: string ... 1 more field]

In [36]:
dfWithDate.cube("Date", "Country")
.agg(sum(col("Quantity")))
.select("Date", "Country", "sum(Quantity)")
.orderBy("Date")
.show(20) 

+----+--------------------+-------------+
|Date|             Country|sum(Quantity)|
+----+--------------------+-------------+
|null|           Australia|        83653|
|null|                null|      5176450|
|null|            Portugal|        16180|
|null|             Finland|        10666|
|null|               Japan|        25218|
|null|             Germany|       117448|
|null|              Cyprus|         6317|
|null|         Unspecified|         3300|
|null|             Lebanon|          386|
|null|                 RSA|          352|
|null|     Channel Islands|         9479|
|null|               Spain|        26824|
|null|United Arab Emirates|          982|
|null|             Denmark|         8188|
|null|  European Community|          497|
|null|           Singapore|         5234|
|null|                 USA|         1034|
|null|              Norway|        19247|
|null|           Hong Kong|         4769|
|null|      Czech Republic|          592|
+----+--------------------+-------

In [37]:
val pivoted = dfWithDate
.groupBy("date")
.pivot("Country")
.agg("quantity" -> "sum")

pivoted = [date: date, Australia: bigint ... 37 more fields]


[date: date, Australia: bigint ... 37 more fields]

In [39]:
pivoted.columns
pivoted.select("USA").show()

| USA|
+----+
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
+----+
only showing top 20 rows

