In [1]:
import findspark
import numpy as np
from nltk.stem.snowball import SnowballStemmer

In [2]:
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')

In [3]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.types as tp
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, CountVectorizer,IDF

spark = SparkSession \
        .builder \
        .appName("Data preprocessing") \
        .config("spark.some.config.option","some-value") \
        .getOrCreate()
dataframe = spark.read.csv("/home/cse587/shathesh/Assignment/dic487-587/train.csv", escape ="\"", inferSchema = True, header = True)
dataframe = dataframe.na.drop(subset=["genre","plot","movie_id"])
dataframe.printSchema()
df_mapping = spark.read.csv("/home/cse587/shathesh/Assignment/dic487-587/mapping.csv", escape ="\"", inferSchema = True, header = True)


root
 |-- movie_id: string (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)
 |-- _c9: string (nullable = true)
 |-- _c10: string (nullable = true)
 |-- _c11: string (nullable = true)
 |-- _c12: string (nullable = true)



In [4]:
#clean text
df_clean = dataframe.select('movie_id', 'movie_name', (lower(regexp_replace('plot',"[^a-zA-Z\\s]","")).alias('plot')), (lower(regexp_replace("genre","[^a-zA-Z\-/,\\s]","")).alias("genre")))

In [5]:
def replacelabel(x):
    test = x.split(", ")
    num_label = []
    if(len(test)<1):
        return num_label
    for label in test:
        if label == 'drama':
            num_label.append(0)
        elif label == 'comedy':
            num_label.append(1)
        elif label == 'romance film':
            num_label.append(2)
        elif label ==  'thriller':
            num_label.append(3)
        elif label == 'action': 
            num_label.append(4)
        elif label == 'world cinema':
            num_label.append(5)
        elif label == 'crime fiction':
            num_label.append(6)
        elif label == 'horror':
            num_label.append(7)
        elif label == 'black-and-white':
            num_label.append(8)
        elif label == 'indie':
            num_label.append(9)
        elif label == 'action/adventure':
            num_label.append(10)
        elif label == 'adventure':
            num_label.append(11)
        elif label == 'family film':
            num_label.append(12)
        elif label == 'short film':
            num_label.append(13)
        elif label == 'romantic drama':
            num_label.append(14)
        elif label == 'animation':
            num_label.append(15)
        elif label == 'musical':
            num_label.append(16)
        elif label == 'science fiction':
            num_label.append(17)
        elif label == 'mystery':
            num_label.append(18)
        elif label == 'romantic comedy':
            num_label.append(19)
    return num_label

In [6]:
label_udf = udf(replacelabel, ArrayType(IntegerType()))
df_clean = df_clean.withColumn('genre_value',label_udf(df_clean.genre))

In [8]:
#Tokenize Plot Text
tokenizer = Tokenizer(inputCol = 'plot', outputCol = 'plot_token')
df_words_token = tokenizer.transform(df_clean).select("movie_id","movie_name","plot_token","genre","genre_value")

In [9]:
#Remove StopWords
remover = StopWordsRemover(inputCol = 'plot_token', outputCol = 'plot_clean')
df_words_token_rem_stopwor = remover.transform(df_words_token).select("movie_id","plot_clean","genre","genre_value")

