In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import VectorAssembler, StringIndexer

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("recommender_system").getOrCreate()

In [4]:
df=spark.read.csv("movie_ratings_df.csv",inferSchema=True,header=True)

In [5]:
df.count()

100000

In [6]:
df.distinct().show()

+------+--------------------+------+
|userId|               title|rating|
+------+--------------------+------+
|   913|L.A. Confidential...|     4|
|   843|Dr. Strangelove o...|     3|
|    38| Men in Black (1997)|     1|
|   276| Men in Black (1997)|     4|
|   927|Romy and Michele'...|     3|
|   280|Batman Forever (1...|     3|
|   311|     Only You (1994)|     3|
|   741|     Only You (1994)|     3|
|   904|      Sabrina (1995)|     5|
|   534|      Twister (1996)|     4|
|   661|    Toy Story (1995)|     5|
|   536|    Toy Story (1995)|     5|
|   886| Broken Arrow (1996)|     1|
|   894|  Chasing Amy (1997)|     4|
|   643|Silence of the La...|     3|
|   712|Sleepless in Seat...|     4|
|   707|   Sting, The (1973)|     4|
|   527|  Rear Window (1954)|     4|
|   427|Fly Away Home (1996)|     4|
|   460|     Cop Land (1997)|     4|
+------+--------------------+------+
only showing top 20 rows



In [7]:
df.describe()

DataFrame[summary: string, userId: string, title: string, rating: string]

In [8]:
df.describe().show()

+-------+------------------+--------------------+------------------+
|summary|            userId|               title|            rating|
+-------+------------------+--------------------+------------------+
|  count|            100000|              100000|            100000|
|   mean|         462.48475|                null|           3.52986|
| stddev|266.61442012750865|                null|1.1256735991443156|
|    min|                 1|'Til There Was Yo...|                 1|
|    max|               943|� k�ldum klaka (C...|                 5|
+-------+------------------+--------------------+------------------+



In [9]:
string_indexer=StringIndexer(inputCol="title",outputCol="title_ind")
string_indexer_model = string_indexer.fit(df).transform(df)

In [10]:
from pyspark.sql.functions import col

In [11]:
df=df.withColumn("rating", col("rating").cast("float"))
df=df.withColumn("userId", col("userId").cast("float"))

In [12]:
df.describe()

DataFrame[summary: string, userId: string, title: string, rating: string]

In [13]:
assembler = VectorAssembler(
    inputCols=["userId","rating"],
    outputCol="features"
)

In [14]:
df1 = assembler.transform(df).select('features', 'rating')

In [15]:
df1.show(4, truncate=False)

+-----------+------+
|features   |rating|
+-----------+------+
|[196.0,3.0]|3.0   |
|[63.0,3.0] |3.0   |
|[226.0,5.0]|5.0   |
|[154.0,3.0]|3.0   |
+-----------+------+
only showing top 4 rows



In [16]:
trainTest = df.randomSplit([0.8, 0.2])
trainingDF = trainTest[0]
testDF = trainTest[1]
trainingDF = assembler.transform(trainingDF)
df = assembler.transform(df)

In [17]:
evaluated = MulticlassClassificationEvaluator(labelCol='rating', metricName='accuracy')

In [18]:
from pyspark.ml.classification import LogisticRegression

In [19]:
ridge = LogisticRegression(labelCol='rating', maxIter=100, elasticNetParam=0.8,regParam=0.3)
model = ridge.fit(trainingDF)
pred = model.transform(df)
evaluated.evaluate(pred)

0.34174

In [20]:
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier

In [21]:
lasso = LogisticRegression(labelCol='rating', maxIter=200,elasticNetParam=0.8,regParam=0.3)
model = lasso.fit(trainingDF)
pred = model.transform(df)
evaluated.evaluate(pred)

0.34174