In [1]:
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from ast import literal_eval 
import pyspark.sql.functions as F
from pyspark.sql.functions import udf

In [None]:
import numpy as np
import pandas as pd

In [2]:
from pyspark.sql.types import StringType, IntegerType, ArrayType
from pyspark.sql.functions import regexp_replace,col,array_contains,explode
from pyspark.sql.functions import split
from pyspark.sql.functions import monotonically_increasing_id
from functools import reduce
from pyspark.sql.functions import concat_ws

In [None]:
#loading train data to pandas dataframe and dumping it to spark dataframe
train_df = pd.read_csv(r'train.csv')
train_spark_df = spark.createDataFrame(train_df)

In [4]:
#loading label data file that is generated initially to pandas dataframe and dumping it to spark dataframe
lables_df = pd.read_csv(r'genres.csv')
lables_spark_df = spark.createDataFrame(lables_df)

In [None]:
#loading test data to pandas dataframe and dumping it to spark dataframe
test_pd_df = pd.read_csv(r'test.csv')
test_spark_df = spark.createDataFrame(test_pd_df)

In [5]:
#using joins to combine both data frame with using row_id as key and dumped it into new dataframe
x1 = train_spark_df.withColumn("row_id", monotonically_increasing_id())
x2 = lables_spark_df.withColumn("row_id", monotonically_increasing_id())
train_df = x1.join(x2, "row_id").drop("row_id")


In [None]:
#using joins adding label columns to test data set
x3 = test_spark_df.withColumn("row_id", monotonically_increasing_id())
test_df = x3.join(x2, "row_id").drop("row_id")

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Word2Vec

In [7]:
%%time
#adding regex tokenizer for splitting into words
regex=RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
#adding own defined set of words
add_stopwords=["i", "me", "my", "myself", "we", "our", "ours", "ourselves", "you", "your", "yours", "yourself", "yourselves", "he", "him", "his", "himself", "she", "her", "hers", "herself", "it", "its", "itself", "they", "them", "their", "theirs", "themselves", "what", "which", "who", "whom", "this", "that", "these", "those", "am", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "having", "do", "does", "did", "doing", "a", "an", "the", "and", "but", "if", "or", "because", "as", "until", "while", "of", "at", "by", "for", "with", "about", "against", "between", "into", "through", "during", "before", "after", "above", "below", "to", "from", "up", "down", "in", "out", "on", "off", "over", "under", "again", "further", "then", "once", "here", "there", "when", "where", "why", "how", "all", "any", "both", "each", "few", "more", "most", "other", "some", "such", "no", "nor", "not", "only", "own", "same", "so", "than", "too", "very", "s", "t", "can", "will", "just", "don", "should", "now"]
#removing stopwords
stopwordsRemover=StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
#using word2vec as feature engineering
word2Vec=Word2Vec(vectorSize=350,minCount=5,inputCol="filtered",outputCol="features")
#adding all these to pipeline
pipeline=Pipeline(stages=[regex,stopwordsRemover,word2Vec])
#applying this pipeline on train dataframe
op1=pipeline.fit(train_df)
train_dataset=op1.transform(train_df)
#applying this pipeline on test dataframe
op2=pipeline.fit(test_df)
test_dataset=op2.transform(test_df)

CPU times: user 184 ms, sys: 58.1 ms, total: 242 ms
Wall time: 8min 29s


In [8]:
%%time
from pyspark.ml.classification import LogisticRegression
finallist = []
i=1
labels = lables_spark_df.columns
lr = LogisticRegression(featuresCol = 'features',maxIter=700)
#iterating through lr model for 20 labels and storing predictions in list
for col in labels:
    lr.setLabelCol(col)
    lrModel = lr.fit(train_dataset)
    predictions = lrModel.transform(test_dataset)
    predictions = predictions.withColumn("prediction",F.col("prediction").cast(IntegerType()))
    finallist.append(predictions.select("movie_id","prediction"))
    print(i)
    i=i+1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
CPU times: user 2min 15s, sys: 48.8 s, total: 3min 4s
Wall time: 3h 36min 2s


In [9]:
%%time
#after making predictions which are stored in finallist now these predictions are appeneded to movie id
df = [j.selectExpr('movie_id', f'prediction as prediction_{i}') for i,j in enumerate(finallist)]
temp = reduce(lambda x, y: x.join(y, ['movie_id'], how='full'), df)
col = ['prediction_%d' % i for i in range(len(finallist))]
#appended movie id and predictions are now dumped to .csv file
temp = temp.withColumn('predictions',concat_ws(" ",*col)).drop(*col).toPandas().to_csv("part3.csv",index=False)

CPU times: user 619 ms, sys: 179 ms, total: 799 ms
Wall time: 2min 39s
