## Part 2 : Using TF-IDF for feature engineering and model was built using Random Forest Algorithm

In [1]:
import findspark

In [2]:
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')

In [3]:
import pyspark

In [4]:
from pyspark.sql import *

# Build the SparkSession
spark = SparkSession.builder \
   .appName("Assignment3p2") \
   .config("spark.some.config.option", "8gb") \
   .getOrCreate()
   


In [5]:
sc = spark.sparkContext
sc.setLogLevel("ERROR")

In [6]:
from pyspark.sql import SQLContext
sqlcontext=SQLContext(sc)

## Loading necessary libraries for this part

In [7]:
import numpy as np
import pandas as pd
import re
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
import re
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.sql.functions import udf, col, lower, regexp_replace
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer, IndexToString
from pyspark.mllib.linalg import SparseVector, DenseVector

## Loading training dataset

In [8]:

train_data = pd.read_csv("/home/cse587/Project3/train.csv",sep=",", header = 0)

In [9]:
train_data.head()

Unnamed: 0,movie_id,movie_name,plot,genre
0,23890098,Taxi Blues,"Shlykov, a hard-working taxi driver and Lyosha...","['World cinema', 'Drama']"
1,31186339,The Hunger Games,The nation of Panem consists of a wealthy Capi...,"['Action/Adventure', 'Action', 'Science Fictio..."
2,20663735,Narasimham,Poovalli Induchoodan is sentenced for six yea...,"['Musical', 'Action', 'Drama']"
3,2231378,The Lemon Drop Kid,"The Lemon Drop Kid , a New York City swindler,...",['Comedy']
4,595909,A Cry in the Dark,Seventh-day Adventist Church pastor Michael Ch...,"['Crime Fiction', 'World cinema', 'Drama']"


# Converting pandas dataframe to pyspark dataframe

In [11]:
training_df = spark.createDataFrame(train_data)
# training_df.head(1)

## Cleaning the data and creating TD-IDF

In [12]:

tokenizer = Tokenizer(inputCol="plot", outputCol="words")
wordsData = tokenizer.transform(training_df)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)
# alternatively, CountVectorizer can also be used to get term frequency vectors

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)



In [13]:
rescaledData.head(1)

[Row(movie_id=23890098, movie_name='Taxi Blues', plot="Shlykov, a hard-working taxi driver and Lyosha, a saxophonist, develop a bizarre love-hate relationship, and despite their prejudices, realize they aren't so different after all.", genre="['World cinema', 'Drama']", words=['shlykov,', 'a', 'hard-working', 'taxi', 'driver', 'and', 'lyosha,', 'a', 'saxophonist,', 'develop', 'a', 'bizarre', 'love-hate', 'relationship,', 'and', 'despite', 'their', 'prejudices,', 'realize', 'they', "aren't", 'so', 'different', 'after', 'all.'], rawFeatures=SparseVector(20, {1: 1.0, 5: 2.0, 6: 1.0, 8: 2.0, 9: 1.0, 10: 4.0, 11: 3.0, 13: 2.0, 15: 3.0, 17: 2.0, 18: 2.0, 19: 2.0}), features=SparseVector(20, {1: 0.0265, 5: 0.0471, 6: 0.0715, 8: 0.0286, 9: 0.0734, 10: 0.0045, 11: 0.2, 13: 0.0283, 15: 0.1344, 17: 0.0414, 18: 0.0773, 19: 0.1171}))]

In [13]:
rescaledData.show()

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               genre|               words|         rawFeatures|            features|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|[shlykov,, a, har...|(20,[1,5,6,8,9,10...|(20,[1,5,6,8,9,10...|
|31186339|    The Hunger Games|The nation of Pan...|['Action/Adventur...|[the, nation, of,...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|20663735|          Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|[poovalli, induch...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
| 2231378|  The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|[the, lemon, drop...|(20,[0,1,2,3,4,5,...|(20,[0,1,2,3,4,5,...|
|  595909|   A Cry in the Dark|Seventh-da

Converting genre into labels using StringIndexer

In [14]:
label_stringIdx = StringIndexer(inputCol = "genre", outputCol = "label")
fitter = label_stringIdx.fit(rescaledData)
rescaledData = fitter.transform(rescaledData)
labels = fitter.labels

## Fitting Machine Learning model on the training dataset

In [15]:
(training_data,testing_data) = rescaledData.randomSplit([0.7,0.3], seed=234)

In [16]:
## Fitting Naive Bayes model on the training data

# from pyspark.ml.classification import NaiveBayes
# nb = NaiveBayes(smoothing=0.1)
# model = nb.fit(training_data)

In [16]:
## Fitting Random Forest model on the training data

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="features", numTrees=20, maxBins=100)
model = rf.fit(training_data)

## Saving the trained model

In [17]:
path = "/home/cse587/Project3/randomforest_for_part2"
model.save("/home/cse587/Project3/randomforest_for_part2")

In [14]:
## fitting the obtained model on validation test data
predictions = model.transform(training_data)


In [15]:
## Evaluating the accuracy of predicted labels
evaluator = MulticlassClassificationEvaluator(labelCol = "label", predictionCol="prediction")
evaluator.evaluate(predictions)

0.029308555639158743

## Loading test data from local using Pandas and converted into pyspark dataframe

In [16]:
test_data = pd.read_csv("/home/cse587/Project3/test.csv",sep=",", header = 0)
testing_df = spark.createDataFrame(test_data)
# testing_df.head(1)

## pre-processing the test data

In [17]:

tokenizer = Tokenizer(inputCol="plot", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, hashingTF, idf])

idfModel = pipeline.fit(training_df)
test_data2 = idfModel.transform(testing_df)




In [32]:
test_data2.head(1)



## Predicting the test labels by fitting the trained model

In [18]:
test_pred2 = model.transform(test_data2)

## converted the predicted labels to its corresponding genre
converter = IndexToString(inputCol="prediction", outputCol="originalCategory", labels = labels)
test_result = converter.transform(test_pred2)
# test_result.show(2)

## Loading the mapping.csv into a dataframe

In [19]:
mapping = pd.read_csv('/home/cse587/Project3/mapping.csv')
colnames = ['label','name']
mapping.columns =colnames
mapping_df = sqlContext.createDataFrame(mapping)
# valuelist = [row.name for row in mapping_df.collect()]
mapping.head()


Unnamed: 0,number,name
0,0,Drama
1,1,Comedy
2,2,Romance Film
3,3,Thriller
4,4,Action


In [20]:
def convert(lines):
    lines = lines[1:-1].split(', ')
    temp=[0]*20
    for i in lines:   
        for k,v in mapping.iterrows():
            if i == ('\''+ v['name'] +'\''):
                temp[k] = 1
                continue
    return " ".join(map(str,temp))

# s = "['Musical', 'Mystery']"
# print(convert(s))
converter = udf(convert)
test_result = test_result.withColumn("predictions",converter('originalCategory'))

## Extracting the final dataframe

In [21]:
part2_result = test_result.select("movie_id","predictions")
# part2_result = part2_result.withColumnRenamed("final_prediction","predictions")

## Saving the predicted results in .csv file

In [22]:
part2_result.write.csv('part2_test_result_rf.csv', header = True)

In [23]:
part2_result.show(1)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
| 1335380|1 0 0 0 0 0 0 0 0...|
+--------+--------------------+
only showing top 1 row

