In [1]:
#Import all the required libraries
import findspark
import pandas as pd
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.ml.feature import Tokenizer, StopWordsRemover,CountVectorizer,StringIndexer,VectorAssembler
from pyspark.mllib.classification import LogisticRegressionWithSGD
from pyspark.mllib import linalg as mllib_linalg
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.types import IntegerType,FloatType,StructType
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF


In [2]:
#Initialize spark and create spark context
findspark.init('spark-2.4.0-bin-hadoop2.7')

spark = SparkSession.builder \
       .master("local[2]") \
       .appName("Movie Genre prediction") \
       .config("spark.driver.memory", "8g")\
       .getOrCreate()
sc=spark.sparkContext
sqlContext = SQLContext(sc)



In [3]:
#Read training data 
df_train=pd.read_csv('train.csv')
df_train = sqlContext.createDataFrame(df_train)
df_train.show(5)

#Read label mapping 
df_mapping=pd.read_csv('mapping.csv')
df_mapping = sqlContext.createDataFrame(df_mapping)
df_mapping.show(5)

#Read test data
df_test=pd.read_csv('test.csv')
df_test = sqlContext.createDataFrame(df_test)
df_test.show(5)

+--------+------------------+--------------------+--------------------+
|movie_id|        movie_name|                plot|               genre|
+--------+------------------+--------------------+--------------------+
|23890098|        Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|  The Hunger Games|The nation of Pan...|['Action/Adventur...|
|20663735|        Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|
| 2231378|The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|
|  595909| A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|
+--------+------------------+--------------------+--------------------+
only showing top 5 rows

+----------+------------+
|Unnamed: 0|           0|
+----------+------------+
|         0|       Drama|
|         1|      Comedy|
|         2|Romance Film|
|         3|    Thriller|
|         4|      Action|
+----------+------------+
only showing top 5 rows

+--------+--------------------+--------------------+
|movie_id|     

In [4]:
#Select only necessary columns
df_train_features=df_train.select('movie_id','plot')
df_test_features=df_test.select('movie_id','plot')

# Preprocessing

In [5]:
#Pre-processing

#Removing special characters from plot column
df_train_features = df_train_features.select('movie_id',(lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')))
df_test_features = df_test_features.select('movie_id',(lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')))

#Tokenizing the plot column
tokenizer = Tokenizer(inputCol='plot', outputCol='tokenized_plot')
df_train_features = tokenizer.transform(df_train_features).select('movie_id','tokenized_plot')
df_test_features = tokenizer.transform(df_test_features).select('movie_id','tokenized_plot')

#Removing stopwords
remover = StopWordsRemover(inputCol='tokenized_plot', outputCol='final_tokens')
df_train_features = remover.transform(df_train_features).select('movie_id','final_tokens')
df_test_features = remover.transform(df_test_features).select('movie_id','final_tokens')

df_train_features.show(5)
df_test_features.show(5)


+--------+--------------------+
|movie_id|        final_tokens|
+--------+--------------------+
|23890098|[shlykov, hardwor...|
|31186339|[nation, panem, c...|
|20663735|[poovalli, induch...|
| 2231378|[lemon, drop, kid...|
|  595909|[seventhday, adve...|
+--------+--------------------+
only showing top 5 rows

+--------+--------------------+
|movie_id|        final_tokens|
+--------+--------------------+
| 1335380|[film, based, eve...|
|29062594|[group, teenagers...|
| 9252321|[story, zulu, fam...|
|13455076|[stooges, play, t...|
|24165951|[soldieroffortune...|
+--------+--------------------+
only showing top 5 rows



In [6]:
#Converting True genres to binary values
df_train_genres=df_train.select('movie_id',regexp_replace('genre', r'([\[\]\']+)', '').alias('genre'))
df_train_genres=df_train_genres.select('movie_id',regexp_replace('genre',", ", ',').alias('genre'))
df_train_genres=df_train_genres.select('movie_id',split(df_train_genres['genre'],',').alias('genre'))
genres = df_mapping.select("0").rdd.map(lambda row: row[0]).collect()

for genre in genres:
    def func(values):
        if genre in values:
            return 1.0
        else:
            return 0.0
    func_udf = udf(lambda x: func(x), FloatType())
    df_train_genres = df_train_genres.withColumn(genre, func_udf(col('genre')))

df_train_genres.show(2)


+--------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|               genre|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|23890098|[World cinema, Dr...|  1.0|   0.0|         0.0|     0.0|   0.0|         1.0|          0.0|   0.0|            0.0|  0.0|             0.0|      0.0|        0.0|       0.0|           0.0|      0.0|    0.0|            0.0|    0.0|     

# Part-1 Basic Model

In [7]:
#Implement document term matrix using CountVectorizer

cv = CountVectorizer(inputCol="final_tokens", outputCol="features", minDF=0.05)

model = cv.fit(df_train_features)

countVectors_train = model.transform(df_train_features)
countVectors_test = model.transform(df_test_features)

df_train_features_phase1 = countVectors_train.select('movie_id','features')
df_test_features_phase1 = countVectors_test.select('movie_id','features')


df_train_features_phase1.show(5)
df_test_features_phase1.show(5)

+--------+--------------------+
|movie_id|            features|
+--------+--------------------+
|23890098|(376,[125,176],[1...|
|31186339|(376,[0,3,5,8,12,...|
|20663735|(376,[0,2,6,10,12...|
| 2231378|(376,[0,8,9,13,15...|
|  595909|(376,[0,3,9,10,15...|
+--------+--------------------+
only showing top 5 rows

+--------+--------------------+
|movie_id|            features|
+--------+--------------------+
| 1335380|(376,[0,1,4,6,8,9...|
|29062594|(376,[8,10,59,73,...|
| 9252321|(376,[0,7,8,11,13...|
|13455076|(376,[0,1,69,78,9...|
|24165951|(376,[18,332,361]...|
+--------+--------------------+
only showing top 5 rows



In [8]:
#Convert training data to a format that Logstic Regression would expect

trainingdata_phase1 = df_train_features_phase1
labeldata = df_train_genres.select('movie_id',genre)
trainingdata_phase1=trainingdata_phase1.join(labeldata, "movie_id", "inner")
trainingdata_phase1=trainingdata_phase1.withColumnRenamed(genre,'label')

trainingdata_phase1=trainingdata_phase1.select(col("label"), col("features")).rdd\
                .map(lambda row: LabeledPoint(row.label, Vectors.fromML(row.features)))


In [9]:
#Train the logistic regression model looping through each genre

df_predictons_phase1=df_test.select('movie_id')

for genre in genres:
    print("Genre: "+genre)
    labeldata = df_train_genres.select('movie_id',genre)
    trainingdata_phase1=df_train_features_phase1.join(labeldata, "movie_id", "inner")
    trainingdata_phase1=trainingdata_phase1.withColumnRenamed(genre,'label')
    
    trainingdata_phase1=trainingdata_phase1.select(col("label"), col("features")).rdd\
                .map(lambda row: LabeledPoint(row.label, Vectors.fromML(row.features)))

    lr = LogisticRegressionWithSGD.train(trainingdata_phase1,iterations=30)
    
    predictions=df_test_features_phase1.select(col("movie_id"),col("features")).rdd\
                       .map(lambda row: (row.movie_id,lr.predict(Vectors.fromML(row.features))))
    
    predictions=predictions.toDF()
    predictions=predictions.withColumnRenamed('_1','movie_id')
    predictions=predictions.withColumnRenamed('_2','prediction')
    
    predictions = predictions.withColumnRenamed("prediction",genre).select('movie_id',genre)
    df_predictons_phase1=df_predictons_phase1.join(predictions, "movie_id","inner")

    


Genre: Drama
Genre: Comedy
Genre: Romance Film
Genre: Thriller
Genre: Action
Genre: World cinema
Genre: Crime Fiction
Genre: Horror
Genre: Black-and-white
Genre: Indie
Genre: Action/Adventure
Genre: Adventure
Genre: Family Film
Genre: Short Film
Genre: Romantic drama
Genre: Animation
Genre: Musical
Genre: Science Fiction
Genre: Mystery
Genre: Romantic comedy


In [10]:
#Write to CSV file in the format that Kaggle would expect
df_predictons_final_phase1 = df_predictons_phase1.withColumn("predictions", concat_ws(" ",*[col(genre).cast('int') for genre in genres]))
df_predictons_final_phase1 = df_predictons_final_phase1.select('movie_id','predictions')
df_predictons_final_phase1.repartition(1).write.csv("phase1_predictions.csv",sep=',',header='true')

# Part-2 TF-IDF

In [11]:
#Implement TF-IDF
hashingTF = HashingTF(inputCol="final_tokens", outputCol="rawFeatures", numFeatures=df_train_features_phase1.take(1)[0][1].size)
idf = IDF(inputCol="rawFeatures", outputCol="features") #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[hashingTF, idf])

pipelineFit = pipeline.fit(df_train_features)
df_train_features_phase2 = pipelineFit.transform(df_train_features)
df_test_features_phase2 = pipelineFit.transform(df_test_features)

In [12]:
#Convert training data to a format that Logstic Regression would expect
trainingdata_phase2 = df_train_features_phase2
labeldata = df_train_genres.select('movie_id',genre)
trainingdata_phase2=trainingdata_phase2.join(labeldata, "movie_id", "inner")
trainingdata_phase2=trainingdata_phase2.withColumnRenamed(genre,'label')

trainingdata_phase2=trainingdata_phase2.select(col("label"), col("features")).rdd\
                .map(lambda row: LabeledPoint(row.label, Vectors.fromML(row.features)))


In [13]:
#Train the logistic regression model looping through each genre

df_predictons_phase2=df_test.select('movie_id')

for genre in genres:
    print("Genre: "+genre)
    labeldata = df_train_genres.select('movie_id',genre)
    trainingdata_phase2=df_train_features_phase2.join(labeldata, "movie_id", "inner")
    trainingdata_phase2=trainingdata_phase2.withColumnRenamed(genre,'label')
    
    trainingdata_phase2=trainingdata_phase2.select(col("label"), col("features")).rdd\
                .map(lambda row: LabeledPoint(row.label, Vectors.fromML(row.features)))

    lr = LogisticRegressionWithSGD.train(trainingdata_phase2,iterations=30)
    
    predictions=df_test_features_phase2.select(col("movie_id"),col("features")).rdd\
                       .map(lambda row: (row.movie_id,lr.predict(Vectors.fromML(row.features))))
    
    predictions=predictions.toDF()
    predictions=predictions.withColumnRenamed('_1','movie_id')
    predictions=predictions.withColumnRenamed('_2','prediction')
    
    predictions = predictions.withColumnRenamed("prediction",genre).select('movie_id',genre)
    df_predictons_phase2=df_predictons_phase2.join(predictions, "movie_id","inner")


Genre: Drama
Genre: Comedy
Genre: Romance Film
Genre: Thriller
Genre: Action
Genre: World cinema
Genre: Crime Fiction
Genre: Horror
Genre: Black-and-white
Genre: Indie
Genre: Action/Adventure
Genre: Adventure
Genre: Family Film
Genre: Short Film
Genre: Romantic drama
Genre: Animation
Genre: Musical
Genre: Science Fiction
Genre: Mystery
Genre: Romantic comedy


In [14]:
#Write to CSV file in the format that Kaggle would expect
df_predictons_final_phase2 = df_predictons_phase2.withColumn("predictions", concat_ws(" ",*[col(genre).cast('int') for genre in genres]))
df_predictons_final_phase2 = df_predictons_final_phase2.select('movie_id','predictions')
df_predictons_final_phase2.repartition(1).write.csv("phase2_predictions.csv",sep=',',header='true')


# Part-3 Word2Vec

In [15]:
#Implement Word2Vec
word2vec = Word2Vec(inputCol="final_tokens",outputCol="features")
model = word2vec.fit(df_train_features)
df_train_features_phase3 = model.transform(df_train_features)
df_test_features_phase3 = model.transform(df_test_features)


In [16]:
#Convert training data to a format that Logstic Regression would expect
trainingdata_phase3 = df_train_features_phase3
labeldata = df_train_genres.select('movie_id',genre)
trainingdata_phase3=trainingdata_phase3.join(labeldata, "movie_id", "inner")
trainingdata_phase3=trainingdata_phase3.withColumnRenamed(genre,'label')

trainingdata_phase3=trainingdata_phase3.select(col("label"), col("features")).rdd\
                .map(lambda row: LabeledPoint(row.label, Vectors.fromML(row.features)))

In [17]:
#Train the logistic regression model looping through each genre

df_predictons_phase3=df_test.select('movie_id')

for genre in genres:
    print("Genre: "+genre)
    labeldata = df_train_genres.select('movie_id',genre)
    trainingdata_phase3=df_train_features_phase3.join(labeldata, "movie_id", "inner")
    trainingdata_phase3=trainingdata_phase3.withColumnRenamed(genre,'label')
    
    trainingdata_phase3=trainingdata_phase3.select(col("label"), col("features")).rdd\
                .map(lambda row: LabeledPoint(row.label, Vectors.fromML(row.features)))

    lr = LogisticRegressionWithSGD.train(trainingdata_phase3,iterations=20)
    
    predictions=df_test_features_phase3.select(col("movie_id"),col("features")).rdd\
                       .map(lambda row: (row.movie_id,lr.predict(Vectors.fromML(row.features))))
    
    predictions=predictions.toDF()
    predictions=predictions.withColumnRenamed('_1','movie_id')
    predictions=predictions.withColumnRenamed('_2','prediction')
    
    predictions = predictions.withColumnRenamed("prediction",genre).select('movie_id',genre)
    df_predictons_phase3=df_predictons_phase3.join(predictions, "movie_id","inner")


Genre: Drama
Genre: Comedy
Genre: Romance Film
Genre: Thriller
Genre: Action
Genre: World cinema
Genre: Crime Fiction
Genre: Horror
Genre: Black-and-white
Genre: Indie
Genre: Action/Adventure
Genre: Adventure
Genre: Family Film
Genre: Short Film
Genre: Romantic drama
Genre: Animation
Genre: Musical
Genre: Science Fiction
Genre: Mystery
Genre: Romantic comedy


In [18]:
#Write to CSV file in the format that Kaggle would expect
df_predictons_final_phase3 = df_predictons_phase3.withColumn("predictions", concat_ws(" ",*[col(genre).cast('int') for genre in genres]))
df_predictons_final_phase3 = df_predictons_final_phase3.select('movie_id','predictions')
df_predictons_final_phase3.repartition(1).write.csv("phase3_predictions.csv",sep=',',header='true')
