In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar -xvf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

spark-2.4.5-bin-hadoop2.7/
spark-2.4.5-bin-hadoop2.7/licenses/
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-jtransforms.html
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-zstd-jni.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-xmlenc.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-vis.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-spire.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sorttable.js.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-slf4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scopt.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-scala.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-sbt-launch-lib.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-respond.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-reflectasm.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pyrolite.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-py4j.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-protobuf.txt
spark-2.4.5-bin-hadoop2.7/licenses/LICENSE-pmml-model

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
#Importing required libraries
import findspark
findspark.init()
import pyspark
import string
import re
import numpy
from pyspark.sql.functions import substring, length, expr, size, col, split, udf, UserDefinedFunction
from pyspark.sql.types import StringType, DoubleType, IntegerType
from pyspark.sql import functions as F
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.classification import LogisticRegression

In [0]:
from pyspark.sql import *

In [0]:
spark = SparkSession.builder.appName("Project3")\
        .config("spark.some.config.option","some-value")\
        .getOrCreate()

In [0]:
#loading train data
train_df = spark.read.load("train.csv",\
                           sep = ",",\
                           format = "csv",\
                           inferSchema = True,\
                           header = True,\
                           escape ='"')


In [7]:
train_df.printSchema()
train_df.show(5)

root
 |-- movie_id: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)

