## Technical Description of the Machine Learning Project

This project aims to develop a robust machine learning model capable of forecasting the future share price of Hafez Tile. We will leverage historical stock data, harnessing the power of time-series analysis techniques to uncover patterns and trends that can inform our predictions. 

## Key Objectives:

Data Acquisition and Preparation: Collect and preprocess historical stock data for Hafez Tile, ensuring its quality and suitability for analysis.
Exploratory Data Analysis (EDA): Gain insights into the statistical properties of the data, identify potential outliers or anomalies, and visualize key trends and patterns.
Feature Engineering : Create additional features, such as lagged variables or technical indicators, to potentially enhance model performance.
Model Selection and Training: Train and evaluate a variety of time-series models, including both classical (ARIMA) and potentially deep learning approaches (LSTM).
Model Evaluation and Selection: Compare the performance of different models using appropriate metrics (e.g., MAE, RMSE) and select the most accurate model for forecasting.
Visualization and Interpretation: Present model predictions and relevant insights through clear and informative visualizations.

This project will focus exclusively on time-series data and quantitative modeling techniques. While fundamental analysis and other factors (such as news sentiment or macroeconomic trends) can play a significant role in stock price movements, they are beyond the scope of this initial investigation.

# Importing Libraries 

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.optimizers import Adam

from statsmodels.tsa.arima.model import ARIMA

import statsmodels.api as sm
from prophet import Prophet


# Loading Data

In [None]:
def load_all_data(data_directory="/kaggle/input/khafez-cl/KHAFEZ_CL.xlsx"):
    notebook_directory = os.getcwd()
    data_directory = os.path.join(notebook_directory, data_directory)

    dataframes = {}
    loaded_dataframes_list = []

    if data_directory.endswith('.xlsx'):
        try:
            df = pd.read_excel(data_directory)
            key = os.path.splitext(os.path.basename(data_directory))[0]
            dataframes[key] = df
            loaded_dataframes_list.append(df)
        except Exception as e:
            print(f"Error loading XLSX file {data_directory}: {e}")
    else:
        print(f"Unsupported file format: {data_directory}")

    print("Loaded DataFrames Summary:")
    for name, df in dataframes.items():
        print(f"- {name}: {df.shape[0]} rows, {df.shape[1]} columns")

    return dataframes, loaded_dataframes_list

all_data_dict, all_data_list = load_all_data()

# Print the first few rows of the loaded dataframe
if all_data_list:
    print("\nFirst few rows of the loaded dataframe:")
    print(all_data_list[0].head())
else:
    print("No dataframes were loaded.")

In [None]:
header_rows = {
    'KHAFEZ_CL': 0,  # Header row for KHAFEZ_CL is row 0
}

def clean_and_display_columns(dataframes, header_rows):
    cleaned_dataframes = {}

    for filename, df in dataframes.items():
        # Display original column names
        print(f"\nOriginal columns in '{filename}':\n{df.columns.tolist()}")

        header_row = header_rows.get(filename, 0)  # Default to 0 if not specified

        # Use the specified row as header
        df.columns = df.iloc[header_row]
        
        # Remove rows before and including the header row
        df = df.iloc[header_row + 1:].reset_index(drop=True)

        # Rename columns
        column_translations = {
            '<TICKER>': 'Ticker',
            '<DTYYYYMMDD>': 'Date',
            '<FIRST>': 'First_Price',
            '<HIGH>': 'High_Price',
            '<LOW>': 'Low_Price',
            '<CLOSE>': 'Close_Price',
            '<VALUE>': 'Trade_Value',
            '<VOL>': 'Volume',
            '<OPENINT>': 'Open_Interest',
            '<PER>': 'Price_Earnings_Ratio',
            '<OPEN>': 'Open_Price',
            '<LAST>': 'Last_Price'
        }
        df.rename(columns=column_translations, inplace=True)

        cleaned_dataframes[filename] = df

        # Display new column names
        print(f"\nNew columns in '{filename}':\n{df.columns.tolist()}")

    return cleaned_dataframes

