## Setup

In [0]:
%pip install ydata-profiling
%pip install --upgrade Pillow
%restart_python

## Ingestion

#### Read from source: online file

In [0]:
from pyspark import SparkFiles
import urllib

DATASET_FILE = "ts-spark_ch2_ds2.csv"
DATASET_URL = f"https://github.com/PacktPublishing/Time-Series-Analysis-with-Spark/raw/main/ch2/{DATASET_FILE}"

# Read from a csv file
print(f"Ingesting from: {DATASET_URL}")

# option 1 - using sparkContext
#spark.sparkContext.addFile(DATASET_URL)
#df = spark.read.csv("file:///" + SparkFiles.get(DATASET_FILE), header=True, sep=";", inferSchema=True)
# option 2 - using urllib
urllib.request.urlretrieve(DATASET_URL, f"/tmp/{DATASET_FILE}")
df = spark.read.csv(f"file:/tmp/{DATASET_FILE}", header=True, sep=";", inferSchema=True)
#

df.cache()
df.count()

print("Ingestion result:")
df.display()
print("Inferred schema:")
df.printSchema()

#### Data Prep

In [0]:
from pyspark.sql import functions as F

# Fix Time column
df = df.withColumn('Time', F.date_format('Time', 'HH:mm:ss'))
# Create timestamp column
df = df.withColumn('timestamp', F.concat(df.Date, F.lit(" "), df.Time))
df = df.withColumn('timestamp', F.to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))
# Fix data types
#df = df.dropna() \
df = df \
    .withColumn('Global_active_power', df.Global_active_power.cast('double')) \
    .withColumn('Global_reactive_power', df.Global_reactive_power.cast('double')) \
    .withColumn('Voltage', df.Voltage.cast('double')) \
    .withColumn('Global_intensity', df.Global_intensity.cast('double')) \
    .withColumn('Sub_metering_1', df.Sub_metering_1.cast('double')) \
    .withColumn('Sub_metering_2', df.Sub_metering_2.cast('double')) \
    .withColumn('Sub_metering_3', df.Sub_metering_3.cast('double'))  

print("Ingestion result - after initial prep:")
df.display()
print("Schema:")
df.printSchema()

## Statistical Summary and Visualisation

#### Summary Statistics

In [0]:
df.summary().display()

#### Data Profiling

In [0]:
from pyspark.sql import functions as F
from ydata_profiling import ProfileReport

# Fix Date, Time columns for ydata correlations calc
df = df.withColumn('Date', F.to_timestamp(df.Date, 'yyyy-MM-dd'))
df = df.withColumn('Time', F.to_timestamp(df.Time, 'HH:mm:ss'))

df.display()

pdf = df.toPandas()

# Perform data profiling
profile = ProfileReport(pdf,
                title='Time Series Data Profiling',
                tsmode=True,
                sortby='timestamp',
                infer_dtypes=False,
                interactions=None,
                missing_diagrams=None,
                correlations={"auto": {"calculate": False},
                              "pearson": {"calculate": True},
                              "spearman": {"calculate": True}})

# Save the profiling report to an HTML file
#profile.to_file("time_series_data_profiling_report.html")

# Show the profiling report in the notebook
report_html = profile.to_html()
displayHTML(report_html)

#### Gap Analysis

In [0]:
import numpy as np

pdf = df.toPandas()

# add some random gaps
length = pdf.shape[0]
to_drop = np.unique(np.sort(np.random.randint(0, length, size=3))).tolist()
pdf = pdf.drop(to_drop).reset_index(drop=True)

In [0]:
import pyspark.pandas as ps

# test for gaps
pdf['gap_val'] = pdf['timestamp'].sort_values().diff()
pdf['gap'] = pdf['gap_val'] > ps.to_timedelta('1 minute')
pdf[pdf.gap]

#### Distribution Analysis

In [0]:
from pyspark.sql import functions as F
import matplotlib.pyplot as plt
import seaborn as sns
import pyspark.pandas as ps

# Extract day and hour
df = df.withColumn("dayOfWeek", F.dayofweek(F.col("timestamp")))
df = df.withColumn("hour", F.hour(F.col("timestamp")))

# Show the data
df.display()

