In [7]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, explode, year, month, to_date
from pyspark.sql.types import DoubleType
from hdfs.ext.kerberos import KerberosClient

In [8]:
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import plotly.io as pio
from plotly.subplots import make_subplots
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
from scipy.stats import boxcox
from statsmodels.api import qqplot
from statsmodels.tsa.stattools import adfuller
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX
from pmdarima.arima import auto_arima
from pmdarima.utils import diff_inv
from sklearn.metrics import mean_squared_error
import datetime
from datetime import datetime

%matplotlib inline
warnings.filterwarnings("ignore")
sns.set(rc = {'figure.figsize': (5, 3)})
plt.style.use("fivethirtyeight")
rand_val = 765

In [9]:
from datetime import date, datetime
from dateutil.relativedelta import relativedelta

In [10]:
spark = SparkSession.builder.appName("Read from HDFS").getOrCreate()

In [11]:
hdfs_client = KerberosClient('http://10.4.41.51:9870')
all_files = hdfs_client.list('/user/bdm/formatted_data/bique.transactions')

In [12]:
file_path = 'hdfs://10.4.41.51:27000/user/bdm/formatted_data/bique.transactions/'
file_name = all_files[1]
if file_name != '_SUCCESS':
    file_path += file_name
    print(file_name[20:])

ES02JHOP23942038749660


In [14]:
len('fullDocument_source=')

20

In [15]:
data = spark.read.parquet(file_path)

In [16]:
# data transformations
data = data.withColumn("fullDocument_transactionAmount", col("fullDocument_transactionAmount_amount").cast(DoubleType()))
# data = data.withColumn("fullDocument_transactionAmount", col("fullDocument_transactionAmount_amount") * -1)
# data = data.withColumn("transactionCategory", explode("fullDocument_transactionInformation"))
data = data.withColumn("date", to_date("fullDocument_date"))

In [17]:
columns = ["date","fullDocument_transactionAmount"]
df = data.select(columns).groupBy("date").agg(sum("fullDocument_transactionAmount").alias("amount")).toPandas()

In [18]:
df['date'] = pd.to_datetime(df['date']).apply(lambda x : x.replace(day=1))
df = df.groupby("date").sum().sort_values('date', ascending = True)

In [19]:
# creating training and validation, with a dynamic percentage split of 90%-10%, depending on the values available
perc_split = int(len(df) * 0.8)
perc_split, len(df)

(24, 31)

In [20]:
# train and validation here
train_df = df.iloc[:perc_split]
test_df = df.iloc[perc_split:]

In [21]:
# adding one row for prediction of next month's data
test_df.loc[test_df.index[-1] + relativedelta(months=+1)] = 0

In [22]:
train_df.shape, test_df.shape

((24, 1), (8, 1))

In [23]:
result = adfuller(df['amount'])
print('ADF Statistic: {}'.format(result[0]))
print('p-value: {}'.format(result[1]))
print('Critical Values:')
for key, value in result[4].items():
    print('\t{}: {}'.format(key, value))

ADF Statistic: -5.836679804278832
p-value: 3.861911768577697e-07
Critical Values:
	1%: -3.6699197407407405
	5%: -2.9640707407407407
	10%: -2.621171111111111


In [24]:
model = ARIMA(train_df, order = (2,3,3)) #0,0,1

In [25]:
model_fit = model.fit()

In [26]:
predictions = model_fit.predict(start=len(train_df), end=len(train_df) + len(test_df) - 1)
predictions

2022-11-01    1385.245320
2022-12-01    -449.902670
2023-01-01     822.951683
2023-02-01     855.250806
2023-03-01     371.836414
2023-04-01    1146.314551
2023-05-01    1014.865430
2023-06-01    1096.412103
Freq: MS, Name: predicted_mean, dtype: float64

In [27]:
pred = predictions.to_frame().rename(columns = {0: 'date', 'predicted_mean': 'forecast'})
pred

Unnamed: 0,forecast
2022-11-01,1385.24532
2022-12-01,-449.90267
2023-01-01,822.951683
2023-02-01,855.250806
2023-03-01,371.836414
2023-04-01,1146.314551
2023-05-01,1014.86543
2023-06-01,1096.412103


In [28]:
# Calculate accuracy metrics
actual_values = test_df['amount']
mae = np.mean(np.abs(predictions - actual_values))
mse = np.mean((predictions - actual_values) ** 2)
rmse = np.sqrt(mse)

print(f"Mean Absolute Error (MAE): {mae}")
print(f"Mean Squared Error (MSE): {mse}")
print(f"Root Mean Squared Error (RMSE): {rmse}")

Mean Absolute Error (MAE): 748.5420235048389
Mean Squared Error (MSE): 708506.7042820407
Root Mean Squared Error (RMSE): 841.7284029198734
