In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
spark=SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load(r"C:\Users\manig\Documents\Datasets\spark-data\data\retail-data\all\*.csv")\
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dftable")
df.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 5 rows



# Count()

In [4]:
#count is a type of action which does basic aggregation
df.count()

541909

In [5]:
#count as transformation instead of action. count(*), Spark will count null values wheraes counting an individual column, 
#Spark will not count the null values
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [6]:
#in sql
spark.sql("select count(StockCode) from dftable").show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [112]:
#finding count without duplicate columns
df.distinct().count()

536641

# countDistinct()

In [4]:
#to find number of unique groups 
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [104]:
#in sql
spark.sql("select count(Distinct StockCode) from dftable").show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [9]:
#times when u want count to an approximation to a certain degree of accuracy.approx_count_distinct takes another parameter 
#with which you can specify the maximum estimation error allowed
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [10]:
#in sql
spark.sql("SELECT approx_count_distinct(StockCode, 0.1) FROM DFTABLE").show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



# first(), last()

In [11]:
#You can get the first and last values from a DataFrame by using this 
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



In [12]:
#in sql
spark.sql("SELECT first(StockCode),last(StockCode) FROM DFTABLE").show()

+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
|                 85123A|                 22138|
+-----------------------+----------------------+



# min(), max()

In [26]:
#To extract the minimum and maximum values from a DataFrame
from pyspark.sql.functions import min, max
df.selectExpr("min(Quantity)", "max(Quantity)").show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [27]:
#in sql
spark.sql("SELECT min(Quantity),max(Quantity) FROM DFTABLE").show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



# sum()

In [12]:
#to add all the values in a row using the sum
from pyspark.sql.functions import sum
df.selectExpr("sum(Quantity)").show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [16]:
#in sql
spark.sql("SELECT sum(Quantity) FROM DFTABLE").show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



# sumDistinct()

In [14]:
#to sum a distinct set of values
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [18]:
#in sql
spark.sql("SELECT sum(distinct Quantity) FROM DFTABLE").show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



# avg() or mean()

In [9]:
#various ways of finding average
from pyspark.sql.functions import sum, count, avg, expr
df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



In [13]:
#in sql
spark.sql("SELECT sum(Quantity)/count(Quantity) as sum_div_count_,avg(Quantity) as avg_purchases,mean(Quantity) as mean_purchases FROM DFTABLE").show()

+----------------+----------------+----------------+
|  sum_div_count_|   avg_purchases|  mean_purchases|
+----------------+----------------+----------------+
|9.55224954743324|9.55224954743324|9.55224954743324|
+----------------+----------------+----------------+



# collect_set(), collect_list()

In [15]:
#Aggregating to Complex Types -we can collect a list of values present in a given column or only the unique values 
#by collecting to a set
from pyspark.sql.functions import collect_set, collect_list
df.select(collect_set("Country"), collect_list("Country")).show(1)

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



In [28]:
spark.sql("select collect_set(Country),collect_list(Country) from dftable").show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



# groupBy()

In [17]:
#Grouping - First we specify the column(s) on which we would like to group, and then we specify the aggregation(s).
df.groupBy("InvoiceNo", "CustomerId").count().show(5)

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
+---------+----------+-----+
only showing top 5 rows



In [30]:
df.groupBy("InvoiceNo", "CustomerId").agg(expr("count(1) as total_count")).show(5)

+---------+----------+-----------+
|InvoiceNo|CustomerId|total_count|
+---------+----------+-----------+
|   536846|     14573|         76|
|   537026|     12395|         12|
|   537883|     14437|          5|
|   538068|     17978|         12|
|   538279|     14952|          7|
+---------+----------+-----------+
only showing top 5 rows



In [31]:
#Grouping with Expressions - function. Rather than passing function as an expression into a select statement, 
#we specify it within agg
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"),expr("count(Quantity) as quan1")).show(5)

+---------+----+-----+
|InvoiceNo|quan|quan1|
+---------+----+-----+
|   536596|   6|    6|
|   536938|  14|   14|
|   537252|   1|    1|
|   537691|  20|   20|
|   538041|   1|    1|
+---------+----+-----+
only showing top 5 rows



