<a href="https://colab.research.google.com/github/varsha6319/Product_Rec/blob/main/Step_2_with_weights_RF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive


In [2]:
!pip install pyspark==3.3.0



In [3]:
!pip install xgboost



In [4]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.functions import concat_ws, col, when
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
from pyspark.ml import Pipeline, PipelineModel
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score
import matplotlib.pyplot as plt
import pandas as pd

In [5]:

# # Download the RAPIDS Accelerator jar
# !wget https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/23.02.0/rapids-4-spark_2.12-23.02.0.jar

# # Download the cuDF jar
# !wget https://repo1.maven.org/maven2/ai/rapids/cudf/23.02.0/cudf-23.02.0-cuda11.jar


In [5]:
spark = SparkSession.builder \
    .appName("RandomForest_GPU") \
    .config("spark.executor.memory", "64g") \
    .config("spark.executor.cores", "16") \
    .config("spark.driver.memory", "64g") \
    .config("spark.ui.port", "4040").getOrCreate()

In [6]:
parquet_path1 = '/content/drive/My Drive/santander-product-recommendation/train_data.parquet'
parquet_path2 = '/content/drive/My Drive/santander-product-recommendation/transformed_intermediate_data_0528.parquet'
parquet_path3 = '/content/drive/My Drive/santander-product-recommendation/transformed_test_data.parquet'
parquet_path4 = '/content/drive/My Drive/santander-product-recommendation/transformed_train_data_for_original_intermediate.parquet'

# Read Parquet files into Spark DataFrames
train_data = spark.read.parquet(parquet_path1)
transformed_intermediate_data_0528 = spark.read.parquet(parquet_path2)
test_data = spark.read.parquet(parquet_path3)
original_intermediate_data_0528 = spark.read.parquet(parquet_path4)


In [7]:
# train_data = train_data.select("customer_code","features","multi_label","label")
transformed_intermediate_data_0528 = transformed_intermediate_data_0528.select("customer_code","features")
test_data = test_data.select("customer_code","features")
original_intermediate_data_0528 = original_intermediate_data_0528.select("customer_code","label")

In [8]:
# Load a pretained model from drive
model_path =  "/content/drive/My Drive/santander-product-recommendation/model/"

trained_model = RandomForestClassificationModel.load(model_path)

In [11]:
# mapped_new_products = map_prediction_to_product(combined_data, "new_products")
train_data2 = train_data.withColumn("predicted_product", map_prediction_to_products("label", label_to_products_mapping))

In [12]:
train_data2.show()

+----------+-------------+--------------+-----------------+---+---+-------------------+------------------+------------------+----------------------+-------------------------+----------------------------------+---------------+---------------+-------------+--------------+------------+-------------+-------------+-----------------------+----------------+----------------+---------------+----------+----------------+----------------+---------------+--------------+----------------------+------------------+-----------------------+-------------------+--------------------+------------------+---------+-----+--------+-------------+-----+-----+-----------+----------+------------+-------+------------+------------+-----------+-----------------------+--------------------+--------------------+-----------------------+---------+------------------------+----------------------------+-------------------+-------------------------------+----------------------------------------+---------------------+-----------

In [12]:
grouped_df = train_data2.groupBy("predicted_product") \
               .count() \
               .orderBy("predicted_product")

In [16]:
grouped_df.show(24)

+--------------------+-------+
|   predicted_product|  count|
+--------------------+-------+
|         credit_card|  50919|
|    current_accounts| 643007|
|    derivada_account| 382618|
|        direct_debit|  39633|
|           e_account|  66725|
|               funds|  62679|
|          guarantees|2311257|
|        home_account|  46151|
|      junior_account| 179264|
|               loans|  52833|
|  long_term_deposits|  86735|
|mas_particular_ac...| 120626|
|medium_term_deposits|  86921|
|            mortgage|  55180|
|  particular_account| 117308|
|particular_plus_a...| 105151|
|             payroll|  42067|
|     payroll_account| 379664|
|       pension_plans|  40725|
|        pensions_nom|  41815|
|     savings_account|5682032|
|          securities|  46272|
| short_term_deposits| 101391|
|               taxes|  52050|
+--------------------+-------+
only showing top 24 rows



In [13]:
# Convert DataFrame to dictionary
product_dict = grouped_df.rdd.collectAsMap()
# Print the dictionary
print(product_dict)

{'credit_card': 50919, 'current_accounts': 643007, 'derivada_account': 382618, 'direct_debit': 39633, 'e_account': 66725, 'funds': 62679, 'guarantees': 2311257, 'home_account': 46151, 'junior_account': 179264, 'loans': 52833, 'long_term_deposits': 86735, 'mas_particular_account': 120626, 'medium_term_deposits': 86921, 'mortgage': 55180, 'particular_account': 117308, 'particular_plus_account': 105151, 'payroll': 42067, 'payroll_account': 379664, 'pension_plans': 40725, 'pensions_nom': 41815, 'savings_account': 5682032, 'securities': 46272, 'short_term_deposits': 101391, 'taxes': 52050, 'unknown': 1883171}


In [14]:
total_count = sum(product_dict.values())

# Assign class weights inversely proportional to counts
class_weights = {label: total_count / count for label, count in product_dict.items()}

# Print the class weights dictionary
print(class_weights)

