# Aggregates ➕➕

We have used some Spark SQL before, today we will practice with it some more, specifically we will focus on **aggregation operations**.

## What will you learn in this course? 🧐🧐

This course will walk you through the mechanics of aggregation with Spark SQL. Here's the outline:

* Import functions
* Data
    * Schema
    * Date
* GroupedData
    * `.mean()`
    * `.sum()`
    * `.count()`
    * Other aggregations
    * `.agg(exprs)`
        * exprs as dict
        * exprs as list
        * advanced expressions

In this notebook we will be using data from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/datasets/Online%20Retail).  

We'll actually use 👉 [this version](https://www.kaggle.com/carrie1/ecommerce-data) 👈 of the dataset for convenience reasons (csv format).

In [None]:
filepath = "s3://full-stack-bigdata-datasets/Big_Data/data.csv"

sales = spark.read.format('csv')\
             .option('header', 'true')\
             .option('inferSchema', 'true')\
             .load(filepath)

## Import functions 🔧🔧

Before we get started, let's import PySpark's SQL functions. This module contains all the functions you'll need to do everything you could do with regular SQL using PySpark!

In [None]:
from pyspark.sql import functions as F

## Our data 📊📊
Let's take a quick look at the first few rows of our dataset to get a feel of what's in there before we get started.

In [None]:
sales.show(5)

We have 8 columns here: 

* InvoiceNo: the identification number for an order
* StockCode: the identification of the product
* Description: a description of the product
* Quantity: The amount of product purchased
* InvoiceDate: The day and time the order was placed
* UnitPrice: The price for one unit of the product
* CustomerID: The identification number of the customer
* Country: The customer's country of residence

We're **ready to roll!**

### Schema 📋
Let's take a look of the data schema of this DataFrame, make sure everyting is in order.

In [None]:
sales.printSchema()

In [None]:
sales.describe().toPandas()

Unnamed: 0,summary,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,count,541909,541909,540455,541909.0,541909,541909.0,406829.0,541909
1,mean,559965.752026781,27623.240210938104,20713.0,9.55224954743324,,4.6111136260897085,15287.690570239583,
2,stddev,13428.417280796779,16799.737628427658,,218.0811578502344,,96.75985306117964,1713.6003033215982,
3,min,536365,10002,4 PURPLE FLOCK DINNER CANDLES,-80995.0,1/10/2011 10:04,-11062.06,12346.0,Australia
4,max,C581569,m,wrongly sold sets,80995.0,9/9/2011 9:52,38970.0,18287.0,Unspecified


Looks like some columns have incoherent values, let's clean them up a little before we move on with the analysis:

In [None]:
sales = sales.filter((F.col("Quantity")>0) & (F.col("UnitPrice")>0)) 
sales.describe().toPandas()
# filter out quantities and unitprices below 0

Unnamed: 0,summary,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,count,530104,530104,530104,530104.0,530104,530104.0,397884.0,530104
1,mean,559981.4746888812,27591.351654656588,,10.542037034242338,,3.907625247122361,15294.423452564064,
2,stddev,13430.049737665431,16756.848658106595,,155.52412351063666,,35.915681104255405,1713.141560439857,
3,min,536365,10002,4 PURPLE FLOCK DINNER CANDLES,1.0,1/10/2011 10:32,0.001,12346.0,Australia
4,max,A563185,m,ZINC WIRE SWEETHEART LETTER TRAY,80995.0,9/9/2011 9:52,13541.33,18287.0,Unspecified


Seems like the data Schema did not get any of the types wrong, except for the date, let's convert the date to the proper format before we are ready to move forward.

### Date 📆

Let's work on the date data a little so it's in the proper format.

