In [1]:
# Import necessary libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score

import sys
import os

current_dir = os.getcwd()
parent_dir = os.path.abspath(os.path.join(current_dir, '..'))
sys.path.append(parent_dir)

from utills import *

# Exploratory Data Analysis
---

## Load the Data

In [None]:
# Load the dataset
df = pd.read_csv('../data/youtube_data_processed.csv')

df.head()

## Understanding the Data

In [None]:
df.shape

In [None]:
df.describe()

In [None]:
df.info()

In [6]:
# Dropping columns that are not needed
df = df.drop(['video_id', 'title', 'thumbnail_url', 'likes', 'comments', 'tags', 'category'], axis=1)

In [None]:
df.info()

# Data Cleaning 

## Handle missing values

In [None]:
df.isnull().sum()

In [None]:
sns.heatmap(df.isnull(), yticklabels=False, cbar=False, cmap='viridis')

In [10]:
# Setting the missing values
columns_to_fill = ['angry_emotion', 'disgust_emotion', 'fear_emotion', 
                   'happy_emotion', 'sad_emotion', 'surprise_emotion', 
                   'neutral_emotion']
df[columns_to_fill] = df[columns_to_fill].fillna(0)

In [None]:
sns.heatmap(df.isnull(), yticklabels=False, cbar=False, cmap='viridis')

## Feature engineering

### Categorizing views

In [12]:
from datetime import datetime

# Ensure 'date_posted' is in datetime format and make it timezone-naive
df['date_posted'] = pd.to_datetime(df['date_posted'], errors='coerce')

# Remove timezone information if it exists
df['date_posted'] = df['date_posted'].dt.tz_localize(None)

# Calculate the number of days since each video was posted
df['days_since_posted'] = (datetime.now() - df['date_posted']).dt.days

# Calculate Age-Weighted Views (AWV)
df['age_weighted_views'] = df['views'] / (df['days_since_posted'] + 1)**0.5

In [None]:
bins = [
    0, 
    df['age_weighted_views'].quantile(0.10),  # Very Low (0 to 10th percentile)
    df['age_weighted_views'].quantile(0.25),  # Low (10th to 25th percentile)
    df['age_weighted_views'].quantile(0.75),  # Medium (25th to 75th percentile)
    df['age_weighted_views'].quantile(0.90),  # High (75th to 90th percentile)
    df['age_weighted_views'].max()            # Very High (90th percentile to max)
]
labels = [0, 1, 2, 3, 4]

# Categorize 'age_weighted_views' into bins
df['views_category'] = pd.cut(df['age_weighted_views'], bins=bins, labels=labels, include_lowest=True)

# Check the result
df['views_category'].value_counts()

In [None]:
# Visually check the counts of each category
sns.countplot(x='views_category', data=df)

In [None]:
df.drop(['date_posted', 'days_since_posted', 'age_weighted_views', 'views'], axis=1, inplace=True)
df.columns

### Categorzing colors

In [17]:
# Apply color categorization for the top 5 dominant colors
for i in range(1, 6):
    df[[f'dominant_color_{i}', f'dominant_color_{i}_name']] = df.apply(
        lambda row: pd.Series(closest_color_name(
            (row[f'color_{i}_r'], row[f'color_{i}_g'], row[f'color_{i}_b']))), axis=1
    )

In [None]:
df[['dominant_color_1', 'dominant_color_1_name', 'dominant_color_2', 'dominant_color_2_name']].sample(5)

In [None]:
df.drop(['color_1_r', 'color_1_g','color_1_b', 
        'color_2_r', 'color_2_g', 'color_2_b', 
        'color_3_r','color_3_g', 'color_3_b', 
        'color_4_r', 'color_4_g', 'color_4_b',
        'color_5_r', 'color_5_g', 'color_5_b' ], axis=1, inplace=True)
df.columns

In [None]:
df.drop([ 'dominant_color_1_name',
       'dominant_color_2_name', 
       'dominant_color_3_name',
       'dominant_color_4_name',
       'dominant_color_5_name'], axis=1, inplace=True)
df.columns

In [None]:
df.info()

### Dealing with skewed data

In [None]:
plt.figure(figsize=(10, 6))
df.select_dtypes(include=['number']).skew().plot(kind='bar', color='skyblue')
plt.title('Skewness of Columns')
plt.xlabel('Columns')
plt.ylabel('Skewness')
plt.show()

In [None]:
def skewness_check(data):
    total_left_skewed = 0
    total_right_skewed = 0
    for column in data.select_dtypes(include=['number']):
        skewness = round(data[column].skew(), 3)
    
        # Checking if the skewness is between -1 and 1
        if (skewness < -1):
            print(f"{column} : {skewness} (Left skewed)")
            total_left_skewed = total_left_skewed + 1
        if (skewness > 1):
            print(f"{column} : {skewness} (Right skewed)")
            total_right_skewed = total_right_skewed + 1
        
    print(f'\n')
    print(f'Total skewed columns: {total_left_skewed + total_right_skewed}')
    print(f'Total left skewed columns: {total_left_skewed}')
    print(f'Total right skewed columns: {total_right_skewed}')

