In [1]:
# Useful starting lines
%matplotlib inline

import numpy as np
import scipy
import scipy.io
import scipy.sparse as sp
import matplotlib.pyplot as plt
%load_ext autoreload
%autoreload 2

In [1]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = sc.textFile("data/sample_movielens_ratings.txt")
parts = lines.map(lambda l: l.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=long(p[3])))
ratings = sqlContext.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
rawPredictions = model.transform(test)
predictions = rawPredictions\
    .withColumn("rating", rawPredictions.rating.cast("double"))\
    .withColumn("prediction", rawPredictions.prediction.cast("double"))
evaluator =\
    RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

NameError: name 'sc' is not defined

In [None]:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs=True,
          userCol="userId", itemCol="movieId", ratingCol="rating")

In [202]:
from pyspark.sql import SparkSession

# $example on$
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
# $example off$

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .getOrCreate()

    # $example on$
    lines = spark.read.text("data/wtf.txt").rdd
    parts = lines.map(lambda row: row.value.split("::"))


In [165]:
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [187]:
    # Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

IllegalArgumentException: 'requirement failed: Column userId must be of type NumericType but was actually of type StringType.'

In [None]:
    # Evaluate the model by computing the RMSE on the test data
    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse))
    # $example off$
spark.stop()

### My example - Arrange csv file

In [215]:
import fileinput

# DON'T FORGET TO REMOVE HEADER Id,Prediction
with fileinput.FileInput("data/data_train_for_fun.csv", inplace=True, backup='.txt') as file:
    for line in file:
        print(line.replace("_", ","), end='')
        
with fileinput.FileInput("data/data_train_for_fun.csv", inplace=True, backup='.txt') as file:
    for line in file:
        print(line.replace("r", ""), end='')
        
with fileinput.FileInput("data/data_train_for_fun.csv", inplace=True, backup='.txt') as file:
    for line in file:
        print(line.replace("c", ""), end='')

### My example - Use pyspark ALS

In [36]:
from pyspark.sql import Row 
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .getOrCreate()

lines = spark.read.text("data/data_train_for_fun.csv").rdd #sc.textFile("data/try.csv")

In [14]:
parts = lines.map(lambda row: row.value.split(",")) # (lambda l: l.split(",")) 
ratingsRDD = parts.map(lambda p: Row(movieId=int(p[0]), userId=int(p[1]), 
                                     rating=float(p[2])))

In [15]:
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

In [16]:
from pyspark.ml.recommendation import ALS
# Build the recommendation model using ALS on the training data
als = ALS(rank=20, maxIter=10, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")  
model = als.fit(training)

In [17]:
predictions = model.transform(training)

In [6]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")

In [7]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))
# $example off$
spark.stop()

Root-mean-square error = 0.7966449638529294


In [70]:
predictions

DataFrame[movieId: bigint, rating: double, userId: bigint, prediction: float]

In [28]:
predictions.count()

941689

### My example - Prepare file for submission

In [72]:
# save DataFrame as csv
predictions.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("data/myfile.csv") # predictions.write.csv('data/pred_als_spark.csv')

AnalysisException: 'path file:/home/vincent/Documents/machine_learning/project2/data/myfile.csv already exists.;'

### My example - Prepare file for submission

In [71]:
# convert to rdd in order
predictionsRDD = predictions.rdd
# remove the column of previous raitngs
submissionRDD = predictionsRDD.map(lambda row: Row(movieId=int(row[0]), userId=int(row[2]), 
                                     pred=float(row[3])))

In [72]:
# save DataFrame as csv
predictions.coalesce(1).write.format("com.databricks.spark.csv").option("header", "true").save("data/myfile.csv") # predictions.write.csv('data/pred_als_spark.csv')

AnalysisException: 'path file:/home/vincent/Documents/machine_learning/project2/data/myfile.csv already exists.;'

### My example - prepare for submission: reload submission file from prediction done in rdd format

In [67]:
# REMOVE HEADER BEFORE
# load data from csv file into DataFrame type in order to use pyspark
def load_sampleSub_data(path_dataset): # data/sampleSubmission.csv
    """Load data in text format, one rating per line, as in the kaggle competition."""
    lines = spark.read.text(path_dataset).rdd
    parts = lines.map(lambda row: row.value.split(",")) 
    # rduce previous RDD to take only userId, movieId and predictions
    ratingsRDD = parts.map(lambda p: Row(movieId=int(p[0]), userId=int(p[2]), 
                                     pred=round(float(p[3]))))
    ratings = spark.createDataFrame(ratingsRDD)
    return ratings

In [68]:
#from helpers_als import load_sampleSub_data
pred_DF = load_sampleSub_data("data/myfile.csv")

In [None]:
# fro removingheader, doesn't work
def f(idx, iter_): 
    if (idx == 0): 
        iter_.drop(1) 
    else: 
        iter_

parts_2 = parts.mapPartitionsWithIndex(f)