+--------+------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|
+--------+------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|  The Hunger Games|The nation of Pan...|['Action/Adventur...|
|20663735|        Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|
|  595909| A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|
+--------+------------------+--------------------+--------------------+
only showing top 5 rows



In [0]:
#loading test data
test_df = spark.read.load("test.csv",\
                          sep = ",",\
                          format = "csv",\
                          inferSchema = True,\
                          header = True,\
                          escape ='"')

In [9]:
test_df.printSchema()
test_df.show(5)

root
 |-- movie_id: integer (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)

+--------+--------------------+--------------------+
|movie_id|          movie_name|                plot|
+--------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|
|29062594|A la salida nos v...|A group of teenag...|
| 9252321|   Come Back, Africa|This story of a Z...|
|13455076|       A Merry Mixup|The Stooges play ...|
|24165951|        Getting Even|A soldier-of-fort...|
+--------+--------------------+--------------------+
only showing top 5 rows



In [10]:
#Mapping labels
genreMap = spark.read.load("mapping.csv",\
                           sep = ",",format="csv",\
                           inferSchema = True,\
                           header = True,\
                           escape ='"')
genreMap = genreMap.withColumnRenamed('_c0','label')
genreMap = genreMap.withColumnRenamed('0','genre')
genreMap.show()

genreList = genreMap.select("genre").rdd.flatMap(lambda x: x).collect()
print(genreList)

+-----+----------------+
|label|           genre|
+-----+----------------+
|    0|           Drama|
|    1|          Comedy|
|    2|    Romance Film|
|    3|        Thriller|
|    4|          Action|
|    5|    World cinema|
|    6|   Crime Fiction|
|    7|          Horror|
|    8| Black-and-white|
|    9|           Indie|
|   10|Action/Adventure|
|   11|       Adventure|
|   12|     Family Film|
|   13|      Short Film|
|   14|  Romantic drama|
|   15|       Animation|
|   16|         Musical|
|   17| Science Fiction|
|   18|         Mystery|
|   19| Romantic comedy|
+-----+----------------+

['Drama', 'Comedy', 'Romance Film', 'Thriller', 'Action', 'World cinema', 'Crime Fiction', 'Horror', 'Black-and-white', 'Indie', 'Action/Adventure', 'Adventure', 'Family Film', 'Short Film', 'Romantic drama', 'Animation', 'Musical', 'Science Fiction', 'Mystery', 'Romantic comedy']


In [12]:
#Creating an array from string of labels
train_df_1 = train_df.withColumn("genre_array", expr("substring(genre, 2, length(genre)-2)"))
train_df_1.show(5)

+--------+------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|         genre_array|
+--------+------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|'World cinema', '...|
|31186339|  The Hunger Games|The nation of Pan...|['Action/Adventur...|'Action/Adventure...|
|20663735|        Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|'Musical', 'Actio...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|            'Comedy'|
|  595909| A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|'Crime Fiction', ...|
+--------+------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [13]:
train_df_2 = train_df_1.withColumn("genre_array", split(col("genre_array"), ", ").alias("genre_array"))
train_df_2 = train_df_2.select("movie_id", "movie_name", "plot", "genre", "genre_array")
train_df_2.show(5)

+--------+------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|         genre_array|
+--------+------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|['World cinema', ...|
|31186339|  The Hunger Games|The nation of Pan...|['Action/Adventur...|['Action/Adventur...|
|20663735|        Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|['Musical', 'Acti...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|          ['Comedy']|
|  595909| A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|['Crime Fiction',...|
+--------+------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [14]:
#Eliminating genre null rows
train_df_3 = train_df_2.where(size(train_df_2["genre_array"]) > 0)
print("old count: ", train_df_2.count())
print("new count: ", train_df_3.count())

old count:  31111
new count:  31109


In [0]:
#One hot encoder string generator for genre
def onehotEncoder(s):
    retVal = ""
    temp = []
    s = s[1:-1].split(", ")
    for x in s:
        x = x[1:-1]
        temp.append(x)
    s = temp
    for genre in genreList:
        if genre in s:
            retVal = retVal + "1"
        else:
            retVal = retVal + "0"
    return retVal

In [15]:
#generating one hot encoded string from genre
from pyspark.sql.functions import udf
udf_test = udf(onehotEncoder, StringType())
train_df_4 = train_df_3.withColumn("genre_string", udf_test(train_df_3['genre']))
train_df_5 = train_df_4.select("movie_id", "movie_name", "plot", "genre_string", "genre_array", "genre")
train_df_5.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|        genre_string|         genre_array|               genre|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|10000100000000000000|['World cinema', ...|['World cinema', ...|
|31186339|  The Hunger Games|The nation of Pan...|10001000001000000100|['Action/Adventur...|['Action/Adventur...|
|20663735|        Narasimham|Poovalli Induchoo...|10001000000000001000|['Musical', 'Acti...|['Musical', 'Acti...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|01000000000000000000|          ['Comedy']|          ['Comedy']|
|  595909| A Cry in the Dark|Seventh-day Adven...|10000110000000000000|['Crime Fiction',...|['Crime Fiction',...|
+--------+------------------+--------------------+--------------------+-----------------

**Part 1**

In [16]:
#Building transformers
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="tokens", pattern="\\W")
tokenizedTrainData = regexTokenizer.transform(train_df_5)
tokenizedTrainData.show(5)

countVec = CountVectorizer(inputCol="tokens", outputCol="featureVectors")
trainDatafit = countVec.fit(tokenizedTrainData)
trainData = trainDatafit.transform(tokenizedTrainData)
trainData.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|        genre_string|         genre_array|               genre|              tokens|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|10000100000000000000|['World cinema', ...|['World cinema', ...|[shlykov, a, hard...|
|31186339|  The Hunger Games|The nation of Pan...|10001000001000000100|['Action/Adventur...|['Action/Adventur...|[the, nation, of,...|
|20663735|        Narasimham|Poovalli Induchoo...|10001000000000001000|['Musical', 'Acti...|['Musical', 'Acti...|[poovalli, induch...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|01000000000000000000|          ['Comedy']|          ['Comedy']|[the, lemon, drop...|
|  595909| A Cry in the Dark|Seventh-day Adven...|10000

In [17]:
tokenizedTestData = regexTokenizer.transform(test_df)
tokenizedTestData.show(5)

testData = trainDatafit.transform(tokenizedTestData)
testData.show(5)

+--------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|              tokens|
+--------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|
| 9252321|   Come Back, Africa|This story of a Z...|[this, story, of,...|
|13455076|       A Merry Mixup|The Stooges play ...|[the, stooges, pl...|
|24165951|        Getting Even|A soldier-of-fort...|[a, soldier, of, ...|
+--------+--------------------+--------------------+--------------------+
only showing top 5 rows

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|              tokens|      featureVectors|
+--------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              

In [18]:
pipelinedData = trainData.select("featureVectors", "genre_string")
pipelinedData.show(10)

+--------------------+--------------------+
|      featureVectors|        genre_string|
+--------------------+--------------------+
|(119515,[2,3,19,2...|10000100000000000000|
|(119515,[0,1,2,3,...|10001000001000000100|
|(119515,[0,1,2,3,...|10001000000000001000|
|(119515,[0,1,2,3,...|01000000000000000000|
|(119515,[0,1,2,3,...|10000110000000000000|
|(119515,[0,1,2,3,...|10011000001000000000|
|(119515,[0,1,2,3,...|10010001000000000000|
|(119515,[0,1,2,3,...|10000000000000000000|
|(119515,[0,1,2,3,...|11100000100000000001|
|(119515,[0,1,2,3,...|00000000000011010000|
+--------------------+--------------------+
only showing top 10 rows



In [0]:
#Generating columns for each genre with corresponding label from the one hot encoded string
split_col = pyspark.sql.functions.split(pipelinedData['genre_string'], '')
for i in range(len(genreList)):
  pipelinedData = pipelinedData.withColumn(genreList[i], split_col.getItem(i))
  pipelinedData = pipelinedData.withColumn(genreList[i], pipelinedData[genreList[i]].cast(DoubleType()))

In [20]:
pipelinedData.show(5)

+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|      featureVectors|        genre_string|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|(119515,[2,3,19,2...|10000100000000000000|  1.0|   0.0|         0.0|     0.0|   0.0|         1.0|          0.0|   0.0|            0.0|  0.0|             0.0|      0.0|        0.0|       0.0|           0.0

In [0]:
#Creating model for every genre
allModels = numpy.empty(len(genreList), dtype=object) 
for i in range(len(genreList)):
  lr = LogisticRegression(labelCol=genreList[i], featuresCol="featureVectors")
  allModels[i] = lr.fit(pipelinedData)

In [0]:
#Creating a new dataframe from predicted labels and movie id.
final_df = test_df.select(test_df["movie_id"])
for i in range(len(genreList)):
  pred = allModels[i].transform(testData)
  pred = pred.select(pred["movie_id"], pred["prediction"].alias(genreList[i]))
  final_df = final_df.join(pred, on=['movie_id'], how='left_outer')

In [44]:
final_df.show(10)

+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
| 1335380|  1.0|   0.0|         0.0|     0.0|   0.0|         0.0|          0.0|   0.0|            0.0|  0.0|             0.0|      0.0|        0.0|       0.0|           0.0|      0.0|    0.0|            0.0|    0.0|            0.0|
|29062594|  0.0|   0.0|         0.0|     0.0|   0.0|         0.0|       

In [0]:
formatted_output = spark.read.load("sample.csv",\
                                   sep = ",",\
                                   format = "csv",\
                                   inferSchema = True,\
                                   header = True,\
                                   escape = '"')

In [28]:
formatted_output.printSchema()
formatted_output.show(5)

root
 |-- movie_id: integer (nullable = true)
 |-- predictions: string (nullable = true)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
| 1335380|0 0 0 0 0 0 0 0 0...|
|29062594|0 0 0 0 0 0 0 0 0...|
| 9252321|0 0 0 0 0 0 0 0 0...|
|13455076|0 0 0 0 0 0 0 0 0...|
|24165951|0 0 0 0 0 0 0 0 0...|
+--------+--------------------+
only showing top 5 rows



In [0]:
for i in range(len(genreList)):
  final_df = final_df.withColumn(genreList[i],col(genreList[i]).cast(IntegerType()))

In [47]:
final_df.show(5)

+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
| 1335380|    1|     0|           0|       0|     0|           0|            0|     0|              0|    0|               0|        0|          0|         0|             0|        0|      0|              0|      0|              0|
|29062594|    0|     0|           0|       0|     0|           0|       

In [0]:
final_df_1 = final_df.withColumn('predictions', F.concat(F.col(genreList[0]),F.lit(' '), F.col(genreList[1])))
for i in range(2, len(genreList)):
  final_df_1 = final_df_1.withColumn('predictions', F.concat(F.col("predictions"),F.lit(' '), F.col(genreList[i])))

In [49]:
final_df_1.show(5)

+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
|movie_id|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|         predictions|
+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
| 1335380|    1|     0|           0|       0|     0|           0|            0|     0|              0|    0|               0|        0|          0|         0|             0|        0|      0|              0|      0|              0|1 0 0 0 0 

In [50]:
output_task1 = final_df_1.select("movie_id", "predictions")
output_task1.show()

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
| 1335380|1 0 0 0 0 0 0 0 0...|
|29062594|0 0 0 0 0 0 0 0 0...|
| 9252321|0 0 0 0 0 0 0 0 0...|
|13455076|0 1 0 0 0 0 0 0 0...|
|24165951|0 0 0 0 0 0 0 0 0...|
| 1925869|1 0 0 0 0 0 0 0 0...|
|10799612|1 1 0 0 0 0 0 0 0...|
|28238240|0 1 0 0 0 0 0 0 0...|
|17124781|0 0 0 0 1 0 0 0 0...|
|28207941|0 0 0 0 0 0 0 0 0...|
|19174305|0 0 1 0 0 0 0 0 0...|
|18392317|0 0 0 0 0 0 0 0 0...|
|34420857|0 0 0 0 0 1 0 0 0...|
| 4039635|0 0 0 0 0 0 0 0 0...|
| 8034072|1 0 0 0 0 1 0 0 0...|
| 4016437|1 0 0 0 0 0 0 0 0...|
| 1520023|1 0 0 0 0 1 0 0 0...|
|24589422|1 0 0 0 0 0 0 0 0...|
|35068740|0 0 1 0 0 0 0 0 0...|
|21132951|0 0 0 0 0 0 0 0 0...|
+--------+--------------------+
only showing top 20 rows



In [0]:
output_task1.write.csv('output_task1.csv')

**Part 2**

In [0]:
idf = IDF(minDocFreq=2, inputCol="featureVectors", outputCol="features_2")
idfModel = idf.fit(trainData)
tfidfTrain = idfModel.transform(trainData)
tfidfTest = idfModel.transform(testData)

In [53]:
pipelinedData = tfidfTrain.select("features_2", "genre_string")
pipelinedData.show(5)

+--------------------+--------------------+
|          features_2|        genre_string|
+--------------------+--------------------+
|(119515,[2,3,19,2...|10000100000000000000|
|(119515,[0,1,2,3,...|10001000001000000100|
|(119515,[0,1,2,3,...|10001000000000001000|
|(119515,[0,1,2,3,...|01000000000000000000|
|(119515,[0,1,2,3,...|10000110000000000000|
+--------------------+--------------------+
only showing top 5 rows



In [0]:
split_col = pyspark.sql.functions.split(pipelinedData['genre_string'], '')
for i in range(len(genreList)):
  pipelinedData = pipelinedData.withColumn(genreList[i], split_col.getItem(i))
  pipelinedData = pipelinedData.withColumn(genreList[i], pipelinedData[genreList[i]].cast(DoubleType()))

In [55]:
pipelinedData.show(5)

+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|          features_2|        genre_string|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|(119515,[2,3,19,2...|10000100000000000000|  1.0|   0.0|         0.0|     0.0|   0.0|         1.0|          0.0|   0.0|            0.0|  0.0|             0.0|      0.0|        0.0|       0.0|           0.0

In [0]:
allModels = numpy.empty(len(genreList), dtype=object) 
for i in range(len(genreList)):
  lr = LogisticRegression(labelCol=genreList[i], featuresCol="features_2")
  allModels[i] = lr.fit(pipelinedData)

In [0]:
final_df = test_df.select("movie_id")
for i in range(len(genreList)):
  pred = allModels[i].transform(tfidfTest)
  pred = pred.select("movie_id", pred["prediction"].alias(genreList[i]))
  final_df = final_df.join(pred, on=['movie_id'], how='left_outer')

In [0]:
for i in range(len(genreList)):
  final_df = final_df.withColumn(genreList[i],col(genreList[i]).cast(IntegerType()))

In [0]:
final_df = final_df.withColumn('predictions', F.concat(F.col(genreList[0]),F.lit(' '), F.col(genreList[1])))
for i in range(2, len(genreList)):
  final_df = final_df.withColumn('predictions', F.concat(F.col("predictions"),F.lit(' '), F.col(genreList[i])))

In [62]:
output_task2 = final_df.select("movie_id", "predictions")
output_task2.show(5)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
| 1335380|1 0 0 0 0 0 0 0 0...|
|29062594|0 0 0 0 0 0 0 0 0...|
| 9252321|1 0 0 0 0 0 0 0 0...|
|13455076|0 1 0 0 0 0 0 0 0...|
|24165951|0 0 0 0 0 0 0 0 0...|
+--------+--------------------+
only showing top 5 rows



In [0]:
output_task2.write.csv('output_task2.csv')

**Part 3**

In [65]:
tokenizedTrainData.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|        genre_string|         genre_array|               genre|              tokens|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|10000100000000000000|['World cinema', ...|['World cinema', ...|[shlykov, a, hard...|
|31186339|  The Hunger Games|The nation of Pan...|10001000001000000100|['Action/Adventur...|['Action/Adventur...|[the, nation, of,...|
|20663735|        Narasimham|Poovalli Induchoo...|10001000000000001000|['Musical', 'Acti...|['Musical', 'Acti...|[poovalli, induch...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|01000000000000000000|          ['Comedy']|          ['Comedy']|[the, lemon, drop...|
|  595909| A Cry in the Dark|Seventh-day Adven...|10000

In [66]:
tokenizedTestData.show(5)

+--------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|              tokens|
+--------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|
| 9252321|   Come Back, Africa|This story of a Z...|[this, story, of,...|
|13455076|       A Merry Mixup|The Stooges play ...|[the, stooges, pl...|
|24165951|        Getting Even|A soldier-of-fort...|[a, soldier, of, ...|
+--------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [67]:
from pyspark.ml.feature import StopWordsRemover
swr = StopWordsRemover(inputCol="tokens", outputCol="sw_removed")
swrTrainData = swr.transform(tokenizedTrainData)
swrTrainData.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|        genre_string|         genre_array|               genre|              tokens|          sw_removed|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|10000100000000000000|['World cinema', ...|['World cinema', ...|[shlykov, a, hard...|[shlykov, hard, w...|
|31186339|  The Hunger Games|The nation of Pan...|10001000001000000100|['Action/Adventur...|['Action/Adventur...|[the, nation, of,...|[nation, panem, c...|
|20663735|        Narasimham|Poovalli Induchoo...|10001000000000001000|['Musical', 'Acti...|['Musical', 'Acti...|[poovalli, induch...|[poovalli, induch...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|01000000000000

In [68]:
swrTestData = swr.transform(tokenizedTestData)
swrTestData.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|              tokens|          sw_removed|
+--------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|
| 9252321|   Come Back, Africa|This story of a Z...|[this, story, of,...|[story, zulu, fam...|
|13455076|       A Merry Mixup|The Stooges play ...|[the, stooges, pl...|[stooges, play, t...|
|24165951|        Getting Even|A soldier-of-fort...|[a, soldier, of, ...|[soldier, fortune...|
+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [71]:
hashingTF = HashingTF(inputCol="sw_removed", outputCol="hashfeatureVectors")
hashTrainData = hashingTF.transform(swrTrainData)
hashTrainData.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|        genre_string|         genre_array|               genre|              tokens|          sw_removed|  hashfeatureVectors|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|10000100000000000000|['World cinema', ...|['World cinema', ...|[shlykov, a, hard...|[shlykov, hard, w...|(262144,[2437,127...|
|31186339|  The Hunger Games|The nation of Pan...|10001000001000000100|['Action/Adventur...|['Action/Adventur...|[the, nation, of,...|[nation, panem, c...|(262144,[991,1739...|
|20663735|        Narasimham|Poovalli Induchoo...|10001000000000001000|['Musical', 'Acti...|['Musical', 'Acti...|[p

In [70]:
hashTestData = hashingTF.transform(swrTestData)
hashTestData.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|              tokens|          sw_removed|  hashfeatureVectors|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|(262144,[1728,261...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|(262144,[6068,191...|
| 9252321|   Come Back, Africa|This story of a Z...|[this, story, of,...|[story, zulu, fam...|(262144,[1598,208...|
|13455076|       A Merry Mixup|The Stooges play ...|[the, stooges, pl...|[stooges, play, t...|(262144,[3294,618...|
|24165951|        Getting Even|A soldier-of-fort...|[a, soldier, of, ...|[soldier, fortune...|(262144,[4098,644...|
+--------+--------------------+--------------------+--------------------

In [72]:
from pyspark.ml.feature import MaxAbsScaler

MinAbsScalerizer = MaxAbsScaler().setInputCol("hashfeatureVectors").setOutputCol("MinAbs_Scaled_features")
absTrain = MinAbsScalerizer.fit(hashTrainData).transform(hashTrainData)
absTrain.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+
|movie_id|        movie_name|                plot|        genre_string|         genre_array|               genre|              tokens|          sw_removed|  hashfeatureVectors|MinAbs_Scaled_features|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|10000100000000000000|['World cinema', ...|['World cinema', ...|[shlykov, a, hard...|[shlykov, hard, w...|(262144,[2437,127...|  (262144,[2437,127...|
|31186339|  The Hunger Games|The nation of Pan...|10001000001000000100|['Action/Adventur...|['Action/Adventur...|[the, nation, of,...|[nation, panem, c...|(262144,[991,1739...|  (262144,[991,1739...|


In [73]:
absTest = MinAbsScalerizer.fit(hashTestData).transform(hashTestData)
absTest.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+
|movie_id|          movie_name|                plot|              tokens|          sw_removed|  hashfeatureVectors|MinAbs_Scaled_features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|(262144,[1728,261...|  (262144,[1728,261...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|(262144,[6068,191...|  (262144,[6068,191...|
| 9252321|   Come Back, Africa|This story of a Z...|[this, story, of,...|[story, zulu, fam...|(262144,[1598,208...|  (262144,[1598,208...|
|13455076|       A Merry Mixup|The Stooges play ...|[the, stooges, pl...|[stooges, play, t...|(262144,[3294,618...|  (262144,[3294,618...|
|24165951|        Getting E

In [74]:
pipelinedData_hash = absTrain.select(absTrain["MinAbs_Scaled_features"], absTrain["genre_string"])
pipelinedData_hash.show(5)

+----------------------+--------------------+
|MinAbs_Scaled_features|        genre_string|
+----------------------+--------------------+
|  (262144,[2437,127...|10000100000000000000|
|  (262144,[991,1739...|10001000001000000100|
|  (262144,[119,571,...|10001000000000001000|
|  (262144,[619,1998...|01000000000000000000|
|  (262144,[1911,243...|10000110000000000000|
+----------------------+--------------------+
only showing top 5 rows



In [0]:
split_col = pyspark.sql.functions.split(pipelinedData_hash['genre_string'], '')
for i in range(len(genreList)):
  pipelinedData_hash = pipelinedData_hash.withColumn(genreList[i], split_col.getItem(i))
  pipelinedData_hash = pipelinedData_hash.withColumn(genreList[i], pipelinedData_hash[genreList[i]].cast(DoubleType()))

In [76]:
pipelinedData_hash.show(5)

+----------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|MinAbs_Scaled_features|        genre_string|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+----------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|  (262144,[2437,127...|10000100000000000000|  1.0|   0.0|         0.0|     0.0|   0.0|         1.0|          0.0|   0.0|            0.0|  0.0|             0.0|      0.0|        0.0|       0.0|      

In [0]:
allModels = numpy.empty(len(genreList), dtype=object) 
for i in range(len(genreList)):
  lr = LogisticRegression(labelCol=genreList[i], featuresCol="MinAbs_Scaled_features")
  allModels[i] = lr.fit(pipelinedData_hash)

In [0]:
final_df = test_df.select("movie_id")
for i in range(len(genreList)):
  pred = allModels[i].transform(absTest)
  pred = pred.select(pred["movie_id"], pred["prediction"].alias(genreList[i]))
  final_df = final_df.join(pred, on=['movie_id'], how='left_outer')

In [79]:
final_df.show(5)

+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
| 1335380|  1.0|   0.0|         0.0|     0.0|   0.0|         0.0|          0.0|   0.0|            0.0|  0.0|             1.0|      0.0|        0.0|       0.0|           0.0|      0.0|    0.0|            0.0|    0.0|            0.0|
|29062594|  0.0|   0.0|         0.0|     0.0|   0.0|         0.0|       

In [0]:
for i in range(len(genreList)):
  final_df = final_df.withColumn(genreList[i],col(genreList[i]).cast(IntegerType()))

In [0]:
final_df = final_df.withColumn('predictions', F.concat(F.col(genreList[0]),F.lit(' '), F.col(genreList[1])))
for i in range(2, len(genreList)):
  final_df = final_df.withColumn('predictions', F.concat(F.col("predictions"),F.lit(' '), F.col(genreList[i])))

In [83]:
output_task3 = final_df.select("movie_id", "predictions")
output_task3.show(5)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
| 1335380|1 0 0 0 0 0 0 0 0...|
|29062594|0 0 0 0 0 0 0 0 0...|
| 9252321|1 0 0 0 0 0 0 0 0...|
|13455076|0 1 0 0 0 0 0 0 0...|
|24165951|0 0 1 0 1 0 0 0 0...|
+--------+--------------------+
only showing top 5 rows



In [0]:
output_task3.write.csv('output_task3.csv')