In [1]:
import numpy as np
import pandas as pd

import warnings
warnings.filterwarnings('ignore')

import seaborn as sns
import matplotlib.pyplot as plt
import matplotlib.animation as animation


import plotly.express as px
import plotly.graph_objects as go
import plotly.figure_factory as ff


# Step 1: Loading Dataset

In [2]:
df = pd.read_csv('Data\FS_Classification_AMZN_Historical_Quarterly_2009_2022_With_Fundamental_Data_Economic_Indicators.csv')

# Removing leading and trailing spaces from column names
df.columns = df.columns.str.strip()

# Using a regular expression to replace multiple spaces with a single space in all column names
df.columns = df.columns.str.replace(r'\s+', ' ', regex=True)  

# # Dropping columns that are not needed
df.drop(["Date", "Year"], axis=1, inplace=True)



# Step 2: Overview of Dataset

In [None]:
num_of_rows = len(df)
print(f"The number of rows is {num_of_rows}")
print('\n')

df.info()

In [None]:
df.head()

# Step 3: EDA - Missing Values Analysis 

## Step 3)i): EDA - Show Missing Values in each Column

In [5]:
def display_columns_with_null_values(df: pd.DataFrame):
    """
    Displays the total number of null values for each column in the dataframe,
    showing only columns that have null values.
    
    Parameters:
    - df (pd.DataFrame): The dataframe to be checked for null values.
    
    Returns:
    - None: Prints the columns with null values and their counts.
    """
    
    # Get total null values in each column
    total_null_values = df.isnull().sum()
    
    # Filter out columns that don't have any null values
    columns_with_null = total_null_values[total_null_values > 0].sort_values(ascending=False)
    
    # Check if there are any columns with null values
    if not columns_with_null.empty:
        print('-' * 64)
        print("Total null values in each column (only columns with null values)")
        print('-' * 64)
        print(columns_with_null)
    else:
        print('-' * 64)
        print("Total null values in each column (only columns with null values)")
        print('-' * 64)
        print("No columns have null values.")

In [None]:
# Get percentage of null values in each column
null_values_percentage = df.isnull().mean().round(4).mul(100).sort_values(ascending=False)
print('-' * 44)
print("Percentage(%) of null values in each column")
print('-' * 44)
print(null_values_percentage)
print('\n')

# Get total null values in each column
display_columns_with_null_values(df)


## Step 3)ii): EDA - Handling Missing Values

In [None]:
# Fill Null Values in the Remaining Columns with the average of the column
numeric_df = df.select_dtypes(include=[np.number]) # Select only numeric columns
numeric_df.fillna(numeric_df.mean(), inplace=True)  # Fill missing values in numeric columns with the column mean
df[numeric_df.columns] = numeric_df # Merge back with non-numeric columns if needed

# Get total null values in each column
display_columns_with_null_values(df)


# Step 4: EDA - Duplicate Values Analysis 

## Step 4)i): EDA - Show Duplicate Values Rows

In [None]:
# Get percentage of duplicate rows
total_rows = len(df)
duplicate_rows = df.duplicated().sum()
duplicate_percentage = (duplicate_rows / total_rows) * 100

print('-' * 48)
print("Percentage(%) of duplicate rows in the DataFrame")
print('-' * 48)
print(f"{duplicate_percentage:.2f}%")
print('\n')

# Get total number of duplicate rows
print('-' * 30)
print("Total number of duplicate rows")
print('-' * 30)
print(duplicate_rows)


# Step 5): EDA - Feature Scaling 

## Step 5)i): EDA - Categorical Feature Scaling

## Step 5)ii): EDA - Numerical Feature Scaling

In [None]:
df.head()

In [None]:
from sklearn.preprocessing import StandardScaler

def standard_scale_dataframe(dataframe, columns_to_scale):
    """
    Scales the specified columns of the DataFrame using Standard Scaling (Z-score normalization).
    :param dataframe: pandas DataFrame
    :param columns_to_scale: list of strings, names of columns to scale
    :return: DataFrame with scaled columns
    """
    # Create a copy of the DataFrame to avoid modifying the original one
    df_scaled = dataframe.copy()

    # Initialize the Standard Scaler
    scaler = StandardScaler()

    # Perform Standard Scaling on specified columns and update the DataFrame
    df_scaled[columns_to_scale] = scaler.fit_transform(dataframe[columns_to_scale])

    return df_scaled


columns = list(df.columns)

numerical_features = columns
numerical_features.remove('Hybrid_Price_Movement_Class')

scaled_df = standard_scale_dataframe(dataframe=df, 
                                    columns_to_scale=numerical_features)

df = scaled_df
df.head()

# Step 9) Train Test Split

In [None]:
df['Hybrid_Price_Movement_Class'].value_counts().sort_index()

In [12]:
from sklearn.model_selection import train_test_split

X = df.drop("Hybrid_Price_Movement_Class", axis=1)
y = df["Hybrid_Price_Movement_Class"]

X_train, X_test, y_train, y_test = train_test_split(X, y, 
                                                    test_size=0.2,
                                                    shuffle=False)

# Step 10) XGBoost Model

In [13]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

