

<h1><center> CSE 487/587 Assignment 3: Predictive Analytics with Spark</center></h1>

<h1><center> PART 3 - Custom Feature Engineering<center></h1>

## Importing packages and checking the version

In [1]:
# Checking the JAVA version
!java -version

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)


In [2]:
# Import the findspark and pyspark
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
import pyspark

In [3]:
pyspark

<module 'pyspark' from '/home/cse587/spark-2.4.0-bin-hadoop2.7/python/pyspark/__init__.py'>

## Importing Libraries

In [4]:
import pandas as pd
from pyspark.sql import SparkSession 
from pyspark.sql.functions import col, lower, regexp_replace, split, size,array
from pyspark.sql.functions import udf
from pyspark.sql.functions import UserDefinedFunction
import pyspark.sql.functions as sf
from pyspark.sql.types import StringType,IntegerType
from pyspark.ml import Pipeline
from pyspark.ml.feature import  Tokenizer, StopWordsRemover,HashingTF, IDF, Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier

In [5]:
# Setting the max memory to 8gb RAM
MAX_MEMORY = "8g"

# Creating the spark session
spark = SparkSession \
    .builder \
    .config("spark.executor.memory", MAX_MEMORY) \
    .config("spark.driver.memory", MAX_MEMORY) \
    .getOrCreate()

from pyspark.sql import SQLContext
sqlContext = SQLContext(spark)

## Importing Train, Test and Mapping datasets

In [6]:
train = pd.read_csv("train.csv")
test = pd.read_csv("test.csv")
mapping = pd.read_csv("mapping.csv",names=["genre_id", "genre"])

## Checking datasets

In [7]:
train.head()

Unnamed: 0,movie_id,movie_name,plot,genre
0,23890098,Taxi Blues,"Shlykov, a hard-working taxi driver and Lyosha...","['World cinema', 'Drama']"
1,31186339,The Hunger Games,The nation of Panem consists of a wealthy Capi...,"['Action/Adventure', 'Action', 'Science Fictio..."
2,20663735,Narasimham,Poovalli Induchoodan is sentenced for six yea...,"['Musical', 'Action', 'Drama']"
3,2231378,The Lemon Drop Kid,"The Lemon Drop Kid , a New York City swindler,...",['Comedy']
4,595909,A Cry in the Dark,Seventh-day Adventist Church pastor Michael Ch...,"['Crime Fiction', 'World cinema', 'Drama']"


In [8]:
test.head()

Unnamed: 0,movie_id,movie_name,plot
0,1335380,Exodus,The film is based on the events that happened ...
1,29062594,A la salida nos vemos,A group of teenagers at Catholic boarding scho...
2,9252321,"Come Back, Africa",This story of a Zulu family is a composite sto...
3,13455076,A Merry Mixup,The Stooges play three sets of identical tripl...
4,24165951,Getting Even,A soldier-of-fortune steals some Russian nerve...


In [9]:
mapping.head()

Unnamed: 0,genre_id,genre
0,,0
1,0.0,Drama
2,1.0,Comedy
3,2.0,Romance Film
4,3.0,Thriller


## Converting pandas datasets to PySpark

In [10]:
spark_df = sqlContext.createDataFrame(train)
spark_test_df= sqlContext.createDataFrame(test)

In [11]:
spark_df.show(2)

