In [1]:
import findspark
findspark.init('/Users/vamshi/Documents/spark-3.0.0-preview2-bin-hadoop2.7')
# findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "16g")\
        .config("spark.driver.memory", "16g")\
        .config("spark.memory.offHeap.enabled",'true')\
        .config("spark.memory.offHeap.size","16g")\
        .getOrCreate()



In [2]:
import numpy as np
import pandas as pd
from ast import literal_eval 
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import col
from pyspark.sql.functions import monotonically_increasing_id
from functools import reduce
from pyspark.sql.functions import concat_ws
from pyspark.ml.classification import LogisticRegression

In [3]:
train_pd_data_frame = pd.read_csv(r'train.csv')
train_spark_data_frame = spark.createDataFrame(train_pd_data_frame)
train_pd_data_frame['genre']= train_pd_data_frame['genre'].apply(literal_eval)
s = train_pd_data_frame['genre'].to_list()

mapping_df = pd.read_csv("mapping.csv")
mapping_spark_df = spark.createDataFrame(mapping_df)

genres = []

for i in range(mapping_spark_df.count()):
    genres.append(mapping_spark_df.take(20)[i][1])

matrix = np.zeros((len(train_pd_data_frame),len(genres)))
for i,genre in enumerate(s):
    for j,g in enumerate(genre):
        for k,name in enumerate(genres):
            if name==g:
                matrix[i][k] = 1
genre_csv = str(genres).replace("[","").replace("]","").strip().replace("'", "")

np.savetxt("genres.csv", matrix, delimiter=",",fmt='%d',header=genre_csv, comments='')


In [4]:
labels_data_frame = pd.read_csv(r'genres.csv')
test_pd_data_frame = pd.read_csv(r'test.csv')
labels_spark_df = spark.createDataFrame(labels_data_frame)
test_spark_data_frame = spark.createDataFrame(test_pd_data_frame)

In [5]:
dataframe1 = train_spark_data_frame.withColumn("row_id", monotonically_increasing_id())
dataframe2 = labels_spark_df.withColumn("row_id", monotonically_increasing_id())
dataframe3 = test_spark_data_frame.withColumn("row_id", monotonically_increasing_id())
final_dataframe = dataframe1.join(dataframe2, "row_id").drop("row_id")
test_df = dataframe3.join(dataframe2, "row_id").drop("row_id")


# Preprocessing

In [6]:
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
hashingTF = HashingTF(inputCol="filtered", outputCol="features")
pipeline = Pipeline(stages=[regexTokenizer, remover, hashingTF])

model = pipeline.fit(final_dataframe)
train_dataset = model.transform(final_dataframe)

model2 = pipeline.fit(test_df)
test_dataset = model2.transform(test_df)

# Basic Model

In [7]:
dfList = []
labelCols = labels_spark_df.columns
lr = LogisticRegression(featuresCol = 'features',maxIter=50)
for labelCol in labelCols:

    lr.setLabelCol(labelCol)
    lrModel = lr.fit(train_dataset)
    predictions = lrModel.transform(test_dataset)
    predictions = predictions.withColumn("prediction",F.col("prediction").cast(IntegerType()))
    dfList.append(predictions.select("movie_id","prediction"))

In [8]:
dfs_renamed = [df.selectExpr('movie_id', f'prediction as prediction_{i}') for i, df in enumerate(dfList)]
temp_df = reduce(lambda x, y: x.join(y, ['movie_id'], how='full'), dfs_renamed)
col_list = ['prediction_%d' % i for i in range(len(dfList))]
temp_df = temp_df.withColumn('predictions',concat_ws(" ",*col_list)).drop(*col_list).toPandas().to_csv("part1_finalPredictions.csv",index=False)


# TF-IDF

In [9]:
idf = IDF(inputCol="features", outputCol="idf_features")
idfModel = idf.fit(train_dataset)
rescaledTrainData = idfModel.transform(train_dataset)
rescaledTestData = idfModel.transform(test_dataset)


In [10]:
dfList = []
labelCols = labels_spark_df.columns
lr = LogisticRegression(featuresCol = 'idf_features',maxIter=50)
for labelCol in labelCols:
    lr.setLabelCol(labelCol)
    lrModel = lr.fit(rescaledTrainData)
    predictions = lrModel.transform(rescaledTestData)
    predictions = predictions.withColumn("prediction",F.col("prediction").cast(IntegerType()))
    dfList.append(predictions.select("movie_id","prediction"))


In [11]:
dfs_renamed = [df.selectExpr('movie_id', f'prediction as prediction_{i}') for i, df in enumerate(dfList)]
temp_df = reduce(lambda x, y: x.join(y, ['movie_id'], how='full'), dfs_renamed)
col_list = ['prediction_%d' % i for i in range(len(dfList))]
temp_df = temp_df.withColumn('predictions',concat_ws(" ",*col_list)).drop(*col_list).toPandas().to_csv("tf-idf_predictions.csv",index=False)


# Word2Vec

In [12]:
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=300, minCount=0, inputCol="words", outputCol="w2v_features")


In [13]:
W2V_model = word2Vec.fit(train_dataset)

W2V_train_data = W2V_model.transform(train_dataset)
W2V_test_data = W2V_model.transform(test_dataset)


In [14]:
dfList = []
labelCols = labels_spark_df.columns
lr = LogisticRegression(featuresCol = 'w2v_features',maxIter=1000)
for labelCol in labelCols:
    lr.setLabelCol(labelCol)
    lrModel = lr.fit(W2V_train_data)
    predictions = lrModel.transform(W2V_test_data)
    predictions = predictions.withColumn("prediction",F.col("prediction").cast(IntegerType()))
    dfList.append(predictions.select("movie_id","prediction"))


In [15]:
dfs_renamed = [df.selectExpr('movie_id', f'prediction as prediction_{i}') for i, df in enumerate(dfList)]
temp_df = reduce(lambda x, y: x.join(y, ['movie_id'], how='full'), dfs_renamed)
col_list = ['prediction_%d' % i for i in range(len(dfList))]
temp_df = temp_df.withColumn('predictions',concat_ws(" ",*col_list)).drop(*col_list).toPandas().to_csv("word2vec_predictions.csv",index=False)
