Aggregating is the act of collecting something together and is a cornerstone of big data analytics.
In an aggregation, you will specify a key or grouping and an aggregation function that specifies
how you should transform one or more columns. This function must produce one result for each
group, given multiple input values. Spark’s aggregation capabilities are sophisticated and mature,
with a variety of different use cases and possibilities

* Spark also allows us to create the following groupings types
  * The **simplest grouping** is to just summarize a complete DataFrame by performing an aggregation in a select statement.
  * A **group by** allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns.
  * A **window** gives you the ability to specify one or more keys as well as one or more aggregation functions to transform the value columns. However, the rows input to the function are somehow related to the current row.
  * A **grouping set,** which you can use to aggregate at multiple different levels. Grouping sets are available as a primitive in SQL and via rollups and cubes in DataFrames.
  * A **rollup** makes it possible for you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized hierarchically.
  * A **cube** allows you to specify one or more keys as well as one or more aggregation functions to transform the value columns, which will be summarized across all combinations of columns.

* Each grouping returns a RelationalGroupedDataset on which we specify our aggregations.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Chapter 7").getOrCreate()

23/07/30 13:46:21 WARN Utils: Your hostname, FM-PC-LT-342 resolves to a loopback address: 127.0.1.1; using 192.168.1.87 instead (on interface wlp0s20f3)
23/07/30 13:46:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/30 13:46:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
spark

In [8]:
df = spark.read.format("csv").\
    option("header","true").\
    option("inferSchema","true").\
    load("/home/fm-pc-lt-342/Documents/Spark Docx/datasets/datas/retail-data/online-retail-dataset.csv").coalesce(5)

                                                                                

In [9]:
df.cache()
df.createOrReplaceTempView("dfTable")

In [10]:
df.count()

                                                                                

541909

### Aggregration Functions

* ### count

The first function worth going over is count, except in this example it will perform as a transformation instead of an action. In this case, we can do one of two things: specify a specificcolumn to count, or all the columns by using count(*) or count(1) to represent that we want to count every row as the literal one, as shown in this example:

In [11]:
from pyspark.sql.functions import count

df.select(count("StockCode")).show()

+----------------+
|count(StockCode)|
+----------------+
|          541909|
+----------------+



* ### count distinct

Sometimes, the total number is not relevant; rather, it’s the number of unique groups that you
want. To get this number, you can use the countDistinct function. This is a bit more relevant
for individual columns:

In [12]:
from pyspark.sql.functions import countDistinct

df.select(countDistinct("StockCode")).show()



+-------------------------+
|count(DISTINCT StockCode)|
+-------------------------+
|                     4070|
+-------------------------+



                                                                                

* ### approx_count_distinct

Often, we find ourselves working with large datasets and the exact distinct count is irrelevant.
There are times when an approximation to a certain degree of accuracy will work just fine, and
for that, you can use the approx_count_distinct function:

In [13]:
from pyspark.sql.functions import approx_count_distinct

df.select(approx_count_distinct("StockCode",0.1)).show()



+--------------------------------+
|approx_count_distinct(StockCode)|
+--------------------------------+
|                            3364|
+--------------------------------+



                                                                                

You will notice that approx_count_distinct took another parameter with which you can
specify the maximum estimation error allowed. In this case, we specified a rather large error and
thus receive an answer that is quite far off but does complete more quickly than countDistinct.
You will see much greater performance gains with larger datasets.

* ### first and last

You can get the first and last values from a DataFrame by using these two obviously named
functions. This will be based on the rows in the DataFrame, not on the values in the DataFrame:

In [14]:
from pyspark.sql.functions import first, last
df.select(first("StockCode").alias("firstvalue"), last("StockCode").alias("lastvalue")).show()

+----------+---------+
|firstvalue|lastvalue|
+----------+---------+
|    85123A|    22138|
+----------+---------+



* ### min and max

To extract the minimum and maximum values from a DataFrame, use the min and max functions:

In [15]:
from pyspark.sql.functions import min, max, col

df.select(min(col("Quantity")),max("Quantity")).show()

+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
|       -80995|        80995|
+-------------+-------------+



* ### sum

Another simple task is to add all the values in a row using the sum function:

In [16]:
from pyspark.sql.functions import sum

df.select(sum("Quantity")).show()

+-------------+
|sum(Quantity)|
+-------------+
|      5176450|
+-------------+



* ### sum distinct

In addition to summing a total, you also can sum a distinct set of values by using the
sumDistinct function:

In [17]:
from pyspark.sql.functions import sumDistinct

df.select(sumDistinct("Quantity")).show() # 29310



+----------------------+
|sum(DISTINCT Quantity)|
+----------------------+
|                 29310|
+----------------------+



                                                                                

* ### avg

