In [1]:
import findspark
import re
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
import pandas as pd
from nltk.corpus import stopwords
import nltk 

In [2]:
sc = pyspark.SparkContext()
spark = SparkSession \
    .builder \
    .appName("PA3") \
    .config("spark.some.config.option","some-value") \
    .getOrCreate()

In [3]:
#read in data
df = pd.read_csv("train.csv")
df = spark.createDataFrame(df)

In [4]:
#split test and training data
df_list = df.randomSplit([.7,.3])

In [5]:
#get test and train data
train_data= df_list[0]
test_data = df_list[1]

In [6]:
#clean data 
train_data =train_data.withColumn('plot',lower(regexp_replace(train_data['plot'],"[^a-zA-Z ]","")))
test_data =test_data.withColumn('plot',lower(regexp_replace(test_data['plot'],"[^a-zA-Z ]","")))

In [7]:
#list of stop words
stop = stopwords.words('english')

In [8]:
#routine to remove stop words and count all words in test data to find common words to use as features
find_common = train_data.select('plot')
find_common = find_common.rdd.map(list)
find_common = find_common.map(lambda x:x[0].split(' '))

In [9]:
def remove_stop_words(x):
    to_remove = []
    for word in x:
        if word not in stop:
            continue
        else:
            to_remove.append(word)
    for word in to_remove:
        x.remove(word)
    return x

In [10]:
find_common = find_common.map(remove_stop_words)

In [11]:
find_common = find_common.flatMap(lambda x:x)

In [12]:
find_common = find_common.map(lambda x: (x,1))

In [13]:
find_common = find_common.reduceByKey(lambda a,b:a+b)

In [14]:
common_words = find_common.filter(lambda x: x[1] > 200 and x[1] < 91000)

In [15]:
common_words = common_words.keys()

In [16]:
#Now create features for ML algo using the common words as the features as sparse vectors for each plot
from pyspark .mllib.linalg import SparseVector
from collections import Counter
import numpy as np
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import SVMWithSGD, SVMModel

In [17]:
def remove_uncommon_words(x):
    for word in x:
        if word in features:
            continue
        else:
            x.remove(word)
    return x

In [18]:
features = common_words.collect()

In [19]:
#get plots for train and test data
tr_data = train_data.select('plot')
te_data = test_data.select('plot')

In [20]:
#tokenize plot into a list of lists
tr_data = tr_data.rdd.flatMap(lambda x:x)

In [21]:
#split on space to get list of words
tr_data = tr_data.map(lambda x : x.split(' '))

In [23]:
te_data = te_data.rdd.flatMap(lambda x:x)
te_data = te_data.map(lambda x : x.split(' '))

In [24]:
#reomve uncommon words
tr_data = tr_data.map(remove_uncommon_words)
te_data = te_data.map(remove_uncommon_words)


In [28]:
tr_data = tr_data.map(remove_stop_words)
te_data = te_data.map(remove_stop_words)

In [29]:
vocab_map = tr_data.flatMap(lambda token: token).distinct() \
    .zipWithIndex().collectAsMap()
v_map = sc.broadcast(vocab_map)
vocab_size = sc.broadcast(len(vocab_map))
tdm_train = tr_data \
    .map(Counter) \
    .map(lambda counts: {v_map.value[token]: float(counts[token]) for token in counts})\
    .map(lambda index_counts: SparseVector(vocab_size.value, index_counts))


In [31]:
vocab_map_test = te_data.flatMap(lambda token: token).distinct() \
    .zipWithIndex().collectAsMap()
v_map_test = sc.broadcast(vocab_map_test)
vocab_size_test = sc.broadcast(len(vocab_map_test))
tdm_test = te_data \
    .map(Counter) \
    .map(lambda counts: {v_map_test.value[token]: float(counts[token]) for token in counts})\
    .map(lambda index_counts: SparseVector(vocab_size_test.value, index_counts))


In [33]:
train_labels = train_data.select('genre')
train_labels= train_labels.rdd.flatMap(lambda x:x)
test_labels = test_data.select('genre')
test_labelslabels= test_labels.rdd.flatMap(lambda x:x)

In [34]:
sparseVectorsList = tdm_train.collect()
sparseVectorsList_Test = tdm_test.collect()

In [35]:
train_movie_ids = train_data.select('movie_id')
train_movie_ids = train_movie_ids.rdd
train_movie_ids = train_movie_ids.flatMap(lambda x:x)
train_movie_ids = train_movie_ids.collect()

test_movie_ids = test_data.select('movie_id')
test_movie_ids = test_movie_ids.rdd
test_movie_ids = test_movie_ids.flatMap(lambda x:x)
test_movie_ids = test_movie_ids.collect()


In [36]:
keylist = pd.read_csv('mapping.csv')
keylist.rename(columns={'Unnamed: 0': 'i'}, inplace=True)
keylist = spark.createDataFrame(keylist)
k = keylist.select('0')
k1 = keylist.select('i')
k = k.rdd.flatMap(lambda x:x)
k1 = k1.rdd.flatMap(lambda x:x)
genre_list = k1.zip(k)

In [37]:
def test(data):
    ans = []
    for i in range(len(data)):
        val = 0.0
        labels = data[i][0].replace('[','').replace(']','').replace('\'','').split(',')
        for lab in labels:
            if lab == label:
                val = 1.0
                break
        pt = LabeledPoint(val, data[i][1])
        ans.append(pt)
    return ans

In [38]:
def makeClassifier(data):
    lst = test(data)
    return SVMWithSGD.train(sc.parallelize(lst), iterations=100)

In [44]:
lab2sparse = train_labels.zip(tdm_train)
lab2sparse_test = test_labels.rdd.zip(tdm_test)

In [45]:
l2sm = lab2sparse.map(lambda x: (x[0],x[1]))
data = l2sm.collect()
l2sm_test =lab2sparse_test.map(lambda x: (x[0],x[1]))
data_test = l2sm_test.collect()

In [None]:
classifiers = []

for label in k.collect():
    print('Trained', label, 'Classifier')
    classifiers.append(makeClassifier(data))