# Convert to Pandas DataFrame for plotting
pdf = df.toPandas()

# Distribution analysis using Seaborn and Matplotlib
plt.figure(figsize=(10, 6))
sns.histplot(pdf['Global_active_power'], kde=True, bins=30)
plt.title('Distribution of Global_active_power in Time Series Data')
plt.xlabel('Global_active_power')
plt.ylabel('Frequency')
plt.show()

# Boxplot to visualize the distribution per dayOfWeek
plt.figure(figsize=(10, 6))
sns.boxplot(x='dayOfWeek', y='Global_active_power', data=pdf)
plt.title('Daily Distribution of Global_active_power in Time Series Data')
plt.xlabel('DayOfWeek')
plt.ylabel('Global_active_power')
plt.show()

# Boxplot to visualize the distribution per hour
plt.figure(figsize=(10, 6))
sns.boxplot(x='hour', y='Global_active_power', data=pdf)
plt.title('Hourly Distribution of Global_active_power in Time Series Data')
plt.xlabel('Hour')
plt.ylabel('Global_active_power')
plt.show()

#### Visualisations

In [0]:
import plotly.express as px
df_ = df['timestamp', 'Global_active_power'].toPandas()
fig = px.line(df_, x=df_['timestamp'], y=df_['Global_active_power'])
fig.show()

## Trend, Seasonality and Stationarity Analysis

#### Resampling and Aggregation

In [0]:
import plotly.express as px

# Convert to Pandas DataFrame for resampling and aggregation
pdf = df['timestamp', 'Global_active_power'].toPandas()

# Set date as index
pdf.set_index('timestamp', inplace=True)

# Resample data to hourly, daily and weekly frequency and aggregate by mean
hourly_resampled = pdf.resample('h').mean()
hourly_resampled_s = pdf.resample('h').std()
daily_resampled = pdf.resample('d').mean()
daily_resampled_s = pdf.resample('d').std()
weekly_resampled = pdf.resample('w').mean()
weekly_resampled_s = pdf.resample('w').std()

In [0]:
df_ = hourly_resampled.reset_index()
fig = px.line(df_, x=df_['timestamp'], y=df_['Global_active_power'], title="Global_active_power - hourly mean")
fig.show()

In [0]:
df_ = hourly_resampled_s.reset_index()
fig = px.line(df_, x=df_['timestamp'], y=df_['Global_active_power'], title="Global_active_power - hourly stddev")
fig.show()

In [0]:
df_ = daily_resampled.reset_index()
fig = px.line(df_, x=df_['timestamp'], y=df_['Global_active_power'], title="Global_active_power - daily mean")
fig.show()

In [0]:
df_ = daily_resampled_s.reset_index()
fig = px.line(df_, x=df_['timestamp'], y=df_['Global_active_power'], title="Global_active_power - daily stddev")
fig.show()

In [0]:
df_ = weekly_resampled.reset_index()
fig = px.line(df_, x=df_['timestamp'], y=df_['Global_active_power'], title="Global_active_power - weekly mean")
fig.show()

In [0]:
df_ = weekly_resampled_s.reset_index()
fig = px.line(df_, x=df_['timestamp'], y=df_['Global_active_power'], title="Global_active_power - weekly stddev")
fig.show()

#### Decomposition

In [0]:
import matplotlib.pyplot as plt
from statsmodels.tsa.seasonal import seasonal_decompose

# Perform seasonal decomposition
hourly_result = seasonal_decompose(hourly_resampled['Global_active_power'])
daily_result = seasonal_decompose(daily_resampled['Global_active_power'])

# Plot the decomposed components
_plot = hourly_result.plot()
_plot.set_size_inches((15, 8))
_plot.tight_layout()
plt.show()
_plot = daily_result.plot()
_plot.set_size_inches((15, 8))
_plot.tight_layout()
plt.show()

#### Stationarity

###### Check

In [0]:
from statsmodels.tsa.stattools import adfuller

# Perform Augmented Dickey-Fuller test
result = adfuller(hourly_resampled)

