In [2]:
val spark_home = sys.env("SPARK_HOME")
val spark_data = sys.env("SPARK_DATA")

spark_home = /Users/fanzhenxin/bigData/spark-2.4.4-bin-hadoop2.7
spark_data = /Users/fanzhenxin/bigData/spark-2.4.4-bin-hadoop2.7


/Users/fanzhenxin/bigData/spark-2.4.4-bin-hadoop2.7

In [3]:
val df = spark.read.format("csv")
    .option("header","true")
    .option("inferSchema","true")
    .load(f"$spark_data/data/retail-data/all/*.csv")
    .coalesce(5)

df = [InvoiceNo: string, StockCode: string ... 6 more fields]


[InvoiceNo: string, StockCode: string ... 6 more fields]

In [4]:
df.cache()
df.createOrReplaceTempView("dfTable")

In [5]:
df.count() == 541909

true

In [6]:
import org.apache.spark.sql.functions.count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [7]:
%%SQL
select count(*) from dfTable

+--------+
|count(1)|
+--------+
|  541909|
+--------+



In [8]:
import org.apache.spark.sql.functions.countDistinct
df.select(countDistinct("StockCode")).show(2)

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [9]:
import org.apache.spark.sql.functions.approx_count_distinct
df.select(approx_count_distinct("StockCode",0.02)).show(2)

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            4104|
+--------------------------------+



In [10]:
import org.apache.spark.sql.functions.{first,last}

df.select(first("StockCode"),last("StockCode")).show(2)

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



In [13]:
import org.apache.spark.sql.functions.{sum,count,avg,expr}

df.select(
count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases")
).selectExpr(
    "total_purchases/total_transactions as t_t",
    "avg_purchases",
    "mean_purchases"
).show(2)

+----------------+----------------+----------------+
|             t_t|   avg_purchases|  mean_purchases|
+----------------+----------------+----------------+
|9.55224954743324|9.55224954743324|9.55224954743324|
+----------------+----------------+----------------+



lastException: Throwable = null


In [15]:
import org.apache.spark.sql.functions.{var_pop,stddev_pop}
import org.apache.spark.sql.functions.{var_samp,stddev_samp}

df.select(var_pop("Quantity"),var_samp("Quantity"),
         stddev_pop("Quantity"),stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660923| 47559.39140929892|  218.08095663447835|   218.08115785023455|
+-----------------+------------------+--------------------+---------------------+



In [None]:
import org.apache.spark.sql.functions.{collect_set,collect_list}
df.agg(collect_set("Country"),
      collect_list("Country")).show(1,false)

# Grouping Data

In [19]:
df.groupBy("InvoiceNo","CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
|   544578|     12365|    1|
|   545165|     16339|   20|
|   545289|     14732|   30|
+---------+----------+-----+
only showing top 20 rows



In [23]:
%%SQL
select InvoiceNo,CustomerId,count(*) as count from dfTable GROUP BY InvoiceNo,CustomerId


+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
+---------+----------+-----+
only showing top 10 rows



In [25]:
import org.apache.spark.sql.functions.count

df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity) as quan2")).show()

+---------+----+-----+
|InvoiceNo|quan|quan2|
+---------+----+-----+
|   536596|   6|    6|
|   536938|  14|   14|
|   537252|   1|    1|
|   537691|  20|   20|
|   538041|   1|    1|
|   538184|  26|   26|
|   538517|  53|   53|
|   538879|  19|   19|
|   539275|   6|    6|
|   539630|  12|   12|
|   540499|  24|   24|
|   540540|  22|   22|
|  C540850|   1|    1|
|   540976|  48|   48|
|   541432|   4|    4|
|   541518| 101|  101|
|   541783|  35|   35|
|   542026|   9|    9|
|   542375|   6|    6|
|  C542604|   8|    8|
+---------+----+-----+
only showing top 20 rows



In [26]:
df.groupBy("InvoiceNo").agg("Quantity"->"avg","Quantity"->"stddev_pop").show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

In [27]:
import org.apache.spark.sql.functions.{col,to_date}
val dfWithDate = df.withColumn("date",
to_date(col("InvoiceDate"),"MM/d/yyyy H:mm"))

dfWithDate.createOrReplaceTempView("dfWithDate")

dfWithDate = [InvoiceNo: string, StockCode: string ... 7 more fields]


[InvoiceNo: string, StockCode: string ... 7 more fields]

In [30]:
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col

val  windowSpec = Window
.partitionBy("CustomerId","date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding,Window.currentRow)


windowSpec = org.apache.spark.sql.expressions.WindowSpec@764f2ce1


org.apache.spark.sql.expressions.WindowSpec@764f2ce1

In [31]:
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

maxPurchaseQuantity = max(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)


max(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)