skewness_check(df)

In [None]:
def transform_data(data):

    # Getting a copy of the dataset, so we can return without changing original dataset
    data_copy = data.copy()

    for column in data_copy.select_dtypes(include=['number']):
        skewness = round(data_copy[column].skew(), 3)
        
        # Checking if the skewness is between -1 and 1, 
        # because if it fit in that range we can use them without transforming
        if (skewness < -1):
            data_copy[column] = np.log1p(data_copy[column].abs())
        if (skewness > 1):
            data_copy[column] = np.sqrt(data_copy[column])
        
    print(f'Transformed completed.')
    return data_copy


# Transforming data to fix skewness of dataset columns
transformed_data = transform_data(df)

In [None]:
# Rechecking skewness of transformed data
skewness_check(transformed_data)

In [None]:
# Showing visually before and after transformation, skewed fixed.
def plot_before_after_transformation(original_data, transformed_data, title):

    skewness_original = round(original_data.skew(), 2)
    skewness_transformed = round(transformed_data.skew(), 2)

    fig, axs = plt.subplots(nrows=1, ncols=2)

    axs[0].hist(original_data, edgecolor='black')
    axs[0].set_title(f"Before Transformation - {title}")
    axs[0].annotate(f"Skewness: {skewness_original}", xy=(0.5, 0.96), xycoords='axes fraction', ha='center')

    axs[1].hist(transformed_data, edgecolor='black')
    axs[1].set_title(f"After Transformation - {title}")
    axs[1].annotate(f"Skewness: {skewness_transformed}", xy=(0.5, 0.96), xycoords='axes fraction', ha='center')
    
# Visualize the transformation for all the columns which are skewed
def visualize_transformation(original_data, transformed_data):
    for column in original_data.select_dtypes(include=['number']):
        skewness = round(original_data[column].skew(), 3)

        # Checking if the skewness is between -1 and 1
        if (skewness < -1) | (skewness > 1):
            plot_before_after_transformation(original_data[column], transformed_data[column], column)

# Visualize the transformation
visualize_transformation(df, transformed_data)


In [None]:
# Replacing transform data into original data
df = transformed_data

# Rechecking skewness to make sure
skewness_check(df)

## Dealing Outliers

In [None]:
plt.figure(figsize=(20, 20))
sns.boxplot(data=df, orient="h")
plt.title("Boxplot of Features")
plt.show()

In [None]:
import numpy as np

def fix_outliers(data):
    # Getting a copy of the dataset, so we can return without changing original dataset
    data_copy = data.copy()

    print(f"Before fixing outliers - Shape: {data_copy.shape}")

    # Select only numeric columns
    numeric_columns = data_copy.select_dtypes(include=['float64', 'int64']).columns

    for column in numeric_columns:
        # Calculate the IQR
        IQR = data_copy[column].quantile(0.75) - data_copy[column].quantile(0.25)
        lower_limit = data_copy[column].quantile(0.25) - (IQR * 1.5)
        upper_limit = data_copy[column].quantile(0.75) + (IQR * 1.5)

        # Clip the outliers
        data_copy[column] = np.where(
            data_copy[column] > upper_limit, upper_limit, 
            np.where(data_copy[column] < lower_limit, lower_limit, data_copy[column])
        )
        
    print(f'Outliers fixed.')  
    print(f"After fixing outliers - Shape: {data_copy.shape}") 
    return data_copy

# Fixing outliers using IQR
df_without_outliers = fix_outliers(df)

In [None]:
# Showing visually before and after outlier fixed
def plot_before_after_outlier_fix(original_data, transformed_data):
    for column in original_data:
        plt.figure(figsize=(10, 3))
        plt.subplot(2, 1, 1)
        sns.boxplot(data=original_data[column], orient="h")
        plt.title(f"Before outlier fix - {column}")

        plt.subplot(2, 1, 2)
        sns.boxplot(data=transformed_data[column], orient="h")
        plt.title(f"After outlier fix - {column}")
        plt.tight_layout()        
        plt.show()

# Visualizing before and after fixing outliers
plot_before_after_outlier_fix(df, df_without_outliers)

In [31]:
df = df_without_outliers

## Feature Selection

In [None]:
# Visualize correlation matrix heatmap
def plot_correlation_matrix_heatmap(data, threshold = 0):
    correlation_matrix = data.corr().round(2)
    
    if threshold > 0:
        # Apply the mask to the correlation matrix
        correlation_matrix = correlation_matrix[np.abs(correlation_matrix) > threshold]

    plt.figure(figsize=(18, 18)) 
    sns.heatmap(correlation_matrix, annot=True, linewidths=.5, fmt='.1f')
    plt.title("Correlation Matrix Heatmap")
    plt.show()

# Checking correlation between each feature
plot_correlation_matrix_heatmap(df)

In [None]:
# Since there are lots of feature, Let's show only highly correlated ones only. So It's easier to understand. 
# Only showing correlation more than 0.7
plot_correlation_matrix_heatmap(df, 0.7)

As we can see only emotion highly corelated with each other. but those feaures are emitoins so will keep those features