# if Test statistic < Critical Value and p-value < 0.05
#   reject the Null hypothesis, time series does not have a unit root
#   series is stationary
# Extract and print the ADF test results
print('ADF Statistic:', result[0])
print('p-value:', result[1])
print('Critical Values:')
for key, value in result[4].items():
    print(f'   {key}: {value}')

In [0]:
from statsmodels.tsa.stattools import adfuller

# Perform Augmented Dickey-Fuller test
result = adfuller(daily_resampled)

# if Test statistic < Critical Value and p-value < 0.05
#   reject the Null hypothesis, time series does not have a unit root
#   series is stationary
# Extract and print the ADF test results
print('ADF Statistic:', result[0])
print('p-value:', result[1])
print('Critical Values:')
for key, value in result[4].items():
    print(f'   {key}: {value}')

In [0]:
from statsmodels.tsa.stattools import kpss
import pandas as pd

# if Test Statistic > Critical Value and p-value < 0.05
#   reject the Null hypothesis
#   series is non-stationary
print ('Results of KPSS Test:')
kpsstest = kpss(hourly_resampled, regression='c', nlags="auto")
kpss_output = pd.Series(kpsstest[0:3], index=['Test Statistic','p-value','#Lags Used'])
for key,value in kpsstest[3].items():
    kpss_output['Critical Value (%s)'%key] = value
print (kpss_output)

In [0]:
from statsmodels.tsa.stattools import kpss
import pandas as pd

# if Test Statistic > Critical Value and p-value < 0.05
#   reject the Null hypothesis
#   series is non-stationary
print ('Results of KPSS Test:')
kpsstest = kpss(daily_resampled, regression='c', nlags="auto")
kpss_output = pd.Series(kpsstest[0:3], index=['Test Statistic','p-value','#Lags Used'])
for key,value in kpsstest[3].items():
    kpss_output['Critical Value (%s)'%key] = value
print (kpss_output)

%md
###### Check - another dataset

In [0]:
import plotly.express as px
import pyspark.pandas as ps

DATASET_FILE = "ts-spark_ch1_ds1.csv"
DATASET_URL = f"https://raw.githubusercontent.com/PacktPublishing/Time-Series-Analysis-with-Spark/main/ch1/{DATASET_FILE}"

spark.sparkContext.addFile(DATASET_URL)
df1 = spark.read.format("csv").option("header", "true").load("file:///" + SparkFiles.get(DATASET_FILE))
df1.createOrReplaceTempView("temperatures")

df2 = spark.sql("select to_date(Category) as year, float(`Annual Mean`) as annual_mean from temperatures where Category > '1950'")
df2_pd = df2.toPandas()
df2_pd['year'] = ps.to_datetime(df2_pd['year'])
#display(df2_pd)

fig = px.scatter(df2_pd, x="year", y="annual_mean", trendline="ols", title='Average Temperature - Mauritius (from 1950)')
fig.update_traces(mode = 'lines')
fig.data[-1].line.color = 'red'
fig.data[-1].line.dash = 'dash'
fig.show()

In [0]:
from statsmodels.tsa.stattools import adfuller

# Perform Augmented Dickey-Fuller test
result = adfuller(df2_pd['annual_mean'])

# if Test statistic < Critical Value and p-value < 0.05
#   reject the Null hypothesis, time series does not have a unit root
#   series is stationary
# Extract and print the ADF test results
print('ADF Statistic:', result[0])
print('p-value:', result[1])
print('Critical Values:')
for key, value in result[4].items():
    print(f'   {key}: {value}')

###### Differencing

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Calculate the difference (differencing)
window = Window.orderBy("year")
df2_ = df2.withColumn("annual_mean_diff", F.col("annual_mean") - F.lag(F.col("annual_mean"), 1).over(window))

# Drop the first row with null difference
df2_ = df2_.na.drop()

# Show the differenced data
df2_.display()

# Convert to Pandas DataFrame for visualization
pdf2 = df2_.toPandas()

# Set date as index
pdf2.set_index('year', inplace=True)

# Plot the original and differenced time series
plt.figure(figsize=(12, 6))

plt.subplot(2, 1, 1)
plt.plot(pdf2.index, pdf2['annual_mean'], marker='o')
plt.title('Original Time Series')
plt.xlabel('year')
plt.ylabel('annual_mean')

