In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import roc_curve, auc
from pyspark.sql.types import IntegerType, FloatType

spark = SparkSession.builder.appName('recommend-ML').getOrCreate()

In [31]:
#import test_2 ground truths
data_dir = "../data/proccessed/new.csv"
df = spark.read.csv(data_dir, header=True, inferSchema=True)
df.show(5)

+------+-------+-------+--------+-----------+------------+----------+---------+---------+---------+-----------------+------------------+------------+
|UserId|TrackId|AlbumId|ArtistId|AlbumRating|ArtistRating|TotalScore|Predictor|MinRating|MaxRating|       MeanRating|          Variance|MedianRating|
+------+-------+-------+--------+-----------+------------+----------+---------+---------+---------+-----------------+------------------+------------+
|200031|  30877| 192723|  132319|       90.0|        50.0|     140.0|        1|        0|      100|53.42664670658683|1195.6428228871598|        60.0|
|200031|   8244| 223220|  233697|       90.0|         0.0|      90.0|        1|        0|      100|55.75752773375594|1216.3168014948726|        70.0|
|200031| 130183|   None|    None|        0.0|         0.0|       0.0|        0|        0|       90|57.69230769230769| 1063.905325443787|        70.0|
|200031| 198762| 220103|  113265|        0.0|         0.0|       0.0|        0|        0|      100|6

In [32]:
df = df.withColumn("UserId", df["UserId"].cast(IntegerType())) \
    .withColumn("TrackId", df["TrackId"].cast(IntegerType())) \
    .withColumn("AlbumId", df["AlbumId"].cast(IntegerType())) \
    .withColumn("ArtistId", df["ArtistId"].cast(IntegerType())) \
    .withColumn("AlbumRating", df["AlbumRating"].cast(FloatType())) \
    .withColumn("ArtistRating", df["ArtistRating"].cast(FloatType())) \
    .withColumn("TotalScore", df["TotalScore"].cast(FloatType())) \
    .withColumn("Predictor", df["Predictor"].cast(IntegerType())) \
    .withColumn("MinRating", df["MinRating"].cast(FloatType())) \
    .withColumn("MaxRating", df["MaxRating"].cast(FloatType())) \
    .withColumn("MeanRating", df["MeanRating"].cast(FloatType())) \
    .withColumn("Variance", df["Variance"].cast(FloatType())) \
    .withColumn("MedianRating", df["MedianRating"].cast(FloatType()))


In [33]:
# Fill missing values with 0
df = df.fillna(0, subset=['AlbumId', 'TrackId', 'ArtistId'])

stages = []
numericCols = ['TrackId', 'AlbumId', 'ArtistId',
               "AlbumRating", "ArtistRating", "TotalScore", "MinRating", "MaxRating", "MeanRating", "Variance", "MedianRating"]
