In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, col, when
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder,CrossValidator, TrainValidationSplit
import numpy as np
from IPython.display import Image
from IPython.display import display

In [3]:
from pyspark.sql.functions import expr
from pyspark.sql.window import Window

In [4]:
spark  = SparkSession.builder.appName("Loan Recommendation Fine Tunning using hyperparameters_2").getOrCreate()

In [5]:
spark

In [6]:
loans_df = spark.read.csv("Loan_recommedation_demo_version.csv",inferSchema = True, header=True)
loans_df.printSchema()

root
 |-- Contact__c: string (nullable = true)
 |-- Min_IT_Loan_ID__c: integer (nullable = true)
 |-- Opp_Number__c: integer (nullable = true)
 |-- Id: string (nullable = true)
 |-- AccountID: string (nullable = true)
 |-- Number_Of_Loans_Granted__c: integer (nullable = true)
 |-- Num_Of_Loans_Paid__c: double (nullable = true)
 |-- Purpose_of_Loan__c: string (nullable = true)
 |-- Total_Repayments__c: integer (nullable = true)
 |-- Amount: integer (nullable = true)
 |-- Term_in_Weeks__c: double (nullable = true)
 |-- Payment_Frequency__c: string (nullable = true)
 |-- StageName: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- loanId: integer (nullable = true)
 |-- count: double (nullable = true)
 |-- LoanIdFormat: integer (nullable = true)



In [7]:
loans_df.show(10)

+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+-------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+
|        Contact__c|Min_IT_Loan_ID__c|Opp_Number__c|                Id|         AccountID|Number_Of_Loans_Granted__c|Num_Of_Loans_Paid__c| Purpose_of_Loan__c|Total_Repayments__c|Amount|Term_in_Weeks__c|Payment_Frequency__c|StageName|userId|loanId|count|LoanIdFormat|
+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+-------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+
|0032x000003RvgkAAC|          5145623|      4369500|0062x00000D178JAAR|0012x000004VG7MAAW|                         7|                 4.0|Relocation Expenses|                  2|   200|           6.8

In [8]:
loans_df.columns

['Contact__c',
 'Min_IT_Loan_ID__c',
 'Opp_Number__c',
 'Id',
 'AccountID',
 'Number_Of_Loans_Granted__c',
 'Num_Of_Loans_Paid__c',
 'Purpose_of_Loan__c',
 'Total_Repayments__c',
 'Amount',
 'Term_in_Weeks__c',
 'Payment_Frequency__c',
 'StageName',
 'userId',
 'loanId',
 'count',
 'LoanIdFormat']

In [9]:
loans_df.printSchema()

root
 |-- Contact__c: string (nullable = true)
 |-- Min_IT_Loan_ID__c: integer (nullable = true)
 |-- Opp_Number__c: integer (nullable = true)
 |-- Id: string (nullable = true)
 |-- AccountID: string (nullable = true)
 |-- Number_Of_Loans_Granted__c: integer (nullable = true)
 |-- Num_Of_Loans_Paid__c: double (nullable = true)
 |-- Purpose_of_Loan__c: string (nullable = true)
 |-- Total_Repayments__c: integer (nullable = true)
 |-- Amount: integer (nullable = true)
 |-- Term_in_Weeks__c: double (nullable = true)
 |-- Payment_Frequency__c: string (nullable = true)
 |-- StageName: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- loanId: integer (nullable = true)
 |-- count: double (nullable = true)
 |-- LoanIdFormat: integer (nullable = true)



In [10]:
loans_df.show()

+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+--------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+
|        Contact__c|Min_IT_Loan_ID__c|Opp_Number__c|                Id|         AccountID|Number_Of_Loans_Granted__c|Num_Of_Loans_Paid__c|  Purpose_of_Loan__c|Total_Repayments__c|Amount|Term_in_Weeks__c|Payment_Frequency__c|StageName|userId|loanId|count|LoanIdFormat|
+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+--------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+
|0032x000003RvgkAAC|          5145623|      4369500|0062x00000D178JAAR|0012x000004VG7MAAW|                         7|                 4.0| Relocation Expenses|                  2|   200|          

In [11]:
loans_df.describe(["userId","LoanIdFormat","count"]).show()