Although you can calculate average by dividing sum by count, Spark provides an easier way to
get that value via the avg or mean functions. In this example, we use alias in order to more
easily reuse these columns later:

In [18]:
from pyspark.sql.functions import sum, count, avg, expr

df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()

+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)|   avg_purchases|  mean_purchases|
+--------------------------------------+----------------+----------------+
|                      9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+



* ### Variance and Standard Deviation

Calculating the mean naturally brings up questions about the variance and standard deviation.
These are both measures of the spread of the data around the mean. The variance is the average
of the squared differences from the mean, and the standard deviation is the square root of the
variance. You can calculate these in Spark by using their respective functions. However,
something to note is that Spark has both the formula for the sample standard deviation as well as
the formula for the population standard deviation. These are fundamentally different statistical
formulae, and we need to differentiate between them. By default, Spark performs the formula for
the sample standard deviation or variance if you use the variance or stddev functions.
You can also specify these explicitly or refer to the population standard deviation or variance:

In [19]:
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp

df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()

+-----------------+------------------+--------------------+---------------------+
|var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quantity)|
+-----------------+------------------+--------------------+---------------------+
| 47559.3036466091|  47559.3914092988|  218.08095663447807|   218.08115785023426|
+-----------------+------------------+--------------------+---------------------+



* ### skewness and kurtosis

Skewness and kurtosis are both measurements of extreme points in your data. Skewness
measures the asymmetry of the values in your data around the mean, whereas kurtosis is a
measure of the tail of data. These are both relevant specifically when modeling your data as a
probability distribution of a random variable. Although here we won’t go into the math behind
these specifically, you can look up definitions quite easily on the internet. You can calculate
these by using the functions:

In [20]:
from pyspark.sql.functions import skewness, kurtosis

df.select(skewness("Quantity"), kurtosis("Quantity")).show()

+--------------------+------------------+
|  skewness(Quantity)|kurtosis(Quantity)|
+--------------------+------------------+
|-0.26407557610529564|119768.05495534712|
+--------------------+------------------+



* ### Covariance and Correlation

We discussed single column aggregations, but some functions compare the interactions of the
values in two difference columns together. Two of these functions are cov and corr, for
covariance and correlation, respectively. Correlation measures the Pearson correlation
coefficient, which is scaled between –1 and +1. The covariance is scaled according to the inputs
in the data.

In [21]:
from pyspark.sql.functions import corr, covar_pop, covar_samp

df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity")).show()

[Stage 47:>                                                         (0 + 5) / 5]

+-------------------------+-------------------------------+------------------------------+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo, Quantity)|covar_pop(InvoiceNo, Quantity)|
+-------------------------+-------------------------------+------------------------------+
|     4.912186085642775E-4|             1052.7280543915654|            1052.7260778754612|
+-------------------------+-------------------------------+------------------------------+



                                                                                

### Aggregating to Complex Types

In Spark, you can perform aggregations not just of numerical values using formulas, you can also
perform them on complex types. For example, we can collect a list of values present in a given
column or only the unique values by collecting to a set.
You can use this to carry out some more programmatic access later on in the pipeline or pass the
entire collection in a user-defined function (UDF):

In [24]:
from pyspark.sql.functions import collect_set, collect_list

df.agg(collect_set("Country"),collect_list("Country")).show()

+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+



### Grouping
Thus far, we have performed only DataFrame-level aggregations. A more common task is to
perform calculations based on groups in the data. This is typically done on categorical data forwhich we group our data on one column and perform some calculations on the other columns
that end up in that group.

The best way to explain this is to begin performing some groupings. The first will be a count,
just as we did before. We will group by each unique invoice number and get the count of items
on that invoice. Note that this returns another DataFrame and is lazily performed.

We do this grouping in two phases. First we specify the column(s) on which we would like to
group, and then we specify the aggregation(s). The first step returns a
RelationalGroupedDataset, and the second step returns a DataFrame.
As mentioned, we can specify any number of columns on which we want to group:

In [25]:
df.groupBy("InvoiceNo","CustomerId").count().show()



+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
|   536846|     14573|   76|
|   537026|     12395|   12|
|   537883|     14437|    5|
|   538068|     17978|   12|
|   538279|     14952|    7|
|   538800|     16458|   10|
|   538942|     17346|   12|
|  C539947|     13854|    1|
|   540096|     13253|   16|
|   540530|     14755|   27|
|   541225|     14099|   19|
|   541978|     13551|    4|
|   542093|     17677|   16|
|   543188|     12567|   63|
|   543590|     17377|   19|
|  C543757|     13115|    1|
|  C544318|     12989|    1|
|   544578|     12365|    1|
|   536596|      null|    6|
|   537252|      null|    1|
+---------+----------+-----+
only showing top 20 rows



                                                                                