Two functions are very useful for handling dates `F.to_timestamp(col, format="")` which converts character strings to [timestamp type](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.TimestampType.html#pyspark.sql.types.TimestampType) and `F.to_date(col,format="")`which converts character strings to [date type](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.types.DateType.html#pyspark.sql.types.DateType).

Here are links to the documentation if you want to go further: [to_date](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.to_date.html), [to_timestamp](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.to_timestamp.html).

The function to_timestamp is able to convert the string format date to a timestamp format! All you have to do is write the proper format argument:

In [None]:
sales = sales.withColumn("InvoiceDateClean", F.to_timestamp(F.col("InvoiceDate"), format="M/d/y H:m"))
sales.show(5)

Let's drop the previous date column:

In [None]:
sales = sales.drop(F.col("InvoiceDate"))

Once the column is in date format, it is possible to extract specific information from the date:

In [None]:
from pyspark.sql.functions import year, month, dayofmonth, dayofweek, dayofyear, weekofyear

sales.select(year(F.col(InvoiceDateClean))).show(5)

## GroupedData 📊📊

What's the type of `sales.groupBy('CustomerID')`?

In [None]:
type(sales.groupBy('CustomerID'))

A `GroupedData`, here's the link to the [documentation](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.GroupedData).

We'll go through what we can do with this.

### .mean()

`.mean()` is an alias to `.avg()`

Compute the average of all numeric columns with `.avg()`

In [None]:
sales.groupBy('CustomerID').avg().show(5)

In [None]:
sales.groupBy('CustomerID').mean().show(5)

### `.sum()`

In [None]:
sales.groupBy('customerID').sum().show(5)

We can select a specific column...

In [None]:
sales.groupBy('customerID').mean('Quantity').show(5)

And several columns at once...

In [None]:
sales.groupBy('customerID').mean('Quantity', 'UnitPrice').show(5)

This won't work with a list, a tuple or a generator. These need to be **unpacked**:

In [None]:
# This will fail without unpacking!
col_list = ['Quantity', 'UnitPrice']
sales.groupBy('customerID').mean(*col_list).show(5)

### `.count()`

`count()` is a bit different, it doesn't apply to any column, it count the number of rows in the DataFrame.  
This is different from pandas, where `.count()` count the number of non-null values.

In [None]:
sales.groupBy('customerID').count().show(5)

### Other aggregations 🛠️

Other aggregation functions include: `.max()`, `.min()` and `.pivot()`.

In [None]:
sales.groupBy('customerID').pivot('Quantity').count().limit(5).toPandas()
# this creates a pivot table where rows represent cutomer ids, columns
# represent quantities and the values correspond to the result of the aggregation
# function

Unnamed: 0,customerID,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,...,1296,1300,1350,1356,1394,1400,1404,1412,1428,1440,1488,1500,1515,1540,1600,1728,1788,1820,1824,1878,1900,1930,1944,1992,2000,2040,2100,2160,2400,2592,2700,2880,3000,3114,3186,3906,4300,4800,74215,80995
0,16503,5.0,15,6.0,7.0,,17,,11.0,,5.0,,14,,,,1.0,,,,2.0,,,,1.0,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
1,15727,22.0,27,11.0,40.0,5.0,57,,11.0,2.0,39.0,2.0,35,,,,1.0,,1.0,,4.0,1.0,,,22.0,7.0,,,,1.0,1.0,,,,,,2.0,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
2,15447,,1,,,1.0,1,,1.0,,,,4,,,,1.0,,,,,,,,,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
3,14450,4.0,5,3.0,10.0,2.0,6,1.0,1.0,,2.0,,4,,,,,,,,,,,,1.0,1.0,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,
4,17420,1.0,4,,,,4,,,,16.0,,4,,,,,,,,,,,,1.0,,,,,,,,,,,,,,,,...,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,


### `.agg(exprs)`

And a last one: `.agg(exprs)`. This one is a bit meta, it will compute the aggregate of a function given as parameter.  

From the [Documentation](https://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.GroupedData.agg):

> The available aggregate functions are `avg`, `max`, `min`, `sum`, `count`.

It makes it possible to easily compute more complicated aggregations.

#### Single dict mapping str to str 

If `exprs` is a single dict mapping from string to string, then the key is the column to perform aggregation on, and the value is the aggregate function.

In [None]:
agg_dict = {'Quantity': 'mean', 'UnitPrice': 'sum'}
sales.groupBy('customerId').agg(agg_dict).show(5)

Can use `'*'` if calling `.count()`. But not with others.

In [None]:
sales.groupBy('customerId').agg({'*': 'count'}).show(5)

#### List of aggregate expressions

Alternatively, `exprs` can also be a "list" of aggregate Column expressions.  

This requires **unpacking**:

In [None]:
agg_exprs = (F.mean('Quantity'), F.sum('UnitPrice'))
sales.groupBy('customerId').agg(*agg_exprs).show()

Now, you can alias, and that's because `.alias()` also returns a _Column Expression_.

In [None]:
type(F.mean('Quantity').alias('meanQuantity'))

In [None]:
sales.groupBy('customerId').agg(
    F.mean('Quantity').alias('meanQuantity'),
    F.sum('UnitPrice').alias('totalPrice')
).show(5)

#### Advanced expressions ⚗️

It possible to calculate more complicated expressions thanks to `.agg()`:

In [None]:
sales.groupBy('customerId').agg((F.sum(F.col("UnitPrice"))/F.sum(F.col("Quantity"))).alias("avg_unit_price")).show(5) 