assemblerInputs = numericCols 
assembler = VectorAssembler(
    inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# apply
cols = df.columns
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(df)
df = pipelineModel.transform(df)
selectedCols = ['features'] + cols
df = df.select(selectedCols)
df.show()


+--------------------+------+-------+-------+--------+-----------+------------+----------+---------+---------+---------+----------+----------+------------+
|            features|UserId|TrackId|AlbumId|ArtistId|AlbumRating|ArtistRating|TotalScore|Predictor|MinRating|MaxRating|MeanRating|  Variance|MedianRating|
+--------------------+------+-------+-------+--------+-----------+------------+----------+---------+---------+---------+----------+----------+------------+
|[30877.0,192723.0...|200031|  30877| 192723|  132319|       90.0|        50.0|     140.0|        1|      0.0|    100.0| 53.426647| 1195.6428|        60.0|
|[8244.0,223220.0,...|200031|   8244| 223220|  233697|       90.0|         0.0|      90.0|        1|      0.0|    100.0| 55.757526| 1216.3168|        70.0|
|(11,[0,7,8,9,10],...|200031| 130183|      0|       0|        0.0|         0.0|       0.0|        0|      0.0|     90.0| 57.692307| 1063.9053|        70.0|
|[198762.0,220103....|200031| 198762| 220103|  113265|        0.

In [34]:
from pyspark.sql.functions import max, min

# Calculate the highest and lowest user ID
max_user_id = df.agg(max("UserId")).collect()[0][0]
min_user_id = df.agg(min("UserId")).collect()[0][0]

print("Highest User ID:", max_user_id)
print("Lowest User ID:", min_user_id)


Highest User ID: 212234
Lowest User ID: 200031


In [35]:
#train = df.where(col("UserId").between(202000, 212234))
#test = df.where(col("UserId").between(200031, 201999))
#print("Training Dataset Count: " + str(train.count()))
#print("Test Dataset Count: " + str(test.count()))

# below is the typical random split
# of the train and test data sets
# HOWEVER, our testing users have 6 tracks for each
# We cannot use random split here
train, test = df.randomSplit([0.7, 0.3], seed=2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))


Training Dataset Count: 4260
Test Dataset Count: 1740


In [36]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

lr = LogisticRegression(featuresCol='features', labelCol='Predictor')

paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

cv = CrossValidator(estimator=lr,
                    estimatorParamMaps=paramGrid,
                    evaluator=BinaryClassificationEvaluator(
                        labelCol='Predictor'),
                    numFolds=5)

cvModel = cv.fit(train)

predictions = cvModel.transform(test)
predictions.select('UserId', 'TrackId', 'Predictor', 'probability',
                   'rawPrediction', 'prediction').show(12)

evaluator = BinaryClassificationEvaluator(
    labelCol='Predictor', rawPredictionCol='rawPrediction', metricName='areaUnderROC')

lr_bestModel = cvModel.bestModel

print("Model's areaUnderROC: ", evaluator.evaluate(predictions))
print("Coefficients: " + str(cvModel.bestModel.coefficients))
print("Intercept: " + str(cvModel.bestModel.intercept))


+------+-------+---------+--------------------+--------------------+----------+
|UserId|TrackId|Predictor|         probability|       rawPrediction|prediction|
+------+-------+---------+--------------------+--------------------+----------+
|205246| 237713|        0|[0.91903502683263...|[2.42930760458673...|       0.0|
|200625|   5281|        0|[0.79099615862614...|[1.33094047982947...|       0.0|
|206067|   9408|        0|[0.79738898848394...|[1.37005468422200...|       0.0|
|210889|  11423|        0|[0.79758407381363...|[1.37126262980322...|       0.0|
|209063|  21778|        0|[0.83037296154063...|[1.58827281558701...|       0.0|
|210234|  27337|        1|[0.81637607600320...|[1.49198535155676...|       0.0|
|210746|  30548|        0|[0.81122267447125...|[1.45797443554659...|       0.0|
|207658|  46693|        0|[0.87956492352701...|[1.98831655685312...|       0.0|
|200166|  49989|        1|[0.84961383245739...|[1.73157549533631...|       0.0|
|207499|  51434|        0|[0.77958272195

In [37]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

dt = DecisionTreeClassifier(featuresCol='features', labelCol='Predictor')

paramGrid = ParamGridBuilder() \
    .addGrid(dt.maxDepth, [5]) \
    .addGrid(dt.minInstancesPerNode, [5]) \
    .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol='Predictor', predictionCol='prediction', metricName='accuracy')

crossval = CrossValidator(estimator=dt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(train)

predictions = cvModel.transform(test)
predictions.select('UserId', 'TrackId', 'Predictor', 'probability',
                   'rawPrediction', 'prediction').show(12)

dt_bestModel = cvModel.bestModel
print("Best model's maxDepth: ", dt_bestModel._java_obj.getMaxDepth())
print("Best model's minInstancesPerNode: ",
      dt_bestModel._java_obj.getMinInstancesPerNode())
print("Best model's accuracy: ", evaluator.evaluate(predictions))


+------+-------+---------+--------------------+--------------+----------+
|UserId|TrackId|Predictor|         probability| rawPrediction|prediction|
+------+-------+---------+--------------------+--------------+----------+
|205246| 237713|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|200625|   5281|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|206067|   9408|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|210889|  11423|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|209063|  21778|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|210234|  27337|        1|[0.85233644859813...|[1824.0,316.0]|       0.0|
|210746|  30548|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|207658|  46693|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|200166|  49989|        1|[0.85233644859813...|[1824.0,316.0]|       0.0|
|207499|  51434|        0|[0.85233644859813...|[1824.0,316.0]|       0.0|
|204905|  56166|        0|[0.852336448

In [38]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

rf = RandomForestClassifier(featuresCol='features', labelCol='Predictor')

paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [2,3,5]) \
    .addGrid(rf.maxDepth, [1,2,3]) \
    .addGrid(rf.impurity, ['gini', 'entropy']) \
    .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol='Predictor', predictionCol='prediction', metricName='accuracy')

crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(train)

predictions = cvModel.transform(test)
predictions.select('UserId', 'TrackId', 'Predictor', 'probability',
                   'rawPrediction', 'prediction').show(12)

rf_bestModel = cvModel.bestModel
print("Best model's numTrees: ", rf_bestModel._java_obj.getNumTrees())
print("Best model's maxDepth: ", rf_bestModel._java_obj.getMaxDepth())
print("Best model's impurity: ", rf_bestModel._java_obj.getImpurity())
print("Best model's accuracy: ", evaluator.evaluate(predictions))


+------+-------+---------+--------------------+--------------------+----------+
|UserId|TrackId|Predictor|         probability|       rawPrediction|prediction|
+------+-------+---------+--------------------+--------------------+----------+
|205246| 237713|        0|[0.84589579945532...|[4.22947899727661...|       0.0|
|200625|   5281|        0|[0.84589579945532...|[4.22947899727661...|       0.0|
|206067|   9408|        0|[0.84589579945532...|[4.22947899727661...|       0.0|
|210889|  11423|        0|[0.84589579945532...|[4.22947899727661...|       0.0|
|209063|  21778|        0|[0.84589579945532...|[4.22947899727661...|       0.0|
|210234|  27337|        1|[0.84589579945532...|[4.22947899727661...|       0.0|
|210746|  30548|        0|[0.84589579945532...|[4.22947899727661...|       0.0|
|207658|  46693|        0|[0.84589579945532...|[4.22947899727661...|       0.0|
|200166|  49989|        1|[0.84589579945532...|[4.22947899727661...|       0.0|
|207499|  51434|        0|[0.84589579945

In [39]:
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

gbt = GBTClassifier(featuresCol='features', labelCol='Predictor')

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxIter, [10]) \
    .addGrid(gbt.maxDepth, [5]) \
    .addGrid(gbt.stepSize, [0.1]) \
    .build()

evaluator = MulticlassClassificationEvaluator(
    labelCol='Predictor', predictionCol='prediction', metricName='accuracy')

crossval = CrossValidator(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=evaluator,
                          numFolds=5)

cvModel = crossval.fit(train)

predictions = cvModel.transform(test)
predictions.select('UserId', 'TrackId', 'Predictor', 'probability',
                   'rawPrediction', 'prediction').show(12)

gbt_bestModel = cvModel.bestModel
print("Best model's maxIter: ", gbt_bestModel._java_obj.getMaxIter())
print("Best model's maxDepth: ", gbt_bestModel._java_obj.getMaxDepth())
print("Best model's stepSize: ", gbt_bestModel._java_obj.getStepSize())
print("Best model's accuracy: ", evaluator.evaluate(predictions))


+------+-------+---------+--------------------+--------------------+----------+
|UserId|TrackId|Predictor|         probability|       rawPrediction|prediction|
+------+-------+---------+--------------------+--------------------+----------+
|205246| 237713|        0|[0.85401181151936...|[0.88320965344686...|       0.0|
|200625|   5281|        0|[0.88784061484277...|[1.03443564882892...|       0.0|
|206067|   9408|        0|[0.86202780132885...|[0.91611765815416...|       0.0|
|210889|  11423|        0|[0.71433146454814...|[0.45825745946521...|       0.0|
|209063|  21778|        0|[0.88784061484277...|[1.03443564882892...|       0.0|
|210234|  27337|        1|[0.79017076092421...|[0.66297751124812...|       0.0|
|210746|  30548|        0|[0.89428072877188...|[1.06761627193538...|       0.0|
|207658|  46693|        0|[0.85264043425887...|[0.87773115114081...|       0.0|
|200166|  49989|        1|[0.81931601363027...|[0.75586014438376...|       0.0|
|207499|  51434|        0|[0.88784061484

In [40]:
predictions_lr = lr_bestModel.transform(test)

predictions_dt = dt_bestModel.transform(test)

predictions_rf = rf_bestModel.transform(test)

predictions_gbt = gbt_bestModel.transform(test)

In [41]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

predictions_lr = predictions_lr.withColumnRenamed(
    "prediction", "lr_prediction")
predictions_dt = predictions_dt.withColumnRenamed(
    "prediction", "dt_prediction")
predictions_rf = predictions_rf.withColumnRenamed(
    "prediction", "rf_prediction")
predictions_gbt = predictions_gbt.withColumnRenamed(
    "prediction", "gbt_prediction")


combined_df = predictions_lr.select("UserId", "TrackId", "Predictor", "lr_prediction") \
    .join(predictions_dt.select("UserId", "TrackId", "dt_prediction"), ["UserId", "TrackId"]) \
    .join(predictions_rf.select("UserId", "TrackId", "rf_prediction"), ["UserId", "TrackId"]) \
    .join(predictions_gbt.select("UserId", "TrackId", "gbt_prediction"), ["UserId", "TrackId"])

combined_df.show(12)

+------+-------+---------+-------------+-------------+-------------+--------------+
|UserId|TrackId|Predictor|lr_prediction|dt_prediction|rf_prediction|gbt_prediction|
+------+-------+---------+-------------+-------------+-------------+--------------+
|205246| 237713|        0|          0.0|          0.0|          0.0|           0.0|
|200625|   5281|        0|          0.0|          0.0|          0.0|           0.0|
|206067|   9408|        0|          0.0|          0.0|          0.0|           0.0|
|210889|  11423|        0|          0.0|          0.0|          0.0|           0.0|
|209063|  21778|        0|          0.0|          0.0|          0.0|           0.0|
|210234|  27337|        1|          0.0|          0.0|          0.0|           0.0|
|210746|  30548|        0|          0.0|          0.0|          0.0|           0.0|
|207658|  46693|        0|          0.0|          0.0|          0.0|           0.0|
|200166|  49989|        1|          0.0|          0.0|          0.0|        

In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.evaluation import RegressionEvaluator
# Create a VectorAssembler which consumes columns lr_prediction, dt_prediction, rf_prediction and gbt_prediction and produces a new column "features"
assembler = VectorAssembler(inputCols=[
                            "lr_prediction", "dt_prediction", "rf_prediction", "gbt_prediction"], outputCol="features")

# Use the assembler to transform our DataFrame to the two-column format
df = assembler.transform(combined_df)

# Initialize LinearRegression model
lr = LinearRegression(featuresCol="features", labelCol="Predictor")

# Fit the model to the data
ensemble_lr = lr.fit(df)


print("Coefficients: " + str(ensemble_lr.coefficients))
print("Intercept: " + str(ensemble_lr.intercept))


df = ensemble_lr.transform(df)

evaluator = RegressionEvaluator(
    labelCol="Predictor", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

predictions = predictions.withColumn(
    "final_prediction", (col("prediction") > 0.5).cast("double"))

evaluator = MulticlassClassificationEvaluator(
    labelCol="Predictor", predictionCol="final_prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print("Accuracy = %g" % accuracy)

23/05/10 18:22:51 WARN Instrumentation: [735af279] regParam is zero, which might cause numerical instability and overfitting.


Coefficients: [0.3322246852290674,0.6629075455680206,0.14856100432541708,-0.4030910378694768]
Intercept: 0.18853145010668956
Root Mean Squared Error (RMSE) on test data = 0.381317
Accuracy = 0.854598


In [43]:
# import test_2 ground truths
data_dir = "../data/proccessed/test.csv"
test_df = spark.read.csv(data_dir, header=True, inferSchema=True)
test_df.show(5)

test_df = test_df.withColumn("UserId", test_df["UserId"].cast(IntegerType())) \
    .withColumn("TrackId", test_df["TrackId"].cast(IntegerType())) \
    .withColumn("AlbumId", test_df["AlbumId"].cast(IntegerType())) \
    .withColumn("ArtistId", test_df["ArtistId"].cast(IntegerType())) \
    .withColumn("AlbumRating", test_df["AlbumRating"].cast(FloatType())) \
    .withColumn("ArtistRating", test_df["ArtistRating"].cast(FloatType())) \
    .withColumn("TotalScore", test_df["TotalScore"].cast(FloatType())) \
    .withColumn("MinRating", test_df["MinRating"].cast(FloatType())) \
    .withColumn("MaxRating", test_df["MaxRating"].cast(FloatType())) \
    .withColumn("MeanRating", test_df["MeanRating"].cast(FloatType())) \
    .withColumn("Variance", test_df["Variance"].cast(FloatType())) \
    .withColumn("MedianRating", test_df["MedianRating"].cast(FloatType()))

+------+-------+-------+--------+-----------+------------+----------+---------+---------+------------------+------------------+------------+
|UserId|TrackId|AlbumId|ArtistId|AlbumRating|ArtistRating|TotalScore|MinRating|MaxRating|        MeanRating|          Variance|MedianRating|
+------+-------+-------+--------+-----------+------------+----------+---------+---------+------------------+------------------+------------+
|199810| 208019| 209288|    None|        0.0|         0.0|       0.0|        0|      100|49.766129032258064|1349.9533688865763|        50.0|
|199810|  74139| 277282|  271146|        0.0|         0.0|       0.0|       50|       90| 78.33333333333333|297.22222222222223|        90.0|
|199810|   9903|   None|    None|        0.0|         0.0|       0.0|        0|      100|52.858823529411765|1339.4977162629757|        50.0|
|199810| 242681| 190640|  244574|        0.0|         0.0|       0.0|        0|      100| 49.50834597875569|1692.7537239713458|        50.0|
|199810|  185

In [44]:
# Fill missing values with 0
test_df = test_df.fillna(0, subset=['AlbumId', 'TrackId', 'ArtistId'])

stages = []
numericCols = ['TrackId', 'AlbumId', 'ArtistId',
               "AlbumRating", "ArtistRating", "TotalScore", "MinRating", "MaxRating", "MeanRating", "Variance", "MedianRating"]
assemblerInputs = numericCols
assembler = VectorAssembler(
    inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

# apply
cols = test_df.columns
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(test_df)
test_df = pipelineModel.transform(test_df)
selectedCols = ['features'] + cols
test_df = test_df.select(selectedCols)
test_df.show()


+--------------------+------+-------+-------+--------+-----------+------------+----------+---------+---------+----------+---------+------------+
|            features|UserId|TrackId|AlbumId|ArtistId|AlbumRating|ArtistRating|TotalScore|MinRating|MaxRating|MeanRating| Variance|MedianRating|
+--------------------+------+-------+-------+--------+-----------+------------+----------+---------+---------+----------+---------+------------+
|(11,[0,1,7,8,9,10...|199810| 208019| 209288|       0|        0.0|         0.0|       0.0|      0.0|    100.0|  49.76613|1349.9534|        50.0|
|[74139.0,277282.0...|199810|  74139| 277282|  271146|        0.0|         0.0|       0.0|     50.0|     90.0| 78.333336|297.22223|        90.0|
|(11,[0,7,8,9,10],...|199810|   9903|      0|       0|        0.0|         0.0|       0.0|      0.0|    100.0|  52.85882|1339.4977|        50.0|
|[242681.0,190640....|199810| 242681| 190640|  244574|        0.0|         0.0|       0.0|      0.0|    100.0| 49.508347|1692.7538

In [45]:
predictions_lr = lr_bestModel.transform(test_df)

predictions_dt = dt_bestModel.transform(test_df)

predictions_rf = rf_bestModel.transform(test_df)

predictions_gbt = gbt_bestModel.transform(test_df)

In [46]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col, udf
from pyspark.sql.types import FloatType

predictions_lr = predictions_lr.withColumnRenamed(
    "prediction", "lr_prediction")
predictions_dt = predictions_dt.withColumnRenamed(
    "prediction", "dt_prediction")
predictions_rf = predictions_rf.withColumnRenamed(
    "prediction", "rf_prediction")
predictions_gbt = predictions_gbt.withColumnRenamed(
    "prediction", "gbt_prediction")


combined_df = predictions_lr.select("UserId", "TrackId", "lr_prediction") \
    .join(predictions_dt.select("UserId", "TrackId", "dt_prediction"), ["UserId", "TrackId"]) \
    .join(predictions_rf.select("UserId", "TrackId", "rf_prediction"), ["UserId", "TrackId"]) \
    .join(predictions_gbt.select("UserId", "TrackId", "gbt_prediction"), ["UserId", "TrackId"])

combined_df.show(12)


[Stage 8158:> (1 + 2) / 3][Stage 8159:> (1 + 2) / 3][Stage 8160:=>(2 + 1) / 3]

+------+-------+-------------+-------------+-------------+--------------+
|UserId|TrackId|lr_prediction|dt_prediction|rf_prediction|gbt_prediction|
+------+-------+-------------+-------------+-------------+--------------+
|199810| 208019|          0.0|          0.0|          0.0|           0.0|
|199810|  74139|          0.0|          0.0|          0.0|           0.0|
|199810|   9903|          0.0|          0.0|          0.0|           0.0|
|199810| 242681|          0.0|          0.0|          0.0|           0.0|
|199810|  18515|          1.0|          1.0|          1.0|           1.0|
|199810| 105760|          1.0|          1.0|          1.0|           1.0|
|199812| 276940|          0.0|          0.0|          0.0|           0.0|
|199812| 142408|          1.0|          1.0|          1.0|           1.0|
|199812| 130023|          1.0|          1.0|          1.0|           1.0|
|199812|  29189|          0.0|          0.0|          0.0|           0.0|
|199812| 223706|          1.0|        

                                                                                

In [47]:
assembler = VectorAssembler(inputCols=[
                            "lr_prediction", "dt_prediction", "rf_prediction", "gbt_prediction"], outputCol="features")

# Use the assembler to transform our DataFrame to the two-column format
df = assembler.transform(combined_df)
df = ensemble_lr.transform(df)

df.show()

+------+-------+-------------+-------------+-------------+--------------+-----------------+-------------------+
|UserId|TrackId|lr_prediction|dt_prediction|rf_prediction|gbt_prediction|         features|         prediction|
+------+-------+-------------+-------------+-------------+--------------+-----------------+-------------------+
|199810| 208019|          0.0|          0.0|          0.0|           0.0|        (4,[],[])|0.18853145010668956|
|199810|  74139|          0.0|          0.0|          0.0|           0.0|        (4,[],[])|0.18853145010668956|
|199810|   9903|          0.0|          0.0|          0.0|           0.0|        (4,[],[])|0.18853145010668956|
|199810| 242681|          0.0|          0.0|          0.0|           0.0|        (4,[],[])|0.18853145010668956|
|199810|  18515|          1.0|          1.0|          1.0|           1.0|[1.0,1.0,1.0,1.0]| 0.9291336473597179|
|199810| 105760|          1.0|          1.0|          1.0|           1.0|[1.0,1.0,1.0,1.0]| 0.9291336473

In [48]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, when

window = Window.partitionBy('UserId').orderBy(df['prediction'].desc())
df = df.withColumn('row_num', row_number().over(window))
df = df.withColumn('final_result', when(df['row_num'] <= 3, 1).otherwise(0))
df = df.drop('row_num')

df.show(6)

+------+-------+-------------+-------------+-------------+--------------+-----------------+-------------------+------------+
|UserId|TrackId|lr_prediction|dt_prediction|rf_prediction|gbt_prediction|         features|         prediction|final_result|
+------+-------+-------------+-------------+-------------+--------------+-----------------+-------------------+------------+
|199810|  18515|          1.0|          1.0|          1.0|           1.0|[1.0,1.0,1.0,1.0]| 0.9291336473597179|           1|
|199810| 105760|          1.0|          1.0|          1.0|           1.0|[1.0,1.0,1.0,1.0]| 0.9291336473597179|           1|
|199810| 208019|          0.0|          0.0|          0.0|           0.0|        (4,[],[])|0.18853145010668956|           1|
|199810|  74139|          0.0|          0.0|          0.0|           0.0|        (4,[],[])|0.18853145010668956|           0|
|199810|   9903|          0.0|          0.0|          0.0|           0.0|        (4,[],[])|0.18853145010668956|           0|


In [50]:
from pyspark.sql.functions import concat, col, lit

df = df.withColumn('UserId_TrackId',concat(col('UserId'), lit('_'), col('TrackId')))

df = df.select('TrackID', 'final_result')

df = df.withColumnRenamed('final_result', 'Predictor')

df.write.mode('overwrite').csv('submission.csv', header=True)