### Grouping with Expressions
Rather than passing count() function as an expression into a select statement, we specify it as within agg. This makes it possible for you to pass-in arbitrary expressions that just need to have some aggregations specified

In [28]:
df.groupBy("InvoiceNo").agg(\
                           count("Quantity").alias("quan"),
                           avg("Quantity").alias("avg_quantity"),
                           expr("count(Quantity)")).show(3)

+---------+----+------------------+---------------+
|InvoiceNo|quan|      avg_quantity|count(Quantity)|
+---------+----+------------------+---------------+
|   536596|   6|               1.5|              6|
|   536938|  14|33.142857142857146|             14|
|   537252|   1|              31.0|              1|
+---------+----+------------------+---------------+
only showing top 3 rows



### Grouping with Maps
Sometimes, it can be easier to specify your transformations as a series of Maps for which the key
is the column, and the value is the aggregation function (as a string) that you would like to
perform. You can reuse multiple column names if you specify them inline, as well:

In [29]:
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)")).show()



+---------+------------------+--------------------+
|InvoiceNo|     avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
|   536596|               1.5|  1.1180339887498947|
|   536938|33.142857142857146|  20.698023172885524|
|   537252|              31.0|                 0.0|
|   537691|              8.15|   5.597097462078001|
|   538041|              30.0|                 0.0|
|   538184|12.076923076923077|   8.142590198943392|
|   538517|3.0377358490566038|  2.3946659604837897|
|   538879|21.157894736842106|  11.811070444356483|
|   539275|              26.0|  12.806248474865697|
|   539630|20.333333333333332|  10.225241100118645|
|   540499|              3.75|  2.6653642652865788|
|   540540|2.1363636363636362|  1.0572457590557278|
|  C540850|              -1.0|                 0.0|
|   540976|10.520833333333334|   6.496760677872902|
|   541432|             12.25|  10.825317547305483|
|   541518| 23.10891089108911|  20.550782784878713|
|   541783|1

                                                                                

### Window Functions:

window functions are used to carry out some unique aggregations by either computing some aggregation on a specific **window** of data, which you define by using a reference to the current data.

This window specification determines which rows will be passed in to this function.

A groupBy takes data, and every row can go only into one grouping. A window function calculates a return value for every input row of a table based on group of rows, called a frame. Each row fall into one or more frames.

Spark supports three kinds of window functions:
* ranking functions - Rank, Dense_Rank, Row_Number etc..,
* analytic functions - Lead, Lag,First_value,Last_value etc..,
* aggregate functions- sum,avg,count,min,max etc..,

OVER clause defines the partitioning and ordering of rows(i.e a window) for the above functions to operate on. Hence these functions are called window functions. The OVER clause accepts the following three arguments to define a window for these functions to operate on.

* ORDERBY - Defines the logical order of the rows
* PARTITION BY - Divides the query result set in to partitions. The window function is applied to each partition separately.
* ROWS or RANGE Clause - Further limits the rows within the partition by specifying start and end points within the partition

In [31]:
from pyspark.sql.functions import to_date


In [51]:
dfWithDate = df.withColumn("date",to_date(col("InvoiceDate"),"MM/d/YYYY"))
dfWithDate.createOrReplaceTempView('dfWithDate')

In [53]:
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

windowSpec = Window.partitionBy("CustomerId","date").orderBy(desc("Quantity")).rowsBetween(Window.unboundedPreceding, Window.currentRow)

In [54]:
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)


You will notice that this returns a column (or expressions). We can now use this in a DataFrame
select statement. Before doing so, though, we will create the purchase quantity rank. To do that
we use the dense_rank function to determine which date had the maximum purchase quantity
for every customer. We use dense_rank as opposed to rank to avoid gap

In [55]:
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

This also returns a column that we can use in select statements. Now we can perform a select to
view the calculated window values:

In [59]:
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()

In [64]:
spark.sql('''
          SELECT CustomerId, date, Quantity,
            rank(Quantity) OVER (PARTITION BY CustomerId, date
            ORDER BY Quantity DESC NULLS LAST
            ROWS BETWEEN
            UNBOUNDED PRECEDING AND
            CURRENT ROW) as rank,
            dense_rank(Quantity) OVER (PARTITION BY CustomerId, date
            ORDER BY Quantity DESC NULLS LAST
            ROWS BETWEEN
            UNBOUNDED PRECEDING AND
            CURRENT ROW) as dRank,
            max(Quantity) OVER (PARTITION BY CustomerId, date
            ORDER BY Quantity DESC NULLS LAST
            ROWS BETWEEN
            UNBOUNDED PRECEDING AND
            CURRENT ROW) as maxPurchase
            FROM dfWithDate WHERE CustomerId IS NOT NULL ORDER BY CustomerId
          ''')

DataFrame[CustomerId: int, date: date, Quantity: int, rank: int, dRank: int, maxPurchase: int]