In [28]:
df.groupBy("InvoiceNo").agg(sum("Quantity"),max("UnitPrice")).show(5)

+---------+-------------+--------------+
|InvoiceNo|sum(Quantity)|max(UnitPrice)|
+---------+-------------+--------------+
|   536596|            9|         19.95|
|   536938|          464|         10.95|
|   537252|           31|          0.85|
|   537691|          163|          9.95|
|   538041|           30|           0.0|
+---------+-------------+--------------+
only showing top 5 rows



In [30]:
#in sql - group by should be specified after where or from clause
spark.sql("SELECT InvoiceNo,CustomerId,count(*) FROM dfTable GROUP BY InvoiceNo, CustomerId").show(5)

+---------+----------+--------+
|InvoiceNo|CustomerId|count(1)|
+---------+----------+--------+
|   536846|     14573|      76|
|   537026|     12395|      12|
|   537883|     14437|       5|
|   538068|     17978|      12|
|   538279|     14952|       7|
+---------+----------+--------+
only showing top 5 rows



# window functions

In [56]:
#window
from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date_dt", to_date(col("InvoiceDate"), "MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate_sql")
dfWithDate.show(5)

+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|   date_dt|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|2010-12-01|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|2010-12-01|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|2010-12-01|
+---------+---------+--------------------+--------+--------------+---------+----------+-

In [57]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("CustomerId", "date_dt")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [58]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

In [59]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

In [60]:
from pyspark.sql.functions import col
dfp=dfWithDate.where("CustomerId IS NOT NULL").where("date_dt IS NOT NULL").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("date_dt"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity"),sum("Quantity").over(windowSpec).alias("running_total"))

In [61]:
dfp.show(20,False)

+----------+----------+--------+------------+-----------------+-------------------+-------------+
|CustomerId|date_dt   |Quantity|quantityRank|quantityDenseRank|maxPurchaseQuantity|running_total|
+----------+----------+--------+------------+-----------------+-------------------+-------------+
|12347     |2010-12-07|36      |1           |1                |36                 |36           |
|12347     |2010-12-07|30      |2           |2                |36                 |66           |
|12347     |2010-12-07|24      |3           |3                |36                 |90           |
|12347     |2010-12-07|12      |4           |4                |36                 |102          |
|12347     |2010-12-07|12      |4           |4                |36                 |114          |
|12347     |2010-12-07|12      |4           |4                |36                 |126          |
|12347     |2010-12-07|12      |4           |4                |36                 |138          |
|12347     |2010-12-

# rollup()

In [78]:
#rollup : rollup is a multidimensional aggregation that performs a variety of group-by style calculations
#it does not include rows if left column in rollup  has a null value but includes if both rollup column has null value or if right 
#col alone has null value
dfNoNull = dfWithDate.na.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
from pyspark.sql.functions import sum
rolledUpDF = dfNoNull.rollup("date_dt", "Country").agg(sum("Quantity").alias("total_quantity")).orderBy("date_dt")
rolledUpDF.show(5)

+----------+---------+--------------+
|   date_dt|  Country|total_quantity|
+----------+---------+--------------+
|      null|     null|       1739779|
|2010-12-01|     null|         24032|
|2010-12-01|   Norway|          1852|
|2010-12-01|Australia|           107|
|2010-12-01|  Germany|           117|
+----------+---------+--------------+
only showing top 5 rows



In [55]:
#where you see the null values is where you’ll find the grand totals
#null in both rollup columns specifies the grand total across both of those columns
rolledUpDF.where("Country IS NULL and Date is null").show()

+----+-------+--------------+
|Date|Country|total_quantity|
+----+-------+--------------+
|null|   null|       1739779|
+----+-------+--------------+



# cube()

In [79]:
#cube:Irrespective of rollup it includes all values
dfNoNull.cube("date_dt", "Country").agg(sum("Quantity"))\
.orderBy("date_dt").show(5)

+-------+---------------+-------------+
|date_dt|        Country|sum(Quantity)|
+-------+---------------+-------------+
|   null|Channel Islands|         2260|
|   null|            USA|          897|
|   null|          Spain|         7906|
|   null|         Norway|        11001|
|   null| Czech Republic|          285|
+-------+---------------+-------------+
only showing top 5 rows

