In [1]:
from datetime import datetime
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [2]:
! ls ../data/

T1.csv


In [3]:
sc = SparkSession.builder.appName("Cheatsheet").getOrCreate()

In [4]:
custom_date_parser = lambda x: datetime.strptime(x, "%d %m %Y %H:%M")

# Reading CSV in pandas
df = pd.read_csv("../data/T1.csv",
                 parse_dates=["Date/Time"],
                 date_parser=custom_date_parser
                )

# Reading CSV in PySpark
spark_df = sc.read.options(
    header=True,
    inferSchema=True,
    timestampFormat="dd MM y HH:mm"
).csv("../data/T1.csv")

In [5]:
df.head()

Unnamed: 0,Date/Time,LV ActivePower (kW),Wind Speed (m/s),Theoretical_Power_Curve (KWh),Wind Direction (°)
0,2018-01-01 00:00:00,380.047791,5.311336,416.328908,259.994904
1,2018-01-01 00:10:00,453.769196,5.672167,519.917511,268.641113
2,2018-01-01 00:20:00,306.376587,5.216037,390.900016,272.564789
3,2018-01-01 00:30:00,419.645905,5.659674,516.127569,271.258087
4,2018-01-01 00:40:00,380.650696,5.577941,491.702972,265.674286


In [6]:
spark_df.show(5)

+-------------------+-------------------+----------------+-----------------------------+------------------+
|          Date/Time|LV ActivePower (kW)|Wind Speed (m/s)|Theoretical_Power_Curve (KWh)|Wind Direction (°)|
+-------------------+-------------------+----------------+-----------------------------+------------------+
|2018-01-01 00:00:00|   380.047790527343|5.31133604049682|             416.328907824861|  259.994903564453|
|2018-01-01 00:10:00|    453.76919555664|5.67216682434082|             519.917511061494|   268.64111328125|
|2018-01-01 00:20:00|   306.376586914062|5.21603679656982|             390.900015810951|  272.564788818359|
|2018-01-01 00:30:00|   419.645904541015|5.65967416763305|             516.127568975674|  271.258087158203|
|2018-01-01 00:40:00|   380.650695800781|5.57794094085693|             491.702971953588|  265.674285888671|
+-------------------+-------------------+----------------+-----------------------------+------------------+
only showing top 5 rows



## Resampling time series

**Note:** Renaming the columns to improve the productivity while writing code.

In [7]:
col_rename_dict = {
    "Date/Time": "timestamp",
    "LV ActivePower (kW)": "active_pwr",
    "Wind Speed (m/s)": "wind_speed",
    "Theoretical_Power_Curve (KWh)": "theoretical_pwr",
    "Wind Direction (°)": "wind_dir"
}

df = df.rename(columns=col_rename_dict)

for col_name, updated_name in col_rename_dict.items():
    spark_df = spark_df.withColumnRenamed(col_name, updated_name)

**Note:** In the following cells I'll be resampling the 10 min data to 1 hour

### Resampling in pandas

In [8]:
df_to_resample = df.set_index("timestamp")

df_resample = df_to_resample.resample("1H").agg(
    {
        "active_pwr": "sum",
        "wind_speed": "mean",
        "theoretical_pwr": "sum",
        "wind_dir": "mean"
    }
)

### Resampling in PySpark

In [9]:
agg_by_cols = [
    F.sum("active_pwr").alias("active_pwr"),
    F.mean("wind_speed").alias("wind_speed"),
    F.sum("theoretical_pwr").alias("theoretical_pwr"),
    F.mean("wind_dir").alias("wind_dir")
]

downsample_window = F.window("timestamp", "1 hour", startTime="30 minutes")

spark_resampled_df = spark_df.groupBy(downsample_window).agg(*agg_by_cols)
spark_resampled_df = spark_resampled_df.select(
    "window.start",
    F.round(F.col("active_pwr"), 6).alias("active_pwr"),
    F.round(F.col("wind_speed"), 6).alias("wind_speed"),
    F.round(F.col("theoretical_pwr"), 6).alias("theoretical_pwr"),
    F.round(F.col("wind_dir"), 6).alias("wind_dir")
)

spark_resampled_df = spark_resampled_df.sort("start")

In [10]:
df_resample.head()

Unnamed: 0_level_0,active_pwr,wind_speed,theoretical_pwr,wind_dir
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2018-01-01 00:00:00,2342.882172,5.506868,2834.413361,267.118632
2018-01-01 01:00:00,2763.222748,5.644205,3080.510451,258.945546
2018-01-01 02:00:00,4401.932251,6.452037,4730.40786,268.397466
2018-01-01 03:00:00,5456.172485,6.811455,5692.760077,256.014765
2018-01-01 04:00:00,8361.109131,7.748749,8438.350772,247.652468


In [11]:
spark_resampled_df.show(5)

+-------------------+-----------+----------+---------------+----------+
|              start| active_pwr|wind_speed|theoretical_pwr|  wind_dir|
+-------------------+-----------+----------+---------------+----------+
|2018-01-01 00:00:00|2342.882172|  5.506868|    2834.413361|267.118632|
|2018-01-01 01:00:00|2763.222748|  5.644205|    3080.510451|258.945546|
|2018-01-01 02:00:00|4401.932251|  6.452037|     4730.40786|268.397466|
|2018-01-01 03:00:00|5456.172485|  6.811455|    5692.760077|256.014765|
|2018-01-01 04:00:00|8361.109131|  7.748749|    8438.350772|247.652468|
+-------------------+-----------+----------+---------------+----------+
only showing top 5 rows



### Nerdy Tip

* The pre-requisite for resampling in **pandas** is that it requires the index to be of timestamp


* While in **PySpark** resampling requires a window to be created using the `window()` function. The `window()` requires the column to be of `TimestampType()`.