cleaned_dataframes = clean_and_display_columns(all_data_dict, header_rows)

# Print the first few rows of the cleaned dataframe
if cleaned_dataframes:
    first_df_name = list(cleaned_dataframes.keys())[0]
    print(f"\nFirst few rows of the cleaned dataframe '{first_df_name}':")
    print(cleaned_dataframes[first_df_name].head())
else:
    print("No dataframes were cleaned.")

In [None]:
cleaned_dataframes = clean_and_display_columns(all_data_dict, header_rows)

In [None]:
# Assuming we're working with the first (and only) dataframe in cleaned_dataframes
df = list(cleaned_dataframes.values())[0]

# Convert Date column
df['Date'] = pd.to_datetime(df['Date'], format='%Y%m%d')

# Sort by date
df = df.sort_values('Date').reset_index(drop=True)

# Drop Price_Earnings_Ratio column if it exists
if 'Price_Earnings_Ratio' in df.columns:
    df = df.drop('Price_Earnings_Ratio', axis=1)

# Identify trading days
df['Is_Trading_Day'] = ((df['First_Price'] != 0) | (df['High_Price'] != 0) | (df['Low_Price'] != 0) | (df['Volume'] != 0)).astype(int)

# For non-trading days, fill price columns with the last available price
price_columns = ['First_Price', 'High_Price', 'Low_Price', 'Close_Price', 'Open_Price', 'Last_Price']
for col in price_columns:
    df[col] = df[col].replace(0, np.nan)
df[price_columns] = df[price_columns].ffill()

# Convert numeric columns
numeric_columns = ['First_Price', 'High_Price', 'Low_Price', 'Close_Price', 'Trade_Value', 'Volume', 'Open_Interest', 'Open_Price', 'Last_Price']
for col in numeric_columns:
    df[col] = pd.to_numeric(df[col], errors='coerce')

# Calculate returns and log returns
df['Returns'] = df['Close_Price'].pct_change()
df['Log_Returns'] = np.log(df['Close_Price'] / df['Close_Price'].shift(1))

# Calculate volatility and moving averages
df['Volatility'] = df['Returns'].rolling(window=20).std()
df['MA_5'] = df['Close_Price'].rolling(window=5).mean()
df['MA_20'] = df['Close_Price'].rolling(window=20).mean()

# Fill NaN values
df = df.fillna(method='ffill').fillna(method='bfill')

# Print info about the preprocessed dataframe
print(df.info())

# Display the first few rows to verify the changes
print(df.head())

# Check for stationarity
from statsmodels.tsa.stattools import adfuller

def test_stationarity(timeseries):
    result = adfuller(timeseries, autolag='AIC')
    print(f'ADF Statistic: {result[0]}')
    print(f'p-value: {result[1]}')
    print('Critical Values:')
    for key, value in result[4].items():
        print(f'\t{key}: {value}')

# Check if we have enough data points
if len(df) > 20:  # Assuming we need at least 20 data points for a meaningful test
    test_stationarity(df['Close_Price'])
else:
    print("Not enough data points for stationarity test")

# Analyze trading days (Saturday as first day of the week)
df['Day_of_Week'] = (df['Date'].dt.dayofweek + 1) % 7  # 0 = Saturday, 6 = Friday
trading_days_by_weekday = df[df['Is_Trading_Day'] == 1]['Day_of_Week'].value_counts().sort_index()
print("\nTrading days by day of week:")
print(trading_days_by_weekday)

# Visualize trading days
plt.figure(figsize=(10, 6))
trading_days_by_weekday.plot(kind='bar')
plt.title('Number of Trading Days by Day of Week')
plt.xlabel('Day of Week (0 = Saturday, 6 = Friday)')
plt.ylabel('Number of Trading Days')
plt.xticks(range(7), ['Sat', 'Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri'])
plt.show()

Stationarity Test:

The Augmented Dickey-Fuller test was performed on 'Close_Price'.
The p-value (0.0206) is less than 0.05, which suggests that we can reject the null hypothesis of non-stationarity.
This indicates that the 'Close_Price' series is likely stationary, which is good for many time series models.

# Visualisation

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(15, 10))

# Close Price
plt.subplot(3, 1, 1)
plt.plot(df['Date'], df['Close_Price'])
plt.title('Close Price Over Time')
plt.ylabel('Close Price')

# Volume
plt.subplot(3, 1, 2)
plt.bar(df['Date'], df['Volume'], alpha=0.7)
plt.title('Trading Volume Over Time')
plt.ylabel('Volume')

# Returns
plt.subplot(3, 1, 3)
plt.plot(df['Date'], df['Returns'])
plt.title('Daily Returns Over Time')
plt.ylabel('Returns')

plt.tight_layout()
plt.show()

# Distribution of returns
plt.figure(figsize=(10, 6))
sns.histplot(df['Returns'], kde=True, bins=50)
plt.title('Distribution of Daily Returns')
plt.xlabel('Returns')
plt.show()

# EDA : Correlation

In [None]:
import numpy as np
import pandas as pd
from scipy import stats

def calculate_correlation_stats(df, columns):
    results = []
    for i in range(len(columns)):
        for j in range(i+1, len(columns)):
            x = df[columns[i]]
            y = df[columns[j]]
            correlation, p_value = stats.pearsonr(x, y)
            results.append({
                'Variable 1': columns[i],
                'Variable 2': columns[j],
                'Correlation': correlation,
                'P-value': p_value,
                'Sample Size': len(x)
            })
    return pd.DataFrame(results)

# Select numeric columns for correlation analysis
numeric_cols = ['Close_Price', 'Volume', 'Returns', 'Volatility', 'MA_5', 'MA_20']

# Calculate correlation matrix
correlation_matrix = df[numeric_cols].corr()

# Calculate statistical measures
correlation_stats = calculate_correlation_stats(df, numeric_cols)

# Sort by absolute correlation value
correlation_stats['Abs_Correlation'] = abs(correlation_stats['Correlation'])
correlation_stats = correlation_stats.sort_values('Abs_Correlation', ascending=False)

# Display correlation matrix
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1, center=0)
plt.title('Correlation Heatmap')
plt.show()

# Display correlation statistics
pd.set_option('display.float_format', '{:.4f}'.format)
print("\nCorrelation Statistics:")
print(correlation_stats[['Variable 1', 'Variable 2', 'Correlation', 'P-value', 'Sample Size']])

# Identify significant correlations
alpha = 0.05  # significance level
significant_correlations = correlation_stats[
    (correlation_stats['P-value'] < alpha) & 
    (abs(correlation_stats['Correlation']) > 0.5)
]

print("\nStatistically Significant and Strong Correlations:")
print(significant_correlations[['Variable 1', 'Variable 2', 'Correlation', 'P-value']])

# Machine Learning : Random Forest

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import StandardScaler
import pywt

In [None]:
# Prepare the data for machine learning
df['Close_Price_shifted'] = df['Close_Price'].shift(-1)
df.dropna(inplace=True)

# Features
X = df[['Close_Price', 'Volume', 'Volatility', 'MA_5', 'MA_20']]
y = df['Close_Price_shifted']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# Feature Scaling
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Train Random Forest model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)  # Fit on original data

# Make predictions
y_pred = model.predict(X_test)

# Evaluate the model (using original values)
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)

print('Mean Squared Error (MSE):', mse)
print('Root Mean Squared Error (RMSE):', rmse)
print('Mean Absolute Error (MAE):', mae)
print('Mean Absolute Percentage Error (MAPE):', mape)


# Machine Learning : ARIMA

In [None]:
import numpy as np
import pandas as pd
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
from pmdarima import auto_arima

