# Day 5 - Doing Some Math
Doing some calculations is a common task for me when I'm writing ETL jobs, e.g. when amounts need to be aligned to the domestic currency (for me mostly EUR) or when I need to unify the scaling of numeric values. The handling of missing values is alsa a regular issuer here. 

Applying math functions becomes even more impartant to me, when it comes to analytical queries and Key Performance Indicator calculation. So today, I want to have a closer look at the following pyspark sub-modules: 
* `pyspar.sql.functions`
* `pyspark.sql.DataFrameNaFunctions`
* `pyspark.sql.DataFrameStatFunctions`

## Some Data Preperation

In [15]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession\
   .builder\
   .getOrCreate()

In [16]:
df = spark.read\
   .format("csv")\
   .option("header", "true")\
   .option("inferSchema", "true")\
   .load("./data/day-005/retail-data/by-day/*.csv")

In [17]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: timestamp (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [18]:
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

Looking at the derived schema and a small data sample shows, that Spark interpretes the Customer ID as as decimal numbers (double). Actually they are integers and so I therefore I want to get rid of the decimals. 

Rounding to zero decimals is not a good option, because the result is still a double having a decimal.

In [19]:
from pyspark.sql.functions import round

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice", 
          round("CustomerID",0), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+--------------------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|round(CustomerID, 0)|       Country|
+---------+---------+--------+---------+--------------------+--------------+
|   580538|    23084|      48|     1.79|             14075.0|United Kingdom|
|   580538|    23077|      20|     1.25|             14075.0|United Kingdom|
|   580538|    22906|      24|     1.65|             14075.0|United Kingdom|
|   580538|    21914|      24|     1.25|             14075.0|United Kingdom|
|   580538|    22467|       6|     2.55|             14075.0|United Kingdom|
|   580538|    21544|      48|     0.85|             14075.0|United Kingdom|
|   580538|    23126|       8|     4.95|             14075.0|United Kingdom|
|   580538|    21833|      24|     1.69|             14075.0|United Kingdom|
|   580539|    21479|       4|     4.25|             18180.0|United Kingdom|
|   580539|   84030E|       4|     4.25|             18180.0|United Kingdom|

Since this is more an data type issue, rather than a calculation problem, type casting is more appropriate here.

In [20]:
from pyspark.sql.functions import col 

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice", 
          col("CustomerID").cast("integer"), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+----------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|CustomerID|       Country|
+---------+---------+--------+---------+----------+--------------+
|   580538|    23084|      48|     1.79|     14075|United Kingdom|
|   580538|    23077|      20|     1.25|     14075|United Kingdom|
|   580538|    22906|      24|     1.65|     14075|United Kingdom|
|   580538|    21914|      24|     1.25|     14075|United Kingdom|
|   580538|    22467|       6|     2.55|     14075|United Kingdom|
|   580538|    21544|      48|     0.85|     14075|United Kingdom|
|   580538|    23126|       8|     4.95|     14075|United Kingdom|
|   580538|    21833|      24|     1.69|     14075|United Kingdom|
|   580539|    21479|       4|     4.25|     18180|United Kingdom|
|   580539|   84030E|       4|     4.25|     18180|United Kingdom|
+---------+---------+--------+---------+----------+--------------+
only showing top 10 rows



Next I want to calculate the amount for each invoice position.

In [21]:
from pyspark.sql.functions import col 

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice",
          (col("Quantity") * col("UnitPrice")).alias("Amount"),
          col("CustomerID").cast("integer"), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+------------------+----------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|            Amount|CustomerID|       Country|
+---------+---------+--------+---------+------------------+----------+--------------+
|   580538|    23084|      48|     1.79|             85.92|     14075|United Kingdom|
|   580538|    23077|      20|     1.25|              25.0|     14075|United Kingdom|
|   580538|    22906|      24|     1.65|39.599999999999994|     14075|United Kingdom|
|   580538|    21914|      24|     1.25|              30.0|     14075|United Kingdom|
|   580538|    22467|       6|     2.55|15.299999999999999|     14075|United Kingdom|
|   580538|    21544|      48|     0.85|              40.8|     14075|United Kingdom|
|   580538|    23126|       8|     4.95|              39.6|     14075|United Kingdom|
|   580538|    21833|      24|     1.69|             40.56|     14075|United Kingdom|
|   580539|    21479|       4|     4.25|              

Ok, I forgot to round the amount to two decimals.

In [22]:
from pyspark.sql.functions import col, round 

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice",
          round((col("Quantity") * col("UnitPrice")), 2).alias("Amount"),
          col("CustomerID").cast("integer"), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+------+----------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|Amount|CustomerID|       Country|
+---------+---------+--------+---------+------+----------+--------------+
|   580538|    23084|      48|     1.79| 85.92|     14075|United Kingdom|
|   580538|    23077|      20|     1.25|  25.0|     14075|United Kingdom|
|   580538|    22906|      24|     1.65|  39.6|     14075|United Kingdom|
|   580538|    21914|      24|     1.25|  30.0|     14075|United Kingdom|
|   580538|    22467|       6|     2.55|  15.3|     14075|United Kingdom|
|   580538|    21544|      48|     0.85|  40.8|     14075|United Kingdom|
|   580538|    23126|       8|     4.95|  39.6|     14075|United Kingdom|
|   580538|    21833|      24|     1.69| 40.56|     14075|United Kingdom|
|   580539|    21479|       4|     4.25|  17.0|     18180|United Kingdom|
|   580539|   84030E|       4|     4.25|  17.0|     18180|United Kingdom|
+---------+---------+--------+--------

Now I save the intermediate results of my data preperation in a variable, to keep the further analytical queries more simple. By decomposing my query into a preperation part and an analytical part, I get the option to check, that the intermediat results are correct and so they a the appropriate foundation of my analysis.

In [23]:
preparedDf = df.select(
    "InvoiceNO", 
    "InvoiceDate",
    "StockCode", 
    "Quantity", 
    "UnitPrice",
    round((col("Quantity") * col("UnitPrice")), 2).alias("Amount"),
    col("CustomerID").cast("integer"), 
    "Country")

preparedDf.show(10)

+---------+-------------------+---------+--------+---------+------+----------+--------------+
|InvoiceNO|        InvoiceDate|StockCode|Quantity|UnitPrice|Amount|CustomerID|       Country|
+---------+-------------------+---------+--------+---------+------+----------+--------------+
|   580538|2011-12-05 08:38:00|    23084|      48|     1.79| 85.92|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    23077|      20|     1.25|  25.0|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    22906|      24|     1.65|  39.6|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    21914|      24|     1.25|  30.0|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    22467|       6|     2.55|  15.3|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    21544|      48|     0.85|  40.8|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    23126|       8|     4.95|  39.6|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    21833|      24|     1.69|

## Aggregations
### Aggregating on DataFrames
First I want to apply several aggregation functions on the entire dataset in the `DataFrame` to do some data profiling. To make the output more readable, I switch `show()` to vertical output to get many rows instead of many columns.

In [33]:
from pyspark.sql.functions import *

preparedDf\
    .select(
        count("Amount").alias("count"), 
        countDistinct("Amount").alias("countDistinct"),
        approx_count_distinct("Amount", rsd=0.1).alias("approx_count_distinct"),
        sum("Amount").alias("sum"), 
        sumDistinct("Amount").alias("sumDistinct"),
        min("Amount").alias("min"), 
        max("Amount").alias("max"), 
        avg("Amount").alias("avg"), 
        mean("Amount").alias("mean"), 
        variance("Amount").alias("variance"), 
        var_samp("Amount").alias("var_samp"),
        var_pop("Amount").alias("var_pop"),
        stddev("Amount").alias("stddev"),
        kurtosis("Amount").alias("kurtosis"),
        skewness("Amount").alias("skewness"),
        first("Amount").alias("first"),
        last("Amount").alias("last")
    )\
    .show(vertical=True)

-RECORD 0------------------------------------
 count                 | 541909              
 countDistinct         | 5827                
 approx_count_distinct | 5417                
 sum                   | 9747747.929999992   
 sumDistinct           | 863586.9200000005   
 min                   | -168469.6           
 max                   | 168469.6            
 avg                   | 17.987794869618316  
 mean                  | 17.987794869618316  
 variance              | 143497.64000554013  
 var_samp              | 143497.64000554013  
 var_pop               | 143497.37520528768  
 stddev                | 378.81082350632505  
 kurtosis              | 151196.60137753483  
 skewness              | -0.9643865070858197 
 first                 | 3.26                
 last                  | 176.48              



There are further aggregating I don't have a use case in my example here, functions like:
* **corr(col1, col2)** - returns a new Column for the Pearson Correlation Coefficient for col1 and col2
* **covar_pop(col1, col2)** - returns a new Column for the population covariance of col1 and col2
* **covar_samp(col1, col2)** - Returns a new Column for the sample covariance of col1 and col2

Most of the aggregation function names are self-explaining, so nothing to comment on. Just first() and last() are a bit special. In contrast to most of the other aggregation functions, `first()` and `last()` both refer to the value **position** in the dataset and not to the value **amount**, like `min()` and `max()` do. So `first()` and `last()` are the only aggregation functions, being affected by data sorting.

Back to data profiling. The ratio between count and countDistinct is an important indicator to identify key candidate columns. For primary keys, the ratio must be 1, i.e. countDistinct must equal the total count of values so it's cardinality must be also 1 to ensure uniqueness. 

Even though not beeing unique, columns with low cardinality are still candidates for performant data acess patterns. The cardinality is a measure of the average number of rows I will get when filtering in such a column value.

The reverse value of cardinality, the entropy, is an indicator how compressible a column is. `DataFrames` having many columns with low entropy benefit much from a columnar storage format. On the other extreme, primary key columns are not compressible at all.

In [37]:
preparedDf\
    .select(
        count("Amount").alias("count"), 
        countDistinct("Amount").alias("countDistinct")
    )\
    .withColumn("cardinality", col("count") / col("countDistinct"))\
    .withColumn("entropy", col("countdistinct") / col("count"))\
    .show(vertical=True)

-RECORD 0-----------------------------
 count         | 541909               
 countDistinct | 5827                 
 cardinality   | 92.99965677020765    
 entropy       | 0.010752727856522036 



Not suprisingly the amount column is not a key candidate but it is very interesting, that the cardinality is quite high. Maybe there are only a few standard unit prices and/or lot sizes I can put orders on. Let's compare it with the *InvoiceNO* column. The column name sounds like a key.

In [39]:
preparedDf\
    .select(
        count("InvoiceNo").alias("count"), 
        countDistinct("InvoiceNo").alias("countDistinct")
    )\
    .withColumn("cardinality", col("count") / col("countDistinct"))\
    .withColumn("entropy", col("countdistinct") / col("count"))\
    .show(vertical=True)

-RECORD 0-----------------------------
 count         | 541909               
 countDistinct | 25900                
 cardinality   | 20.923127413127414   
 entropy       | 0.047794002314041656 



Ok, this column has a much lower cardinality but stii it is not a unique key. The reason is, that the retail dataset is denormalized and the granularity is not based on invoices but on stock items. Since each invoice can list multiple stock items, I need to combine InviceNo and StockCode to get a unique key. Let's check, if this solves my problem.

In [47]:
keyedDf = preparedDf.select(
    concat_ws('-', "InvoiceNO","StockCode").alias("Key"),
    "InvoiceNO", 
    "InvoiceDate",
    "StockCode", 
    "Quantity", 
    "UnitPrice",
    "Amount",
    "CustomerID", 
    "Country")

keyedDf.show(10)

+-------------+---------+-------------------+---------+--------+---------+------+----------+--------------+
|          Key|InvoiceNO|        InvoiceDate|StockCode|Quantity|UnitPrice|Amount|CustomerID|       Country|
+-------------+---------+-------------------+---------+--------+---------+------+----------+--------------+
| 580538-23084|   580538|2011-12-05 08:38:00|    23084|      48|     1.79| 85.92|     14075|United Kingdom|
| 580538-23077|   580538|2011-12-05 08:38:00|    23077|      20|     1.25|  25.0|     14075|United Kingdom|
| 580538-22906|   580538|2011-12-05 08:38:00|    22906|      24|     1.65|  39.6|     14075|United Kingdom|
| 580538-21914|   580538|2011-12-05 08:38:00|    21914|      24|     1.25|  30.0|     14075|United Kingdom|
| 580538-22467|   580538|2011-12-05 08:38:00|    22467|       6|     2.55|  15.3|     14075|United Kingdom|
| 580538-21544|   580538|2011-12-05 08:38:00|    21544|      48|     0.85|  40.8|     14075|United Kingdom|
| 580538-23126|   580538|201

In [48]:
keyedDf\
    .select(
        count("Key").alias("count"), 
        countDistinct("Key").alias("countDistinct")
    )\
    .withColumn("cardinality", col("count") / col("countDistinct"))\
    .withColumn("entropy", col("countdistinct") / col("count"))\
    .show(vertical=True)

-RECORD 0---------------------------
 count         | 541909             
 countDistinct | 531225             
 cardinality   | 1.0201120052708363 
 entropy       | 0.9802845127133891 



Mh, I'm getting close but there are still some duplicates. Maybe there are some Null values in these columns. I would need ot investigate it furthr down, but I leave this for now because another phenomenon confuses me, there are two versions of counting in Spark:
* `DataFrame.count()`
* `pyspark.sql.functions.count()`

The `DataFrame.count()` method is always applied on the entire `DataFrame` and counts the total number of physical rows in it. Additionally this method is an action and not a transformation, because the count number is directy determined and returned. On the other hand `pyspark.sql.functions.count()` is an aggregation function counting non-Null values which is applied on grouped data defined by a grouping key `DataFrame.groupBy()`or a window function. Aggregation functions define lazly evaluated transformations. 
## Aggregating on Grouped Data
Aggregating on the entire DataFrame will show me just in row.

In [50]:
preparedDf\
    .select(
        sum("Amount").alias("sum"), 
        min("Amount").alias("min"), 
        max("Amount").alias("max"), 
        avg("Amount").alias("avg"), 
        mean("Amount").alias("mean")
    )\
    .show()

+-----------------+---------+--------+------------------+------------------+
|              sum|      min|     max|               avg|              mean|
+-----------------+---------+--------+------------------+------------------+
|9747747.929999745|-168469.6|168469.6|17.987794869617858|17.987794869617858|
+-----------------+---------+--------+------------------+------------------+



Such highly aggregated data does not provide me much business insight, so I want to see the results for each country. So the first thing I have to do is to define a grouping key to arrange the data to get one group for each country. Than I can aggregate on each group seperately.

In [53]:
preparedDf.groupBy("Country").count().show(10)

+------------------+-----+
|           Country|count|
+------------------+-----+
|            Sweden|  462|
|         Singapore|  229|
|           Germany| 9495|
|               RSA|   58|
|            France| 8557|
|            Greece|  146|
|European Community|   61|
|           Belgium| 2069|
|           Finland|  695|
|             Malta|  127|
+------------------+-----+
only showing top 10 rows



As I already know, rearranging data means, Spark is shuffling partitions around. The explain plan confirms this (*Exchange hashpartitioning*)

In [None]:
preparedDf.groupBy("Country").count().explain()

In [56]:
preparedDf\
    .groupBy("Country")\
    .agg(
        sum("Amount").alias("sum"), 
        min("Amount").alias("min"), 
        max("Amount").alias("max"), 
        avg("Amount").alias("avg"), 
        mean("Amount").alias("mean")
    )\
    .show(10)

+------------------+------------------+--------+-------+------------------+------------------+
|           Country|               sum|     min|    max|               avg|              mean|
+------------------+------------------+--------+-------+------------------+------------------+
|            Sweden|36595.909999999996| -1188.0| 1188.0|  79.2119264069264|  79.2119264069264|
|         Singapore|           9120.39|-3949.32|3949.32| 39.82703056768559| 39.82703056768559|
|           Germany|         221698.21|  -599.5|  876.0|23.348942601369142|23.348942601369142|
|               RSA|1002.3099999999998|     0.0|  38.25|17.281206896551723|17.281206896551723|
|            France|197403.90000000008|-8322.12|4161.06|   23.069288301975|   23.069288301975|
|            Greece|           4710.52|   -50.0|  175.2| 32.26383561643836| 32.26383561643836|
|European Community|1291.7500000000002|    -8.5|   60.0|21.176229508196727|21.176229508196727|
|           Belgium|          40910.96|  -19.95|  

And just to remind myself: I could do this all using my good-old SQL

In [61]:
preparedDf.createOrReplaceTempView("retailTable")

spark.sql("""
    SELECT 
        Country, sum(Amount) as sum, min(Amount) as min, max(Amount) as max, 
        avg(Amount) as avg, mean(Amount) as mean 
    FROM retailTable
    GROUP BY Country""").show(10)

+------------------+------------------+--------+-------+------------------+------------------+
|           Country|               sum|     min|    max|               avg|              mean|
+------------------+------------------+--------+-------+------------------+------------------+
|            Sweden|36595.909999999996| -1188.0| 1188.0|  79.2119264069264|  79.2119264069264|
|         Singapore|           9120.39|-3949.32|3949.32| 39.82703056768559| 39.82703056768559|
|           Germany|         221698.21|  -599.5|  876.0|23.348942601369142|23.348942601369142|
|               RSA|1002.3099999999998|     0.0|  38.25|17.281206896551723|17.281206896551723|
|            France|197403.90000000008|-8322.12|4161.06|   23.069288301975|   23.069288301975|
|            Greece|           4710.52|   -50.0|  175.2| 32.26383561643836| 32.26383561643836|
|European Community|1291.7500000000002|    -8.5|   60.0|21.176229508196727|21.176229508196727|
|           Belgium|          40910.96|  -19.95|  

## Aggregating on Floating Windows