In [10]:
#Text Stemming
stemmer = SnowballStemmer(language='english')
stem_udf = udf(lambda tokens : [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
df_stemmed = df_words_token_rem_stopwor.withColumn("words_stemmed" ,stem_udf("plot_clean")).select('movie_id',"words_stemmed","genre","genre_value")
df_stemmed = df_stemmed.withColumnRenamed("words_stemmed","plot")
df_stemmed = df_stemmed.withColumnRenamed("genre_value","label")

In [11]:
df_stemmed = df_stemmed.withColumn("col_0",lit(0))
df_stemmed = df_stemmed.withColumn("col_1",lit(0))
df_stemmed = df_stemmed.withColumn("col_2",lit(0))
df_stemmed = df_stemmed.withColumn("col_3",lit(0))
df_stemmed = df_stemmed.withColumn("col_4",lit(0))
df_stemmed = df_stemmed.withColumn("col_5",lit(0))
df_stemmed = df_stemmed.withColumn("col_6",lit(0))
df_stemmed = df_stemmed.withColumn("col_7",lit(0))
df_stemmed = df_stemmed.withColumn("col_8",lit(0))
df_stemmed = df_stemmed.withColumn("col_9",lit(0))
df_stemmed = df_stemmed.withColumn("col_10",lit(0))
df_stemmed = df_stemmed.withColumn("col_11",lit(0))
df_stemmed = df_stemmed.withColumn("col_12",lit(0))
df_stemmed = df_stemmed.withColumn("col_13",lit(0))
df_stemmed = df_stemmed.withColumn("col_14",lit(0))
df_stemmed = df_stemmed.withColumn("col_15",lit(0))
df_stemmed = df_stemmed.withColumn("col_16",lit(0))
df_stemmed = df_stemmed.withColumn("col_17",lit(0))
df_stemmed = df_stemmed.withColumn("col_18",lit(0))
df_stemmed = df_stemmed.withColumn("col_19",lit(0))

In [12]:
my_schema = tp.StructType([
    tp.StructField(name='movie_id', dataType=tp.StringType(), nullable=True),
    tp.StructField(name='plot', dataType=StringType(), nullable=True),
    tp.StructField(name='genre', dataType=tp.StringType(), nullable=True),
    tp.StructField(name='label', dataType=tp.ArrayType(IntegerType()), nullable=True),
    tp.StructField(name='col_0', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_1', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_2', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_3', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_4', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_5', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_6', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_7', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_8', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_9', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_10', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_11', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_12', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_13', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_14', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_15', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_16', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_17', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_18', dataType=tp.IntegerType(), nullable=True),
    tp.StructField(name='col_19', dataType=tp.IntegerType(), nullable=True),
])

In [13]:
%%time
result_df = df_stemmed.select("*").toPandas()
for index, row in result_df.iterrows():
    label_arr = row['label']
    for i in label_arr:
        if i == 0:
            result_df.loc[index, "col_0"] = 1
        if i == 1:
            result_df.loc[index, "col_1"] = 1
        if i == 2:
            result_df.loc[index, "col_2"] = 1
        if i == 3:
            result_df.loc[index, "col_3"] = 1
        if i == 4:
            result_df.loc[index, "col_4"] = 1
        if i == 5:
            result_df.loc[index, "col_5"] = 1
        if i == 6:
            result_df.loc[index, "col_6"] = 1
        if i == 7:
            result_df.loc[index, "col_7"] = 1
        if i == 8:
            result_df.loc[index, "col_8"] = 1
        if i == 9:
            result_df.loc[index, "col_9"] = 1
        if i == 10:
            result_df.loc[index, "col_10"] = 1
        if i == 11:
            result_df.loc[index, "col_11"] = 1
        if i == 12:
            result_df.loc[index, "col_12"] = 1
        if i == 13:
            result_df.loc[index, "col_13"] = 1
        if i == 14:
            result_df.loc[index, "col_14"] = 1
        if i == 15:
            result_df.loc[index, "col_15"] = 1
        if i == 16:
            result_df.loc[index, "col_16"] = 1
        if i == 17:
            result_df.loc[index, "col_17"] = 1
        if i == 18:
            result_df.loc[index, "col_18"] = 1
        if i == 19:
            result_df.loc[index, "col_19"] = 1

CPU times: user 49.4 s, sys: 163 ms, total: 49.5 s
Wall time: 4min 56s


In [14]:
df_stemmed = spark.createDataFrame(result_df, schema = my_schema)

In [15]:
#filter words whose length is greater than 3
filter_length_udf = udf(lambda row: [x for x in row if len(x) > 0], ArrayType(StringType()))
df_stemmed = df_stemmed.withColumn('plot_length', filter_length_udf(col('plot')))
data = df_stemmed.select("*")


In [16]:
from pyspark.ml.feature import Word2Vec

In [17]:
%%time
cv_model = Word2Vec( vectorSize=100, seed=100, inputCol="plot_length", outputCol="features")
cv_model = cv_model.fit(data)
cv_result=cv_model.transform(data)

CPU times: user 61.3 ms, sys: 491 µs, total: 61.8 ms
Wall time: 7min 37s


In [18]:
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [19]:
from pyspark.ml.classification import RandomForestClassifier

In [20]:
%%time
RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_0", seed=42)
lrModel=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_1", seed=42)
lrModel1=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_2", seed=42)
lrModel2=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_3", seed=42)
lrModel3=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_4", seed=42)
lrModel4=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_5", seed=42)
lrModel5=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_6", seed=42)
lrModel6=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_7", seed=42)
lrModel7=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_8", seed=42)
lrModel8=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_9", seed=42)
lrModel9=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_10", seed=42)
lrModel10=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_11", seed=42)
lrModel11=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_12", seed=42)
lrModel12=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_13", seed=42)
lrModel13=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_14", seed=42)
lrModel14=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_15", seed=42)
lrModel15=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_16", seed=42)
lrModel16=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_17", seed=42)
lrModel17=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_18", seed=42)
lrModel18=RF.fit(cv_result)
print('done')

RF=RandomForestClassifier(numTrees=3, maxDepth=10, featuresCol='features',labelCol="col_19", seed=42)
lrModel19=RF.fit(cv_result)
print('done')



done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
done
CPU times: user 637 ms, sys: 37 ms, total: 674 ms
Wall time: 45min 4s


In [31]:
lrModel.save("/home/cse587/shathesh/Assignment/RFModel-bonus-1")
lrModel1.save("/home/cse587/shathesh/Assignment/RFModel-bonus-2")
lrModel2.save("/home/cse587/shathesh/Assignment/RFModel-bonus-3")
lrModel3.save("/home/cse587/shathesh/Assignment/RFModel-bonus-4")
lrModel4.save("/home/cse587/shathesh/Assignment/RFModel-bonus-5")
lrModel5.save("/home/cse587/shathesh/Assignment/RFModel-bonus-6")
lrModel6.save("/home/cse587/shathesh/Assignment/RFModel-bonus-7")
lrModel7.save("/home/cse587/shathesh/Assignment/RFModel-bonus-8")
lrModel8.save("/home/cse587/shathesh/Assignment/RFModel-bonus-9")
lrModel9.save("/home/cse587/shathesh/Assignment/RFModel-bonus-10")
lrModel10.save("/home/cse587/shathesh/Assignment/RFModel-bonus-11")
lrModel11.save("/home/cse587/shathesh/Assignment/RFModel-bonus-12")
lrModel12.save("/home/cse587/shathesh/Assignment/RFModel-bonus-13")
lrModel13.save("/home/cse587/shathesh/Assignment/RFModel-bonus-14")
lrModel14.save("/home/cse587/shathesh/Assignment/RFModel-bonus-15")
lrModel15.save("/home/cse587/shathesh/Assignment/RFModel-bonus-16")
lrModel16.save("/home/cse587/shathesh/Assignment/RFModel-bonus-17")
lrModel17.save("/home/cse587/shathesh/Assignment/RFModel-bonus-18")
lrModel18.save("/home/cse587/shathesh/Assignment/RFModel-bonus-19")
lrModel19.save("/home/cse587/shathesh/Assignment/RFModel-bonus-20")

In [21]:
#clean text

test_df = spark.read.csv("/home/cse587/shathesh/Assignment/dic487-587/test.csv", escape ="\"", inferSchema = True, header = True)
test_df = test_df.na.drop(subset=["plot","movie_id"])
test_df.printSchema()
test_df_clean = test_df.select('movie_id', 'movie_name', (lower(regexp_replace('plot',"[^a-zA-Z\\s]","")).alias('plot')))


root
 |-- movie_id: integer (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)



In [22]:

#Tokenize Plot Text
test_df_words_token = tokenizer.transform(test_df_clean).select("movie_id","movie_name","plot_token")

#Remove StopWords
test_df_words_token_rem_stopwor = remover.transform(test_df_words_token).select("movie_id","movie_name","plot_clean")

In [23]:

#Text Stemming

test_df_stemmed = test_df_words_token_rem_stopwor.withColumn("words_stemmed" ,stem_udf("plot_clean")).select('movie_id',"words_stemmed")
test_df_stemmed = test_df_stemmed.withColumnRenamed("words_stemmed","plot")
test_df_stemmed = test_df_stemmed.withColumn('plot_length', filter_length_udf(col('plot')))
test_data = test_df_stemmed.select("movie_id","plot","plot_length")

#transform data to word2vec
test_cv_result = cv_model.transform(test_data)

In [None]:
# >>>>>>MODEL LOADING<<<<<<
'''from pyspark.ml.classification import LogisticRegressionModel

savePath = './Bonus/RFModel-bonus'
lrModel = LogisticRegressionModel.load(savePath + '-1')
lrModel2 = LogisticRegressionModel.load(savePath + '-2')
lrModel3 = LogisticRegressionModel.load(savePath + '-3')
lrModel4 = LogisticRegressionModel.load(savePath + '-4')
lrModel5 = LogisticRegressionModel.load(savePath + '-5')
lrModel6 = LogisticRegressionModel.load(savePath + '-6')
lrModel7 = LogisticRegressionModel.load(savePath + '-7')
lrModel8 = LogisticRegressionModel.load(savePath + '-8')
lrModel9 = LogisticRegressionModel.load(savePath + '-9')
lrModel10 = LogisticRegressionModel.load(savePath + '-10')
lrModel11 = LogisticRegressionModel.load(savePath + '-11')
lrModel12 = LogisticRegressionModel.load(savePath + '-12')
lrModel13 = LogisticRegressionModel.load(savePath + '-13')
lrModel14 = LogisticRegressionModel.load(savePath + '-14')
lrModel15 = LogisticRegressionModel.load(savePath + '-15')
lrModel16 = LogisticRegressionModel.load(savePath + '-16')
lrModel17 = LogisticRegressionModel.load(savePath + '-17')
lrModel18 = LogisticRegressionModel.load(savePath + '-18')
lrModel19 = LogisticRegressionModel.load(savePath + '-19')'''

In [24]:
%%time
predictions = lrModel.transform(test_cv_result)
predictions1 = lrModel1.transform(test_cv_result)
predictions2 = lrModel2.transform(test_cv_result)
predictions3 = lrModel3.transform(test_cv_result)
predictions4 = lrModel4.transform(test_cv_result)
predictions5 = lrModel5.transform(test_cv_result)
predictions6 = lrModel6.transform(test_cv_result)
predictions7 = lrModel7.transform(test_cv_result)
predictions8 = lrModel8.transform(test_cv_result)
predictions9 = lrModel9.transform(test_cv_result)
predictions10 = lrModel10.transform(test_cv_result)
predictions11 = lrModel11.transform(test_cv_result)
predictions12 = lrModel12.transform(test_cv_result)
predictions13 = lrModel13.transform(test_cv_result)
predictions14 = lrModel14.transform(test_cv_result)
predictions15 = lrModel15.transform(test_cv_result)
predictions16 = lrModel16.transform(test_cv_result)
predictions17 = lrModel17.transform(test_cv_result)
predictions18 = lrModel18.transform(test_cv_result)
predictions19 = lrModel19.transform(test_cv_result)

CPU times: user 165 ms, sys: 16.4 ms, total: 181 ms
Wall time: 1.22 s


In [25]:
from pyspark.sql import functions as F

In [None]:
dict = {}
movie_id = predictions.select(F.collect_list('movie_id')).first()[0]
pred1 = predictions.select(F.collect_list('prediction')).first()[0]
pred2 = predictions1.select(F.collect_list('prediction')).first()[0]
pred3 = predictions2.select(F.collect_list('prediction')).first()[0]
pred4 = predictions3.select(F.collect_list('prediction')).first()[0]
pred5 = predictions4.select(F.collect_list('prediction')).first()[0]
pred6 = predictions5.select(F.collect_list('prediction')).first()[0]
pred7 = predictions6.select(F.collect_list('prediction')).first()[0]
pred8 = predictions7.select(F.collect_list('prediction')).first()[0]
pred9 = predictions8.select(F.collect_list('prediction')).first()[0]
pred10 = predictions9.select(F.collect_list('prediction')).first()[0]
pred11 = predictions10.select(F.collect_list('prediction')).first()[0]
pred12 = predictions11.select(F.collect_list('prediction')).first()[0]
pred13 = predictions12.select(F.collect_list('prediction')).first()[0]
pred14 = predictions13.select(F.collect_list('prediction')).first()[0]
pred15 = predictions14.select(F.collect_list('prediction')).first()[0]
pred16 = predictions15.select(F.collect_list('prediction')).first()[0]
pred17 = predictions16.select(F.collect_list('prediction')).first()[0]
pred18 = predictions17.select(F.collect_list('prediction')).first()[0]
pred19 = predictions18.select(F.collect_list('prediction')).first()[0]
pred20 = predictions19.select(F.collect_list('prediction')).first()[0]

In [27]:
from csv import writer

In [28]:
def append_list_as_row(filename, elements):
    with open(filename, 'a+', newline='') as write_obj:
        csv_writer = writer(write_obj)
        csv_writer.writerow(elements)

In [29]:
for i in range(0,len(pred1)):
    p = ""
    p+=str(int(pred1[i]))
    p+=" "+str(int(pred2[i]))
    p+=" "+str(int(pred3[i]))
    p+=" "+str(int(pred4[i]))
    p+=" "+str(int(pred5[i]))
    p+=" "+str(int(pred6[i]))
    p+=" "+str(int(pred7[i]))
    p+=" "+str(int(pred8[i]))
    p+=" "+str(int(pred9[i]))
    p+=" "+str(int(pred10[i]))
    p+=" "+str(int(pred11[i]))
    p+=" "+str(int(pred12[i]))
    p+=" "+str(int(pred13[i]))
    p+=" "+str(int(pred14[i]))
    p+=" "+str(int(pred15[i]))
    p+=" "+str(int(pred16[i]))
    p+=" "+str(int(pred17[i]))
    p+=" "+str(int(pred18[i]))
    p+=" "+str(int(pred19[i]))
    p+=" "+str(int(pred20[i]))
    dict[movie_id[i]] = p
    row_contents= [movie_id[i], p]
    append_list_as_row("/home/cse587/shathesh/Assignment/preds_part_1.csv", row_contents)