# Prepare the data
features = ['Volume', 'Returns', 'Volatility', 'MA_20', 'Is_Trading_Day']
for col in ['Volume', 'Returns']:
    for lag in [1, 2, 3, 5]:
        df[f'{col}_Lag_{lag}'] = df[col].shift(lag)
        features.append(f'{col}_Lag_{lag}')

df = df.dropna()

# Set the date as the index
df.set_index('Date', inplace=True)

# Split the data
train_size = int(len(df) * 0.8)
train, test = df[:train_size], df[train_size:]

# Scale the features
scaler = StandardScaler()
train_scaled = pd.DataFrame(scaler.fit_transform(train[features]), columns=features, index=train.index)
test_scaled = pd.DataFrame(scaler.transform(test[features]), columns=features, index=test.index)

# Find the best SARIMA parameters
auto_model = auto_arima(train['Close_Price'], exogenous=train_scaled, 
                        start_p=1, start_q=1, max_p=3, max_q=3, m=5,
                        start_P=0, seasonal=True, d=1, D=1, trace=True,
                        error_action='ignore', suppress_warnings=True, stepwise=True)

# Fit the SARIMAX model
model = SARIMAX(train['Close_Price'], exog=train_scaled, 
                order=auto_model.order, seasonal_order=auto_model.seasonal_order)
results = model.fit()

# Make predictions
forecast = results.get_forecast(steps=len(test), exog=test_scaled)
y_pred = forecast.predicted_mean

# Evaluate the model
mse = mean_squared_error(test['Close_Price'], y_pred)
mae = mean_absolute_error(test['Close_Price'], y_pred)
r2 = r2_score(test['Close_Price'], y_pred)

print("Mean Squared Error:", mse)
print("Mean Absolute Error:", mae)
print("R-squared:", r2)

# Plot actual vs predicted values
plt.figure(figsize=(12, 6))
plt.plot(test.index, test['Close_Price'], label='Actual')
plt.plot(test.index, y_pred, label='Predicted')
plt.title('Actual vs Predicted Close Prices (SARIMAX)')
plt.xlabel('Date')
plt.ylabel('Close Price')
plt.legend()
plt.show()

# Print model summary
print(results.summary())

In [None]:
from statsmodels.tsa.stattools import adfuller

def test_stationarity(timeseries):
    result = adfuller(timeseries)
    print('ADF Statistic:', result[0])
    print('p-value:', result[1])
    print('Critical Values:', result[4])

# Check stationarity of differenced Close_Price
diff_close = df['Close_Price'].diff().dropna()
test_stationarity(diff_close)

# ACF and PACF

In [None]:
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
plot_acf(diff_close, ax=ax1, lags=40)
plot_pacf(diff_close, ax=ax2, lags=40)
plt.show()

# Machine Learning : Prophet

In [None]:
pip install prophet

In [None]:
from prophet import Prophet
from prophet.plot import plot_plotly, add_changepoints_to_plot
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np
import pandas as pd

# Prepare data for Prophet
prophet_data = df.reset_index()[['Date', 'Close_Price']].rename(columns={'Date': 'ds', 'Close_Price': 'y'})

# Split data
train_size = int(len(prophet_data) * 0.8)
train_prophet = prophet_data[:train_size]
test_prophet = prophet_data[train_size:]

# Fit Prophet model
model = Prophet(changepoint_prior_scale=0.05, yearly_seasonality=True, weekly_seasonality=True, daily_seasonality=False)
model.fit(train_prophet)

# Make predictions
future = model.make_future_dataframe(periods=len(test_prophet))
forecast = model.predict(future)

# Evaluate the model
y_pred = forecast['yhat'][-len(test_prophet):]
mse = mean_squared_error(test_prophet['y'], y_pred)
mae = mean_absolute_error(test_prophet['y'], y_pred)
r2 = r2_score(test_prophet['y'], y_pred)

print("Prophet Model Results:")
print("Mean Squared Error:", mse)
print("Mean Absolute Error:", mae)
print("R-squared:", r2)

