In [1]:
# get spark
import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.4.5/libexec')

In [2]:
# import required files
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature  import RegexTokenizer,StopWordsRemover,CountVectorizer
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors
import pandas as pd
from pyspark.sql.functions import lower
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
import pyspark.sql.functions as f

In [3]:
sc=pyspark.SparkContext()

In [4]:
# create spark config
spark= SparkSession \
       .builder \
       .appName("Assignment3") \
       .config("spark.driver.memory", "4g") \
       .config("spark.executor.memory", "4g") \
       .getOrCreate() 

In [5]:
# read the input data (training, mapping and testing)
train_dframe = pd.read_csv('train.csv')
map_dframe = pd.read_csv('mapping.csv')
test_dframe = pd.read_csv('test.csv')
train_data = spark.createDataFrame(train_dframe)
mapping_data = spark.createDataFrame(map_dframe,['id','genre'])
test_data = spark.createDataFrame(test_dframe)

In [6]:
# Display the details of input data
train_data.show()
test_data.show()
genre_count = mapping_data.count()

+--------+--------------------+--------------------+--------------------+
|movie_id|          movie_name|                plot|               genre|
+--------+--------------------+--------------------+--------------------+
|23890098|          Taxi Blues|Shlykov, a hard-w...|['World cinema', ...|
|31186339|    The Hunger Games|The nation of Pan...|['Action/Adventur...|
|20663735|          Narasimham|Poovalli Induchoo...|['Musical', 'Acti...|
| 2231378|  The Lemon Drop Kid|The Lemon Drop Ki...|          ['Comedy']|
|  595909|   A Cry in the Dark|Seventh-day Adven...|['Crime Fiction',...|
| 5272176|            End Game|The president is ...|['Action/Adventur...|
| 1952976|          Dark Water|{{plot}} The film...|['Thriller', 'Dra...|
|24225279|                Sing|The story begins ...|           ['Drama']|
| 2462689|       Meet John Doe|Infuriated at bei...|['Black-and-white...|
|20532852|Destination Meatball|A line of people ...|['Animation', 'Sh...|
|15401493|    Husband for Hire|Lola  a

In [7]:
# remove stopwords and transform training data into required format
tokenizer = RegexTokenizer(inputCol = "plot", outputCol = "ptok",pattern="\\w+", gaps=False)
train_data = tokenizer.transform(train_data)
remover = StopWordsRemover(inputCol = "ptok", outputCol = "clr")
train_data = remover.transform(train_data)
train_data.select('movie_id','clr').show()

htf = HashingTF(inputCol="clr", outputCol="HTFfeature")
train_data = htf.transform(train_data)
idf_train = IDF(inputCol="HTFfeature", outputCol="feature")
idf_train_model = idf_train.fit(train_data)
train_data = idf_train_model.transform(train_data)

+--------+--------------------+
|movie_id|                 clr|
+--------+--------------------+
|23890098|[shlykov, hard, w...|
|31186339|[nation, panem, c...|
|20663735|[poovalli, induch...|
| 2231378|[lemon, drop, kid...|
|  595909|[seventh, day, ad...|
| 5272176|[president, way, ...|
| 1952976|[plot, film, open...|
|24225279|[story, begins, h...|
| 2462689|[infuriated, told...|
|20532852|[line, people, dr...|
|15401493|[lola, attempts, ...|
|18188932|[milan, goran, tw...|
| 2940516|[bumbling, pirate...|
| 1480747|[plot, following,...|
|24448645|[despite, lucy, r...|
|15072401|[alan, colby, hei...|
| 4018288|[debbie, favorite...|
| 4596602|[ashes, ashes, se...|
|15224586|[film, follows, e...|
|15585766|[three, friends, ...|
+--------+--------------------+
only showing top 20 rows



In [8]:
# transform test data into required format
test_data = tokenizer.transform(test_data)
test_data = remover.transform(test_data)
test_data = htf.transform(test_data)
idf_test = IDF(inputCol = "HTFfeature", outputCol="feature")
idf_test_model = idf_test.fit(test_data)
test_data = idf_test_model.transform(test_data)
test_data.select("movie_id","feature").show()

+--------+--------------------+
|movie_id|             feature|
+--------+--------------------+
| 1335380|(262144,[1728,261...|
|29062594|(262144,[6068,191...|
| 9252321|(262144,[1598,208...|
|13455076|(262144,[3294,618...|
|24165951|(262144,[4098,644...|
| 1925869|(262144,[535,3294...|
|10799612|(262144,[5053,538...|
|28238240|(262144,[23060,30...|
|17124781|(262144,[5232,733...|
|28207941|(262144,[9726,626...|
|19174305|(262144,[2710,392...|
|18392317|(262144,[5213,606...|
|34420857|(262144,[11275,13...|
| 4039635|(262144,[571,1640...|
| 8034072|(262144,[991,4200...|
| 4016437|(262144,[5595,783...|
| 1520023|(262144,[14,535,5...|
|24589422|(262144,[1998,249...|
|35068740|(262144,[2710,484...|
|21132951|(262144,[1841,392...|
+--------+--------------------+
only showing top 20 rows



In [9]:
# multilable classification for genre
mapper_details = mapping_data.select("genre","id").rdd.collectAsMap()

def genreMapSplit(mapper_obj):
    result = []
    for element in mapper_obj[1:-1].split(","):
        result.append(mapper_details.get(element.strip()[1:-1]))
    result.sort()
    return result

def resultMapLabel(mapper_obj):
    result = []
    for index in range(0,genre_count):
        result.append(0)
    for index in mapper_obj:
        result[index] = 1
    return result

# Mapped data
udf_genre = f.udf(genreMapSplit,ArrayType(IntegerType()))
train_data = train_data.withColumn("mapped",udf_genre("genre"))
train_data.select("mapped").show()

# Labelled data
udf_label = f.udf(resultMapLabel,ArrayType(IntegerType()))
train_data = train_data.withColumn("label",udf_label("mapped"))
train_data.select("label").show()


+----------------+
|          mapped|
+----------------+
|          [0, 5]|
|  [0, 4, 10, 17]|
|      [0, 4, 16]|
|             [1]|
|       [0, 5, 6]|
|   [0, 3, 4, 10]|
|       [0, 3, 7]|
|             [0]|
|[0, 1, 2, 8, 19]|
|    [12, 13, 15]|
|             [1]|
|    [0, 1, 5, 6]|
|             [1]|
|             [1]|
|             [7]|
|   [3, 6, 7, 18]|
|             [0]|
| [2, 3, 4, 6, 9]|
|          [0, 9]|
|             [0]|
+----------------+
only showing top 20 rows

+--------------------+
|               label|
+--------------------+
|[1, 0, 0, 0, 0, 1...|
|[1, 0, 0, 0, 1, 0...|
|[1, 0, 0, 0, 1, 0...|
|[0, 1, 0, 0, 0, 0...|
|[1, 0, 0, 0, 0, 1...|
|[1, 0, 0, 1, 1, 0...|
|[1, 0, 0, 1, 0, 0...|
|[1, 0, 0, 0, 0, 0...|
|[1, 1, 1, 0, 0, 0...|
|[0, 0, 0, 0, 0, 0...|
|[0, 1, 0, 0, 0, 0...|
|[1, 1, 0, 0, 0, 1...|
|[0, 1, 0, 0, 0, 0...|
|[0, 1, 0, 0, 0, 0...|
|[0, 0, 0, 0, 0, 0...|
|[0, 0, 0, 1, 0, 0...|
|[1, 0, 0, 0, 0, 0...|
|[0, 0, 1, 1, 1, 0...|
|[1, 0, 0, 0, 0, 0...|
|[1, 0, 0, 0

In [11]:
# final output
final_result = []
for index in range(0, genre_count):
    def parsePoint(row):
        return LabeledPoint(row.label[index], MLLibVectors.fromML(row.feature))
    labelled_output = train_data.rdd.map(parsePoint)
    modelpred = LogisticRegressionWithLBFGS.train(labelled_output)
    pred = test_data.rdd.map(lambda t: (t.movie_id, modelpred.predict(MLLibVectors.fromML(t.feature))))
    final_result.append(pred.collect())
    print(f"Finished processing genere {index}")

final_result_obj = {}
for element in final_result:
    for temp in element:
        if temp[0] not in final_result_obj:
            final_result_obj[temp[0]]=[]
            final_result_obj[temp[0]].append(int(temp[1]))
        else:
            final_result_obj[temp[0]].append(int(temp[1]))

for key in final_result_obj:
    print(key,final_result_obj[key])

Finished processing genere 0
Finished processing genere 1
Finished processing genere 2
Finished processing genere 3
Finished processing genere 4
Finished processing genere 5
Finished processing genere 6
Finished processing genere 7
Finished processing genere 8
Finished processing genere 9
Finished processing genere 10
Finished processing genere 11
Finished processing genere 12
Finished processing genere 13
Finished processing genere 14
Finished processing genere 15
Finished processing genere 16
Finished processing genere 17
Finished processing genere 18
Finished processing genere 19


In [15]:
# csv files
df = pd.DataFrame.from_dict(final_result_obj,orient='index')
df.index.names = ['movie_id']
df['predictions'] = df[df.columns[:]].apply(lambda x: ' '.join(x.dropna().astype(str)),axis=1)
df.to_csv('final_csv_ans_part2.csv',columns = ['predictions'])