+-------+------------------+------------------+------------------+
|summary|            userId|      LoanIdFormat|             count|
+-------+------------------+------------------+------------------+
|  count|            259818|            259818|            259818|
|   mean|1434.1488503490905| 253.8277101663472| 16.81876159465472|
| stddev| 822.0854160334718|183.84693824058843|11.972384814549685|
|    min|                 1|                 1|              -4.0|
|    max|              2880|               894|              69.0|
+-------+------------------+------------------+------------------+



In [12]:
loans_df.columns

['Contact__c',
 'Min_IT_Loan_ID__c',
 'Opp_Number__c',
 'Id',
 'AccountID',
 'Number_Of_Loans_Granted__c',
 'Num_Of_Loans_Paid__c',
 'Purpose_of_Loan__c',
 'Total_Repayments__c',
 'Amount',
 'Term_in_Weeks__c',
 'Payment_Frequency__c',
 'StageName',
 'userId',
 'loanId',
 'count',
 'LoanIdFormat']

In [13]:
# loans_df = loans_df.filter(loans_df.Amount < 500)

In [14]:
# loans_df = loans_df.limit(100)

In [15]:
loans_df.select(["userId","LoanIdFormat","count"]).describe().show()

+-------+-----------------+------------------+------------------+
|summary|           userId|      LoanIdFormat|             count|
+-------+-----------------+------------------+------------------+
|  count|              100|               100|               100|
|   mean|          1474.35|              2.52|             19.51|
| stddev|755.7210668018198|0.5408560367835911|10.839969995112966|
|    min|               83|                 1|              -2.0|
|    max|             2880|                 3|              48.0|
+-------+-----------------+------------------+------------------+



In [16]:
training_df, validation_df = loans_df.randomSplit([.8,.2])

In [17]:
training_df.select(["userId","LoanIdFormat","count"]).describe().show()

+-------+------------------+------------------+------------------+
|summary|            userId|      LoanIdFormat|             count|
+-------+------------------+------------------+------------------+
|  count|                82|                82|                82|
|   mean|1448.1829268292684|               2.5|20.341463414634145|
| stddev| 755.9479394193197|0.5499719409228702|10.744185349277656|
|    min|                83|                 1|               1.0|
|    max|              2880|                 3|              48.0|
+-------+------------------+------------------+------------------+



In [18]:
training_df.show()

+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+--------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+
|        Contact__c|Min_IT_Loan_ID__c|Opp_Number__c|                Id|         AccountID|Number_Of_Loans_Granted__c|Num_Of_Loans_Paid__c|  Purpose_of_Loan__c|Total_Repayments__c|Amount|Term_in_Weeks__c|Payment_Frequency__c|StageName|userId|loanId|count|LoanIdFormat|
+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+--------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+
|0030K00001M9CHaQAN|           801951|      4131426|0062x00000CjySyAAJ|0010K00001cOXFpQAO|                        36|                33.0|     Living Expenses|                  5|   200|          

In [19]:
lls = ALS(userCol="userId", itemCol="LoanIdFormat", ratingCol="count",coldStartStrategy="drop", nonnegative=True)
paramGrid = ParamGridBuilder().addGrid(lls.regParam,[0.1,0.01,0.18,0.17,0.19])\
                              .addGrid(lls.rank, [3,4,5,7,10,12,13,14])\
                              .addGrid(lls.maxIter,[10,15,18,22])\
                              .build()
evaluator = RegressionEvaluator(metricName="rmse", labelCol="count", predictionCol="prediction")
train_val_split = TrainValidationSplit(estimator = lls,
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluator)

In [20]:
# fit ALS model to training data
tvsModel = train_val_split.fit(loans_df)
# extract the best model from the tunning excercise using ParamGridBuilder
best_model  = tvsmodel.bestModel

Py4JJavaError: An error occurred while calling o1026.fit.
: org.apache.spark.SparkException: Job 41 cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1(DAGScheduler.scala:1248)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$cleanUpAfterSchedulerStop$1$adapted(DAGScheduler.scala:1246)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:1246)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:3075)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$stop$3(DAGScheduler.scala:2961)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2961)
	at org.apache.spark.SparkContext.$anonfun$stop$12(SparkContext.scala:2263)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1375)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:2216)
	at org.apache.spark.SparkContext.$anonfun$new$34(SparkContext.scala:686)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:214)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$2(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
	at org.apache.spark.util.SparkShutdownHookManager.$anonfun$runAll$1(ShutdownHookManager.scala:188)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2463)
	at org.apache.spark.rdd.RDD.count(RDD.scala:1293)
	at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:1090)
	at org.apache.spark.ml.recommendation.ALS.$anonfun$fit$1(ALS.scala:737)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:714)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python312\Lib\site-packages\py4j\clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python312\Lib\socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
