<a href="https://colab.research.google.com/github/jhashuva/pyspark_tutorial/blob/main/Aggregations.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Aggregations

Aggregating is the act of collecting something together and is a cornerstone of big data analytics.
- In an aggregation we will specify a ```key``` or ```grouping``` and an ```aggregation``` function that specifies how we should transform one or more columns. This function must produce one result for each group, given multiple input values.

- Spark's aggregation capabilities are sophisticated and mature, with variety of different usecases and possibilites.

Spark allows us to create the following groupings tyes:

` The simplest grouping is to just summarize a complete DataFrame by performing an aggregation in a select statement.

- A "window" gives you the ability to specify one or more keys as well  as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.

- A "grouping set", which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrame.

- A "rollup" makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.

- A "cube" allows you to specify one or more keys as well as one or more aggregation function to transform the value columns, which will be summarized across all combinations of columns.

Each grouping returns a RelationalGroupDataset on which we specify our aggregations.

In [1]:
% pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 69 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 69.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=e628488bd88e4c0c12b5a85fefdd42ba80be190bb755cc23bcfcf62111d626b2
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql.session import SparkSession

In [3]:
conf = SparkConf().setMaster("local").setAppName("Aggregations")
sc = SparkContext(conf= conf)
spark = SparkSession(sc)

In [4]:
data = "/content/drive/MyDrive/data for spark/online-retail-dataset.csv"

In [5]:
df = spark.read.format("csv").option("header","true").option("inferSchema","true").load(data).coalesce(5)

df.cache() calls the persist() method which stores on storage level as MEMORY_AND_DISK, but you can change the storage level.

The persist() method calls sparkSession.sharedState.cacheManager.cacheQuery() 

In [6]:
df.cache()

DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: int, Country: string]

In [7]:
df.createOrReplaceTempView("dfTable")

In [8]:
df.count()== 541909

True

In [9]:
df.count()

541909

## Aggregation Functions
We can find most aggregation functions in the org.apache.spark.sql.functions package

### count
Function will perform as a transformation instead of an action.

In [10]:
from pyspark.sql.functions import count
df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



In [None]:
--inSQL
SELECT COUNT(*) FROM dfTable

**WARNING:** When we performing a count(*), Spark will count null values(including rows containing all nulls). However, when counting an individual column, spark will not count the null values.

### countDistinct
If you want number of unique groups than you can use ```countDistinct```. 

In [11]:
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show()

+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



In [None]:
--in SQL
SELECT COUNT(DISTINCT *) FROM DFTABLE

### approx_count_distinct
There are times when an approximation to a certain degree of accuracy will work just fine and for that we can use the ```approx_count_distinct``` function.

In [12]:
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode",0.1)).show()

+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



In [None]:
--in SQL
SELECT approx_count_distinct(StockCode,0.1) FROM DFTABLE

If you see ```approx_count_distinct``` took pramaeter other than column name that parameter is the maximum estimation error. 

- with ```approx_count_distinct``` we will acheive greater performance gains with larger datasets.

## first and last
We can get the first and last values from a DataFrame by using these two named functions.
This will be based on the rows in the DataFrame, not in the values in the DataFrame:

In [13]:
from pyspark.sql.functions import first,last
df.select(first("StockCode"), last("StockCode")).show()

+----------------+---------------+
|first(StockCode)|last(StockCode)|
+----------------+---------------+
|          85123A|          22138|
+----------------+---------------+



In [None]:
--in SQL
SELECT first(StockCode),last(StockCode) FROM dfTable

### min and max
To extract the minimum and maximum values from DataFrame, use the ```min``` and ```max``` functions.

In [14]:
from pyspark.sql.functions import min,max
df.select(min("Quantity"),max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



In [None]:
--in SQL
SELECT min(Quantity), max(Quantity) FROM dfTable

SyntaxError: ignored

### sum
```sum``` is to add the all values in a row.

In [15]:
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



In [None]:
--in SQL
SELECT sum(Quantity) FROM dfTable

### sumDistinct
```sumDistinct``` function sum a distinct set of values 

In [16]:
# in python
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show()

+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



In [None]:
-- in SQL
SELECT SUM(Quantity) FROM dfTable

### avg
We can calculate average by dividing ```sum``` by ```count```, spark provides an easier way to get that value via the ```avg``` or ```mean``` functions.

In [17]:
from pyspark.sql.functions import sum, count, avg, expr

df.select(
    count("Quantity").alias("total_transactions"),
    sum("Quantity").alias("total_purchases"),
    avg("Quantity").alias("avg_purchases"),
    expr("mean(Quantity)").alias("mean_purchases"))\
  .selectExpr(
      "total_purchases/total_transactions",
      "avg_purchases",
      "mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



**NOTE**: We can also average all the distinct values by specifying distinct. In fact, most aggregate functions support doing so only on distinct values.

### Varience and Standard Deviation

- The variance is the average of the squared differences from the mean
- The standard deviation is the square root of the variance.
- Spark has both the formula for the sample standard deviation and the population standard deviation.
- By default Spark performs the formula for the sample standad deviation or variance if we use the *variance* or *stddev* functions.

In [18]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import stddev_samp, var_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
|47559.30364660879| 47559.39140929848|  218.08095663447733|   218.08115785023355|
+-----------------+------------------+--------------------+---------------------+



In [None]:
--in SQL
SELECT var_pop(Quantity), var_samp(Quantity), stddev_pop(Quantity), stddev_samp(Quantity) FROM dfTable

### skewness and kurtosis
- Skewness measures the asymentry of the values in your data around the mean.
- Kurtosis is a measure of the tail of data.

Thease are both relevant specifically when modeing our data as a probability distribution of a random variable.

We can import `skewness` and `kurtosis` from `pyspark.sql.functions` as mentioned below:

In [19]:
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+------------------+------------------+
|skewness(Quantity)|kurtosis(Quantity)|
+------------------+------------------+
|-0.264075576105298|119768.05495534067|
+------------------+------------------+



In [None]:
-- in SQL
SELECT skewness(Quantity), kurtosis(Quantity) FROM dfTable

### Covariance and Correlation
- Correlation measures the Pearson correlation coefficent, which is scaled between -1 and +1.
- The covarience is scaled according to the inputs in the data.
- Like the `var` function, covarience can be calculated either as the sample covarience(`covar_samp`) or the population covarience(`covar_pop`).

Let us apply correlation and covarience on the columns `InvoiceNo` and `Quantity` of the dataset `online retail data`.

In [20]:
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr("InvoiceNo","Quantity"), covar_samp("InvoiceNo","Quantity"), covar_pop("InvoiceNo","Quantity")).show()

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085636837E-4|             1052.7280543912716|            1052.7260778751674|
+-------------------------+-------------------------------+------------------------------+



In [None]:
-- in SQL
SELECT corr(InvoiceNo, Quantity), covar_samp(InvoiceNo,Quantity), covar_pop(InvoiceNo, Quantity) from dfTable

### Aggregating to Complex Types
In spark, we can perform aggregations like collect a list of values(`collect_list`) present in a given column or unique values(`collect_set`) in a column of complex types.

In [21]:
from pyspark.sql.functions import collect_list, collect_set
df.agg(collect_list("Country"), collect_set("Country")).show()

+---------------------+--------------------+
|collect_list(Country)|collect_set(Country)|
+---------------------+--------------------+
| [United Kingdom, ...|[Portugal, Italy,...|
+---------------------+--------------------+



In [None]:

-- in SQL
SELECT collect_set(Country), collect_list(Country) FROM dfTable

### Grouping

Group our data on one column and perform some calculations on the other columns that end up in the group. 
  

In [22]:
df.groupBy("InvoiceNo","CustomerId").count().show()

+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
|   544578|     12365|    1|
|   545165|     16339|   20|
|   545289|     14732|   30|
+---------+----------+-----+
only showing top 20 rows



In [None]:
-- in SQL
SELECT count(*) FROM dfTable GROUP BY InvoiceNo, CustomerId


SyntaxError: ignored

### Grouping with Expressions
Rather than passing that function as an expression into a `select` statement, we specify it as within `agg`. 
This makes it possible for us to pass-in arbitary expressions that just need to have some aggregation specified.
We can do things like `alias` a column after transforming it for later use in our data flow

In [23]:
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(count("Quantity").alias("quan"), expr("count(Quantity)")).show()

+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
|   536596|   6|              6|
|   536938|  14|             14|
|   537252|   1|              1|
|   537691|  20|             20|
|   538041|   1|              1|
|   538184|  26|             26|
|   538517|  53|             53|
|   538879|  19|             19|
|   539275|   6|              6|
|   539630|  12|             12|
|   540499|  24|             24|
|   540540|  22|             22|
|  C540850|   1|              1|
|   540976|  48|             48|
|   541432|   4|              4|
|   541518| 101|            101|
|   541783|  35|             35|
|   542026|   9|              9|
|   542375|   6|              6|
|  C542604|   8|              8|
+---------+----+---------------+
only showing top 20 rows



### Grouping with Maps
Sometimes, it can be easier to specify our transformations as a series of `Maps` for which the `key` is the column, and the value is  the aggregation function(as a string) that we would like to perform. We can reuse multiple column names if we specify them inline.

In [24]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"), expr("stddev_pop(Quantity)")).show()

+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

## Window Functions


- We use *window functions* to carry out some unique aggregations  by either computing some aggregation on specific "Window" of data, which you define by using a reference to the current data.

- A window function calculates a return value for every input row of a table based on a group of rows, called a frame.

- Spark supports three kinds of window functions: ranking functions, analytic functions, and aggregate functions.

To demonstrate we will add a date column that will convert our invoice date into a column that contains only date information:

In [25]:
from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),"MM/d/yyyy H:mm"))
#dfWithDate.createOrRepalceTempView("dfWithDate")

The first step to a window function is to create a window specification. 

**Note**: The `partition` by is unrelated to the partitioning scheme concept, It is just a similar concept that describes how we will be breaking up our group.

The ordering determines the ordering within a given partition, and, finally, the frame specification(the `rowsBetween` statement) states which rows will be included in the frame based on its reference to the current input row.

In [26]:
# in python
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window.partitionBy("CustomerId","date").orderBy(desc("Quantity")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

Now we want to use an aggregation function to learn more about each specific customer.
An example here is the maximum purchase quantity over all time.

We use the same aggregation functions that we saw earlier by passing a column name or expression.

We indicate the window specification that defines to which frames of data this function will apply.

In [27]:
# in python
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)


We will notice that this returns a column(or expressions). We can now use this in DataFrame `select` statement. Before doing it, we create the purchase quantity rank. For that we use the `dense_rank` function to determine which date had the maximum purchase quantity for every customer.

- we use `dense_rank` as opposed to rank to avoid gaps in the ranking sequence when there are tied values or duplicate values.

In [28]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

This returns a column that we can use in select statements. Now we can perform a `select` to view the calculated window values.

In [None]:
#in python
from pyspark.sql.functions import col

dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId").select(col("CustomerId"),col("date"),col("Quantity"),purchaseRank.alias("quantityRank"),purchaseDenseRank.alias("quantityDenseRank"),maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()


Py4JJavaError: ignored

In [None]:
-- in SQL
SELECT CustomerId, date, Quantity, rank(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as rank, 
 dense_rank(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as dRank,
 max(Quantity) OVER (PARTITION BY CustomerId, date ORDER BY Quantity DESC NULLS LAST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as maxPurchase
 FROM dfWithDate WHERE CustomerId IS NOT NULL ORDER BY CustomerId

## Grouping Sets


Some times we want something a bit more complete- an aggregation across multiple groups. We acheive this by using grouping sets.

Grouping sets are a low-level tool for combining sets of aggregations together. They give you the ability to create arbitrary aggregations in their group-by statements.

In [None]:
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")

In [None]:
--in SQL
SELECT CustomerId, StockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode ORDER BY CustomerId DESC, stockCode DESC

We can do exact same thing by using a grouping set.

In [None]:
-- in SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode)) ORDER BY CustomerId DESC, stockCode DESC

**WARNING**: Grouping sets depend on null values for aggregation levels. If we do not filter-out null values, we will get incorrect results. This applies to cubes, rollups, and grouping sets.

In [None]:
--in SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),()) ORDER BY CustomerId DESC, stockCode DESC

The `GROUPING SETS` operator is only available in SQL. To perform the same in DataFrames, we use the `rollup` and `cube` operators

### Rollups
When we set our grouping keys of multiple columns, Spark looks at those as well as the actual combinations that are visible in the dataset.
A `rollup` is a multidimensional aggregation that performs a variety of group-by style calculations for us.

In [None]:
rolledUpDF = dfNoNull.rollup("Date","Country").agg(sum("Quantity")).selectExpr("Date","Country","'sum(Quantity)' as total_quantity").orderBy("Date")

In [None]:
rolledUpDF.show()

Py4JJavaError: ignored

Now where we see the null values is where we'll find the grand totals. A `null` in both `rollup` columns specifies the grand total across both of those columns.

In [None]:
rolledUpDF.where("Country IS NULL").show()

Py4JJavaError: ignored

### Cube
A cube takes the rollup to a level deeper. Rather treating elements hierarchically, a cube does the same thing across all dimensions. That means that it won't just go by date over the entire time period.

In [None]:
from pyspark.sql.functions import sum

dfNoNull.cube("Date","Country","sum(Quantity)").orderBy("Date").show()

AttributeError: ignored

## Grouping Metadata
When using cubes and rollups, we want to be able to query the aggregation levels so that we can easily filter them accordingly.

We can do this by using the `grouping_id`, which gives us a column specifying the level of aggregation that we have in our result set.


### Pivot
Pivot makes it possible for us to convert a row into a column 

In [29]:
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

In [30]:
pivoted.where("date > '2011-12-05'").select("date","'USA_sum(Quantity)'").show()

AnalysisException: ignored

## User-Defined Aggregation Functions
User-defined aggregation functions(UDAFs) are a way for users to define their own aggregation functions based on custom formulae or business rules.

- We can use UDAFs to compute custom calculations over groups of input data (as opposed to single rows).

- Spark maintains a single `AggregationBuffer` to store intermediate results for every group of input data.

- To create a UDAF, we must inherit from the `UserDefinedAggregateFunction` base class and implement the following methods:
-- `inputShema` reprsents input arguments as a `StructType`