plt.subplot(2, 1, 2)
plt.plot(pdf2.index, pdf2['annual_mean_diff'], marker='o', color='orange')
plt.title('Differenced Time Series')
plt.xlabel('year')
plt.ylabel('differenced value')

plt.tight_layout()
plt.show()


In [0]:
from statsmodels.tsa.stattools import adfuller

# Perform Augmented Dickey-Fuller test
result = adfuller(pdf2['annual_mean_diff'])

# if Test statistic < Critical Value and p-value < 0.05
#   reject the Null hypothesis, time series does not have a unit root
#   series is stationary
# Extract and print the ADF test results
print('ADF Statistic:', result[0])
print('p-value:', result[1])
print('Critical Values:')
for key, value in result[4].items():
    print(f'   {key}: {value}')

## Correlation Analysis

#### Autocorrelation

In [0]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Plot Autocorrelation Function (ACF)
plt.figure(figsize=(12, 6))
plot_acf(hourly_resampled['Global_active_power'], lags=3*24)
plt.title('Autocorrelation Function (ACF)')
plt.show()

# Plot Partial Autocorrelation Function (PACF)
plt.figure(figsize=(12, 6))
plot_pacf(hourly_resampled['Global_active_power'], lags=3*24)
plt.title('Partial Autocorrelation Function (PACF)')
plt.show()

In [0]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Plot Autocorrelation Function (ACF)
plt.figure(figsize=(12, 6))
plot_acf(daily_resampled['Global_active_power'], lags=2*7)
plt.title('Autocorrelation Function (ACF)')
plt.show()

# Plot Partial Autocorrelation Function (PACF)
plt.figure(figsize=(12, 6))
plot_pacf(daily_resampled['Global_active_power'], lags=2*7)
plt.title('Partial Autocorrelation Function (PACF)')
plt.show()

In [0]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

# Convert to Pandas DataFrame for autocorrelation checks
pdf2 = df2_.toPandas()

# Set date as index
pdf2.set_index('year', inplace=True)

# Plot Autocorrelation Function (ACF)
plt.figure(figsize=(12, 6))
plot_acf(pdf2['annual_mean'], lags=30)
plt.title('Autocorrelation Function (ACF)')
plt.show()

# Plot Partial Autocorrelation Function (PACF)
plt.figure(figsize=(12, 6))
plot_pacf(pdf2['annual_mean'], lags=30)
plt.title('Partial Autocorrelation Function (PACF)')
plt.show()

#### Lag Analysis

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

hourly_df = hourly_resampled.reset_index()
hourly_df = spark.createDataFrame(hourly_df)

# Define window specification
window = Window.orderBy("timestamp")

# Create lagged features
hourly_df = hourly_df.withColumn("lag1", F.lag(F.col("Global_active_power"), 1).over(window))
hourly_df = hourly_df.withColumn("lag2", F.lag(F.col("Global_active_power"), 2).over(window))
hourly_df = hourly_df.withColumn("lag12", F.lag(F.col("Global_active_power"), 12).over(window))
hourly_df = hourly_df.withColumn("lag24", F.lag(F.col("Global_active_power"), 24).over(window))

# Show the DataFrame with lagged features
hourly_df.display()

In [0]:
from pyspark.sql.functions import corr

# Calculate autocorrelation for lag 1
df_lag1 = hourly_df.dropna(subset=["lag1"])
autocorr_lag1 = df_lag1.stat.corr("Global_active_power", "lag1")

# Calculate autocorrelation for lag 2
df_lag2 = hourly_df.dropna(subset=["lag2"])
autocorr_lag2 = df_lag2.stat.corr("Global_active_power", "lag2")

# Calculate autocorrelation for lag 12
df_lag12 = hourly_df.dropna(subset=["lag12"])
autocorr_lag12 = df_lag12.stat.corr("Global_active_power", "lag12")

# Calculate autocorrelation for lag 24
df_lag24 = hourly_df.dropna(subset=["lag24"])
autocorr_lag24 = df_lag24.stat.corr("Global_active_power", "lag24")