ConnectionResetError: [WinError 10054] An existing connection was forcibly closed by the remote host

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python312\Lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\Lenovo\AppData\Local\Programs\Python\Python312\Lib\site-package

In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("ALSExample").getOrCreate()

# Assuming you have a DataFrame 'ratings' with columns: 'userId', 'movieId', 'rating'
# Replace 'ratings' with your actual DataFrame
ratings = ...

# Create an ALS instance
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")

# Define the parameter grid to search for the best parameters
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [10, 20, 30]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Define an evaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Create a CrossValidator
cross_validator = CrossValidator(estimator=als,
                               estimatorParamMaps=param_grid,
                               evaluator=evaluator,
                               numFolds=5)  # Specify the number of folds for cross-validation

# Run cross-validation and choose the best set of parameters
cv_model = cross_validator.fit(ratings)

# Get the best model from cross-validation
best_model = cv_model.bestModel

# Print the best parameters
print("Best Rank:", best_model.rank)
print("Best Max Iteration:", best_model._java_obj.parent().getMaxIter())
print("Best Regularization Parameter:", best_model._java_obj.parent().getRegParam())

# Perform predictions with the best model
predictions = best_model.transform(ratings)

# Evaluate the performance of the best model
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data:", rmse)


In [19]:
iterations = 10
regularization_parameter = 0.1
rank = 4
error = []
err = 0

In [20]:
als = ALS(maxIter = iterations,regParam = regularization_parameter,rank = rank, userCol="userId",itemCol="LoanIdFormat",ratingCol="count", coldStartStrategy="drop",nonnegative=True)
model = als.fit(training_df)
predictions = model.transform(validation_df)
new_predictions = predictions.filter(col('prediction') != np.nan)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="count", predictionCol="prediction")
rmse = evaluator.evaluate(new_predictions)
print("Root-mean-Square-error = "+ str(rmse))

Root-mean-Square-error = 3.1238672977695003


In [21]:
new_predictions.select(["Id","AccountID","Amount","Num_Of_Loans_Paid__c","Purpose_of_Loan__c","userId","count" ,"LoanIdFormat","prediction"]).show()

+------------------+------------------+------+--------------------+------------------+------+-----+------------+----------+
|                Id|         AccountID|Amount|Num_Of_Loans_Paid__c|Purpose_of_Loan__c|userId|count|LoanIdFormat|prediction|
+------------------+------------------+------+--------------------+------------------+------+-----+------------+----------+
|0062x00000D8G4aAAF|0010K00001jN1TCQA0|   200|                20.0|   Living Expenses|  1190| 13.0|           3| 13.973488|
|0062x00000DFB2xAAH|0010K00001jN1TCQA0|   200|                21.0|   Living Expenses|  1190| 15.0|           3| 13.973488|
|0062x00000DA1BDAA1|0012x000003E9qvAAC|   200|                26.0|  Medical Expenses|  1314| 23.0|           3| 25.950764|
|0062x00000CU0RqAAL|0012x000005SQ99AAG|   200|                23.0|  Vehicle Expenses|   826| 13.0|           3|  18.96402|
|0062x00000CiLOwAAN|0012x000005SQ99AAG|   200|                24.0|  Vehicle Expenses|   826| 15.0|           3|  18.96402|
|0062x00

In [22]:
new_predictions.show(10)

+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+----------+
|        Contact__c|Min_IT_Loan_ID__c|Opp_Number__c|                Id|         AccountID|Number_Of_Loans_Granted__c|Num_Of_Loans_Paid__c|Purpose_of_Loan__c|Total_Repayments__c|Amount|Term_in_Weeks__c|Payment_Frequency__c|StageName|userId|loanId|count|LoanIdFormat|prediction|
+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+----------+
|0030K00001UKD4RQAX|           845497|      4523195|0062x00000D8G4aAAF|0010K00001jN1TCQA0|                        27|                20.0|   Living Expenses|            

In [19]:
movieRecommends = model.recommendForAllItems(3)

In [28]:
movieRecommends.select("LoanIdFormat","recommendations.userId").show(10,False)

