Analyze time series

In [None]:
!pip install -q pyspark findspark
!pip install -q hvplot

In [None]:
import findspark
findspark.init()

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import hvplot.pandas

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pyspark.sql.types as pys_types
from pyspark.sql import Window

In [None]:
drive_folder = '/MyDrive/Data/flow-co2/data/'
mount_folder = '/content/drive'
data_folder = mount_folder + drive_folder
print(data_folder)

import_csv = 'values.csv'
import_path = data_folder + import_csv
print(import_path)

In [None]:
from google.colab import drive
drive.mount(mount_folder)

In [None]:
spark = SparkSession.builder.appName("analyze-time-series").getOrCreate()

In [None]:
df = spark.read.csv(import_path, header=True, inferSchema=True)

In [None]:
# df.show()

In [None]:
# show dataframe columns type
# df.dtypes

In [None]:
# look for missing data
df_missing = df.filter(F.col("timestamp").isNull()| F.col("value").isNull()|(F.col("value")=="unknown")|(F.col("value")=="unavailable"))
df_missing.show()

In [None]:
# todo: fill with interpolation

In [None]:
df_valid = df.filter(F.col("timestamp").isNotNull()& F.col("value").isNotNull()&(F.col("value")!="unknown")&(F.col("value")!="unavailable"))
df_valid.show()

In [None]:
df = df_valid

In [None]:
# convert value to number
df = df.withColumn("value_double", F.col("value").cast(pys_types.DoubleType()))
df = df.drop("value")
df = df.withColumnRenamed("value_double", "value")

In [None]:
# converting to Pandas for exploratory data analysis
df_pd = df.toPandas()
df_pd["timestamp_local"] = df_pd["timestamp"].dt.tz_localize('UTC').dt.tz_convert('America/Toronto')
df_pd["timestamp"] = df_pd["timestamp"].dt.floor(freq="min")
df_pd = df_pd.set_index("timestamp")


In [None]:
df_pd

In [None]:
dupl = df_pd.index.duplicated(keep='first')
dupl
count = (dupl == True).sum()
print(count)

In [None]:
df_pd = df_pd[~dupl]
df_pd

In [None]:
df_pd

In [None]:
df_pd_freq = df_pd.asfreq("min", method='pad')  # converts to specified frequency, pads missing values
df_pd_freq

In [None]:
df_pd_freq.index.isin(df_pd.index)

In [None]:
df_pd_freq[df_pd_freq.index.isin(df_pd.index)==False]

In [None]:
df_pd_padded = df_pd_freq[~df_pd_freq.index.isin(df_pd.index)]

In [None]:
df_pd_padded

In [None]:
df_pd_freq = df_pd_freq.sort_index()
df_pd_padded = df_pd_padded.sort_index()

In [None]:
# plot df_pd_freq and df_pd_padded on the same figure
#ax=None
fig, ax = plt.subplots()
df_pd.plot(ax=ax, y="value", label="Pd", subplots=False)
df_pd_padded.plot(ax=ax, y="value", label="Padded", subplots=False)
plt.show()

In [None]:
plot = df_pd.hvplot.scatter(label="Dataset Source") * df_pd_padded.hvplot.scatter(label="Dataset Padded")
plot

In [None]:
from statsmodels.tsa.seasonal import seasonal_decompose, STL
plt.rcParams["figure.figsize"] = [10, 5]
freqseason = 60*24
df_eda_decomposed = seasonal_decompose(df_pd["value"], model='additive', period = freqseason)

In [None]:
df_eda_decomposed.seasonal.hvplot()

In [None]:
# plot with hvplot
df_pd.hvplot()

In [None]:
# see percentiles
perc = df.agg(F.percentile("value", [0.25, 0.75])).collect()[0][0]
print(perc)

In [None]:
# order by timestamp
df = df.orderBy("timestamp")

# define lags
lag_list = [1, 2, 3, 5, 10, 20, 60]

for l in lag_list:
    df = df.withColumn(f"lag_{l}", F.lag("value", l).over(Window.orderBy("timestamp")))

# rolling mean + std over last 15 min
w15 = Window.orderBy("timestamp").rowsBetween(-15, -1)
df = df.withColumn("roll15_mean", F.avg("value").over(w15))
df = df.withColumn("roll15_std", F.stddev("value").over(w15))

# time-of-day features
df = df.withColumn("minute", F.minute("timestamp"))
df = df.withColumn("hour", F.hour("timestamp"))
df = df.withColumn("dayofweek", F.dayofweek("timestamp"))

# target: next 10 minutes
for k in range(1, 11):
    df = df.withColumn(f"target_{k}", F.lead("value", k).over(Window.orderBy("timestamp")))

In [None]:
df.show()

In [None]:
df.type

In [None]:
df = df.dropna()

df.write.mode("overwrite").parquet("prepared_ts/")
