In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lag
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import ARMA
from statsmodels.tsa.stattools import adfuller
import matplotlib.pyplot as plt
from pandas.tseries.offsets import DateOffset
import pandas as pd
import numpy as np

In [None]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("TimeSeriesAnalysis") \
    .getOrCreate()

In [None]:
# Load data
df = spark.read.csv('/home/skystone/Documents/TimeSeries/T1.csv', header=True, inferSchema=True)


In [None]:
# Select relevant columns
dataset = df.select(df["Date/Time"].alias("timeStamp"), df["Wind Speed (m/s)"].alias("windSpeed"))


In [None]:
# Limit dataset to 5000 rows
dataset = dataset.limit(5000)

# Convert timeStamp format
time_format = "yyyy-MM-dd HH:mm:ss"
dataset = dataset.withColumn("timeStamp", date_format(col("timeStamp"), time_format))

In [None]:
# Convert timeStamp column to timestamp type
dataset = dataset.withColumn("timeStamp", dataset["timeStamp"].cast("timestamp"))

# Set timeStamp as index
dataset = dataset.withColumn("index", col("timeStamp").cast("long")).drop("timeStamp").sort("index").drop("index")

In [None]:
# Fill missing values using forward fill
windowSpec = Window.orderBy("index").rowsBetween(Window.unboundedPreceding, 0)
dataset = dataset.withColumn("windSpeed_imputed", col("windSpeed").fillna(lag("windSpeed").over(windowSpec)))

In [None]:
# Generate a column for ticks
dataset = dataset.withColumn("Ticks", (lag("windSpeed").over(windowSpec)).isNull().cast("int"))

In [None]:
# Plot original data
original_data = dataset.toPandas()
plt.figure(figsize=(20, 10))
plt.plot(original_data["Ticks"], original_data["windSpeed_imputed"])
plt.xlabel("Ticks")
plt.ylabel("Wind Speed (m/s)")
plt.title("Original Plot")
plt.show()

In [None]:
# Define a function for stationarity check using ADF test
def stationarity_check(df):
    df_values = df.select("windSpeed_imputed").toPandas()
    result = adfuller(df_values["windSpeed_imputed"])
    print('Augmented Dickey-Fuller test:')
    print(f'Test Statistic: {result[0]}')
    print(f'p-value: {result[1]}')
    print(f'Critical Values:')
    for key, value in result[4].items():
        print(f'   {key}: {value}')

# Apply stationarity check
stationarity_check(dataset)

In [None]:
# Create lag features
for i in range(1, 5):
    dataset = dataset.withColumn(f"lag_{i}", lag("windSpeed_imputed", i).over(windowSpec))

# Drop rows with missing lag features
dataset = dataset.dropna()

In [None]:
# Assemble features
feature_cols = ["lag_1", "lag_2", "lag_3", "lag_4"]
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
dataset = assembler.transform(dataset)

In [None]:
# Fit ARMA model
arma = ARMA(spark, labelCol="windSpeed_imputed", featuresCol="features", p=3, q=3)
model = arma.fit(dataset)


In [None]:
# Plot predictions
predictions = model.transform(dataset)
predictions_pd = predictions.select("timeStamp", "prediction").toPandas()
plt.plot(original_data["Ticks"], original_data["windSpeed_imputed"], label="Actual")
plt.plot(predictions_pd["timeStamp"], predictions_pd["prediction"], label="Predicted")
plt.xlabel("Ticks")
plt.ylabel("Wind Speed (m/s)")
plt.title("Fitted data")
plt.legend()
plt.show()

In [None]:
# Save the model
model.save("humidityModel")

In [None]:
# Generate future dates
future_dates = [pd.Timestamp(original_data["timeStamp"].iloc[-1]) + pd.DateOffset(months=x) for x in range(1, 25)]
future_datest_df = pd.DataFrame(index=future_dates[1:], columns=original_data.columns)
future_df = pd.concat([original_data, future_datest_df])

In [None]:
# Make predictions for future dates
future_df["forecast"] = model.transform(assembler.transform(spark.createDataFrame(future_df))).select("prediction").toPandas()