In [None]:
import json
import urllib.parse
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, year, month, dayofmonth, from_unixtime, to_date
from pyspark.ml.feature import StringIndexer, StandardScaler, VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
import matplotlib.pyplot as plt
import numpy as np
from pyspark.ml import Pipeline
from pyspark.sql.types import StructType, StructField, StringType, FloatType, LongType

# Load MongoDB credentials
def get_mongo_url():
    login = {
        "host": "cluster1.tgo3l.mongodb.net",
        "port": 27017,
        "username": "guest_account",
        "password": "e8RZETLbPWzhgvXH"
    }
    username = login['username']
    password = urllib.parse.quote(login['password'])
    host = login['host']
    url = f"mongodb+srv://{username}:{password}@{host}/?retryWrites=true&w=majority"
    return url

# Fetch data from MongoDB using pymongo
def fetch_data_with_pymongo():
    print("Fetching data from MongoDB...")
    mongo_url = get_mongo_url()
    client = MongoClient(mongo_url)
    db = client["steam"]
    collection = db["games"]
    
    # Convert BSON types to Python-native types
    def convert_bson_types(record):
        record["_id"] = str(record["_id"])  # Convert ObjectId to string
        if "release_date" in record:
            record["release_date"] = int(record["release_date"])  # Convert bson.int64.Int64 to int
        return record

    # Prompt the user to choose 0 (all records) or 1 (5000 records)
    choice = input("Enter 0 to fetch all records or 1 to fetch 5000 records: ")

    # Fetch data based on the user's choice
    if choice == "0":
        print("Fetching all records from MongoDB.")
        data = [convert_bson_types(record) for record in collection.find()]
    elif choice == "1":
        print("Fetching 5000 records from MongoDB.")
        data = [convert_bson_types(record) for record in collection.find().limit(5000)]
    else:
        print("Invalid choice. Defaulting to 5000 records.")
        data = [convert_bson_types(record) for record in collection.find().limit(5000)]
    client.close()
    print(f"Fetched {len(data)} records from MongoDB.")
    return data

# Initialize SparkSession
def create_spark_session():
    print("Initializing Spark session...")
    spark = SparkSession.builder \
        .appName("MongoDBIntegration") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.executor.memory", "4g") \
        .config("spark.driver.memory", "4g") \
        .getOrCreate()
    print("Spark session initialized.")
    return spark

# Define a schema for the data
def get_schema():
    return StructType([
        StructField("_id", StringType(), True),
        StructField("name", StringType(), True),
        StructField("detailed_description", StringType(), True),
        StructField("price", FloatType(), True),
        StructField("dlc_count", LongType(), True),  # Use LongType for large values
        StructField("release_date", LongType(), True),  # Unix timestamp in milliseconds
        StructField("genres", StringType(), True),
        StructField("developers", StringType(), True),
        StructField("publishers", StringType(), True),
        StructField("peak_ccu", LongType(), True)  # Use LongType if needed
    ])

# Convert pymongo data to PySpark DataFrame
def convert_to_spark_dataframe(spark, data):
    print("Converting pymongo data to PySpark DataFrame...")
    schema = get_schema()
    df = spark.createDataFrame(data, schema=schema)
    print("Data successfully converted to PySpark DataFrame.")
    return df

# Transformation 1: Replace empty strings with null values and add 'dlc_check' flag
def transform_data(df):
    print("Starting data transformation...")
    df = df.withColumn(
        "name", when(col("name") == "", None).otherwise(col("name"))
    ).withColumn(
        "dlc_check", when(col("dlc_count") > 1, 1).otherwise(0)
    ).withColumn(
        "release_date", to_date(from_unixtime(col("release_date") / 1000))  # Convert milliseconds to date
    ).withColumn(
        "release_year", year("release_date")
    ).withColumn(
        "release_month", month("release_date")
    ).withColumn(
        "release_day", dayofmonth("release_date")
    ).drop("release_date")
    df = df.dropna(subset=["name", "price", "peak_ccu"])  # Remove rows with critical null values
    print("Data transformation completed.")
    return df

# Prepare data for modeling
def prepare_data_for_modeling(df):
    print("Preparing data for modeling...")
    categorical_columns = ["genres", "developers", "publishers"]
    numerical_columns = ["price", "dlc_count"]
    indexers = [StringIndexer(inputCol=column, outputCol=f"{column}_index") for column in categorical_columns]
    assembler = VectorAssembler(inputCols=[f"{column}_index" for column in categorical_columns] + numerical_columns,
                                 outputCol="features")
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    pipeline = Pipeline(stages=indexers + [assembler, scaler])
    model = pipeline.fit(df)
    df_preprocessed = model.transform(df)
    print("Data preparation for modeling completed.")
    return df_preprocessed

