# Day 5 - Doing Some Math
[pyspark Doc](https://spark.apache.org/docs/2.4.5/api/python/index.html)

Doing some calculations is a common task for me when I'm writing ETL jobs, e.g. when amounts need to be aligned to the domestic currency (for me mostly EUR) or when I need to unify the scaling of numeric values. 

Applying math functions becomes even more impartant to me, when it comes to analytical queries and Key Performance Indicator calculation. So today, I want to have a closer look at the following pyspark sub-modules and classes: 
* `pyspar.sql.functions`
* `pyspark.sql.GroupedData`
* `pyspark.sql.DataFrameStatFunctions`

## Some Data Preperation

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession\
   .builder\
   .getOrCreate()

In [2]:
df = spark.read\
   .format("csv")\
   .option("header", "true")\
   .option("inferSchema", "true")\
   .load("./data/retail-data/by-day/*.csv")

In [3]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [4]:
df.show(10)

+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|        InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|   580538|    23084|  RABBIT NIGHT LIGHT|      48|2011-12-05 08:38:00|     1.79|   14075.0|United Kingdom|
|   580538|    23077| DOUGHNUT LIP GLOSS |      20|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22906|12 MESSAGE CARDS ...|      24|2011-12-05 08:38:00|     1.65|   14075.0|United Kingdom|
|   580538|    21914|BLUE HARMONICA IN...|      24|2011-12-05 08:38:00|     1.25|   14075.0|United Kingdom|
|   580538|    22467|   GUMBALL COAT RACK|       6|2011-12-05 08:38:00|     2.55|   14075.0|United Kingdom|
|   580538|    21544|SKULLS  WATER TRA...|      48|2011-12-05 08:38:00|     0.85|   14075.0|United Kingdom|
|   580538|    23126|FELTCRA

Looking at the derived schema and a small data sample shows, that Spark interpretes the Customer ID as as decimal numbers (double). Actually they are integers and so I therefore I want to get rid of the decimals. 

Rounding to zero decimals is not a good option, because the result is still a *double* having a decimal.

In [5]:
from pyspark.sql.functions import round

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice", 
          round("CustomerID",0), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+--------------------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|round(CustomerID, 0)|       Country|
+---------+---------+--------+---------+--------------------+--------------+
|   580538|    23084|      48|     1.79|             14075.0|United Kingdom|
|   580538|    23077|      20|     1.25|             14075.0|United Kingdom|
|   580538|    22906|      24|     1.65|             14075.0|United Kingdom|
|   580538|    21914|      24|     1.25|             14075.0|United Kingdom|
|   580538|    22467|       6|     2.55|             14075.0|United Kingdom|
|   580538|    21544|      48|     0.85|             14075.0|United Kingdom|
|   580538|    23126|       8|     4.95|             14075.0|United Kingdom|
|   580538|    21833|      24|     1.69|             14075.0|United Kingdom|
|   580539|    21479|       4|     4.25|             18180.0|United Kingdom|
|   580539|   84030E|       4|     4.25|             18180.0|United Kingdom|

Since this is more an data type issue, rather than a calculation problem, type casting is more appropriate here.

In [6]:
from pyspark.sql.functions import col 

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice", 
          col("CustomerID").cast("integer"), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+----------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|CustomerID|       Country|
+---------+---------+--------+---------+----------+--------------+
|   580538|    23084|      48|     1.79|     14075|United Kingdom|
|   580538|    23077|      20|     1.25|     14075|United Kingdom|
|   580538|    22906|      24|     1.65|     14075|United Kingdom|
|   580538|    21914|      24|     1.25|     14075|United Kingdom|
|   580538|    22467|       6|     2.55|     14075|United Kingdom|
|   580538|    21544|      48|     0.85|     14075|United Kingdom|
|   580538|    23126|       8|     4.95|     14075|United Kingdom|
|   580538|    21833|      24|     1.69|     14075|United Kingdom|
|   580539|    21479|       4|     4.25|     18180|United Kingdom|
|   580539|   84030E|       4|     4.25|     18180|United Kingdom|
+---------+---------+--------+---------+----------+--------------+
only showing top 10 rows



Next I want to calculate the amount for each invoice position.

In [7]:
from pyspark.sql.functions import col 

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice",
          (col("Quantity") * col("UnitPrice")).alias("Amount"),
          col("CustomerID").cast("integer"), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+------------------+----------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|            Amount|CustomerID|       Country|
+---------+---------+--------+---------+------------------+----------+--------------+
|   580538|    23084|      48|     1.79|             85.92|     14075|United Kingdom|
|   580538|    23077|      20|     1.25|              25.0|     14075|United Kingdom|
|   580538|    22906|      24|     1.65|39.599999999999994|     14075|United Kingdom|
|   580538|    21914|      24|     1.25|              30.0|     14075|United Kingdom|
|   580538|    22467|       6|     2.55|15.299999999999999|     14075|United Kingdom|
|   580538|    21544|      48|     0.85|              40.8|     14075|United Kingdom|
|   580538|    23126|       8|     4.95|              39.6|     14075|United Kingdom|
|   580538|    21833|      24|     1.69|             40.56|     14075|United Kingdom|
|   580539|    21479|       4|     4.25|              

Ok, I forgot to round the amount to two decimals.

In [8]:
from pyspark.sql.functions import col, round 

df.select("InvoiceNO", 
          "StockCode", 
          "Quantity", 
          "UnitPrice",
          round((col("Quantity") * col("UnitPrice")), 2).alias("Amount"),
          col("CustomerID").cast("integer"), 
          "Country")\
    .show(10)

+---------+---------+--------+---------+------+----------+--------------+
|InvoiceNO|StockCode|Quantity|UnitPrice|Amount|CustomerID|       Country|
+---------+---------+--------+---------+------+----------+--------------+
|   580538|    23084|      48|     1.79| 85.92|     14075|United Kingdom|
|   580538|    23077|      20|     1.25|  25.0|     14075|United Kingdom|
|   580538|    22906|      24|     1.65|  39.6|     14075|United Kingdom|
|   580538|    21914|      24|     1.25|  30.0|     14075|United Kingdom|
|   580538|    22467|       6|     2.55|  15.3|     14075|United Kingdom|
|   580538|    21544|      48|     0.85|  40.8|     14075|United Kingdom|
|   580538|    23126|       8|     4.95|  39.6|     14075|United Kingdom|
|   580538|    21833|      24|     1.69| 40.56|     14075|United Kingdom|
|   580539|    21479|       4|     4.25|  17.0|     18180|United Kingdom|
|   580539|   84030E|       4|     4.25|  17.0|     18180|United Kingdom|
+---------+---------+--------+--------

Now I save the intermediate results of my data preperation in a variable, to keep the further analytical queries more simple. By decomposing my query into a preperation part and an analytical part, I get the option to check, that the intermediat results are correct and so they a the appropriate foundation of my analysis.

In [9]:
preparedDf = df.select(
    "InvoiceNO", 
    "InvoiceDate",
    "StockCode", 
    "Quantity", 
    "UnitPrice",
    round((col("Quantity") * col("UnitPrice")), 2).alias("Amount"),
    col("CustomerID").cast("integer"), 
    "Country")

preparedDf.show(10)

+---------+-------------------+---------+--------+---------+------+----------+--------------+
|InvoiceNO|        InvoiceDate|StockCode|Quantity|UnitPrice|Amount|CustomerID|       Country|
+---------+-------------------+---------+--------+---------+------+----------+--------------+
|   580538|2011-12-05 08:38:00|    23084|      48|     1.79| 85.92|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    23077|      20|     1.25|  25.0|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    22906|      24|     1.65|  39.6|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    21914|      24|     1.25|  30.0|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    22467|       6|     2.55|  15.3|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    21544|      48|     0.85|  40.8|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    23126|       8|     4.95|  39.6|     14075|United Kingdom|
|   580538|2011-12-05 08:38:00|    21833|      24|     1.69|

## Aggregations
### Aggregating on DataFrames
First I want to apply several aggregation functions on the entire dataset in the `DataFrame` to do some data profiling. To make the output more readable, I switch `show()` to vertical output to get many rows instead of many columns.

In [10]:
from pyspark.sql.functions import *

preparedDf\
    .select(
        count("Amount").alias("count"), 
        countDistinct("Amount").alias("countDistinct"),
        approx_count_distinct("Amount", rsd=0.1).alias("approx_count_distinct"),
        sum("Amount").alias("sum"), 
        sumDistinct("Amount").alias("sumDistinct"),
        min("Amount").alias("min"), 
        max("Amount").alias("max"), 
        avg("Amount").alias("avg"), 
        mean("Amount").alias("mean"), 
        variance("Amount").alias("variance"), 
        var_samp("Amount").alias("var_samp"),
        var_pop("Amount").alias("var_pop"),
        stddev("Amount").alias("stddev"),
        kurtosis("Amount").alias("kurtosis"),
        skewness("Amount").alias("skewness"),
        first("Amount").alias("first"),
        last("Amount").alias("last")
    )\
    .show(vertical=True)

-RECORD 0------------------------------------
 count                 | 541909              
 countDistinct         | 5827                
 approx_count_distinct | 5417                
 sum                   | 9747747.929999992   
 sumDistinct           | 863586.9200000005   
 min                   | -168469.6           
 max                   | 168469.6            
 avg                   | 17.987794869618316  
 mean                  | 17.987794869618316  
 variance              | 143497.64000554013  
 var_samp              | 143497.64000554013  
 var_pop               | 143497.37520528768  
 stddev                | 378.81082350632505  
 kurtosis              | 151196.60137753483  
 skewness              | -0.9643865070858197 
 first                 | 3.26                
 last                  | 176.48              



There are further aggregating I don't have a use case in my example here, functions like:
* **corr(col1, col2)** - returns a new Column for the Pearson Correlation Coefficient for col1 and col2
* **covar_pop(col1, col2)** - returns a new Column for the population covariance of col1 and col2
* **covar_samp(col1, col2)** - Returns a new Column for the sample covariance of col1 and col2

Most of the aggregation function names are self-explaining, so nothing to comment on. Just first() and last() are a bit special. In contrast to most of the other aggregation functions, `first()` and `last()` both refer to the value **position** in the dataset and not to the value **amount**, like `min()` and `max()` do. So `first()` and `last()` are the only aggregation functions, being affected by data sorting.

Back to data profiling. The ratio between count and countDistinct is an important indicator to identify key candidate columns. For primary keys, the ratio must be 1, i.e. countDistinct must equal the total count of values so it's cardinality must be also 1 to ensure uniqueness. 

Even though not beeing unique, columns with low cardinality are still candidates for performant data acess patterns. The cardinality is a measure of the average number of rows I will get when filtering in such a column value.

The reverse value of cardinality, the entropy, is an indicator how compressible a column is. `DataFrames` having many columns with low entropy benefit much from a columnar storage format. On the other extreme, primary key columns are not compressible at all.

In [11]:
preparedDf\
    .select(
        count("Amount").alias("count"), 
        countDistinct("Amount").alias("countDistinct")
    )\
    .withColumn("cardinality", col("count") / col("countDistinct"))\
    .withColumn("entropy", col("countdistinct") / col("count"))\
    .show(vertical=True)

-RECORD 0-----------------------------
 count         | 541909               
 countDistinct | 5827                 
 cardinality   | 92.99965677020765    
 entropy       | 0.010752727856522036 



Not suprisingly the amount column is not a key candidate but it is very interesting, that the cardinality is quite high. Maybe there are only a few standard unit prices and/or lot sizes I can put orders on. Let's compare it with the *InvoiceNO* column. The column name sounds like a key.

In [12]:
preparedDf\
    .select(
        count("InvoiceNo").alias("count"), 
        countDistinct("InvoiceNo").alias("countDistinct")
    )\
    .withColumn("cardinality", col("count") / col("countDistinct"))\
    .withColumn("entropy", col("countdistinct") / col("count"))\
    .show(vertical=True)

-RECORD 0-----------------------------
 count         | 541909               
 countDistinct | 25900                
 cardinality   | 20.923127413127414   
 entropy       | 0.047794002314041656 



Ok, this column has a much lower cardinality but stii it is not a unique key. The reason is, that the retail dataset is denormalized and the granularity is not based on invoices but on stock items. Since each invoice can list multiple stock items, I need to combine InviceNo and StockCode to get a unique key. Let's check, if this solves my problem.

In [13]:
keyedDf = preparedDf.select(
    concat_ws('-', "InvoiceNO","StockCode").alias("Key"),
    "InvoiceNO", 
    "InvoiceDate",
    "StockCode", 
    "Quantity", 
    "UnitPrice",
    "Amount",
    "CustomerID", 
    "Country")

keyedDf.show(10)

+-------------+---------+-------------------+---------+--------+---------+------+----------+--------------+
|          Key|InvoiceNO|        InvoiceDate|StockCode|Quantity|UnitPrice|Amount|CustomerID|       Country|
+-------------+---------+-------------------+---------+--------+---------+------+----------+--------------+
| 580538-23084|   580538|2011-12-05 08:38:00|    23084|      48|     1.79| 85.92|     14075|United Kingdom|
| 580538-23077|   580538|2011-12-05 08:38:00|    23077|      20|     1.25|  25.0|     14075|United Kingdom|
| 580538-22906|   580538|2011-12-05 08:38:00|    22906|      24|     1.65|  39.6|     14075|United Kingdom|
| 580538-21914|   580538|2011-12-05 08:38:00|    21914|      24|     1.25|  30.0|     14075|United Kingdom|
| 580538-22467|   580538|2011-12-05 08:38:00|    22467|       6|     2.55|  15.3|     14075|United Kingdom|
| 580538-21544|   580538|2011-12-05 08:38:00|    21544|      48|     0.85|  40.8|     14075|United Kingdom|
| 580538-23126|   580538|201

In [14]:
keyedDf\
    .select(
        count("Key").alias("count"), 
        countDistinct("Key").alias("countDistinct")
    )\
    .withColumn("cardinality", col("count") / col("countDistinct"))\
    .withColumn("entropy", col("countdistinct") / col("count"))\
    .show(vertical=True)

-RECORD 0---------------------------
 count         | 541909             
 countDistinct | 531225             
 cardinality   | 1.0201120052708363 
 entropy       | 0.9802845127133891 



Mh, I'm getting close but there are still some duplicates. Maybe there are some Null values in these columns. I would need ot investigate it furthr down, but I leave this for now because another phenomenon confuses me, there are two versions of counting in Spark:
* `DataFrame.count()`
* `pyspark.sql.functions.count()`

The `DataFrame.count()` method is always applied on the entire `DataFrame` and counts the total number of physical rows in it. Additionally this method is an action and not a transformation, because the count number is directy determined and returned. On the other hand `pyspark.sql.functions.count()` is an aggregation function counting non-Null values which is applied on grouped data defined by a grouping key `DataFrame.groupBy()`or a window function. Aggregation functions define lazly evaluated transformations. 
## Aggregating on Grouped Data
Aggregating on the entire DataFrame will show me just in row.

In [15]:
preparedDf\
    .select(
        sum("Amount").alias("sum"), 
        min("Amount").alias("min"), 
        max("Amount").alias("max"), 
        avg("Amount").alias("avg"), 
        mean("Amount").alias("mean")
    )\
    .show()

+-----------------+---------+--------+------------------+------------------+
|              sum|      min|     max|               avg|              mean|
+-----------------+---------+--------+------------------+------------------+
|9747747.929999745|-168469.6|168469.6|17.987794869617858|17.987794869617858|
+-----------------+---------+--------+------------------+------------------+



Such highly aggregated data does not provide me much business insight, so I want to see the results for each country. So the first thing I have to do is to define a grouping key to arrange the data to get one group for each country. Than I can aggregate on each group seperately.

In [16]:
preparedDf.groupBy("Country").count().show(10)

+------------------+-----+
|           Country|count|
+------------------+-----+
|            Sweden|  462|
|         Singapore|  229|
|           Germany| 9495|
|               RSA|   58|
|            France| 8557|
|            Greece|  146|
|European Community|   61|
|           Belgium| 2069|
|           Finland|  695|
|             Malta|  127|
+------------------+-----+
only showing top 10 rows



As I already know, rearranging data means, Spark is shuffling partitions around. The explain plan confirms this (*Exchange hashpartitioning*)

In [17]:
preparedDf.groupBy("Country").count().explain()

== Physical Plan ==
*(2) HashAggregate(keys=[Country#17], functions=[count(1)])
+- Exchange hashpartitioning(Country#17, 200)
   +- *(1) HashAggregate(keys=[Country#17], functions=[partial_count(1)])
      +- *(1) FileScan csv [Country#17] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/home/oli/github/pyspark-tutorial/data/day-005/retail-data/by-day/2011-09-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Country:string>


In [18]:
preparedDf\
    .groupBy("Country")\
    .agg(
        sum("Amount").alias("sum"), 
        min("Amount").alias("min"), 
        max("Amount").alias("max"), 
        avg("Amount").alias("avg"), 
        mean("Amount").alias("mean")
    )\
    .show(10)

+------------------+------------------+--------+-------+------------------+------------------+
|           Country|               sum|     min|    max|               avg|              mean|
+------------------+------------------+--------+-------+------------------+------------------+
|            Sweden|36595.909999999996| -1188.0| 1188.0|  79.2119264069264|  79.2119264069264|
|         Singapore|           9120.39|-3949.32|3949.32| 39.82703056768559| 39.82703056768559|
|           Germany|         221698.21|  -599.5|  876.0|23.348942601369142|23.348942601369142|
|               RSA|1002.3099999999998|     0.0|  38.25|17.281206896551723|17.281206896551723|
|            France|197403.90000000008|-8322.12|4161.06|   23.069288301975|   23.069288301975|
|            Greece|           4710.52|   -50.0|  175.2| 32.26383561643836| 32.26383561643836|
|European Community|1291.7500000000002|    -8.5|   60.0|21.176229508196727|21.176229508196727|
|           Belgium|          40910.96|  -19.95|  

And just to remind myself: I could do this all using my good-old SQL. (ref. <a href="https://docs.databricks.com/spark/latest/spark-sql/language-manual/index.html">SQL Language Reference</a> provided by Databricks)

In [19]:
preparedDf.createOrReplaceTempView("retailTable")

spark.sql("""
    SELECT 
        Country, sum(Amount) as sum, min(Amount) as min, max(Amount) as max, 
        avg(Amount) as avg, mean(Amount) as mean 
    FROM retailTable
    GROUP BY Country""").show(10)

+------------------+------------------+--------+-------+------------------+------------------+
|           Country|               sum|     min|    max|               avg|              mean|
+------------------+------------------+--------+-------+------------------+------------------+
|            Sweden|36595.909999999996| -1188.0| 1188.0|  79.2119264069264|  79.2119264069264|
|         Singapore|           9120.39|-3949.32|3949.32| 39.82703056768559| 39.82703056768559|
|           Germany|         221698.21|  -599.5|  876.0|23.348942601369142|23.348942601369142|
|               RSA|1002.3099999999998|     0.0|  38.25|17.281206896551723|17.281206896551723|
|            France|197403.90000000008|-8322.12|4161.06|   23.069288301975|   23.069288301975|
|            Greece|           4710.52|   -50.0|  175.2| 32.26383561643836| 32.26383561643836|
|European Community|1291.7500000000002|    -8.5|   60.0|21.176229508196727|21.176229508196727|
|           Belgium|          40910.96|  -19.95|  

## Aggregating on Floating Windows

Having the average amount per country over the entire data set history of aprox. one year is still very high-level. I would like to make a time-series analysis on the invoice mounts. So the first thing I have to do is to include the *InvoiceDate* column into the grouping key.

In [95]:
preparedDf\
    .groupBy("Country", "InvoiceDate")\
    .agg( 
        avg("Amount").alias("avg")
    )\
    .orderBy("Country")\
    .show(10)

+---------+-------------------+-------------------+
|  Country|        InvoiceDate|                avg|
+---------+-------------------+-------------------+
|Australia|2011-05-20 14:13:00|             191.59|
|Australia|2011-07-26 10:15:00|             -38.16|
|Australia|2011-09-28 14:26:00|               37.5|
|Australia|2011-07-19 12:26:00|-2.7833333333333328|
|Australia|2011-07-13 15:30:00| 124.32545454545456|
|Australia|2011-09-28 15:41:00|              20.86|
|Australia|2011-05-23 09:14:00| 27.769999999999996|
|Australia|2011-05-31 11:29:00|               -9.9|
|Australia|2011-09-05 09:48:00|            18.4625|
|Australia|2011-07-19 10:51:00| 2.7833333333333345|
+---------+-------------------+-------------------+
only showing top 10 rows



Since *InvoiceDate* is actually a timestamp, now I have to much details because approximaly each row is representig one invoice. To reduce the noice in my time series due to volatile invoice amounts, I would like to calculate rolling 7-day average amounts. Grouping the data does not help me here because this would assign each row to ecactly one group. Now each row should be member of seven overlapping time windows each spanning over seven days.

So first, I have to define how to generate 7-day time windows. Than I can aggregate, averaging in particular, the amounts in each window. 

Ok, how can I define sliding windows? Maybe the sub-module `pyspark.sql.window` can help here?
To get sliding time windows for each country, I have to partition the data by that column. Next I have to order the data along the date because the window boundaries are based on it. Finally I define the bounderies based on the date **values** relative to the date of the current row. Since there can be more than one row per day I cannot use rowsBetween() which is **position** based. Instead `rangeBetween()` seems more suitable here.

In [21]:
from pyspark.sql.window import Window

sevenDayWindows = Window\
    .partitionBy("Country")\
    .orderBy("InvoiceDate")\
    .rangeBetween(-7, Window.currentRow)

Now I can apply my `avg()` aggregation on each timewindow.

In [22]:
preparedDf.select(
    "Country",
    "InvoiceDate",
    avg("Amount").over(sevenDayWindows).alias("7-day-avg")
).show(10)

AnalysisException: "cannot resolve '(PARTITION BY `Country` ORDER BY `InvoiceDate` ASC NULLS FIRST RANGE BETWEEN -7L FOLLOWING AND CURRENT ROW)' due to data type mismatch: The data type 'timestamp' used in the order specification does not match the data type 'bigint' which is used in the range frame.;;\n'Project [Country#17, InvoiceDate#14, avg(Amount#197) windowspecdefinition(Country#17, InvoiceDate#14 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, -7, currentrow$())) AS 7-day-avg#22450]\n+- Project [InvoiceNO#10, InvoiceDate#14, StockCode#11, Quantity#13, UnitPrice#15, round((cast(Quantity#13 as double) * UnitPrice#15), 2) AS Amount#197, cast(CustomerID#16 as int) AS CustomerID#198, Country#17]\n   +- Relation[InvoiceNo#10,StockCode#11,Description#12,Quantity#13,InvoiceDate#14,UnitPrice#15,CustomerID#16,Country#17] csv\n"

Ups, what' wrong now? The trace strack states:
> data type mismatch: The data type 'date' used in the order specification does not match the data type 'bigint' which is used in the range frame

Ok, `Window.rangeBetween()` is obviously only suitable for integer ranges but not for date ranges. Fortunately there is another `window()` function available from sub-module `pyspark.sql.function`. The pyspark  <a href="https://spark.apache.org/docs/2.4.5/api/python/pyspark.sql.html#module-pyspark.sql.functions">documentation</a> says:
> pyspark.sql.functions.window(*timeColumn, windowDuration, slideDuration=None, startTime=None*)

> Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive

I want to have 7-day windows sliding every day.

In [23]:
preparedDf\
    .groupBy(
        "Country", 
        window(timeColumn="InvoiceDate", windowDuration="7 days", slideDuration="1 day").alias("7-day windows")
    )\
    .agg(avg("Amount").alias("avg"))\
    .orderBy("Country", "7-day windows")\
    .show(20, truncate=False)

+---------+------------------------------------------+------------------+
|Country  |7-day windows                             |avg               |
+---------+------------------------------------------+------------------+
|Australia|[2010-11-25 01:00:00, 2010-12-02 01:00:00]|25.589285714285715|
|Australia|[2010-11-26 01:00:00, 2010-12-03 01:00:00]|25.589285714285715|
|Australia|[2010-11-27 01:00:00, 2010-12-04 01:00:00]|25.589285714285715|
|Australia|[2010-11-28 01:00:00, 2010-12-05 01:00:00]|25.589285714285715|
|Australia|[2010-11-29 01:00:00, 2010-12-06 01:00:00]|25.589285714285715|
|Australia|[2010-11-30 01:00:00, 2010-12-07 01:00:00]|25.589285714285715|
|Australia|[2010-12-01 01:00:00, 2010-12-08 01:00:00]|25.589285714285715|
|Australia|[2010-12-02 01:00:00, 2010-12-09 01:00:00]|32.362500000000004|
|Australia|[2010-12-03 01:00:00, 2010-12-10 01:00:00]|32.362500000000004|
|Australia|[2010-12-04 01:00:00, 2010-12-11 01:00:00]|32.362500000000004|
|Australia|[2010-12-05 01:00:00, 2010-

## Aggregating on Group Sets
Now I want to drill down and analyse the biggest amounts on levels starting on country level.

In [84]:
preparedDf\
    .where("StockCode = 22423 or StockCode = 22086")\
    .groupBy("Country")\
    .agg(sum("Amount").alias("sum_amount"))\
    .select("Country", round("sum_amount", 2).alias("Total Amount"))\
    .orderBy("Country", desc("Total Amount"))\
    .show(10)

+---------------+------------+
|        Country|Total Amount|
+---------------+------------+
|      Australia|      2090.3|
|        Austria|        51.0|
|        Bahrain|        25.5|
|        Belgium|      599.25|
|         Brazil|       175.2|
|         Canada|       12.75|
|Channel Islands|       624.0|
|         Cyprus|       382.5|
|           EIRE|     8524.35|
|        Finland|       38.25|
+---------------+------------+
only showing top 10 rows



To go further deeper on StockCode level, I just need to  add this column to the grouping key.

In [83]:
preparedDf\
    .where("StockCode = 22423 or StockCode = 22086")\
    .groupBy("Country", "StockCode")\
    .agg(sum("Amount").alias("sum_amount"))\
    .select("Country", "StockCode", round("sum_amount", 2).alias("Total Amount"))\
    .orderBy("Country", desc("Total Amount"))\
    .show(10)

+---------------+---------+------------+
|        Country|StockCode|Total Amount|
+---------------+---------+------------+
|      Australia|    22423|      1978.2|
|      Australia|    22086|       112.1|
|        Austria|    22423|        51.0|
|        Bahrain|    22423|        25.5|
|        Belgium|    22423|      599.25|
|         Brazil|    22423|       175.2|
|         Canada|    22423|       12.75|
|Channel Islands|    22423|       517.8|
|Channel Islands|    22086|       106.2|
|         Cyprus|    22423|       382.5|
+---------------+---------+------------+
only showing top 10 rows



By adding more and more columns, I'll get more granular figures, but I will see only the figures on the most detailed level. What should I do to get the sums on all higher levels as well?
### Rollup
All I need to do is just rolling up the detaild sums up to the top level. Actually I just need to change one word in the code, replacing `groupBy()` by `rollup()`.

In [82]:
preparedDf\
    .where("StockCode = 22423 or StockCode = 22086")\
    .rollup("Country", "StockCode")\
    .agg(sum("Amount").alias("sum_amount"))\
    .select("Country", "StockCode", round("sum_amount", 2).alias("Total Amount"))\
    .orderBy("Country", desc("Total Amount"))\
    .show(10)

+---------+---------+------------+
|  Country|StockCode|Total Amount|
+---------+---------+------------+
|     null|     null|   228554.13|
|Australia|     null|      2090.3|
|Australia|    22423|      1978.2|
|Australia|    22086|       112.1|
|  Austria|     null|        51.0|
|  Austria|    22423|        51.0|
|  Bahrain|    22423|        25.5|
|  Bahrain|     null|        25.5|
|  Belgium|     null|      599.25|
|  Belgium|    22423|      599.25|
+---------+---------+------------+
only showing top 10 rows



### Cube
Rolling up aggregations is strongly hierarchical. Now I have the total by country and the sub-total per StockCode per country. But I need the total per StockCode across all countries as well. More generla, I( want to slice and dice the data along each dimension independently from other dimensions, like I can do in relational star schemas.

Again, there is only one little piece to tbe changed in the code: using `cube()` instead of `rollup()`.

In [93]:
preparedDf\
    .where("StockCode = 22423 or StockCode = 22086")\
    .where("Country = 'Australia' or Country = 'Germany'")\
    .cube("Country", "StockCode")\
    .agg(sum("Amount").alias("sum_amount"))\
    .select("Country", "StockCode", round("sum_amount", 2).alias("Total Amount"))\
    .orderBy("Country", desc("Total Amount"))\
    .show(10)

+---------+---------+------------+
|  Country|StockCode|Total Amount|
+---------+---------+------------+
|     null|     null|    10436.15|
|     null|    22423|    10235.55|
|     null|    22086|       200.6|
|Australia|     null|      2090.3|
|Australia|    22423|      1978.2|
|Australia|    22086|       112.1|
|  Germany|     null|     8345.85|
|  Germany|    22423|     8257.35|
|  Germany|    22086|        88.5|
+---------+---------+------------+



Now, the second row shows the total for StockCode 22423 across all countries (Country is null ) whereas the fourth row shows the total across StockCodes (StockCode is null) for Australia.

Another nice feature is `grouping_id()` which is a special aggregation function. Starting with 0 at the lowest level, the group_id provides the aggregation level in ascending order so I could easily filter on specific detail levels. 

In [92]:
preparedDf\
    .where("StockCode = 22423 or StockCode = 22086")\
    .where("Country = 'Australia' or Country = 'Germany'")\
    .cube("Country", "StockCode")\
    .agg(sum("Amount").alias("sum_amount"), grouping_id().alias("Level"))\
    .select("Country", "StockCode", round("sum_amount", 2).alias("Total Amount"), "Level")\
    .orderBy("Country", desc("Total Amount"))\
    .show(10)

+---------+---------+------------+-----+
|  Country|StockCode|Total Amount|Level|
+---------+---------+------------+-----+
|     null|     null|    10436.15|    3|
|     null|    22423|    10235.55|    2|
|     null|    22086|       200.6|    2|
|Australia|     null|      2090.3|    1|
|Australia|    22423|      1978.2|    0|
|Australia|    22086|       112.1|    0|
|  Germany|     null|     8345.85|    1|
|  Germany|    22423|     8257.35|    0|
|  Germany|    22086|        88.5|    0|
+---------+---------+------------+-----+



In my example the aggregation hierarchy looks like this:

3: overall total

2: StockCode total

1: Country total

0: StockCode & Country total

Ok, why has StockCode a higher aggregation level than Country? The answer is, they are numbered according to their order in `cube("Country", "StockCode")`, so Country comes first.

### Pivot

Finally I want to analyse the data for two countries, Germany and Australia, across all StockCode, which can be quite a lot, and I want to compare the German with the Australian figures. Therefore I want to have the totaly by country lined-up in columns. By calling the `pivot()` function, I can flip the country totals from rows to columns.

In [104]:
preparedDf\
    .where("Country = 'Australia' or Country = 'Germany'")\
    .groupBy("StockCode")\
    .pivot("Country")\
    .sum("Amount")\
    .orderBy("StockCode")\
    .show(19)

+---------+---------+------------------+
|StockCode|Australia|           Germany|
+---------+---------+------------------+
|    10002|     null|              0.85|
|    10125|     null|              84.8|
|    10135|     null|             212.0|
|    11001|     null|             54.08|
|    15034|     null|              3.36|
|    15036|    432.0| 853.3200000000002|
|    15039|     null|              0.85|
|   15044A|     null|              88.5|
|   15044B|     null|              35.4|
|   15044D|     null|53.099999999999994|
|  15056BL|    17.85|           1100.25|
|   15056N|     null|            695.65|
|   15056P|     null| 564.7500000000001|
|   15058A|     null|              15.9|
|   15058B|     null|              15.9|
|   15058C|     null|              39.5|
|   15060B|     null|              90.0|
|    16008|     null|              8.64|
|    16011|     null|             20.16|
+---------+---------+------------------+
only showing top 19 rows



## Further Statistical Functions on DataFrames
`DataFrame` objects have also some statistical methods. In contrast to the aggregation functions, they always apply on the entire DataFrame data set and cannot be applied on data groups whithin a DataFrame.
* **approxQuantile(col, probabilities, relativeError)**
* **corr()**
* **count()**
* **cov()**
* **crosstab()**
* **describe()** alias **summary()**

The `describe()` method is quite interesting because it computes basic statistics (count, mean, stddev, min, and max) for numeric and string columns eventhough ean, stddev, min, and max are not available as `DataFrame` object methods.

In [111]:
preparedDf.describe("Amount").show()

+-------+------------------+
|summary|            Amount|
+-------+------------------+
|  count|            541909|
|   mean|17.987794869617858|
| stddev|378.81082350632494|
|    min|         -168469.6|
|    max|          168469.6|
+-------+------------------+



## DataFrameStatFunctions

Each `DataFrame` object has an `na` property referencing a `DataFrameStatFunctions` object which provides statisical object methods. Some of them are just aliases of `DataFrame` object methods, e.g.:

* df.**corr()** alias df.**na.corr()** - calculates the Pearson Correlation Coefficient of two columns of a DataFrame as a double value
* df.**cov()** alias df.**na.cov()** - calculate the sample covariance for the given columns, specified by their names, as a double value
* df.**crosstab()** alias df.**na.crosstab()** - computes a pair-wise frequency table of the given columns. Also known as a contingency table

I admit, I've no clue about the reason for this duplication. 