## Instantiate Spark Session


In [17]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .appName("process_bronze") \
        .config("spark.sql.shuffle.partitions", "1") \
        .config("spark.executor.cores", "2") \
        .config("spark.executor.memory", "1g")\
        .config("spark.executor.instances", "1")\
        .config("spark.driver.memory", "700m")\
        .config("spark.sql.session.timeZone", "UTC")\
        .getOrCreate()

## Variables and Imports
Here comes some common variables since we wold not like to repeat ourselves.

In [18]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType, BooleanType
from pyspark.sql.functions import length, trim, coalesce, lit, udf, count, col, row_number, date_format, avg
from pyspark.sql.window import Window
data_path = "/home/jovyan/data/bronze/demo/*/*.csv"

In [19]:
from scripts.udfs import is_null_or_empty, is_not_null_nor_empty, trim_and_lower_case

is_null_or_empty_udf = udf(is_null_or_empty, BooleanType())  
is_not_null_nor_empty_udf = udf(is_not_null_nor_empty, BooleanType())
trim_and_lower_case_udf = udf(trim_and_lower_case, StringType())

# Task 1: Familiarize yourself with the data

## Describe Data
Let's see what the data consists of to get a better understanding of schema, null values, nullable columns etc. 

In [20]:
spark.read.option("header", True).csv(data_path).describe().show()

+-------+--------------------+-------------------+-------------------+----------+--------------------+---------------+------------------+--------------------+
|summary|            DeviceID|       DataSourceID|             SiteID|DeviceType|           Timestamp|         Metric|             Value| ProcessingTimestamp|
+-------+--------------------+-------------------+-------------------+----------+--------------------+---------------+------------------+--------------------+
|  count|              797977|             797977|             797977|    797977|              797977|         797977|            797977|              797977|
|   mean| 2.178606038862169E9|2.654070997002039E9|3.343580468943778E9|      NULL|                NULL|           NULL|  25109.5576413759|                NULL|
| stddev|1.1426866835683913E9|1.197406648356499E9|1.636059864016143E9|      NULL|                NULL|           NULL|339067.17024964694|                NULL|
|    min|          1003166906|         1020791

## Read the Data
Let's enforce schema instead of infering it. 
- Clearly DeviceId, DataSourceID, SiteID must be strings because we won't need any arithmetical operations on them. These columns ideally must not be null, at least DeviceID
- DeviceType and Metric are also need to be string. They need to be checked for null values
- Timestamp and ProcessingTimestamp are TimestampType, also need to be checked for null values
- Value must be double

In [21]:
data_schema = StructType([
        StructField("DeviceID", StringType(), False),
        StructField("DataSourceID", StringType(), False),
        StructField("SiteID", StringType(), False),
        StructField("DeviceType", StringType(), True),
        StructField("Timestamp", TimestampType(), True),
        StructField("Metric", StringType(),True),
        StructField("Value", DoubleType(), True),
        StructField("ProcessingTimestamp", TimestampType(), True)
])
raw_df = spark.read.option("header", True).schema(data_schema).csv(data_path).cache()

In [22]:
raw_df.show(5, False)

+----------+------------+----------+----------+-------------------+------------+-----+-----------------------+
|DeviceID  |DataSourceID|SiteID    |DeviceType|Timestamp          |Metric      |Value|ProcessingTimestamp    |
+----------+------------+----------+----------+-------------------+------------+-----+-----------------------+
|3137912842|843004523   |4268260543|Sensor    |2021-11-06 23:00:48|digital_in_1|1.0  |2021-11-07 06:38:56.654|
|3137912842|843004523   |4268260543|Sensor    |2021-11-06 23:00:48|digital_in_2|1.0  |2021-11-07 06:38:56.654|
|3137912842|843004523   |4268260543|Sensor    |2021-11-06 23:00:48|digital_in_3|0.0  |2021-11-07 06:38:56.654|
|3137912842|843004523   |4268260543|Sensor    |2021-11-06 23:00:48|digital_in_4|1.0  |2021-11-07 06:38:56.654|
|3137912842|843004523   |4268260543|Sensor    |2021-11-06 23:00:48|digital_in_5|0.0  |2021-11-07 06:38:56.654|
+----------+------------+----------+----------+-------------------+------------+-----+-----------------------+
o

# Task 2: Clean the data

## Rename Columns
Rename columns for redeability purposes

In [23]:
df = raw_df.select(col("DeviceID").alias("device_id"), 
                   col("DataSourceID").alias("data_source_id"), 
                   col("SiteID").alias("site_id"), 
                   col("DeviceType").alias("device_type"),
                   col("Timestamp").alias("event_timestamp"), 
                   col("Metric").alias("metric"),
                   col("Value").alias("value"),
                   col("ProcessingTimestamp").alias("processing_timestamp")
                  )