# Modified train_and_evaluate_models function with additional metrics
def train_and_evaluate_models(df_preprocessed):
    print("Training and evaluating models...")
    train, test = df_preprocessed.randomSplit([0.8, 0.2], seed=42)
    models = {
        "Linear Regression": LinearRegression(featuresCol="scaled_features", labelCol="peak_ccu"),
        "Decision Tree": DecisionTreeRegressor(featuresCol="scaled_features", labelCol="peak_ccu"),
        "Random Forest": RandomForestRegressor(featuresCol="scaled_features", labelCol="peak_ccu", numTrees=100)
    }
    results = {}
    feature_importance = None
    evaluator = RegressionEvaluator(labelCol="peak_ccu", predictionCol="prediction", metricName="mae")
    
    # Add evaluator for additional metrics: RMSE, R2
    rmse_evaluator = RegressionEvaluator(labelCol="peak_ccu", predictionCol="prediction", metricName="rmse")
    r2_evaluator = RegressionEvaluator(labelCol="peak_ccu", predictionCol="prediction", metricName="r2")
    
    for model_name, model in models.items():
        print(f"Training {model_name}...")
        fitted_model = model.fit(train)
        predictions = fitted_model.transform(test)
        
        # Evaluate MAE, RMSE, and R2
        mae = evaluator.evaluate(predictions)
        rmse = rmse_evaluator.evaluate(predictions)
        r2 = r2_evaluator.evaluate(predictions)
        
        results[model_name] = {"MAE": mae, "RMSE": rmse, "R2": r2}
        
        # Store feature importance for Random Forest
        if model_name == "Random Forest":
            feature_importance = fitted_model.featureImportances
        
        print(f"{model_name} completed. MAE: {mae}, RMSE: {rmse}, R2: {r2}")
    
    print("Model training and evaluation completed.")
    return results, feature_importance

# Plot model comparison results
def plot_results(results):
    print("Plotting results...")
    # Extract metric values for plotting
    mae_values = [metrics["MAE"] for metrics in results.values()]
    rmse_values = [metrics["RMSE"] for metrics in results.values()]
    r2_values = [metrics["R2"] for metrics in results.values()]
    
    # Create a bar plot with different colors for each metric
    x_pos = np.arange(len(results))
    width = 0.25
    
    fig, ax = plt.subplots(figsize=(10, 6))
    
    ax.bar(x_pos - width, mae_values, width, label="MAE", color='blue')
    ax.bar(x_pos, rmse_values, width, label="RMSE", color='green')
    ax.bar(x_pos + width, r2_values, width, label="R2", color='red')
    
    ax.set_xticks(x_pos)
    ax.set_xticklabels(results.keys())
    ax.set_xlabel("Model")
    ax.set_ylabel("Metric Value")
    ax.set_title("Model Comparison Metrics")
    ax.legend()
    
    plt.tight_layout()
    plt.show()
    print("Results plotted.")
# New function to plot feature importance
def plot_feature_importance(feature_importance, feature_names):
    print("Plotting feature importance...")
    # Convert feature importance to numpy array
    importance_array = feature_importance.toArray()
    
    # Sort features by importance
    feature_importance_pairs = list(zip(feature_names, importance_array))
    sorted_pairs = sorted(feature_importance_pairs, key=lambda x: x[1], reverse=True)
    sorted_features = [x[0] for x in sorted_pairs]
    sorted_importance = [x[1] for x in sorted_pairs]
    
    # Create the plot
    plt.figure(figsize=(12, 6))
    y_pos = np.arange(len(sorted_features))
    plt.barh(y_pos, sorted_importance, align='center')
    plt.yticks(y_pos, sorted_features)
    plt.xlabel('Feature Importance')
    plt.title('Random Forest Feature Importance')
    
    # Add value labels on the bars
    for i, v in enumerate(sorted_importance):
        plt.text(v, i, f'{v:.4f}', va='center')
    
    plt.tight_layout()
    plt.show()
    print("Feature importance plot completed.")

# Modified main function
def main():
    print("Starting main process...")
    data = fetch_data_with_pymongo()
    spark = create_spark_session()
    df = convert_to_spark_dataframe(spark, data)
    df_transformed = transform_data(df)
    df_preprocessed = prepare_data_for_modeling(df_transformed)
    results, feature_importance = train_and_evaluate_models(df_preprocessed)
    
    # Plot model comparison
    plot_results(results)
    
    # If feature importance exists, plot it
    if feature_importance is not None:
        feature_names = df_preprocessed.columns  # Adjust this if needed
        plot_feature_importance(feature_importance, feature_names)
    
    print("Main process completed.")

if __name__ == "__main__":
    main()