# Model Training & Testing

In [34]:
from sklearn.metrics import mean_squared_error, precision_score, f1_score, recall_score, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import joblib
from sklearn import metrics
from sklearn.model_selection import train_test_split
import os

# ML models
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

def save_model(model, filename):
    # Ensure the directory exists
    directory = '../model'
    if not os.path.exists(directory):
        os.makedirs(directory)
    
    # Save the model to disk
    filename = directory + "/" + filename
    joblib.dump(model, filename)
    print(f"Model saved to {filename}")

# Evaluate a ML model on the dataset
def evaluate_model(model, data, features, target, is_save_model=False):
    result = {}

    # Setting X & y for the model
    X = data[features]
    y = data[target]

    # Splitting the data for train and test, setting 25% of data to be test data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=0)

    # Fitting data to the model
    model.fit(X_train, y_train)

    # Make prediction on the test set
    predictions = model.predict(X_test)

    # Model Evaluation Metrics
    model_name = model.__class__.__name__
    mse = round(mean_squared_error(y_test, predictions), 3)
    accuracy = round(metrics.accuracy_score(y_test, predictions), 3)
    precision = round(precision_score(y_test, predictions, average='weighted'), 3)
    recall = round(recall_score(y_test, predictions, average='weighted'), 3)
    f1 = round(f1_score(y_test, predictions, average='weighted'), 3)

    # Checking if selected features are all the columns in the dataset
    is_all_features = len(features) == (len(data.columns) - 1)

    # Result dictionary
    result = {
        'Model': model_name, 
        'Accuracy': accuracy,
        'Mean Squared Error': mse,
        'Precision': precision,
        'Recall': recall,
        'F1 Score': f1,
        'All Features Used': is_all_features
    }

    print('Features Used')
    print(features)

    # Print metrics
    print(f"Accuracy: {accuracy}")
    print(f"Precision: {precision}")
    print(f"Recall: {recall}")
    print(f"F1 Score: {f1}")
    print(f'Mean Squared Error: {mse}')

    # Detailed Classification Report
    print("\nClassification Report:")
    print(classification_report(y_test, predictions))

    # Confusion Matrix
    c_matrix = confusion_matrix(y_test, predictions)
    sns.heatmap(c_matrix, annot=True, fmt='d', cmap='Blues')
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title('Confusion Matrix')
    plt.show()

    # Check if the model has feature importances and plot them
    if hasattr(model, 'feature_importances_'):
        feature_importances = model.feature_importances_
        importance_df = pd.DataFrame({
            'Feature': features,
            'Importance': feature_importances
        }).sort_values(by='Importance', ascending=False)
        
        # Plot feature importances
        plt.figure(figsize=(10, 6))
        sns.barplot(x='Importance', y='Feature', data=importance_df)
        plt.title(f'Feature Importance in {model_name}')
        plt.show()
    else:
        print(f"The model {model_name} does not support feature importances.")

    # Save the model if needed
    if is_save_model:
        print(f"Saving model to disk...")
        save_model(model, f'ytpa_model.pkl')

    return result

# Creating a list which holds each model's results
results = []


## All Features

In [35]:
all_features = df.drop(columns='views_category').columns

In [None]:
results.append(evaluate_model(RandomForestClassifier(), df, all_features, 'views_category'))

In [None]:
results.append(evaluate_model(SVC(), df, all_features, 'views_category'))

In [None]:
results.append(evaluate_model(LogisticRegression(), df, all_features, 'views_category'))

In [None]:
results.append(evaluate_model(DecisionTreeClassifier(), df, all_features, 'views_category'))

## Selected Features

In [None]:
column_drop_list =[
    'views_category',
    'disgust_emotion',
    'is_text_present',
]
selected_features = df.drop(columns=column_drop_list).columns
print(selected_features)

In [None]:
results.append(evaluate_model(RandomForestClassifier(), df, selected_features, 'views_category'))

In [None]:
results.append(evaluate_model(SVC(), df, selected_features, 'views_category'))

In [None]:
results.append(evaluate_model(LogisticRegression(), df, selected_features, 'views_category'))

In [None]:
results.append(evaluate_model(DecisionTreeClassifier(), df, selected_features, 'views_category'))

## Results

In [None]:
result_data = pd.DataFrame(results)
result_data.head(n=10)

In [None]:
def plot_bar_metrics(data, target_column, title):
    plt.figure(figsize=(12, 8))
    df_long = data.melt(id_vars=[target_column], var_name='Metric', value_name='Value')
    sns.barplot(x='Model', y='Value', hue='Metric', data=df_long)
    plt.title(title)
    plt.ylabel('Values')
    plt.legend(title='Metrics')
    plt.show()


# visualizing metrics using bar plot
plot_bar_metrics(result_data.drop(columns="All Features Used"), 'Model', 'Model Metrics - Selected Features')

As per the results we can see Random Forest Classifier is the best model for this dataset, and we will use all the features since it doesnt apper to have effect when features are selected. 

# Model creation

In [None]:
evaluate_model(RandomForestClassifier(), df, selected_features, 'views_category', is_save_model=True)