## Data Quality Checks

Let's define some rules
- Q1: Keep events which don't have null or empty columns
- Q2: event_timestamp can not be greater then processing_timestamp. The data processed in the system is logically has happened in the past. So keep events that has correct timepstamps
- Q3: There should be only one value for a metric calculated at a certain time, otherwise it is a duplicate. We have to apply a window over dataframe to keep the event with last processing time in case duplicates

Apply this filters to clean raw data as parquet and keep the wrong records as csv for future corrections

### Implement Quality Checks

In [24]:
#Q1: not empty nor null values
from functools import reduce
filter_not_empty_nor_null_q1 = reduce(lambda a, b: a & b,[is_not_null_nor_empty_udf(col(c)) for c in df.columns])

#Q2: processing_timestamp must be greater or equal to (maybe differs by nanoseconds?) then event_timestamp .
filter_correct_date_q2 = col("processing_timestamp") >= col("event_timestamp")

#Q3: if there is a duplicate on columns "device_id", "data_source_id", "site_id", "device_type", "event_timestamp", "metric", "value" then keep the last processed measure
window_last_processed_q3 = Window.partitionBy("device_id", "data_source_id", "site_id", "device_type", "event_timestamp", "metric", "value").orderBy(col("processing_timestamp").desc())

### Write clean data 
Write clean data as parquet files in silver layer by partitioning device_id and processiong date. \
The data is saved in data/silver/demo \
Since we are in a single node cluster no cloasce needed to reduce small files

In [25]:
# This is the final dataframe we should work on it and save
clean_df = df.filter(filter_not_empty_nor_null_q1) \
        .filter(filter_correct_date_q2) \
        .withColumn("row_number", row_number().over(window_last_processed_q3)) \
        .filter(col("row_number") == 1) \
        .drop("row_number").cache()

clean_df.withColumn("Date", date_format(col("processing_timestamp"), "yyyy-MM-dd")) \
        .write.partitionBy("Date") \
        .mode("overwrite")\
        .parquet("/home/jovyan/data/silver/demo/")


### Save Corrupt results
Save corrupt records in csv format in case they are wanted to be corrected \
The data is saved in data/corrupt/demo

In [26]:
# This is the dataframe we should save in another location for further analysis and correction if needed. Better to save then sorry
corrupt_df = df.exceptAll(clean_df)
corrupt_df.withColumn("Date", date_format(col("processing_timestamp"), "yyyy-MM-dd")) \
    .write.partitionBy("Date") \
    .mode("overwrite") \
    .csv("/home/jovyan/data/corrupt/demo/")

### Task 6: Tests

In [27]:
# Do some checks:
assert clean_df.count() + corrupt_df.count() == raw_df.count(), "Total number of clean and corrupt events does not add up to total number of events"
assert clean_df.count() == clean_df.dropDuplicates().count(), "Cleaned dataframe has duplicates"
assert clean_df.filter(filter_not_empty_nor_null_q1).count() == clean_df.count(), "Cleaned dataframe has null or empty values"
assert clean_df.filter(filter_correct_date_q2).count() == clean_df.count(), "Cleaned dataframe has incorrect dates"

# Task 3: Compute the "Inverter AC Hourly Yield"

There are 4 measures done in an hour which makes sense since they are done each 15 mins, so there must be 4 data points with average 39206.5325
```python
clean_df.withColumn("event_hour", date_format(col("event_timestamp"), "yyyy-MM-dd HH:00")).filter((col("device_id") == 1054530426)
                & (trim_and_lower_case_udf(col("device_type")) == "inverter")
                & (trim_and_lower_case_udf(col("metric")) == "ac_active_power")
                & (col("event_hour") == "2021-11-04 11:00")).show()
+----------+--------------+----------+-----------+-------------------+---------------+--------+--------------------+----------------+
| device_id|data_source_id|   site_id|device_type|    event_timestamp|         metric|   value|processing_timestamp|      event_hour|
+----------+--------------+----------+-----------+-------------------+---------------+--------+--------------------+----------------+
|1054530426|     843004523|4268260543|   Inverter|2021-11-04 11:00:31|ac_active_power|55342.67|2021-11-04 12:41:...|2021-11-04 11:00|
|1054530426|     843004523|4268260543|   Inverter|2021-11-04 11:15:35|ac_active_power|35217.33|2021-11-04 12:41:...|2021-11-04 11:00|
|1054530426|     843004523|4268260543|   Inverter|2021-11-04 11:30:35|ac_active_power|25806.67|2021-11-04 12:41:...|2021-11-04 11:00|
|1054530426|     843004523|4268260543|   Inverter|2021-11-04 11:45:37|ac_active_power|40459.46|2021-11-04 12:41:...|2021-11-04 11:00|
+----------+--------------+----------+-----------+-------------------+---------------+--------+--------------------+----------------+
```

