# **Setup Pyspark on Colab**

In [0]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
# !tar xf spark-2.4.5-bin-hadoop2.7.tgz
# !pip install -q findspark

In [0]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# **Importing modules**

In [0]:
import pandas as pd
import findspark
#findspark.init('/content/spark-2.4.5-bin-hadoop2.7')
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
import pyspark
from pyspark import *
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
from pyspark.sql.functions import (col, max as max_, struct, monotonically_increasing_id,concat_ws,udf,lower, regexp_replace)
from pyspark.ml.feature import StringIndexer, VectorIndexer, Tokenizer, StopWordsRemover,CountVectorizer,Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF as MLHashingTF
from pyspark.ml.feature import IDF as MLIDF 
from functools import reduce

In [0]:
spark=SparkSession\
  .builder\
  .appName("Project3")\
  .config("spark.some.config.option", "some-value")\
  .getOrCreate()
  
sqlContext=SQLContext(spark)

# **Importing Data**

In [0]:
# train_data=pd.read_csv("/content/train.csv")
# mapper_data=pd.read_csv("/content/mapping.csv")
# test_data=pd.read_csv("/content/test.csv")

train_data=pd.read_csv("/home/cse587/Downloads/assignment3/train.csv")
mapper_data=pd.read_csv("/home/cse587/Downloads/assignment3/mapping.csv")
test_data=pd.read_csv("/home/cse587/Downloads/assignment3/test.csv")

In [0]:
movie_train=spark.createDataFrame(train_data,schema=None,verifySchema=True)
movie_map=spark.createDataFrame(mapper_data)
movie_test=spark.createDataFrame(test_data)

In [20]:
movie_train.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)



# **Mapping genres into a list**

In [0]:
# create list of all genres in train
genre_list=[]
for i in movie_train.select('genre').collect():
    genre_list.append(i['genre'])

# create list of distinct genres in map
map_list=[]
for i in movie_map.select('0').collect():
    map_list.append(i['0'])

# mapping 0's and 1's to genres 
final=[]
for i in range(len(genre_list)):
    final.append([])
    temp=genre_list[i].split(',')
    for j in range(len(temp)-1):
        final[i].append(temp[j][2:-1])
    final[i].append(temp[-1][2:-2])

pred_list = []
for i in range(len(final)):
    pred_list.append([])
    for j in range(len(map_list)):
        if map_list[j] in final[i]:
            pred_list[i].append(1)
        else:
            pred_list[i].append(0)

In [0]:
names=[]
for i in map_list:
  string=str(i)
  names.append(string)

# **Separating Mappings to 20 different Columns**

In [0]:
genres=[]
for i in map_list:
 genres.append(StructField(i, IntegerType()))
cSchema = StructType(genres)
test_list = pred_list
temp = spark.createDataFrame(test_list,schema=cSchema) 

In [10]:
movie_train = movie_train.withColumn("idx", F.monotonically_increasing_id())
genre_mapped = temp.withColumn("idx", F.monotonically_increasing_id())
windowSpec = W.orderBy("idx")
movie_train  = movie_train .withColumn("idx", F.row_number().over(windowSpec))
genre_mapped = genre_mapped.withColumn("idx", F.row_number().over(windowSpec))
train_df = movie_train.join(genre_mapped, movie_train.idx == genre_mapped.idx).drop("idx")
train_df.show()

+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|          movie_name|                plot|               genre|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|    1|     0|           0|       0|     0|

# **Data Cleaning : Tokenizing and StopWordRemoval**