+--------+----------------+--------------------+--------------------+
|movie_id|      movie_name|                plot|               genre|
+--------+----------------+--------------------+--------------------+
|23890098|      Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|The Hunger Games|The nation of Pan...|['Action/Adventur...|
+--------+----------------+--------------------+--------------------+
only showing top 2 rows



In [12]:
spark_test_df.show(2)

+--------+--------------------+--------------------+
|movie_id|          movie_name|                plot|
+--------+--------------------+--------------------+
| 1335380|              Exodus|The film is based...|
|29062594|A la salida nos v...|A group of teenag...|
+--------+--------------------+--------------------+
only showing top 2 rows



## select particular columns

In [13]:
spark_df.select("movie_id","movie_name","plot","genre").show() 

+--------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               genre|
+--------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|    The Hunger Games|The nation of Pan...|['Action/Adventur...|
|20663735|          Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|
| 2231378|  The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|
|  595909|   A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|
| 5272176|            End Game|The president is ...|['Action/Adventur...|
| 1952976|          Dark Water|{{plot}} The film...|['Thriller', 'Dra...|
|24225279|                Sing|The story begins ...|           ['Drama']|
| 2462689|       Meet John Doe|Infuriated at bei...|['Black-and-white...|
|20532852|Destination Meatball|A line of people ...|['Animation', 'Sh...|
|15401493|    Husband for Hire|Lola  a

## Data Preprocessing

### Data Cleaning 

In [14]:
# Using pyspark.sql.functions regexp_replace to remove punctuations and special characters
def clean_text(c):
    c = lower(c)
    c = regexp_replace(c, "^rt ", "")
    c = regexp_replace(c, "(https?\://)\S+", "")
    c = regexp_replace(c, "[^a-zA-Z0-9\\s]", "")
    return c

# We will do this for only "plot" column of the dataframe
# Making the change in the train data
clean_text_df = spark_df.select(("movie_id"),("movie_name"),clean_text(col("plot")).alias("plot"),("genre"))
# Making the change in the test data
clean_test_df = spark_test_df.select(("movie_id"),("movie_name"),clean_text(col("plot")).alias("plot"))
clean_text_df.printSchema()
clean_text_df.show(5)
clean_test_df.printSchema()
clean_test_df.show(5)

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)

+--------+------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|
+--------+------------------+--------------------+--------------------+
|23890098|        Taxi Blues|shlykov a hardwor...|['World cinema', ...|
|31186339|  The Hunger Games|the nation of pan...|['Action/Adventur...|
|20663735|        Narasimham|poovalli induchoo...|['Musical', 'Acti...|
| 2231378|The Lemon Drop Kid|the lemon drop ki...|          ['Comedy']|
|  595909| A Cry in the Dark|seventhday advent...|['Crime Fiction',...|
+--------+------------------+--------------------+--------------------+
only showing top 5 rows

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)

+--------+--------------------+---------------

## We will first create the pipeline model which defines the step by step approach towards feature engineering

In [15]:
# Step 1 - Tokenizer
# This will separate each word of the plot into a list of words
tokenizer = Tokenizer(inputCol="plot", outputCol="words")

#Step 2 - Removing stopwords from the tokenized words
remover = StopWordsRemover()
stopwords = remover.getStopWords() 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stopwords)

#Step 3 - Creating distributed vector representation of words after removing stopwords using Word2Vec
w2v = Word2Vec(vectorSize=100, minCount=5, inputCol="filtered", outputCol="rawFeatures", stepSize=0.01)

#Step 4 - Creating the pipeline for the above approach
word2vec_pipeline = Pipeline(stages =[tokenizer, stopwordsRemover,w2v])

## Preparing Feature data for Train data

In [16]:
#Fitting the pipeline to the train data
model1 = word2vec_pipeline.fit(clean_text_df) 
# Transforming the train data and creating separate dataframe of it
featurizedData = model1.transform(clean_text_df)
featurizedData.show(5)

+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|               words|            filtered|         rawFeatures|
+--------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|23890098|        Taxi Blues|shlykov a hardwor...|['World cinema', ...|[shlykov, a, hard...|[shlykov, hardwor...|[-0.0180836420195...|
|31186339|  The Hunger Games|the nation of pan...|['Action/Adventur...|[the, nation, of,...|[nation, panem, c...|[-0.0162740596882...|
|20663735|        Narasimham|poovalli induchoo...|['Musical', 'Acti...|[poovalli, induch...|[poovalli, induch...|[-4.7364569269899...|
| 2231378|The Lemon Drop Kid|the lemon drop ki...|          ['Comedy']|[the, lemon, drop...|[lemon, drop, kid...|[-0.0150217633932...|
|  595909| A Cry in the Dark|seventhday advent...|['Cri

## Preparing Feature data for Test data

In [17]:
#Fitting the pipeline to the train data
model2 = word2vec_pipeline.fit(clean_test_df) 
# Transforming the train data and creating separate dataframe of it
testData = model2.transform(clean_test_df)
testData.show(5)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               words|            filtered|         rawFeatures|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+
| 1335380|              Exodus|the film is based...|[the, film, is, b...|[film, based, eve...|[-0.0033346106192...|
|29062594|A la salida nos v...|a group of teenag...|[a, group, of, te...|[group, teenagers...|[-0.0115854096007...|
| 9252321|   Come Back, Africa|this story of a z...|[this, story, of,...|[story, zulu, fam...|[-0.0074041764421...|
|13455076|       A Merry Mixup|the stooges play ...|[the, stooges, pl...|[stooges, play, t...|[-0.0170712757489...|
|24165951|        Getting Even|a soldieroffortun...|[a, soldieroffort...|[soldieroffortune...|[0.00455537089534...|
+--------+--------------------+--------------------+--------------------

In [18]:
# Selecting only necessary columns
featurizedData= featurizedData.select("movie_id","movie_name","plot","genre","rawFeatures")

## **Feature vector with movie id**

In [19]:
featurizedData.show()

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               genre|         rawFeatures|
+--------+--------------------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|shlykov a hardwor...|['World cinema', ...|[-0.0180836420195...|
|31186339|    The Hunger Games|the nation of pan...|['Action/Adventur...|[-0.0162740596882...|
|20663735|          Narasimham|poovalli induchoo...|['Musical', 'Acti...|[-4.7364569269899...|
| 2231378|  The Lemon Drop Kid|the lemon drop ki...|          ['Comedy']|[-0.0150217633932...|
|  595909|   A Cry in the Dark|seventhday advent...|['Crime Fiction',...|[-0.0021270738290...|
| 5272176|            End Game|the president is ...|['Action/Adventur...|[-0.0362848561934...|
| 1952976|          Dark Water|plot the film ope...|['Thriller', 'Dra...|[-0.0394205021706...|
|24225279|                Sing|the story begins ..

## Label Processing start

In [20]:
# We will use mapping.csv to process our genrelist and convert it to the labels having binary number like string telling specifying the multilabels of the given movie plot
mapping=mapping[1:]
mapping_spark = sqlContext.createDataFrame(mapping)
mapping_spark.toPandas().set_index('genre_id').T.to_dict()

#taking the genre column in the list
mapping_genre_list=mapping_spark.select("genre").collect()

#extracting the genre name into the list
genreList=[]
for i in range(0,mapping_spark.count()):
    genreList.append(mapping_genre_list[i][0])

In [21]:
genreList

['Drama',
 'Comedy',
 'Romance Film',
 'Thriller',
 'Action',
 'World cinema',
 'Crime Fiction',
 'Horror',
 'Black-and-white',
 'Indie',
 'Action/Adventure',
 'Adventure',
 'Family Film',
 'Short Film',
 'Romantic drama',
 'Animation',
 'Musical',
 'Science Fiction',
 'Mystery',
 'Romantic comedy']

In [22]:
# This will convert the genre to the binary like string(1 and 0) specifying multilabels of the given movie_id
def oneHotEncoding(x):
    indexList=" "
    for genre in genreList:
        if genre in x:
            indexList=indexList+"1"
        else:
            indexList=indexList+"0"
    temp=(indexList.replace("", " ")[1: -1])
    return temp.strip(" ")

## Mapping Label with feature vectors

In [23]:
myfunction = UserDefinedFunction(lambda x: oneHotEncoding(x), StringType())
featurizedData = featurizedData.select(*[myfunction(col).alias("labels") if col == "genre" else col for col in featurizedData.columns])
featurizedData.show()

+--------+--------------------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|              labels|         rawFeatures|
+--------+--------------------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|shlykov a hardwor...|1 0 0 0 0 1 0 0 0...|[-0.0180836420195...|
|31186339|    The Hunger Games|the nation of pan...|1 0 0 0 1 0 0 0 0...|[-0.0162740596882...|
|20663735|          Narasimham|poovalli induchoo...|1 0 0 0 1 0 0 0 0...|[-4.7364569269899...|
| 2231378|  The Lemon Drop Kid|the lemon drop ki...|0 1 0 0 0 0 0 0 0...|[-0.0150217633932...|
|  595909|   A Cry in the Dark|seventhday advent...|1 0 0 0 0 1 1 0 0...|[-0.0021270738290...|
| 5272176|            End Game|the president is ...|1 0 0 1 1 0 0 0 0...|[-0.0362848561934...|
| 1952976|          Dark Water|plot the film ope...|1 0 0 1 0 0 0 1 0...|[-0.0394205021706...|
|24225279|                Sing|the story begins ..

In [24]:
label=featurizedData.select("movie_id","labels")

In [25]:
label.show()

+--------+--------------------+
|movie_id|              labels|
+--------+--------------------+
|23890098|1 0 0 0 0 1 0 0 0...|
|31186339|1 0 0 0 1 0 0 0 0...|
|20663735|1 0 0 0 1 0 0 0 0...|
| 2231378|0 1 0 0 0 0 0 0 0...|
|  595909|1 0 0 0 0 1 1 0 0...|
| 5272176|1 0 0 1 1 0 0 0 0...|
| 1952976|1 0 0 1 0 0 0 1 0...|
|24225279|1 0 0 0 0 0 0 0 0...|
| 2462689|1 1 1 0 0 0 0 0 1...|
|20532852|0 0 0 0 0 0 0 0 0...|
|15401493|0 1 0 0 0 0 0 0 0...|
|18188932|1 1 0 0 0 1 1 0 0...|
| 2940516|0 1 0 0 0 0 0 0 0...|
| 1480747|0 1 0 0 0 0 0 0 0...|
|24448645|0 0 0 0 0 0 0 1 0...|
|15072401|0 0 0 1 0 0 1 1 0...|
| 4018288|1 0 0 0 0 0 0 0 0...|
| 4596602|0 0 1 1 1 0 1 0 0...|
|15224586|1 0 0 0 0 0 0 0 0...|
|15585766|1 0 0 0 0 0 0 0 0...|
+--------+--------------------+
only showing top 20 rows



## Splitting Label columns

In [26]:
from pyspark.sql.types import StringType,IntegerType
split_col = pyspark.sql.functions.split(label['labels'],' ')
for i in range(0,len(mapping_genre_list)):
    label = label.withColumn('label{}'.format(i), split_col.getItem(i).cast(IntegerType()))

In [27]:
label.show()

+--------+--------------------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|movie_id|              labels|label0|label1|label2|label3|label4|label5|label6|label7|label8|label9|label10|label11|label12|label13|label14|label15|label16|label17|label18|label19|
+--------+--------------------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|23890098|1 0 0 0 0 1 0 0 0...|     1|     0|     0|     0|     0|     1|     0|     0|     0|     0|      0|      0|      0|      0|      0|      0|      0|      0|      0|      0|
|31186339|1 0 0 0 1 0 0 0 0...|     1|     0|     0|     0|     1|     0|     0|     0|     0|     0|      1|      1|      0|      0|      0|      0|      0|      1|      0|      0|
|20663735|1 0 0 0 1 0 0 0 0...|     1|     0|     0|     0|     1|     0|     0|     0|   

## Joining Feature vector column with label columns

In [28]:
featurizedData=featurizedData.drop('labels')
df  = featurizedData.join(label, label.movie_id == featurizedData.movie_id).drop(label.movie_id)

In [29]:
df.show()

+--------------------+--------------------+--------------------+--------+--------------------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|          movie_name|                plot|         rawFeatures|movie_id|              labels|label0|label1|label2|label3|label4|label5|label6|label7|label8|label9|label10|label11|label12|label13|label14|label15|label16|label17|label18|label19|
+--------------------+--------------------+--------------------+--------+--------------------+------+------+------+------+------+------+------+------+------+------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|The Night of the ...|the film is set i...|[-0.0182564130088...|   75264|1 0 0 1 0 0 1 0 1...|     1|     0|     0|     1|     0|     0|     1|     0|     1|     0|      0|      0|      0|      0|      0|      0|      0|      0|      1|      0|
|  One Foot in Heave

## Function for Random Forest Classifier

In [30]:
def fit_and_classify(df, labelcol, featurescol):
    RandomForest_classifier = RandomForestClassifier(labelCol = labelcol, featuresCol= featurescol,numTrees=3,maxDepth=2)
    RandomForest_classifier_pipeline = Pipeline(stages=[RandomForest_classifier])
    RandomForest_model = RandomForest_classifier_pipeline.fit(df)
    return RandomForest_model

## Training the Model

In [31]:
# We will train the model for each label column and take the prediction
# As we are using Random Forest Classifier we will need to balance our data so that our model performs better
# So here we will do undersamlpling to make the data balanced

label_to_append = []
for i in range(20):
    filter_df = df.select('movie_id','rawFeatures','label{}'.format(i))
    #Dividing the filtered_df to check for the ratio
    major_df = filter_df.filter(filter_df['label{}'.format(i)] == 1) # Selecting the rows with labels only 1
    minor_df = filter_df.filter(filter_df['label{}'.format(i)] == 0) # Selecting the rows with labels only 0
    if minor_df.count() > major_df.count(): 
        ratio = minor_df.count() / major_df.count()
        sampled_df = minor_df.sample(False, 1/ratio) # Sampling the data as per the ratio and setting withReplacement to False
        final_df = sampled_df.union(major_df) # creating a final dataframe for our  random forest classifer model
    else:
        ratio = major_df.count() / minor_df.count()
        sampled_df = major_df.sample(False, 1/ratio)# Sampling the data as per the ratio and setting withReplacement to False
        final_df = sampled_df.union(minor_df) # creating a final dataframe for our  random forest classifer model
    RandomForest_model = fit_and_classify(final_df, labelcol='label{}'.format(i), featurescol='rawFeatures')
    predictions = RandomForest_model.transform(testData) # Getting the predictions
    label_to_append.append(predictions.select('movie_id', 'prediction')) # Selecting only movie_id and prediction column

In [32]:
# Preparing the header row for each of the predicted label column (so total 20 prediction column)

for i in range(len(label_to_append)):
    label_to_append[i] = label_to_append[i].withColumnRenamed('prediction','prediction{}'.format(i))

In [33]:
# Joining the data

joined_df = label_to_append[0]
for i in range(1,len(label_to_append)):
    joined_df = joined_df.join(label_to_append[i],on = ['movie_id'], how = 'inner')

## Assembling the predictions

In [34]:
from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols = ["prediction0","prediction1","prediction2","prediction3","prediction4","prediction5","prediction6","prediction7","prediction8","prediction9","prediction10","prediction11","prediction12","prediction13","prediction14","prediction15","prediction16","prediction17","prediction18","prediction19"], outputCol = 'Predictions')
joined_df = vecAssembler.transform(joined_df)

In [35]:
joined_df.show(5)

+--------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------------------+
|movie_id|prediction0|prediction1|prediction2|prediction3|prediction4|prediction5|prediction6|prediction7|prediction8|prediction9|prediction10|prediction11|prediction12|prediction13|prediction14|prediction15|prediction16|prediction17|prediction18|prediction19|         Predictions|
+--------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+------------+------------+------------+------------+--------------------+
|   62693|        0.0|        1.0|        0.0|        1.0|        0.0|        0.0|        1.0|        0.0|        0.0|        0.0|         0.0|         1.

In [36]:
result_df = joined_df.withColumn('predictions', 
                    sf.concat((sf.col('prediction0')).cast(IntegerType()),sf.lit(' '), sf.col('prediction1').cast(IntegerType()),sf.lit(' '), sf.col('prediction2').cast(IntegerType()),sf.lit(' '), sf.col('prediction3').cast(IntegerType()),sf.lit(' '), sf.col('prediction4').cast(IntegerType()),sf.lit(' '), sf.col('prediction5').cast(IntegerType()),sf.lit(' '), sf.col('prediction6').cast(IntegerType()),sf.lit(' '),sf.col('prediction7').cast(IntegerType()),sf.lit(' '), sf.col('prediction8').cast(IntegerType()),sf.lit(' '), sf.col('prediction9').cast(IntegerType()),sf.lit(' '), sf.col('prediction10').cast(IntegerType()),sf.lit(' '), sf.col('prediction11').cast(IntegerType()),sf.lit(' '), sf.col('prediction12').cast(IntegerType()),sf.lit(' '), sf.col('prediction13').cast(IntegerType()),sf.lit(' '),sf.col('prediction14').cast(IntegerType()),sf.lit(' '), sf.col('prediction15').cast(IntegerType()),sf.lit(' '), sf.col('prediction16').cast(IntegerType()),sf.lit(' '), sf.col('prediction17').cast(IntegerType()),sf.lit(' '), sf.col('prediction18').cast(IntegerType()),sf.lit(' '), sf.col('prediction19').cast(IntegerType())))


In [37]:
result_df=result_df.select("movie_id","predictions")

In [38]:
result_df.show(5)

+--------+--------------------+
|movie_id|         predictions|
+--------+--------------------+
|   62693|0 1 0 1 0 0 1 0 0...|
|  296252|0 1 0 1 0 0 1 0 0...|
| 1356971|0 1 0 1 1 0 0 0 0...|
| 1428872|0 1 0 1 0 0 1 0 0...|
| 1582173|0 1 0 1 0 0 1 0 0...|
+--------+--------------------+
only showing top 5 rows



## Saving the result dataframe into CSV file

In [39]:
result_df.coalesce(1).write.format("csv").option("header", "true").save("DIC_Assignment_3_Part3")