print(f"Autocorrelation for lag 1: {autocorr_lag1}")
print(f"Autocorrelation for lag 2: {autocorr_lag2}")
print(f"Autocorrelation for lag 12: {autocorr_lag12}")
print(f"Autocorrelation for lag 24: {autocorr_lag24}")

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# Convert to Pandas DataFrame for visualization
pdf = hourly_df.toPandas()

# Drop rows with NaN values (introduced by lagging)
pdf = pdf.dropna()

# Plot original vs lagged values
plt.figure(figsize=(12, 12))

plt.subplot(4, 1, 1)
plt.plot(pdf['timestamp'], pdf['Global_active_power'])
plt.plot(pdf['timestamp'], pdf['lag1'])
plt.title('Lag 1 vs Original')

plt.subplot(4, 1, 2)
plt.plot(pdf['timestamp'], pdf['Global_active_power'])
plt.plot(pdf['timestamp'], pdf['lag2'])
plt.title('Lag 2 vs Original')

plt.subplot(4, 1, 3)
plt.plot(pdf['timestamp'], pdf['Global_active_power'])
plt.plot(pdf['timestamp'], pdf['lag12'])
plt.title('Lag 12 vs Original')

plt.subplot(4, 1, 4)
plt.plot(pdf['timestamp'], pdf['Global_active_power'])
plt.plot(pdf['timestamp'], pdf['lag24'])
plt.title('Lag 24 vs Original')

plt.tight_layout()
plt.show()

#### Cross-correlation

In [0]:
from pyspark.sql import functions as F

# Convert to Pandas DataFrame for resampling and aggregation
pdf = df['timestamp', 'Global_active_power', 'Voltage'].toPandas()

# Set date as index
pdf.set_index('timestamp', inplace=True)

# Resample data to hourly frequency and aggregate by mean
hourly_resampled = pdf.resample('h').mean()

hourly_df = spark.createDataFrame(hourly_resampled)

# Compute cross-correlation between value1 and value2
cross_corr = hourly_df.stat.corr("Global_active_power", "Voltage")

print(f"Cross-correlation between Global_active_power and Voltage: {cross_corr}")

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import ccf

hourly_ = hourly_resampled.iloc[:36]

# Calculate cross-correlation function
ccf_values = ccf(hourly_['Global_active_power'], hourly_['Voltage'])

# Plot cross-correlation function
plt.figure(figsize=(12, 6))
plt.stem(range(len(ccf_values)), ccf_values, use_line_collection=True, markerfmt="-")
plt.title('Cross-Correlation Function (CCF)')
plt.xlabel('Lag')
plt.ylabel('Cross-Correlation')
plt.show()

In [0]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

hourly_df = hourly_resampled.reset_index()
hourly_df = spark.createDataFrame(hourly_df)

# Define window specification
window = Window.orderBy("timestamp")

# Create lagged features for value2
hourly_df = hourly_df.withColumn("Voltage_lag1", F.lag(F.col("Voltage"), 1).over(window))
hourly_df = hourly_df.withColumn("Voltage_lag2", F.lag(F.col("Voltage"), 2).over(window))
hourly_df = hourly_df.withColumn("Voltage_lag12", F.lag(F.col("Voltage"), 12).over(window))
hourly_df = hourly_df.withColumn("Voltage_lag24", F.lag(F.col("Voltage"), 24).over(window))

# Drop rows with null values
hourly_df = hourly_df.dropna()

# Compute cross-correlation for lagged values
cross_corr_lag1 = hourly_df.stat.corr("Global_active_power", "Voltage_lag1")
cross_corr_lag2 = hourly_df.stat.corr("Global_active_power", "Voltage_lag2")
cross_corr_lag12 = hourly_df.stat.corr("Global_active_power", "Voltage_lag12")
cross_corr_lag24 = hourly_df.stat.corr("Global_active_power", "Voltage_lag24")

print(f"Cross-correlation between Global_active_power and Voltage (lag 1): {cross_corr_lag1}")
print(f"Cross-correlation between Global_active_power and Voltage (lag 2): {cross_corr_lag2}")
print(f"Cross-correlation between Global_active_power and Voltage (lag 12): {cross_corr_lag12}")
print(f"Cross-correlation between Global_active_power and Voltage (lag 24): {cross_corr_lag24}")