+------------+-----------------+
|LoanIdFormat|userId           |
+------------+-----------------+
|1           |[1635, 651, 1211]|
|2           |[997, 1401, 651] |
|3           |[709, 2085, 695] |
|4           |[1635, 651, 1211]|
|5           |[997, 1401, 651] |
|6           |[709, 2085, 695] |
|7           |[1635, 651, 1211]|
|8           |[997, 1401, 695] |
|9           |[709, 2085, 1543]|
|10          |[1635, 651, 1211]|
+------------+-----------------+
only showing top 10 rows



In [29]:
userRecommends = model.recommendForAllUsers(3)

In [35]:
userRecommends.schema

StructType([StructField('userId', IntegerType(), False), StructField('recommendations', ArrayType(StructType([StructField('LoanIdFormat', IntegerType(), True), StructField('rating', FloatType(), True)]), True), True)])

In [37]:
userRecommends.select("userId","recommendations.LoanIdFormat").show(10,False)

+------+---------------+
|userId|LoanIdFormat   |
+------+---------------+
|1     |[535, 553, 550]|
|3     |[628, 625, 622]|
|5     |[830, 827, 818]|
|6     |[358, 343, 364]|
|12    |[30, 29, 28]   |
|13    |[819, 801, 822]|
|16    |[819, 801, 822]|
|19    |[819, 801, 822]|
|20    |[628, 625, 622]|
|22    |[184, 169, 193]|
+------+---------------+
only showing top 10 rows



DataFrame[userId: int, recommendations: array<struct<LoanIdFormat:int,rating:float>>]

In [14]:
predictions.show(10)

+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+----------+
|        Contact__c|Min_IT_Loan_ID__c|Opp_Number__c|                Id|         AccountID|Number_Of_Loans_Granted__c|Num_Of_Loans_Paid__c|Purpose_of_Loan__c|Total_Repayments__c|Amount|Term_in_Weeks__c|Payment_Frequency__c|StageName|userId|loanId|count|LoanIdFormat|prediction|
+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+----------+
|0030K00001JVbSpQAL|           907654|      5059066|0062x00000DYsyxAAD|0010K00001ayVHPQA2|                        29|                24.0|    Other Expenses|            

In [113]:
def apk(actual, predicted, k=10):
    """
    Computes the average precision at k.
    This function computes the average prescision at k between two lists of
    items.
    Parameters
    ----------
    actual : list
             A list of elements that are to be predicted (order doesn't matter)
    predicted : list
                A list of predicted elements (order does matter)
    k : int, optional
        The maximum number of predicted elements
    Returns
    -------
    score : double
            The average precision at k over the input lists
    """
    if not actual:
        return 0.0

    # if len(predicted)>k:
    #     predicted = predicted[:k]

    score = 0.0
    num_hits = 0.0

    for i,p in enumerate(predicted):
        # first condition checks whether it is valid prediction
        # second condition checks if prediction is not repeated
        if p in actual and p not in predicted[:i]:
            num_hits += 1.0
            score += num_hits / (i+1.0)
            
    return score / min(len(actual), k)

def mapk(actual, predicted, k=10):
    """
    Computes the mean average precision at k.
    This function computes the mean average prescision at k between two lists
    of lists of items.
    Parameters
    ----------
    actual : list
             A list of lists of elements that are to be predicted 
             (order doesn't matter in the lists)
    predicted : list
                A list of lists of predicted elements
                (order matters in the lists)
    k : int, optional
        The maximum number of predicted elements
    Returns
    -------
    score : double
            The mean average precision at k over the input lists
    """
    return np.mean([apk(a,p,k) for a,p in zip(actual, predicted)])

In [112]:
count_list = new_predictions.select("count").toPandas()["count"].tolist()
prediction_list =  new_predictions.select("prediction").toPandas()["prediction"].tolist()

mapk(count_list,prediction_list,3 )

TypeError: 'float' object is not iterable

In [115]:
new_predictions.show(5)

+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+--------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+----------+
|        Contact__c|Min_IT_Loan_ID__c|Opp_Number__c|                Id|         AccountID|Number_Of_Loans_Granted__c|Num_Of_Loans_Paid__c|  Purpose_of_Loan__c|Total_Repayments__c|Amount|Term_in_Weeks__c|Payment_Frequency__c|StageName|userId|loanId|count|LoanIdFormat|prediction|
+------------------+-----------------+-------------+------------------+------------------+--------------------------+--------------------+--------------------+-------------------+------+----------------+--------------------+---------+------+------+-----+------------+----------+
|0030K00001sAYZzQAO|          1027125|      6560979|0062x00000Ew6DbAAJ|0010K000023zUJ8QAM|                        20|                18.0|Furniture or Appl...|    

