#### IMPORT SPARK

In [1]:
import findspark

In [2]:
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')

In [3]:
import pyspark
from pyspark.sql import SparkSession
import csv

In [4]:
sc = pyspark.SparkContext()

In [5]:
pyspark

<module 'pyspark' from '/home/cse587/spark-2.4.0-bin-hadoop2.7/python/pyspark/__init__.py'>

In [6]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

#### INPUT DATA PROCESSING

In [7]:
import pandas as pd
pandas_df = pd.read_csv("train.csv")
#RDD = sc.textFile("train.csv")

In [8]:
spark_df = sqlContext.createDataFrame(pandas_df)
rdd_df = spark_df.rdd

In [9]:
spark_df.show(3)

+--------+----------------+--------------------+--------------------+
|movie_id|      movie_name|                plot|               genre|
+--------+----------------+--------------------+--------------------+
|23890098|      Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|The Hunger Games|The nation of Pan...|['Action/Adventur...|
|20663735|      Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|
+--------+----------------+--------------------+--------------------+
only showing top 3 rows



In [10]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

In [11]:
tokenizer = Tokenizer(inputCol = 'plot', outputCol = 'plot_terms')

In [12]:
regex_tokenizer = RegexTokenizer(inputCol = 'plot', outputCol = 'plot_terms', pattern = '\\W')

In [13]:
count_tokens = udf(lambda plot_terms: len(plot_terms), IntegerType())

In [14]:
tokenized = tokenizer.transform(spark_df)

In [15]:
#tokenized.show(1)
tokenized.select("plot_terms").show(2)

+--------------------+
|          plot_terms|
+--------------------+
|[shlykov,, a, har...|
|[the, nation, of,...|
+--------------------+
only showing top 2 rows



In [16]:
regex_tokenized = regex_tokenizer.transform(spark_df)
regex_tokenized.show(2)

+--------+----------------+--------------------+--------------------+--------------------+
|movie_id|      movie_name|                plot|               genre|          plot_terms|
+--------+----------------+--------------------+--------------------+--------------------+
|23890098|      Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, a, hard...|
|31186339|The Hunger Games|The nation of Pan...|['Action/Adventur...|[the, nation, of,...|
+--------+----------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [17]:
from pyspark.ml.feature import StopWordsRemover

In [21]:
remover = StopWordsRemover(inputCol = 'plot_terms', outputCol = 'filtered')

In [22]:
stopword_tokenized_withnum = remover.transform(regex_tokenized)

In [23]:
stopword_tokenized_withnum.show(2)

+--------+----------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|      movie_name|                plot|               genre|          plot_terms|            filtered|
+--------+----------------+--------------------+--------------------+--------------------+--------------------+
|23890098|      Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, a, hard...|[shlykov, hard, w...|
|31186339|The Hunger Games|The nation of Pan...|['Action/Adventur...|[the, nation, of,...|[nation, panem, c...|
+--------+----------------+--------------------+--------------------+--------------------+--------------------+
only showing top 2 rows



In [24]:
def is_not_digit(rec):
    list1 = []
    for i in rec:
        if i.isdigit():
            continue;
        else:
            list1.append(i)
    return list1
            
stopword_without_numbers = stopword_tokenized_withnum.rdd.map(lambda x: [x[0],x[1],x[2],x[3],x[4],is_not_digit(x[5])])#, x[2], x[3]])
stopword_tokenized= sqlContext.createDataFrame(stopword_without_numbers,['movie_id','movie_name','plot','genre','plot_terms','filtered'])
stopword_tokenized.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|          plot_terms|            filtered|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov, a, hard...|[shlykov, hard, w...|
|31186339|  The Hunger Games|The nation of Pan...|['Action/Adventur...|[the, nation, of,...|[nation, panem, c...|
|20663735|        Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|[poovalli, induch...|[poovalli, induch...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|[the, lemon, drop...|[lemon, drop, kid...|
|  595909| A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|[seventh, day, ad...|[seventh, day, ad...|
+--------+------------------+--------------------+--------------------+-----------------

#### THIS IS FOR CREATING TF USING HASHINGTF or COUNTVECTORIZER

In [None]:
# from pyspark.ml.feature import HashingTF, IDF, Tokenizer

In [None]:
# hashing_tf = HashingTF(inputCol = 'filtered', outputCol = 'rawFeatures')

In [None]:
#USES HASHING TO CALCULATE TF. A BETTER APPROACH BUT CONFUSING.
# featurized_df = hashing_tf.transform(stopword_tokenized)
# featurized_df.show(2)

In [None]:
# featurized_df.show(2)

In [25]:
from pyspark.ml.feature import CountVectorizer,IDF

In [26]:
cv = CountVectorizer(inputCol = 'filtered', vocabSize = 10000, minDF = 5.0, outputCol = 'raw_features')

In [27]:
cv_model = cv.fit(stopword_tokenized)
cv_df = cv_model.transform(stopword_tokenized)

In [None]:
#cv_df.select("raw_features").show(2, truncate = False)

In [None]:
#cv_df.take(1)

In [28]:
idf = IDF(inputCol = 'raw_features', outputCol = 'features')
idf_model = idf.fit(cv_df)
final_features = idf_model.transform(cv_df)
final_features = final_features.select("genre","features")

#final_features = cv_df.select("genre", "features")

In [29]:
final_features.show()

+--------------------+--------------------+
|               genre|            features|
+--------------------+--------------------+
|['World cinema', ...|(10000,[10,129,18...|
|['Action/Adventur...|(10000,[2,6,7,10,...|
|['Musical', 'Acti...|(10000,[1,3,8,10,...|
|          ['Comedy']|(10000,[7,9,12,15...|
|['Crime Fiction',...|(10000,[2,8,9,14,...|
|['Action/Adventur...|(10000,[2,4,5,12,...|
|['Thriller', 'Dra...|(10000,[0,1,2,3,4...|
|           ['Drama']|(10000,[0,1,2,5,7...|
|['Black-and-white...|(10000,[0,4,9,17,...|
|['Animation', 'Sh...|(10000,[11,31,38,...|
|          ['Comedy']|(10000,[1,3,4,7,9...|
|['Crime Fiction',...|(10000,[0,2,14,32...|
|          ['Comedy']|(10000,[4,5,15,17...|
|          ['Comedy']|(10000,[0,1,2,3,6...|
|          ['Horror']|(10000,[0,8,54,71...|
|['Crime Fiction',...|(10000,[5,14,61,8...|
|           ['Drama']|(10000,[2,7,21,25...|
|['Crime Fiction',...|(10000,[0,2,4,8,1...|
|  ['Indie', 'Drama']|(10000,[1,2,4,10,...|
|           ['Drama']|(10000,[0,

In [30]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline

In [31]:
label_stringIdx = StringIndexer(inputCol = 'genre', outputCol = 'label')

In [32]:
pipeline = Pipeline(stages=[label_stringIdx])

In [33]:
# pipelineFit = pipeline.fit(final_features)
dataset = pipeline.fit(final_features).transform(final_features)
dataset.show(5)

+--------------------+--------------------+------+
|               genre|            features| label|
+--------------------+--------------------+------+
|['World cinema', ...|(10000,[10,129,18...|   4.0|
|['Action/Adventur...|(10000,[2,6,7,10,...|1603.0|
|['Musical', 'Acti...|(10000,[1,3,8,10,...| 316.0|
|          ['Comedy']|(10000,[7,9,12,15...|   1.0|
|['Crime Fiction',...|(10000,[2,8,9,14,...| 113.0|
+--------------------+--------------------+------+
only showing top 5 rows



In [34]:
test = final_features


In [35]:
def splitter(rec):
    rec = rec.replace('[','')
    rec = rec.replace(']','')
    rec = rec.replace('\'','')
#     rec = [item.replace("[", "") for item in rec]
#     rec = [item.replace("]", "") for item in rec]
#     rec = [item.replace("\'", "") for item in rec]
    return rec
test1 = test.rdd.map(lambda x: [splitter(x[0]), x[1]])#, x[2], x[3]])
test2= sqlContext.createDataFrame(test1,['genre','features'])
test2.show(5)

+--------------------+--------------------+
|               genre|            features|
+--------------------+--------------------+
| World cinema, Drama|(10000,[10,129,18...|
|Action/Adventure,...|(10000,[2,6,7,10,...|
|Musical, Action, ...|(10000,[1,3,8,10,...|
|              Comedy|(10000,[7,9,12,15...|
|Crime Fiction, Wo...|(10000,[2,8,9,14,...|
+--------------------+--------------------+
only showing top 5 rows



In [36]:
import pyspark.sql.functions as f
test3 = test2.select("features", f.split('genre',',').alias("genre"), f.posexplode(f.split("genre",",")).alias("pos", "string_label"))
test3.show(2)

+--------------------+--------------------+---+------------+
|            features|               genre|pos|string_label|
+--------------------+--------------------+---+------------+
|(10000,[10,129,18...|[World cinema,  D...|  0|World cinema|
|(10000,[10,129,18...|[World cinema,  D...|  1|       Drama|
+--------------------+--------------------+---+------------+
only showing top 2 rows



In [37]:
#dataset = test3.select("features", f.concat(f.lit("genre"),f.col('pos').cast("string")).alias("name"),f.expr("genre[pos]").alias("string_label")).groupBy("features").pivot("name").agg(f.first("string_label"))


In [38]:
#dataset.show(1, truncate=False)
#dataset.show(1)

+--------------------+------------+------+------+------+------+------+------+------+------+------+
|            features|      genre0|genre1|genre2|genre3|genre4|genre5|genre6|genre7|genre8|genre9|
+--------------------+------------+------+------+------+------+------+------+------+------+------+
|(10000,[0,1,2,3,4...|Romance Film| Drama|  null|  null|  null|  null|  null|  null|  null|  null|
+--------------------+------------+------+------+------+------+------+------+------+------+------+
only showing top 1 row



In [39]:
final_dataset = test3.select("features", "string_label")

In [None]:
#final_dataset.show(4)

In [40]:
label_stringIdx = StringIndexer(inputCol = 'string_label', outputCol = 'label')

In [None]:
# final_dataset1 = label_stringIdx.fit(fd2).transform(fd2)
# final_dataset1.show(5)

In [41]:
def label_mod(rec):
    rec = rec.strip()
    return rec
fd1_rdd = final_dataset.rdd.map(lambda x: [x[0],label_mod(x[1])])#, x[2], x[3]])
fd2= sqlContext.createDataFrame(fd1_rdd,['features','string_label'])
fd2.show(5)

+--------------------+----------------+
|            features|    string_label|
+--------------------+----------------+
|(10000,[10,129,18...|    World cinema|
|(10000,[10,129,18...|           Drama|
|(10000,[2,6,7,10,...|Action/Adventure|
|(10000,[2,6,7,10,...|          Action|
|(10000,[2,6,7,10,...| Science Fiction|
+--------------------+----------------+
only showing top 5 rows



In [42]:
final_dataset2 = label_stringIdx.fit(fd2).transform(fd2)
final_dataset2.show(5)

+--------------------+----------------+-----+
|            features|    string_label|label|
+--------------------+----------------+-----+
|(10000,[10,129,18...|    World cinema|  5.0|
|(10000,[10,129,18...|           Drama|  0.0|
|(10000,[2,6,7,10,...|Action/Adventure| 10.0|
|(10000,[2,6,7,10,...|          Action|  4.0|
|(10000,[2,6,7,10,...| Science Fiction| 17.0|
+--------------------+----------------+-----+
only showing top 5 rows



In [43]:
pivotDF = final_dataset2.groupBy("features").pivot("string_label").sum('label')
#pivotDF.show(10)

In [44]:
from pyspark.sql.functions import when
cols = pivotDF.columns[1:]
#cols

for col in cols:
    pivotDF = pivotDF.withColumn(col, when(pivotDF[col]>=0,1).otherwise(0))

In [None]:
#pivotDF.show(10)

In [45]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol='label',featuresCol='features',maxIter = 100)

In [46]:
train = pivotDF

In [47]:
train_1 = train.select('features','Drama').withColumnRenamed("Drama","label") 
lr_model_1 = lr.fit(train_1)
#train_test_1 = train_test.select('features','Drama').withColumnRenamed("Drama","label") 
#predictions_1 = lr_model_1.transform(train_test_1)

In [None]:
#train_test_1.show()

In [None]:
#predictions_1.show(3)

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

#evaluator = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction")
#print("Test Accuracy: " + str(evaluator.evaluate(predictions_1)))

In [None]:
#### Model- 2 - Comedy

In [48]:
train_2 = train.select('features','Comedy').withColumnRenamed("Comedy","label") 
lr_model_2 = lr.fit(train_2)
#train_test_2 = train_test.select('features','Comedy').withColumnRenamed("Comedy","label") 
#predictions_2 = lr_model_2.transform(train_test_2)

In [None]:
#evaluator = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction")
#print("Test Accuracy: " + str(evaluator.evaluate(predictions_2)))

In [None]:
#### Model -3 - Romance Film

In [49]:
train_3 = train.select('features','Romance Film').withColumnRenamed("Romance Film","label") 
lr_model_3 = lr.fit(train_3)
#train_test_3 = train_test.select('features','Romance Film').withColumnRenamed("Romance Film","label") 
#predictions_3 = lr_model_3.transform(train_test_3)

In [None]:
#evaluator = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction")
#print("Test Accuracy: " + str(evaluator.evaluate(predictions_3)))

In [None]:
#### Model -4 - Thriller

In [50]:
train_4 = train.select('features','Thriller').withColumnRenamed("Thriller","label") 
lr_model_4 = lr.fit(train_4)
#train_test_4 = train_test.select('features','Thriller').withColumnRenamed("Thriller","label") 
#predictions_4 = lr_model_4.transform(train_test_4)

In [None]:
#evaluator = BinaryClassificationEvaluator(rawPredictionCol = "rawPrediction")
#print("Test Accuracy: " + str(evaluator.evaluate(predictions_4)))

In [None]:
#### Model -5 - Action

In [51]:
train_5 = train.select('features','Action').withColumnRenamed("Action","label") 
lr_model_5 = lr.fit(train_5)
#train_test_5 = train_test.select('features','Action').withColumnRenamed("Action","label") 
#predictions_5 = lr_model_5.transform(train_test_5)

In [None]:
#### Model -6 - World cinema

In [52]:
train_6 = train.select('features','World cinema').withColumnRenamed("World cinema","label") 
lr_model_6 = lr.fit(train_6)
#train_test_6 = train_test.select('features','World cinema').withColumnRenamed("World cinema","label") 
#predictions_6 = lr_model_6.transform(train_test_6)

In [None]:
#### Model -7 - Crime Fiction

In [53]:
train_7 = train.select('features','Crime Fiction').withColumnRenamed("Crime Fiction","label") 
lr_model_7 = lr.fit(train_7)
#train_test_7 = train_test.select('features','Crime Fiction').withColumnRenamed("Crime Fiction","label") 
#predictions_7 = lr_model_7.transform(train_test_7)

In [None]:
#### Model -8 - Horror

In [54]:
train_8 = train.select('features','Horror').withColumnRenamed("Horror","label") 
lr_model_8 = lr.fit(train_8)
#train_test_8 = train_test.select('features','Horror').withColumnRenamed("Horror","label") 
#predictions_8 = lr_model_8.transform(train_test_8)

In [None]:
#### Model -9 - Black-and-white

In [55]:
train_9 = train.select('features','Black-and-white').withColumnRenamed("Black-and-white","label") 
lr_model_9 = lr.fit(train_9)

In [None]:
#### Model - 10 -Indie

In [56]:
train_10 = train.select('features','Indie').withColumnRenamed("Indie","label") 
lr_model_10 = lr.fit(train_10)

In [None]:
#### Model - 11 -Action/Adventure

In [57]:
train_11 = train.select('features','Action/Adventure').withColumnRenamed("Action/Adventure","label") 
lr_model_11 = lr.fit(train_11)

In [None]:
#### Model - 12 -Adventure

In [58]:
train_12 = train.select('features','Adventure').withColumnRenamed("Adventure","label") 
lr_model_12 = lr.fit(train_12)

In [None]:
#### Model - 13 - Family Film

In [59]:
train_13 = train.select('features','Family Film').withColumnRenamed("Family Film","label") 
lr_model_13 = lr.fit(train_13)

In [None]:
#### Model - 14 -Short Film

In [60]:
train_14 = train.select('features','Short Film').withColumnRenamed("Short Film","label") 
lr_model_14 = lr.fit(train_14)

In [None]:
#### Model -15 - Romantic drama

In [61]:
train_15 = train.select('features','Romantic drama').withColumnRenamed("Romantic drama","label") 
lr_model_15 = lr.fit(train_15)

In [None]:
#### Model -16 - Animation

In [62]:
train_16 = train.select('features','Animation').withColumnRenamed("Animation","label") 
lr_model_16 = lr.fit(train_16)

In [None]:
#### Model -17 - Musical

In [63]:
train_17 = train.select('features','Musical').withColumnRenamed("Musical","label") 
lr_model_17 = lr.fit(train_17)

In [None]:
#### Model -18 -Science Fiction

In [64]:
train_18 = train.select('features','Science Fiction').withColumnRenamed("Science Fiction","label") 
lr_model_18 = lr.fit(train_18)

In [None]:
#### Model -19 -Mystery

In [65]:
train_19 = train.select('features','Mystery').withColumnRenamed("Mystery","label") 
lr_model_19 = lr.fit(train_19)

In [None]:
#### Model -20 -Romantic comedy

In [66]:
train_20 = train.select('features','Romantic comedy').withColumnRenamed("Romantic comedy","label") 
lr_model_20 = lr.fit(train_20)

# Test Data

In [67]:
test_pandas_df = pd.read_csv("test.csv")

In [68]:
test_spark_df = sqlContext.createDataFrame(test_pandas_df)
test_rdd_df = test_spark_df.rdd

In [69]:
test_spark_df.show(3)

+--------+--------------------+--------------------+
|movie_id|          movie_name|                plot|
+--------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|
|29062594|A la salida nos v...|A group of teenag...|
| 9252321|   Come Back, Africa|This story of a Z...|
+--------+--------------------+--------------------+
only showing top 3 rows



In [70]:
test_regex_tokenized = regex_tokenizer.transform(test_spark_df)

In [71]:
test_stopword_tokenized_withnum = remover.transform(test_regex_tokenized)

In [72]:
def is_not_digit(rec):
    list1 = []
    for i in rec:
        if i.isdigit():
            continue;
        else:
            list1.append(i)
    return list1
            
test_stopword_without_numbers = test_stopword_tokenized_withnum.rdd.map(lambda x: [x[0],x[1],x[2],x[3],is_not_digit(x[4])])#, x[2], x[3]])
test_stopword_tokenized= sqlContext.createDataFrame(test_stopword_without_numbers,['movie_id','movie_name','plot','plot_terms','filtered'])
test_stopword_tokenized.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|          plot_terms|            filtered|
+--------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|[the, film, is, b...|[film, based, eve...|
|29062594|A la salida nos v...|A group of teenag...|[a, group, of, te...|[group, teenagers...|
| 9252321|   Come Back, Africa|This story of a Z...|[this, story, of,...|[story, zulu, fam...|
|13455076|       A Merry Mixup|The Stooges play ...|[the, stooges, pl...|[stooges, play, t...|
|24165951|        Getting Even|A soldier-of-fort...|[a, soldier, of, ...|[soldier, fortune...|
+--------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [73]:
test_cv = CountVectorizer(inputCol = 'filtered', vocabSize = 10000, minDF = 5.0, outputCol = 'raw_features')

In [74]:
test_cv_model = test_cv.fit(test_stopword_tokenized)
test_cv_df = test_cv_model.transform(test_stopword_tokenized)

In [75]:
test_idf = IDF(inputCol = 'raw_features', outputCol = 'features')
test_idf_model = test_idf.fit(test_cv_df)
test_final_features_all = idf_model.transform(test_cv_df)

In [76]:
test_final_features = test_final_features_all.select("features")

In [77]:
test_final_features.show()

+--------------------+
|            features|
+--------------------+
|(10000,[0,3,6,7,8...|
|(10000,[7,12,56,6...|
|(10000,[4,7,8,10,...|
|(10000,[0,76,78,7...|
|(10000,[19,392,43...|
|(10000,[0,1,2,3,4...|
|(10000,[0,5,6,9,1...|
|(10000,[13,21,191...|
|(10000,[4,8,11,13...|
|(10000,[55,56,222...|
|(10000,[1,3,4,7,8...|
|(10000,[6,7,16,20...|
|(10000,[3,5,8,9,1...|
|(10000,[0,1,6,16,...|
|(10000,[2,3,9,10,...|
|(10000,[0,2,3,5,9...|
|(10000,[0,1,2,3,4...|
|(10000,[0,1,3,5,7...|
|(10000,[9,18,26,3...|
|(10000,[0,3,4,9,1...|
+--------------------+
only showing top 20 rows



In [78]:

test_final_1 = test_final_features.withColumn('label',f.lit(0))
test_final_1.show()

+--------------------+-----+
|            features|label|
+--------------------+-----+
|(10000,[0,3,6,7,8...|    0|
|(10000,[7,12,56,6...|    0|
|(10000,[4,7,8,10,...|    0|
|(10000,[0,76,78,7...|    0|
|(10000,[19,392,43...|    0|
|(10000,[0,1,2,3,4...|    0|
|(10000,[0,5,6,9,1...|    0|
|(10000,[13,21,191...|    0|
|(10000,[4,8,11,13...|    0|
|(10000,[55,56,222...|    0|
|(10000,[1,3,4,7,8...|    0|
|(10000,[6,7,16,20...|    0|
|(10000,[3,5,8,9,1...|    0|
|(10000,[0,1,6,16,...|    0|
|(10000,[2,3,9,10,...|    0|
|(10000,[0,2,3,5,9...|    0|
|(10000,[0,1,2,3,4...|    0|
|(10000,[0,1,3,5,7...|    0|
|(10000,[9,18,26,3...|    0|
|(10000,[0,3,4,9,1...|    0|
+--------------------+-----+
only showing top 20 rows



In [79]:
test_predictions_1 = lr_model_1.transform(test_final_1)


In [80]:
test_predictions_1 = test_predictions_1.select('features','prediction','label').withColumnRenamed('prediction','column')
test_predictions_1 = test_predictions_1.withColumn("column_1",test_predictions_1["column"].cast('int'))
test_predictions_1 = test_predictions_1.select('features', 'column_1','label')
test_predictions_1.show()

+--------------------+--------+-----+
|            features|column_1|label|
+--------------------+--------+-----+
|(10000,[0,3,6,7,8...|       0|    0|
|(10000,[7,12,56,6...|       0|    0|
|(10000,[4,7,8,10,...|       0|    0|
|(10000,[0,76,78,7...|       0|    0|
|(10000,[19,392,43...|       0|    0|
|(10000,[0,1,2,3,4...|       0|    0|
|(10000,[0,5,6,9,1...|       1|    0|
|(10000,[13,21,191...|       1|    0|
|(10000,[4,8,11,13...|       1|    0|
|(10000,[55,56,222...|       0|    0|
|(10000,[1,3,4,7,8...|       0|    0|
|(10000,[6,7,16,20...|       0|    0|
|(10000,[3,5,8,9,1...|       0|    0|
|(10000,[0,1,6,16,...|       1|    0|
|(10000,[2,3,9,10,...|       1|    0|
|(10000,[0,2,3,5,9...|       0|    0|
|(10000,[0,1,2,3,4...|       0|    0|
|(10000,[0,1,3,5,7...|       1|    0|
|(10000,[9,18,26,3...|       0|    0|
|(10000,[0,3,4,9,1...|       1|    0|
+--------------------+--------+-----+
only showing top 20 rows



In [81]:
test_final_2 = test_predictions_1

In [82]:
test_predictions_2 = lr_model_2.transform(test_final_2)

In [83]:
test_predictions_2 = test_predictions_2.select('features','column_1','prediction','label').withColumnRenamed('prediction','column')
test_predictions_2 = test_predictions_2.withColumn("column_2",test_predictions_2["column"].cast('int'))
test_predictions_2 = test_predictions_2.select('features', 'column_1','column_2','label')
test_predictions_2.show()

+--------------------+--------+--------+-----+
|            features|column_1|column_2|label|
+--------------------+--------+--------+-----+
|(10000,[0,3,6,7,8...|       0|       0|    0|
|(10000,[7,12,56,6...|       0|       1|    0|
|(10000,[4,7,8,10,...|       0|       1|    0|
|(10000,[0,76,78,7...|       0|       0|    0|
|(10000,[19,392,43...|       0|       0|    0|
|(10000,[0,1,2,3,4...|       0|       1|    0|
|(10000,[0,5,6,9,1...|       1|       0|    0|
|(10000,[13,21,191...|       1|       0|    0|
|(10000,[4,8,11,13...|       1|       0|    0|
|(10000,[55,56,222...|       0|       0|    0|
|(10000,[1,3,4,7,8...|       0|       1|    0|
|(10000,[6,7,16,20...|       0|       1|    0|
|(10000,[3,5,8,9,1...|       0|       0|    0|
|(10000,[0,1,6,16,...|       1|       0|    0|
|(10000,[2,3,9,10,...|       1|       1|    0|
|(10000,[0,2,3,5,9...|       0|       1|    0|
|(10000,[0,1,2,3,4...|       0|       0|    0|
|(10000,[0,1,3,5,7...|       1|       1|    0|
|(10000,[9,18

In [84]:
test_final_3 = test_predictions_2

In [85]:
test_predictions_3 = lr_model_3.transform(test_final_3)

In [86]:
test_predictions_3 = test_predictions_3.select('features','column_1','column_2','prediction','label').withColumnRenamed('prediction','column')
test_predictions_3 = test_predictions_3.withColumn("column_3",test_predictions_3["column"].cast('int'))
test_predictions_3 = test_predictions_3.select('features', 'column_1','column_2','column_3','label')
test_predictions_3.show()

+--------------------+--------+--------+--------+-----+
|            features|column_1|column_2|column_3|label|
+--------------------+--------+--------+--------+-----+
|(10000,[0,3,6,7,8...|       0|       0|       1|    0|
|(10000,[7,12,56,6...|       0|       1|       0|    0|
|(10000,[4,7,8,10,...|       0|       1|       0|    0|
|(10000,[0,76,78,7...|       0|       0|       0|    0|
|(10000,[19,392,43...|       0|       0|       0|    0|
|(10000,[0,1,2,3,4...|       0|       1|       1|    0|
|(10000,[0,5,6,9,1...|       1|       0|       1|    0|
|(10000,[13,21,191...|       1|       0|       0|    0|
|(10000,[4,8,11,13...|       1|       0|       0|    0|
|(10000,[55,56,222...|       0|       0|       0|    0|
|(10000,[1,3,4,7,8...|       0|       1|       1|    0|
|(10000,[6,7,16,20...|       0|       1|       0|    0|
|(10000,[3,5,8,9,1...|       0|       0|       0|    0|
|(10000,[0,1,6,16,...|       1|       0|       0|    0|
|(10000,[2,3,9,10,...|       1|       1|       0

In [87]:
test_final_4 = test_predictions_3
test_predictions_4 = lr_model_4.transform(test_final_4)

In [88]:
test_predictions_4 = test_predictions_4.select('features','column_1','column_2','column_3','prediction','label').withColumnRenamed('prediction','column')
test_predictions_4 = test_predictions_4.withColumn("column_4",test_predictions_4["column"].cast('int'))
test_predictions_4 = test_predictions_4.select('features', 'column_1','column_2','column_3','column_4','label')
#test_predictions_4.show()

In [89]:
test_final_5 = test_predictions_4
test_predictions_5 = lr_model_5.transform(test_final_5)

In [90]:
test_predictions_5 = test_predictions_5.select('features','column_1','column_2','column_3','column_4','prediction','label').withColumnRenamed('prediction','column')
test_predictions_5 = test_predictions_5.withColumn("column_5",test_predictions_5["column"].cast('int'))
test_predictions_5 = test_predictions_5.select('features', 'column_1','column_2','column_3','column_4','column_5','label')

In [91]:
test_final_6 = test_predictions_5
test_predictions_6 = lr_model_6.transform(test_final_6)

In [92]:
test_predictions_6 = test_predictions_6.select('features','column_1','column_2','column_3','column_4','column_5','prediction','label').withColumnRenamed('prediction','column')
test_predictions_6 = test_predictions_6.withColumn("column_6",test_predictions_6["column"].cast('int'))
test_predictions_6 = test_predictions_6.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','label')

In [93]:
test_final_7 = test_predictions_6
test_predictions_7 = lr_model_7.transform(test_final_7)

In [94]:
test_predictions_7 = test_predictions_7.select('features','column_1','column_2','column_3','column_4','column_5','column_6','prediction','label').withColumnRenamed('prediction','column')
test_predictions_7 = test_predictions_7.withColumn("column_7",test_predictions_7["column"].cast('int'))
test_predictions_7 = test_predictions_7.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','label')

In [95]:
test_final_8 = test_predictions_7
test_predictions_8 = lr_model_8.transform(test_final_8)

In [96]:
test_predictions_8 = test_predictions_8.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','prediction','label').withColumnRenamed('prediction','column')
test_predictions_8 = test_predictions_8.withColumn("column_8",test_predictions_8["column"].cast('int'))
test_predictions_8 = test_predictions_8.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','label')

In [97]:
test_final_9 = test_predictions_8
test_predictions_9 = lr_model_9.transform(test_final_9)

In [98]:
test_predictions_9 = test_predictions_9.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','prediction','label').withColumnRenamed('prediction','column')
test_predictions_9 = test_predictions_9.withColumn("column_9",test_predictions_9["column"].cast('int'))
test_predictions_9 = test_predictions_9.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','label')

In [99]:
test_final_10 = test_predictions_9
test_predictions_10 = lr_model_10.transform(test_final_10)

In [100]:
test_predictions_10 = test_predictions_10.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','prediction','label').withColumnRenamed('prediction','column')
test_predictions_10 = test_predictions_10.withColumn("column_10",test_predictions_10["column"].cast('int'))
test_predictions_10 = test_predictions_10.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','label')

In [101]:
test_final_11 = test_predictions_10
test_predictions_11 = lr_model_11.transform(test_final_11)

In [102]:
test_predictions_11 = test_predictions_11.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','prediction','label').withColumnRenamed('prediction','column')
test_predictions_11 = test_predictions_11.withColumn("column_11",test_predictions_11["column"].cast('int'))
test_predictions_11 = test_predictions_11.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','label')

In [103]:
test_final_12 = test_predictions_11
test_predictions_12 = lr_model_12.transform(test_final_12)

In [104]:
test_predictions_12 = test_predictions_12.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','prediction','label').withColumnRenamed('prediction','column')
test_predictions_12 = test_predictions_12.withColumn("column_12",test_predictions_12["column"].cast('int'))
test_predictions_12 = test_predictions_12.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','label')

In [105]:
test_final_13 = test_predictions_12
test_predictions_13 = lr_model_13.transform(test_final_13)

In [106]:
test_predictions_13 = test_predictions_13.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','prediction','label').withColumnRenamed('prediction','column')
test_predictions_13 = test_predictions_13.withColumn("column_13",test_predictions_13["column"].cast('int'))
test_predictions_13 = test_predictions_13.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','label')

In [107]:
test_final_14 = test_predictions_13
test_predictions_14 = lr_model_14.transform(test_final_14)

In [108]:
test_predictions_14 = test_predictions_14.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','prediction','label').withColumnRenamed('prediction','column')
test_predictions_14 = test_predictions_14.withColumn("column_14",test_predictions_14["column"].cast('int'))
test_predictions_14 = test_predictions_14.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','label')

In [109]:
test_final_15 = test_predictions_14
test_predictions_15 = lr_model_15.transform(test_final_15)

In [110]:
test_predictions_15 = test_predictions_15.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','prediction','label').withColumnRenamed('prediction','column')
test_predictions_15 = test_predictions_15.withColumn("column_15",test_predictions_15["column"].cast('int'))
test_predictions_15 = test_predictions_15.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','label')

In [111]:
test_final_16 = test_predictions_15
test_predictions_16 = lr_model_16.transform(test_final_16)

In [112]:
test_predictions_16 = test_predictions_16.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','prediction','label').withColumnRenamed('prediction','column')
test_predictions_16 = test_predictions_16.withColumn("column_16",test_predictions_16["column"].cast('int'))
test_predictions_16 = test_predictions_16.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','label')

In [113]:
test_final_17 = test_predictions_16
test_predictions_17 = lr_model_17.transform(test_final_17)

In [114]:
test_predictions_17 = test_predictions_17.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','prediction','label').withColumnRenamed('prediction','column')
test_predictions_17 = test_predictions_17.withColumn("column_17",test_predictions_17["column"].cast('int'))
test_predictions_17 = test_predictions_17.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','column_17','label')

In [115]:
test_final_18 = test_predictions_17
test_predictions_18 = lr_model_18.transform(test_final_18)

In [116]:
test_predictions_18 = test_predictions_18.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','column_17','prediction','label').withColumnRenamed('prediction','column')
test_predictions_18 = test_predictions_18.withColumn("column_18",test_predictions_18["column"].cast('int'))
test_predictions_18 = test_predictions_18.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','column_17','column_18','label')

In [117]:
test_final_19 = test_predictions_18
test_predictions_19 = lr_model_19.transform(test_final_19)

In [118]:
test_predictions_19 = test_predictions_19.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','column_17','column_18','prediction','label').withColumnRenamed('prediction','column')
test_predictions_19 = test_predictions_19.withColumn("column_19",test_predictions_19["column"].cast('int'))
test_predictions_19 = test_predictions_19.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','column_17','column_18','column_19','label')

In [119]:
test_final_20 = test_predictions_19
test_predictions_20 = lr_model_20.transform(test_final_20)

In [120]:
test_predictions_20 = test_predictions_20.select('features','column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','column_17','column_18','column_19','prediction','label').withColumnRenamed('prediction','column')
test_predictions_20 = test_predictions_20.withColumn("column_20",test_predictions_20["column"].cast('int'))
test_predictions_20 = test_predictions_20.select('features', 'column_1','column_2','column_3','column_4','column_5','column_6','column_7','column_8','column_9','column_10','column_11','column_12','column_13','column_14','column_15','column_16','column_17','column_18','column_19','column_20','label')

In [121]:
test_predictions_20.show()

+--------------------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-----+
|            features|column_1|column_2|column_3|column_4|column_5|column_6|column_7|column_8|column_9|column_10|column_11|column_12|column_13|column_14|column_15|column_16|column_17|column_18|column_19|column_20|label|
+--------------------+--------+--------+--------+--------+--------+--------+--------+--------+--------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+---------+-----+
|(10000,[0,3,6,7,8...|       0|       0|       1|       0|       0|       0|       0|       0|       0|        0|        0|        0|        0|        0|        0|        0|        0|        0|        0|        0|    0|
|(10000,[7,12,56,6...|       0|       1|       0|       0|       0|       0|       0|       0|       0|        0|       

In [122]:
test_predictions = test_predictions_20.withColumn('predictions',f.concat(f.col('column_1'),f.lit(' '),f.col('column_2'),f.lit(' '),f.col('column_3'),f.lit(' '),f.col('column_4'),f.lit(' '),f.col('column_5'),f.lit(' '),f.col('column_6'),f.lit(' '),f.col('column_7'),f.lit(' '),f.col('column_8'),f.lit(' '),f.col('column_9'),f.lit(' '),f.col('column_10'),f.lit(' '),f.col('column_11'),f.lit(' '),f.col('column_12'),f.lit(' '),f.col('column_13'),f.lit(' '),f.col('column_14'),f.lit(' '),f.col('column_15'),f.lit(' '),f.col('column_16'),f.lit(' '),f.col('column_17'),f.lit(' '),f.col('column_18'),f.lit(' '),f.col('column_19'),f.lit(' '),f.col('column_20')))
test_predictions = test_predictions.select('features','predictions')

In [123]:
test_predictions.show()

+--------------------+--------------------+
|            features|         predictions|
+--------------------+--------------------+
|(10000,[0,3,6,7,8...|0 0 1 0 0 0 0 0 0...|
|(10000,[7,12,56,6...|0 1 0 0 0 0 0 0 0...|
|(10000,[4,7,8,10,...|0 1 0 0 0 0 0 0 0...|
|(10000,[0,76,78,7...|0 0 0 0 0 0 0 0 0...|
|(10000,[19,392,43...|0 0 0 0 0 0 0 0 0...|
|(10000,[0,1,2,3,4...|0 1 1 0 0 0 0 0 0...|
|(10000,[0,5,6,9,1...|1 0 1 0 0 1 1 0 0...|
|(10000,[13,21,191...|1 0 0 0 0 0 0 0 0...|
|(10000,[4,8,11,13...|1 0 0 0 0 0 1 1 0...|
|(10000,[55,56,222...|0 0 0 0 0 0 0 0 1...|
|(10000,[1,3,4,7,8...|0 1 1 0 1 0 0 0 0...|
|(10000,[6,7,16,20...|0 1 0 0 0 0 0 0 1...|
|(10000,[3,5,8,9,1...|0 0 0 1 1 1 1 0 0...|
|(10000,[0,1,6,16,...|1 0 0 1 0 0 0 0 0...|
|(10000,[2,3,9,10,...|1 1 0 0 0 0 0 0 0...|
|(10000,[0,2,3,5,9...|0 1 0 0 1 1 0 1 0...|
|(10000,[0,1,2,3,4...|0 0 0 0 0 0 0 0 0...|
|(10000,[0,1,3,5,7...|1 1 0 1 0 1 0 0 0...|
|(10000,[9,18,26,3...|0 1 0 0 0 0 0 0 0...|
|(10000,[0,3,4,9,1...|1 0 0 0 1 

In [124]:
#test_predictions.show(truncate = False)
#test_cv_df = test_cv_df.alias('df1')
#test_predictions = test_predictions.alias('df2')
#output_task_1=test_cv_df.join(test_predictions,test_cv_df.features == test_predictions.features).select('df1.movie_id','df2.predictions')
test_final_features.show()

+--------------------+
|            features|
+--------------------+
|(10000,[0,3,6,7,8...|
|(10000,[7,12,56,6...|
|(10000,[4,7,8,10,...|
|(10000,[0,76,78,7...|
|(10000,[19,392,43...|
|(10000,[0,1,2,3,4...|
|(10000,[0,5,6,9,1...|
|(10000,[13,21,191...|
|(10000,[4,8,11,13...|
|(10000,[55,56,222...|
|(10000,[1,3,4,7,8...|
|(10000,[6,7,16,20...|
|(10000,[3,5,8,9,1...|
|(10000,[0,1,6,16,...|
|(10000,[2,3,9,10,...|
|(10000,[0,2,3,5,9...|
|(10000,[0,1,2,3,4...|
|(10000,[0,1,3,5,7...|
|(10000,[9,18,26,3...|
|(10000,[0,3,4,9,1...|
+--------------------+
only showing top 20 rows



In [125]:
output_intermediate = test_final_features_all.join(test_predictions, on = ['features'],how = 'inner')

In [126]:
output_intermediate.show()

+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|            features|movie_id|          movie_name|                plot|          plot_terms|            filtered|        raw_features|         predictions|
+--------------------+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|(10000,[0,1,2,4,8...|  709391|Around the World ...|A Chinese man rob...|[a, chinese, man,...|[chinese, man, ro...|(10000,[0,1,2,4,8...|0 0 1 1 0 0 0 0 0...|
|(10000,[0,1,3,5,6...|29542934|Children Who Chas...|Asuna is a young ...|[asuna, is, a, yo...|[asuna, young, gi...|(10000,[0,1,3,5,6...|0 0 0 0 0 0 0 0 0...|
|(10000,[0,1,6,105...|22365569|Recovered: Journe...|Recovered: Journe...|[recovered, journ...|[recovered, journ...|(10000,[0,1,6,105...|0 0 0 0 0 0 0 0 1...|
|(10000,[0,1,10,12...|23726874|          Spy School|

In [127]:
output_intermediate = output_intermediate.distinct()
output_intermediate.count()

7777

In [128]:
output_task_2 = output_intermediate.select('movie_id','predictions')

In [None]:
output_task_2.toPandas().to_csv("task_2.csv")