In [31]:
%load_ext lab_black

In [32]:
import findspark

findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

spark = SparkSession.builder.master("local[1]").appName("Lidl").getOrCreate()

In [67]:
path = "./events.csv"
schema = StructType(
    [
        StructField("time", TimestampType()),
        StructField("action", StringType()),
    ]
)
df = spark.read.schema(schema).csv(path, header=True)

In [40]:
WINDOW_SIZE = 10
# 10 minute aggregation
ten_minutes = (
    df.groupBy(window("time", f"{WINDOW_SIZE} minute")).pivot("action").count()
)
ten_minutes.orderBy(asc("window")).show()

+--------------------+-----+----+
|              window|Close|Open|
+--------------------+-----+----+
|{2016-07-26 02:40...| null|  32|
|{2016-07-26 02:50...|   11| 147|
|{2016-07-26 03:00...|   19| 162|
|{2016-07-26 03:10...|   42| 169|
|{2016-07-26 03:20...|   44| 170|
|{2016-07-26 03:30...|   58| 157|
|{2016-07-26 03:40...|   84| 173|
|{2016-07-26 03:50...|   97| 170|
|{2016-07-26 04:00...|   96| 165|
|{2016-07-26 04:10...|  109| 164|
|{2016-07-26 04:20...|  137| 164|
|{2016-07-26 04:30...|  172| 167|
|{2016-07-26 04:40...|  142| 170|
|{2016-07-26 04:50...|  159| 169|
|{2016-07-26 05:00...|  170| 166|
|{2016-07-26 05:10...|  165| 169|
|{2016-07-26 05:20...|  179| 167|
|{2016-07-26 05:30...|  158| 153|
|{2016-07-26 05:40...|  181| 176|
|{2016-07-26 05:50...|  150| 169|
+--------------------+-----+----+
only showing top 20 rows



In [46]:
# 10 minute aggregation per minute
per_minute_aggregation = (
    ten_minutes.withColumn("Close/Minute", col("Close") / WINDOW_SIZE)
    .withColumn("Open/Minute", col("Open") / WINDOW_SIZE)
    .drop("Open", "Close")
)
per_minute_aggregation.show()

+--------------------+------------+-----------+
|              window|Close/Minute|Open/Minute|
+--------------------+------------+-----------+
|{2016-07-27 06:00...|        18.4|       17.9|
|{2016-07-26 05:40...|        18.1|       17.6|
|{2016-07-27 00:00...|        17.0|       18.4|
|{2016-07-27 10:30...|        15.6|       16.5|
|{2016-07-26 19:10...|        19.3|       16.8|
|{2016-07-28 04:30...|        14.9|       17.2|
|{2016-07-27 05:10...|        15.3|       16.9|
|{2016-07-27 01:10...|        16.8|       17.4|
|{2016-07-27 18:20...|        19.3|       16.9|
|{2016-07-26 23:30...|        13.8|       17.1|
|{2016-07-27 10:50...|        17.5|       16.7|
|{2016-07-26 08:10...|        14.4|       15.5|
|{2016-07-26 23:10...|        16.3|       17.1|
|{2016-07-27 12:30...|        17.9|       17.4|
|{2016-07-27 23:10...|        16.9|       16.6|
|{2016-07-27 16:20...|        15.3|       16.3|
|{2016-07-27 01:40...|        17.9|       15.8|
|{2016-07-27 08:30...|        18.2|     

In [56]:
# compute the average number of actions each 10 minutes.
close_average = ten_minutes.select(
    mean("Open"),
    mean("Close"),
).show()

+------------------+------------------+
|         avg(Open)|        avg(Close)|
+------------------+------------------+
|165.56291390728478|160.25641025641025|
+------------------+------------------+



In [57]:
# compute the top 10 minutes with a bigger amount of "open" action.
ten_minutes.withColumn("diff", col("Open") - col("Close")).orderBy(desc("diff")).show(
    10
)

+--------------------+-----+----+----+
|              window|Close|Open|diff|
+--------------------+-----+----+----+
|{2016-07-26 03:00...|   19| 162| 143|
|{2016-07-26 02:50...|   11| 147| 136|
|{2016-07-26 03:10...|   42| 169| 127|
|{2016-07-26 03:20...|   44| 170| 126|
|{2016-07-26 03:30...|   58| 157|  99|
|{2016-07-26 03:40...|   84| 173|  89|
|{2016-07-26 03:50...|   97| 170|  73|
|{2016-07-26 04:00...|   96| 165|  69|
|{2016-07-26 04:10...|  109| 164|  55|
|{2016-07-27 21:20...|  127| 176|  49|
+--------------------+-----+----+----+
only showing top 10 rows



### Unit Test Proposal

Using pytest/unitest packages it is trivial to generate unit tests for spark dataframes. 
Simple define some assertions about your data,for example, that action type is always a value in set("Open", "Close") 
and run an assert statement such as:

```python
assert set(["Open", "Close"]) == set(
    [x.action for x in df.select("action").distinct().collect()]
)

```

Another assertion could be related to the time values ensuring that timestamps are within an expected date-range.

### Integration Testing

Typically every transformation you apply to the spark df should have a unit test for the logic. But every ETL pipeline also includes
reading data from a datasource and writing data to a database. It is these "integration" points that need to be tested in 
integration tests. For example, what happens if the data is malformed in the source csv, how does your code handle these errors.

Step one is define a set of immuatable test data with known features and then simpley run the transformations against the known data against the expected outcome. For example, provide a the `spark.read.csv` method call a path to a csv file that has an invalid schema and expect to receive an error.

### Test Suite Implemntation

Generally speaking tests should be ran at the point of raising a PR. Nothing should merged into the main/master branch without first passing the test suite. Most modern git platforms, github/gitlab etc have CI/CD actions built into the platform. Dockerizing the spark application and running the tests as a pre-requisite to merge acceptance is the generally accepted method of running tests.