In [13]:
df_clean = train_df.select('movie_id','plot')
df_clean = df_clean.select('movie_id',(lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')))

# Tokenize text
tokenizer = Tokenizer(inputCol='plot', outputCol='tokens')
df_clean = tokenizer.transform(df_clean).select('movie_id','tokens')

# Remove stop words
remover = StopWordsRemover(inputCol='tokens', outputCol='pro_plot')
train_cleaned = remover.transform(df_clean).select('movie_id','pro_plot')

train_cleaned.show()

+--------+--------------------+
|movie_id|            pro_plot|
+--------+--------------------+
|23890098|[shlykov, hardwor...|
|31186339|[nation, panem, c...|
|20663735|[poovalli, induch...|
| 2231378|[lemon, drop, kid...|
|  595909|[seventhday, adve...|
| 5272176|[president, way, ...|
| 1952976|[plot, film, open...|
|24225279|[story, begins, h...|
| 2462689|[infuriated, told...|
|20532852|[line, people, , ...|
|15401493|[lola, , attempts...|
|18188932|[milan, goran, tw...|
| 2940516|[bumbling, pirate...|
| 1480747|[plot, following,...|
|24448645|[despite, lucys, ...|
|15072401|[alan, colby, hei...|
| 4018288|[debbies, favorit...|
| 4596602|[ashes, ashes, se...|
|15224586|[film, follows, e...|
|15585766|[three, friends, ...|
+--------+--------------------+
only showing top 20 rows



In [14]:
test_df = movie_test.select('movie_id','plot')
df_clean = test_df.select('movie_id',(lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')))

# Tokenize text
tokenizer = Tokenizer(inputCol='plot', outputCol='tokens')
df_clean = tokenizer.transform(df_clean).select('movie_id','tokens')

# Remove stop words
remover = StopWordsRemover(inputCol='tokens', outputCol='pro_plot')
test_cleaned = remover.transform(df_clean).select('movie_id','pro_plot')

test_cleaned.show()

+--------+--------------------+
|movie_id|            pro_plot|
+--------+--------------------+
| 1335380|[film, based, eve...|
|29062594|[group, teenagers...|
| 9252321|[story, zulu, fam...|
|13455076|[stooges, play, t...|
|24165951|[soldieroffortune...|
| 1925869|[set, northwester...|
|10799612|[like, many, moni...|
|28238240|[mickey, scorpion...|
|17124781|[desert, wilderne...|
|28207941|[bimbo, koko, sig...|
|19174305|[tahaan, , lives,...|
|18392317|[betty, startled,...|
|34420857|[nirmal, karthik,...|
| 4039635|[group, journalis...|
| 8034072|[vaibhavari, saha...|
| 4016437|[, movies, narrat...|
| 1520023|[ninja, resurrect...|
|24589422|[spring, , ivan, ...|
|35068740|[muthu, prabhu, ,...|
|21132951|[vishwanathan, , ...|
+--------+--------------------+
only showing top 20 rows



# **Part-1**

# **Term-Document Matrix**

In [37]:
# On train data
cv = CountVectorizer(inputCol="pro_plot", outputCol="features1", vocabSize=1000, minDF=30)
model = cv.fit(train_cleaned)
tdm = model.transform(train_cleaned)
tdm_df = train_df.join(tdm.select('movie_id','features1'), on=['movie_id'])
tdm_df.show()

+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
|movie_id|          movie_name|                plot|               genre|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|           features1|
+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
|     330|             Actrius|In order to prepa...|

In [38]:
# On test data
cv = CountVectorizer(inputCol="pro_plot", outputCol="features1", vocabSize=1000, minDF=30)
model = cv.fit(test_cleaned)
tdm = model.transform(test_cleaned)
tdm_df_test = tdm.join(movie_test.select('movie_id'), on=['movie_id'], how='inner').drop('pro_plot')
tdm_df_test.show()

+--------+--------------------+
|movie_id|           features1|
+--------+--------------------+
|   62693|(1000,[0,2,4,5,11...|
|  296252|(1000,[0,1,2,3,4,...|
| 1356971|(1000,[0,2,8,9,16...|
| 1428872|(1000,[0,1,2,3,5,...|
| 1582173|(1000,[0,13,15,21...|
| 1595142|(1000,[0,1,2,3,4,...|
| 1600825|(1000,[0,1,2,3,5,...|
| 1681132|(1000,[0,1,2,3,4,...|
| 2268290|(1000,[0,1,2,13,1...|
| 3327607|(1000,[0,1,5,10,1...|
| 3569957|(1000,[0,1,2,3,4,...|
| 4413498|(1000,[0,2,4,7,11...|
| 4635580|(1000,[0,1,6,10,1...|
| 4950989|(1000,[0,1,2,17,1...|
| 5565692|(1000,[0,2,10,11,...|
| 7003785|(1000,[0,7,9,16,2...|
| 8269287|(1000,[0,1,2,3,4,...|
| 9560197|(1000,[0,2,5,8,13...|
|10731337|(1000,[0,25,33,71...|
|11192132|(1000,[4,15,16,21...|
+--------+--------------------+
only showing top 20 rows



# **Training and Testing Model by Logistic Regression with Term Document Matrix**

**Applying 20 Binary Classfiers**

In [0]:
predlist_1=[]
for i in range(len(map_list)):
  lr = LogisticRegression(featuresCol = 'features1', labelCol = names[i], maxIter=10,regParam=0.1)
  lrModel = lr.fit(tdm_df)

  predictions = lrModel.transform(tdm_df_test)
  predictions=predictions.withColumn('prediction',F.col("prediction").cast(IntegerType()))
  predlist_1.append(predictions.select('movie_id','prediction'))

# **Combining Predictions from 20 Binary Classifiers**

In [0]:
from functools import reduce
dfs_new=[df.selectExpr('movie_id',f'prediction as prediction_{i}') for i, df in enumerate(predlist_1)]
dfs_temp=reduce(lambda x,y:x.join(y,['movie_id'],how='full'),dfs_new)
columns = ['prediction_%d' %i for i in range(len(predlist_1))]
dfs_temp =dfs_temp.withColumn('predictions',concat_ws(" ",*columns)).drop(*columns).toPandas().to_csv('sample1.csv',index=False,header=True)

In [43]:
out1 = pd.read_csv('sample1.csv')
out1

Unnamed: 0,movie_id,predictions
0,62693,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
1,296252,1 1 1 1 0 1 1 0 0 1 0 0 0 0 1 0 0 0 0 1
2,1356971,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
3,1428872,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
4,1582173,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
...,...,...
7772,32032279,0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7773,33509716,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7774,33645448,0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7775,34195696,0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0


# **Part-2**

# **TF-IDF**

In [44]:
# On train data
tfidf=train_cleaned.withColumn("pro_plot", concat_ws(",", "pro_plot"))
tfidf = (tfidf
  .rdd
  .map(lambda x : (x.movie_id,x.pro_plot.split(",")))
  .toDF()
  .withColumnRenamed("_1","movie_id")
  .withColumnRenamed("_2","features2"))

htf = MLHashingTF(inputCol="features2", outputCol="tf")
tf = htf.transform(tfidf)
idf = MLIDF(inputCol="tf", outputCol="idf")
tfidf_df = idf.fit(tf).transform(tf)
idf_df = train_df.join(tfidf_df.select('movie_id','idf'), on=['movie_id'])
idf_df.show()

+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
|movie_id|          movie_name|                plot|               genre|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|                 idf|
+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
|   75264|The Night of the ...|The film is set i...|

In [45]:
# On test data
tfidf=test_cleaned.withColumn("pro_plot", concat_ws(",", "pro_plot"))
tfidf = (tfidf
  .rdd
  .map(lambda x : (x.movie_id,x.pro_plot.split(",")))
  .toDF()
  .withColumnRenamed("_1","movie_id")
  .withColumnRenamed("_2","features2"))

htf = MLHashingTF(inputCol="features2", outputCol="tf")
tf = htf.transform(tfidf)
idf = MLIDF(inputCol="tf", outputCol="idf")
tfidf_df = idf.fit(tf).transform(tf)
idf_df_test = tfidf_df.join(movie_test.select('movie_id'), on=['movie_id']).drop('pro_plot','tf','features2')
idf_df_test.show()

+--------+--------------------+
|movie_id|                 idf|
+--------+--------------------+
|   62693|(262144,[1836,306...|
|  296252|(262144,[170,571,...|
| 1356971|(262144,[511,9129...|
| 1428872|(262144,[86,5595,...|
| 1582173|(262144,[1446,421...|
| 1595142|(262144,[14,469,1...|
| 1600825|(262144,[460,1836...|
| 1681132|(262144,[991,1998...|
| 2268290|(262144,[211,2910...|
| 3327607|(262144,[5083,538...|
| 3569957|(262144,[119,1076...|
| 4413498|(262144,[3067,530...|
| 4635580|(262144,[1998,392...|
| 4950989|(262144,[535,1769...|
| 5565692|(262144,[6446,141...|
| 7003785|(262144,[2410,306...|
| 8269287|(262144,[571,1613...|
| 9560197|(262144,[329,1652...|
|10731337|(262144,[1322,329...|
|11192132|(262144,[5595,606...|
+--------+--------------------+
only showing top 20 rows



# **Training and Testing Model by Logistic Regression with TF_IDF**

In [0]:
predlist_2=[]
for i in range(len(map_list)):
  lr = LogisticRegression(featuresCol = 'idf', labelCol = names[i], maxIter=10)
  lrModel = lr.fit(idf_df)

  predictions = lrModel.transform(idf_df_test)
  predictions=predictions.withColumn('prediction',F.col("prediction").cast(IntegerType()))
  predlist_2.append(predictions.select('movie_id','prediction'))

In [0]:
from functools import reduce
dfs_new=[df.selectExpr('movie_id',f'prediction as prediction_{i}') for i, df in enumerate(predlist_2)]
dfs_temp=reduce(lambda x,y:x.join(y,['movie_id'],how='full'),dfs_new)
columns = ['prediction_%d' %i for i in range(len(predlist_2))]
dfs_temp =dfs_temp.withColumn('predictions',concat_ws(" ",*columns)).drop(*columns).toPandas().to_csv('sample2.csv',index=False,header=True)

In [56]:
out2 = pd.read_csv('sample2.csv')
out2

Unnamed: 0,movie_id,predictions
0,62693,1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0
1,296252,1 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0
2,1356971,1 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
3,1428872,0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
4,1582173,0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0
...,...,...
7772,32032279,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7773,33509716,1 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0
7774,33645448,0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0
7775,34195696,0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0


# **Part-3**

# **Word2Vec**

In [15]:
# On train data
word2vec = Word2Vec(vectorSize = 300, minCount = 10, inputCol = 'pro_plot', outputCol = 'features3')
model = word2vec.fit(train_cleaned)
wtov = model.transform(train_cleaned)
wtov_df = train_df.join(wtov.select('movie_id','features3'), on=['movie_id'])
wtov_df.show()

+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
|movie_id|          movie_name|                plot|               genre|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|           features3|
+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+--------------------+
|     330|             Actrius|In order to prepa...|

In [17]:
# On test data
word2vec = Word2Vec(vectorSize = 300, minCount = 10, inputCol = 'pro_plot', outputCol = 'features3')
model = word2vec.fit(test_cleaned)
wtov = model.transform(test_cleaned)
wtov_df_test = wtov.join(movie_test.select('movie_id'), on=['movie_id']).drop('pro_plot')
wtov_df_test.show()

+--------+--------------------+
|movie_id|           features3|
+--------+--------------------+
|   62693|[-0.0079939511846...|
|  296252|[-1.4927848791384...|
| 1356971|[-0.0261648152067...|
| 1428872|[-0.0159300268285...|
| 1582173|[-7.2602411646152...|
| 1595142|[-0.0176014661049...|
| 1600825|[-0.0030873266397...|
| 1681132|[0.00468130292569...|
| 2268290|[-0.0055558304249...|
| 3327607|[-0.0145524695582...|
| 3569957|[-0.0177347661248...|
| 4413498|[-3.1717733524620...|
| 4635580|[-0.0294743645623...|
| 4950989|[-0.0231792936837...|
| 5565692|[-0.0025541719253...|
| 7003785|[0.00933833959861...|
| 8269287|[-0.0056277224013...|
| 9560197|[-0.0035379482192...|
|10731337|[-0.0152597736673...|
|11192132|[-0.0121372614042...|
+--------+--------------------+
only showing top 20 rows



# **Training and Testing Model by Logistic Regression with word2vec**

In [0]:
predlist_3=[]
for i in range(len(map_list)):
  lr = LogisticRegression(featuresCol = 'features3', labelCol = names[i], maxIter=20,regParam=0.1)
  lrModel = lr.fit(wtov_df)

  predictions = lrModel.transform(wtov_df_test)
  predictions=predictions.withColumn('prediction',F.col("prediction").cast(IntegerType()))
  predlist_3.append(predictions.select('movie_id','prediction'))

In [0]:
from functools import reduce
dfs_new=[df.selectExpr('movie_id',f'prediction as prediction_{i}') for i, df in enumerate(predlist_3)]
dfs_temp=reduce(lambda x,y:x.join(y,['movie_id'],how='full'),dfs_new)
columns = ['prediction_%d' %i for i in range(len(predlist_3))]
dfs_temp =dfs_temp.withColumn('predictions',concat_ws(" ",*columns)).drop(*columns).toPandas().to_csv('sample3.csv',index=False,header=True)

In [28]:
out3 = pd.read_csv('sample3.csv')
out3

Unnamed: 0,movie_id,predictions
0,62693,0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
1,296252,0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
2,1356971,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
3,1428872,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
4,1582173,0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
...,...,...
7772,32032279,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7773,33509716,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7774,33645448,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
7775,34195696,1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