def Confusion_Matrix_For_Multi_Class_With_Overview(title, y_test, y_pred):
    """
    Create a confusion matrix for multi-class classification with detailed overview.

    Parameters:
    - title: Title for the confusion matrix plot.
    - y_test: True labels of the test data.
    - y_pred: Predicted labels of the test data.

    Returns:
    - A seaborn heatmap representing the confusion matrix.
    """

    # Creating the confusion matrix
    cf_matrix = confusion_matrix(y_test, y_pred)

    # Determine class labels
    class_labels = np.unique(np.concatenate((y_test, y_pred)))

    # Calculate the counts and percentages for the confusion matrix
    group_counts = ["{0:0.0f}".format(value) for value in cf_matrix.flatten()]
    
    # Calculate TP and FP percentages
    TP_percentages = ["{0:.2%}".format(value/np.sum(cf_matrix, axis=1)[i]) for i, value in enumerate(np.diag(cf_matrix))]
    FP_percentages = ["{0:.2%}".format((np.sum(cf_matrix, axis=0)[i] - value)/np.sum(cf_matrix)) for i, value in enumerate(np.diag(cf_matrix))]
    
    # Combine TP and FP with their percentages
    combined_info = []
    for i in range(cf_matrix.shape[0]):
        for j in range(cf_matrix.shape[1]):
            value = cf_matrix[i, j]
            if i == j:  # True Positive
                combined_info.append(f"{value}\n(TP: {TP_percentages[i]})")
            else:  # False Positive
                combined_info.append(f"{value}\n(FP: {FP_percentages[j]})")

    labels = np.asarray(combined_info).reshape(cf_matrix.shape)

    # Plotting the heatmap
    plt.figure(figsize=(8, 8))
    ax = sns.heatmap(cf_matrix, annot=labels, fmt='', cmap='Blues', xticklabels=class_labels, yticklabels=class_labels)
    ax.set_title(f'{title}\n\n')
    ax.set_xlabel('\nPredicted Values')
    ax.set_ylabel('Actual Values')

    # Show the plot
    plt.show()

In [None]:
import xgboost as xgb
from sklearn.metrics import confusion_matrix, classification_report
import pandas as pd

def XGBoost_Train_Evaluate(X_train, X_test, y_train, y_test, 
                           objective='multi:softmax', num_class=None, 
                           n_estimators=100, learning_rate=0.1, max_depth=6, 
                           subsample=1.0, colsample_bytree=1.0, gamma=0, 
                           reg_alpha=0, reg_lambda=1, verbosity=1, random_state=42, verbose=False):
   
    model = xgb.XGBClassifier(
        objective=objective,
        num_class=num_class,
        n_estimators=n_estimators,
        learning_rate=learning_rate,
        max_depth=max_depth,
        subsample=subsample,
        colsample_bytree=colsample_bytree,
        gamma=gamma,
        reg_alpha=reg_alpha,
        reg_lambda=reg_lambda,
        verbosity=verbosity,
        random_state=random_state,
        use_label_encoder=False  # Avoids warning in newer versions of XGBoost
    )

    model.fit(X_train, y_train, eval_set=[(X_test, y_test)], verbose=verbose)
    y_pred = model.predict(X_test).flatten()  # Ensure y_pred is a 1D array
    
    df = pd.DataFrame({'Actual': y_test.tolist(), 'Predicted': y_pred.tolist()})
    
    Confusion_Matrix_For_Multi_Class_With_Overview("XGBoost Confusion Matrix", y_test, y_pred)
    print(classification_report(y_test, y_pred))
    
    return df



"""
Best Parameters: {
    'n_estimators': 366, 
    'learning_rate': 0.4594259565422311,
    'max_depth': 3, 
    'subsample': 0.5163571458564111, 
    'colsample_bytree': 0.5547050899403205, 
    'gamma': 1.0546559043221597, 
    'reg_alpha': 0.29203039952794485, 
    'reg_lambda': 2.1708419038837645
    }
    
Best Score: 0.4429142292773435

"""


# Best Parameters example
XGBoost_df = XGBoost_Train_Evaluate(X_train, X_test, y_train, y_test, 
                                    objective='multi:softmax', 
                                    num_class=5, # Update based on your specific number of classes
                                    n_estimators=366, 
                                    learning_rate=0.4594, 
                                    max_depth=3, 
                                    subsample=0.5163, 
                                    colsample_bytree=0.5547, 
                                    gamma=1.0546, 
                                    reg_alpha=0.2920, 
                                    reg_lambda=2.1708, 
                                    verbosity=1, 
                                    random_state=42, 
                                    verbose=False)

# Step 11) XGBoost Hyperparameter Tuning

In [15]:
# import optuna
# from sklearn.model_selection import cross_val_score
# import pandas as pd


# # Suppress Optuna output
# optuna.logging.set_verbosity(optuna.logging.CRITICAL)

# # Objective function for Optuna
# def objective(trial):
#     params = {
#         'objective': 'multi:softmax',
#         'num_class': 4,  # Update based on your specific number of classes
#         'n_estimators': trial.suggest_int('n_estimators', 50, 500),
#         'learning_rate': trial.suggest_loguniform('learning_rate', 0.01, 0.5),
#         'max_depth': trial.suggest_int('max_depth', 3, 10),
#         'subsample': trial.suggest_float('subsample', 0.5, 1.0),
#         'colsample_bytree': trial.suggest_float('colsample_bytree', 0.5, 1.0),
#         'gamma': trial.suggest_float('gamma', 0, 10),
#         'reg_alpha': trial.suggest_float('reg_alpha', 0, 10),
#         'reg_lambda': trial.suggest_float('reg_lambda', 0, 10),
#         'random_state': 42,
#         'use_label_encoder': False
#     }
#     model = xgb.XGBClassifier(**params)
#     score = cross_val_score(model, X_train, y_train, cv=5, scoring='f1_macro', n_jobs=-1, verbose=0)
#     return score.mean()



# # Create a study and optimize the objective function
# study = optuna.create_study(direction='maximize')
# study.optimize(objective, n_trials=500, timeout=None, show_progress_bar=True)

# # Print the best parameters and the best score
# print("Best Parameters:", study.best_params)
# print("Best Score:", study.best_value)

# # Get the detailed study results
# df = study.trials_dataframe()
# df_sorted = df.sort_values('value', ascending=False)
# df_sorted.head()