In [131]:
# Function to calculate MAP@K
def calculate_map_at_k(predictions, ground_truth, k):
    # Inner join to get the predicted ratings for relevant items
    joined_df = predictions.join(ground_truth, on="userId")
    # Sort recommendations by predicted rating in descending order
    window_spec = Window().partitionBy("userId").orderBy(col("count").desc())
    sorted_predictions = joined_df.select(
        "userId",
        "loanId",
        "prediction",
        expr("ROW_NUMBER() OVER (PARTITION BY userId ORDER BY count DESC) AS rank")
    ).filter(col("rank") <= k)
    print(sorted_predictions.show(10))
    # Calculate precision at each position
    precision_at_k = sorted_predictions.withColumn(
        "relevant_at_k", expr("CASE WHEN loanId IN (loanId) THEN (prediction) ELSE 0 END")
    ).groupBy("userId").agg(expr(f"SUM(relevant_at_k) / {k} AS precision_at_k"))
    print(precision_at_k.show(3))
    # Calculate MAP@K
    map_at_k = precision_at_k.agg(expr("SUM(precision_at_k) / COUNT(userId) AS map_at_k")).collect()[0]["map_at_k"]
    print("map at k ",map_at_k)
    return map_at_k

In [132]:
map_at_3 = calculate_map_at_k(new_predictions.select("Id","AccountID","prediction","LoanIdFormat","userId"), loans_df, k=3)
print(f"MAP@3: {map_at_3}")

+------+------+----------+----+
|userId|loanId|prediction|rank|
+------+------+----------+----+
|     1|   757| 4.9824653|   1|
|     1|   757| 4.9746227|   2|
|     1|   757| 4.9824653|   3|
|     2|  2248| 6.9537015|   1|
|     2|  2248| 6.9573073|   2|
|     2|  2248| 6.9537015|   3|
|     3|  2521| 3.9583652|   1|
|     3|  2521| 3.9486866|   2|
|     3|  2521| 3.9583652|   3|
|     4|  4166| 24.773582|   1|
+------+------+----------+----+
only showing top 10 rows

None
+------+------------------+
|userId|    precision_at_k|
+------+------------------+
|     1| 4.979851086934407|
|     2| 6.961169083913167|
|     3|3.9515560468037925|
+------+------------------+
only showing top 3 rows

None
map at k  11.771399787381077
MAP@3: 11.771399787381077


In [65]:
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
 
def calculate_mapk(predictions, k):
    # Group by user
    user_recs = predictions.groupBy('userId').agg(F.collect_list('prediction').alias('predictions'))
    # Calculate average precision for each user
    user_avgs = user_recs.rdd.map(lambda row: (row[0], avg_precision(row[1], k))).collect()
    # Calculate MAP@K
    mapk = sum([user_avg for (_, user_avg) in user_avgs]) / len(user_avgs)
    return mapk
def avg_precision(predictions, k):
    # Sort predictions
    sorted_preds = sorted(predictions, reverse=True)
    # Initialize counters
    num_hits, sum_precs = 0, 0.0
    # Calculate average precision
    for i, pred in enumerate(sorted_preds):
        if pred == 1:
            num_hits += 1
            sum_precs += num_hits / (i+1)
    return sum_precs / min(len(sorted_preds), k)

In [32]:
userRecommends.schema

StructType([StructField('userId', IntegerType(), False), StructField('recommendations', ArrayType(StructType([StructField('movieId', IntegerType(), True), StructField('rating', FloatType(), True)]), True), True)])

In [35]:
userRecommends.select("userId","recommendations.movieID").show(10,False)

+------+--------------------------------------+
|userId|movieID                               |
+------+--------------------------------------+
|1     |[170355, 3379, 33649, 60943, 59018]   |
|2     |[3567, 5075, 141718, 72171, 60943]    |
|3     |[74754, 99764, 26865, 148881, 3837]   |
|4     |[25825, 3567, 127108, 7834, 7841]     |
|5     |[89904, 170355, 3379, 174053, 7815]   |
|6     |[82, 87234, 3925, 42730, 6732]        |
|7     |[25771, 8477, 148881, 74754, 87234]   |
|8     |[60943, 59018, 170355, 3379, 174053]  |
|9     |[112804, 60943, 59018, 106100, 174053]|
|10    |[112804, 90888, 82, 51931, 2316]      |
+------+--------------------------------------+
only showing top 10 rows

