In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder
import pyspark.sql.functions as F

In [2]:
import os

os.environ["PYSPARK_PYTHON"]="/home/emil/.conda/envs/python2/bin/python2.7"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/emil/.conda/envs/python2/bin/python2.7"

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("Word Count") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

sc = spark.sparkContext
data=sc.textFile("train.txt", use_unicode=False)

In [4]:
data = data.map(lambda x: [float(item) for item in x.split("\t")])
df = spark.createDataFrame(data, ["uid", "mid", "rat"])

In [6]:
(training, test) = df.randomSplit([0.8, 0.2])

In [8]:
als = ALS(userCol="uid", itemCol="mid", ratingCol="rat",
          coldStartStrategy="drop", nonnegative=True)

In [9]:
param_grid = (ParamGridBuilder()
              .addGrid(als.rank, [10,11,12,13])
              .addGrid(als.maxIter, [16,17,18,19])
              .addGrid(als.regParam, [.15,.16,.17,.18])
              .build())
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rat', predictionCol='prediction')
tvs = TrainValidationSplit(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator)

In [10]:
model = tvs.fit(training)

In [11]:
best_model = model.bestModel
predictions = best_model.transform(test)
rmse = evaluator.evaluate(predictions)

print("RMSE: " + str(rmse))

RMSE: 0.922835484741


In [12]:
print(best_model.rank)
print(best_model._java_obj.parent().getMaxIter())
print(best_model._java_obj.parent().getRegParam())


13
19
0.15


In [13]:
als1 = ALS(userCol="uid", itemCol="mid", ratingCol="rat",
           nonnegative=True,
          rank=13, maxIter=19, regParam=0.15)

In [14]:
model = als1.fit(df)

In [15]:
data_test=sc.textFile("test.txt", use_unicode=False)
data_test = data_test.map(lambda x: [float(item) for item in x.split("\t")])
df_test = spark.createDataFrame(data_test, ["uid", "mid"])

In [21]:
predictions = model.transform(df_test)
print(predictions.count())

column = "prediction"
predictions = predictions.withColumn(column,F.when(F.isnan(F.col(column)),1).otherwise(F.col(column)))

9430


In [22]:
predictions = predictions.sort(["uid", "mid"])

In [23]:
rdd = predictions.rdd.map(tuple)

with open("submission-3", "w") as f:
    f.write("Id,Score\n") 
    for idx, row in enumerate(rdd.collect()):
        f.write(str(idx+1)+","+str(row[2])+"\n")