In [28]:
avg_df = clean_df.withColumn("event_hour", date_format(col("event_timestamp"), "yyyy-MM-dd HH:00"))\
        .groupBy("device_type", "metric", "device_id", "event_hour").agg(avg("value").alias("avg_value"), count("*").alias("num_points")).cache()
avg_inverter_df = avg_df.filter((col("device_id") == 1054530426) & (col("event_hour") == "2021-11-04 11:00")
              & (trim_and_lower_case_udf(col("device_type")) == "inverter") 
              & (trim_and_lower_case_udf(col("metric")) == "ac_active_power") )
avg_inverter_df.show()

+-----------+---------------+----------+----------------+----------+----------+
|device_type|         metric| device_id|      event_hour| avg_value|num_points|
+-----------+---------------+----------+----------------+----------+----------+
|   Inverter|ac_active_power|1054530426|2021-11-04 11:00|39206.5325|         4|
+-----------+---------------+----------+----------------+----------+----------+



In [29]:
assert avg_inverter_df.select("num_points").first()["num_points"] == 4, "number of data points does not match"
assert avg_inverter_df.select("avg_value").first()["avg_value"] == 39206.5325, "average value does not match"

# Task 4: Compute the "Satellite/Sensor Hourly Irradiance"

There are 4 measures done in an hour for Satellite, so there must be 4 data points with average 49.415. However, there are no sensor data for this device, not even in raw_df
```python
clean_df.withColumn("event_hour", date_format(col("event_timestamp"), "yyyy-MM-dd HH:00")).filter((col("device_id") == 3258837907)
                & (trim_and_lower_case_udf(col("metric")) == "irradiance")
                & (col("event_hour") == "2021-11-04 11:00")).show()
+----------+--------------+----------+-----------+-------------------+----------+-----+--------------------+----------------+
| device_id|data_source_id|   site_id|device_type|    event_timestamp|    metric|value|processing_timestamp|      event_hour|
+----------+--------------+----------+-----------+-------------------+----------+-----+--------------------+----------------+
|3258837907|    2805564098|4268260543|  Satellite|2021-11-04 11:00:00|irradiance| 69.6|2021-11-05 09:45:...|2021-11-04 11:00|
|3258837907|    2805564098|4268260543|  Satellite|2021-11-04 11:15:00|irradiance|31.11|2021-11-05 10:00:...|2021-11-04 11:00|
|3258837907|    2805564098|4268260543|  Satellite|2021-11-04 11:30:00|irradiance| 17.5|2021-11-05 10:15:...|2021-11-04 11:00|
|3258837907|    2805564098|4268260543|  Satellite|2021-11-04 11:45:00|irradiance|79.45|2021-11-05 10:30:...|2021-11-04 11:00|
+----------+--------------+----------+-----------+-------------------+----------+-----+--------------------+----------------+
```

In [30]:
avg_sat_sens_df = avg_df.filter((col("device_id") == 3258837907) & (col("event_hour") == "2021-11-04 11:00")
              & (trim_and_lower_case_udf(col("device_type")).isin("satellite", "sensor"))
              & (trim_and_lower_case_udf(col("metric")) == "irradiance") )
avg_sat_sens_df.show()

+-----------+----------+----------+----------------+---------+----------+
|device_type|    metric| device_id|      event_hour|avg_value|num_points|
+-----------+----------+----------+----------------+---------+----------+
|  Satellite|irradiance|3258837907|2021-11-04 11:00|   49.415|         4|
+-----------+----------+----------+----------------+---------+----------+



In [31]:
assert avg_sat_sens_df.select("num_points").first()["num_points"] == 4, "number of data points does not match"
assert avg_sat_sens_df.select("avg_value").first()["avg_value"] == 49.415, "average value does not match"

# Task 5: Store the result
I have used device_type and metric columns as partitions. Using date is not a good idea since 4 * 24 = 96 records for a device_id + metric + date is too small.
Usually the records will be queried by device_type and metric so partitioning by those two is a good idea.

In [32]:
avg_df.withColumn("device_type", trim(col("device_type")))\
    .withColumn("metric", trim(col("metric")))\
    .write.partitionBy("device_type", "metric").mode("overwrite").parquet("/home/jovyan/data/gold/demo/")