# Plot results with uncertainty intervals
fig = model.plot(forecast)
a = add_changepoints_to_plot(fig.gca(), model, forecast)
plt.title('Prophet Forecast with Changepoints')
plt.show()

# Plot components
fig = model.plot_components(forecast)
plt.show()

# Plot actual vs predicted
plt.figure(figsize=(12, 6))
plt.plot(test_prophet['ds'], test_prophet['y'], label='Actual')
plt.plot(test_prophet['ds'], y_pred, label='Predicted')
plt.fill_between(test_prophet['ds'], 
                 forecast['yhat_lower'][-len(test_prophet):],
                 forecast['yhat_upper'][-len(test_prophet):],
                 alpha=0.3)
plt.title('Actual vs Predicted Close Prices (Prophet)')
plt.xlabel('Date')
plt.ylabel('Close Price')
plt.legend()
plt.show()

# Print the most important changepoints
changepoints = model.changepoints
changepoint_importance = np.abs(model.params['delta'])

print("Shape of changepoint_importance:", changepoint_importance.shape)
print("Type of changepoint_importance:", type(changepoint_importance))
print("First few values of changepoint_importance:", changepoint_importance[:5])

# Ensure changepoint_importance is 1-dimensional
if changepoint_importance.ndim > 1:
    changepoint_importance = changepoint_importance.flatten()

print("Shape after flattening:", changepoint_importance.shape)

top_changepoints = sorted(zip(changepoints, changepoint_importance), key=lambda x: x[1], reverse=True)[:10]
print("\nTop 10 changepoints:")
for date, importance in top_changepoints:
    print(f"Date: {date}, Importance: {importance}")

# Plot changepoint importance
plt.figure(figsize=(12, 6))
plt.bar(range(len(changepoint_importance)), changepoint_importance)
plt.title('Changepoint Importance')
plt.xlabel('Changepoint Index')
plt.ylabel('Importance')
plt.show()

# Analyze residuals
residuals = test_prophet['y'].values - y_pred
plt.figure(figsize=(12, 6))
plt.plot(test_prophet['ds'], residuals)
plt.title('Residuals Over Time')
plt.xlabel('Date')
plt.ylabel('Residual')
plt.show()

# Plot residual distribution
plt.figure(figsize=(12, 6))
sns.histplot(residuals, kde=True)
plt.title('Distribution of Residuals')
plt.xlabel('Residual')
plt.show()

# Autocorrelation of residuals
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf

plt.figure(figsize=(12, 6))
plot_acf(residuals, lags=40)
plt.title('Autocorrelation of Residuals')
plt.show()

# Cross-correlation between actual and predicted
from scipy import signal

correlation = signal.correlate(test_prophet['y'], y_pred, mode='full')
lags = signal.correlation_lags(len(test_prophet['y']), len(y_pred), mode='full')
plt.figure(figsize=(12, 6))
plt.plot(lags, correlation)
plt.title('Cross-correlation between Actual and Predicted')
plt.xlabel('Lag')
plt.ylabel('Correlation')
plt.show()

# Analyze prediction intervals
coverage = np.mean((forecast['yhat_lower'][-len(test_prophet):] <= test_prophet['y']) & 
                   (test_prophet['y'] <= forecast['yhat_upper'][-len(test_prophet):]))
print(f"\nPrediction Interval Coverage: {coverage:.2%}")

# Compute rolling forecast
window = 30  # 30-day rolling window
rolling_mse = []
for i in range(len(test_prophet) - window):
    y_true = test_prophet['y'][i:i+window]
    y_pred = forecast['yhat'][-(len(test_prophet)-i):][:window]
    rolling_mse.append(mean_squared_error(y_true, y_pred))

plt.figure(figsize=(12, 6))
plt.plot(test_prophet['ds'][window:], rolling_mse)
plt.title(f'{window}-day Rolling MSE')
plt.xlabel('Date')
plt.ylabel('MSE')
plt.show()