{'credit_card': 248.94821186590468, 'current_accounts': 19.713928464231337, 'derivada_account': 33.13015592575363, 'direct_debit': 319.8393762773446, 'e_account': 189.97668040464595, 'funds': 202.2398889580242, 'guarantees': 5.484545422685578, 'home_account': 274.66780784815063, 'junior_account': 70.7124352909675, 'loans': 239.92947589574698, 'long_term_deposits': 146.14854441690207, 'mas_particular_account': 105.08674746737852, 'medium_term_deposits': 145.8358049263124, 'mortgage': 229.72442914099312, 'particular_account': 108.05907525488458, 'particular_plus_account': 120.55229146655762, 'payroll': 301.33344426747806, 'payroll_account': 33.38792721985756, 'pension_plans': 311.2632044198895, 'pensions_nom': 303.1494439794332, 'savings_account': 2.230926189785626, 'securities': 273.9495591286307, 'short_term_deposits': 125.0228718525313, 'taxes': 243.53878962536024, 'unknown': 6.7313026804257285}


In [15]:
class_weights_df = spark.createDataFrame(
    [(k, v) for k, v in class_weights.items()],
    ["predicted_product", "class_weight"]
)

In [22]:
class_weights_df.show()

+--------------------+------------------+
|   predicted_product|      class_weight|
+--------------------+------------------+
|         credit_card|248.94821186590468|
|    current_accounts|19.713928464231337|
|    derivada_account| 33.13015592575363|
|        direct_debit| 319.8393762773446|
|           e_account|189.97668040464595|
|               funds| 202.2398889580242|
|          guarantees| 5.484545422685578|
|        home_account|274.66780784815063|
|      junior_account|  70.7124352909675|
|               loans|239.92947589574698|
|  long_term_deposits|146.14854441690207|
|mas_particular_ac...|105.08674746737852|
|medium_term_deposits| 145.8358049263124|
|            mortgage|229.72442914099312|
|  particular_account|108.05907525488458|
|particular_plus_a...|120.55229146655762|
|             payroll|301.33344426747806|
|     payroll_account| 33.38792721985756|
|       pension_plans| 311.2632044198895|
|        pensions_nom| 303.1494439794332|
+--------------------+------------

In [16]:
data_with_weights = train_data2.join(class_weights_df, on="predicted_product", how="left")

In [24]:
data_with_weights.show()

+-------------------+----------+-------------+--------------+-----------------+---+---+-------------------+------------------+------------------+----------------------+-------------------------+----------------------------------+---------------+---------------+-------------+--------------+------------+-------------+-------------+-----------------------+----------------+----------------+---------------+----------+----------------+----------------+---------------+--------------+----------------------+------------------+-----------------------+-------------------+--------------------+------------------+---------+-----+--------+-------------+-----+-----+-----------+----------+------------+-------+------------+------------+-----------+-----------------------+--------------------+--------------------+-----------------------+---------+------------------------+----------------------------+-------------------+-------------------------------+----------------------------------------+-------------

In [17]:

rf = RandomForestClassifier(featuresCol='features', labelCol='label', numTrees=5, maxDepth=5, weightCol='class_weight')

data_with_weights = data_with_weights.select("customer_code","features","multi_label", "class_weight","label")

In [18]:
# Train the model
trained_model = rf.fit(data_with_weights)

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 43712)
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving
Traceback (most recent call last):
  File "/usr/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(r

Py4JError: An error occurred while calling o147.fit

In [None]:
predictions = trained_model.transform(data_with_weights)

In [None]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

precision = evaluator.evaluate(predictions, {evaluator.metricName: "weightedPrecision"})
recall = evaluator.evaluate(predictions, {evaluator.metricName: "weightedRecall"})
f1 = evaluator.evaluate(predictions, {evaluator.metricName: "f1"})

print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1-score: {f1}")


In [9]:

predictions_0528 = trained_model.transform(transformed_intermediate_data_0528)

predicted_products_0528 = predictions_0528.select("customer_code", "prediction").withColumnRenamed("prediction", "predicted_products_0528")

combined_test_data_predicted_products_0528 = test_data.join(predicted_products_0528, on='customer_code', how='left').cache()

predictions_0628 = trained_model.transform(combined_test_data_predicted_products_0528)

combined_data = predictions_0628.withColumn("new_products", col("prediction") - col("predicted_products_0528")) \
                                  .select("customer_code", "new_products")

In [10]:

product_columns = [
    "savings_account", "guarantees", "current_accounts", "derivada_account", "payroll_account",
    "junior_account", "mas_particular_account", "particular_account", "particular_plus_account",
    "short_term_deposits", "medium_term_deposits", "long_term_deposits", "e_account", "funds",
    "mortgage", "loans", "taxes", "credit_card", "securities", "home_account", "payroll",
    "pensions_nom", "pension_plans", "direct_debit"
]

label_to_products_mapping = {i: product_columns[i] for i in range(len(product_columns))}

# Define a function to map prediction index to product column name
def map_prediction_to_products(prediction_col: str, mapping: dict) -> F.Column:
    mapping_expr = F.expr(
        "CASE " +
        " ".join([
            f"WHEN {prediction_col} = {idx} THEN '{product}' "
            for idx, product in mapping.items()
        ]) +
        " ELSE 'unknown' END"
    )
    return mapping_expr


In [11]:
# mapped_new_products = map_prediction_to_product(combined_data, "new_products")
mapped_new_products = combined_data.withColumn("predicted_product", map_prediction_to_products("new_products", label_to_products_mapping))

In [None]:
mapped_new_products.show()

In [12]:
submission = mapped_new_products.withColumnRenamed("customer_code", "ncodpers").withColumnRenamed("predicted_product", "added_products") \
                                  .select("ncodpers","added_products")

In [None]:
submission.show()

In [None]:
model_path = "/content/drive/My Drive/santander-product-recommendation/model_rf_50_10_10_sqrt_classweights/"

trained_model.write().overwrite().save(model_path)

print(f"Model saved successfully at: {model_path}")