In [14]:
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import *
from pyspark.ml.feature import StopWordsRemover,RegexTokenizer, StringIndexer, CountVectorizer
import pyspark.sql.functions as f
from pyspark.sql.types import *

In [15]:
spark = SparkSession \
        .builder \
        .appName("PA3") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

In [16]:
import pandas as pd
df = pd.read_csv('train.csv')
traindataframe = spark.createDataFrame(df)

df = pd.read_csv('test.csv')
testdataframe = spark.createDataFrame(df)

In [17]:
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
traindataframe = regexTokenizer.transform(traindataframe)
testdataframe = regexTokenizer.transform(testdataframe)

swremover = StopWordsRemover(inputCol="words", outputCol="filteredwords")
traindataframe = swremover.transform(traindataframe)
testdataframe = swremover.transform(testdataframe)

countvectorizer = CountVectorizer(inputCol="filteredwords", outputCol="features")
countvectorizer = countvectorizer.fit(traindataframe)
traindataframe = countvectorizer.transform(traindataframe)
testdataframe = countvectorizer.transform(testdataframe)

In [18]:
traindataframe.select('movie_id','features','genre').show()

+--------+--------------------+--------------------+
|movie_id|            features|               genre|
+--------+--------------------+--------------------+
|23890098|(119399,[10,129,1...|['World cinema', ...|
|31186339|(119399,[2,6,7,10...|['Action/Adventur...|
|20663735|(119399,[1,3,8,10...|['Musical', 'Acti...|
| 2231378|(119399,[7,9,12,1...|          ['Comedy']|
|  595909|(119399,[2,8,9,14...|['Crime Fiction',...|
| 5272176|(119399,[2,4,5,12...|['Action/Adventur...|
| 1952976|(119399,[0,1,2,3,...|['Thriller', 'Dra...|
|24225279|(119399,[0,1,2,5,...|           ['Drama']|
| 2462689|(119399,[0,4,9,17...|['Black-and-white...|
|20532852|(119399,[11,31,38...|['Animation', 'Sh...|
|15401493|(119399,[1,3,4,7,...|          ['Comedy']|
|18188932|(119399,[0,2,14,3...|['Crime Fiction',...|
| 2940516|(119399,[4,5,15,1...|          ['Comedy']|
| 1480747|(119399,[0,1,2,3,...|          ['Comedy']|
|24448645|(119399,[0,8,54,7...|          ['Horror']|
|15072401|(119399,[5,14,61,...|['Crime Fiction

In [19]:
testdataframe.select('movie_id','features').show()

+--------+--------------------+
|movie_id|            features|
+--------+--------------------+
| 1335380|(119399,[0,3,4,7,...|
|29062594|(119399,[7,8,56,7...|
| 9252321|(119399,[5,7,11,1...|
|13455076|(119399,[0,66,81,...|
|24165951|(119399,[18,379,4...|
| 1925869|(119399,[0,1,2,3,...|
|10799612|(119399,[0,4,6,8,...|
|28238240|(119399,[15,21,19...|
|17124781|(119399,[5,9,11,1...|
|28207941|(119399,[49,56,21...|
|19174305|(119399,[1,3,5,7,...|
|18392317|(119399,[4,7,13,1...|
|34420857|(119399,[3,6,10,1...|
| 4039635|(119399,[0,1,4,17...|
| 8034072|(119399,[2,3,8,10...|
| 4016437|(119399,[0,2,3,6,...|
| 1520023|(119399,[0,1,2,3,...|
|24589422|(119399,[0,1,3,6,...|
|35068740|(119399,[10,20,28...|
|21132951|(119399,[0,3,5,8,...|
+--------+--------------------+
only showing top 20 rows



In [20]:
mapping = spark.read.load("./mapping.csv", format="csv",sep=",", inferschema="true", header="true")
genre_map = mapping.select("_c0", "0").rdd.collectAsMap()
genre_map

{0: 'Drama',
 1: 'Comedy',
 2: 'Romance Film',
 3: 'Thriller',
 4: 'Action',
 5: 'World cinema',
 6: 'Crime Fiction',
 7: 'Horror',
 8: 'Black-and-white',
 9: 'Indie',
 10: 'Action/Adventure',
 11: 'Adventure',
 12: 'Family Film',
 13: 'Short Film',
 14: 'Romantic drama',
 15: 'Animation',
 16: 'Musical',
 17: 'Science Fiction',
 18: 'Mystery',
 19: 'Romantic comedy'}

In [21]:
import pyspark.sql.functions as F

from pyspark.sql.types import *
def generateLabels(row, i): 
    label = 0
    for x in row[1:-1].split(","):
        if x.strip()[1:-1] == genre_map.get(i):
            label = 1
    return label
udfFunc = F.udf(generateLabels, IntegerType())
for i in range(len(genre_map)):    
    traindataframe = traindataframe.withColumn("label"+str(i), udfFunc("genre",F.lit(i)))
traindataframe.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- genre: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filteredwords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label0: integer (nullable = true)
 |-- label1: integer (nullable = true)
 |-- label2: integer (nullable = true)
 |-- label3: integer (nullable = true)
 |-- label4: integer (nullable = true)
 |-- label5: integer (nullable = true)
 |-- label6: integer (nullable = true)
 |-- label7: integer (nullable = true)
 |-- label8: integer (nullable = true)
 |-- label9: integer (nullable = true)
 |-- label10: integer (nullable = true)
 |-- label11: integer (nullable = true)
 |-- label12: integer (nullable = true)
 |-- label13: integer (nullable = true)
 |-- label14: integer (nullable = true)
 |-- label15: integer (nullable = tr

In [22]:
def generateLabelCol(): 
    return ""
udfFunc = F.udf(generateLabelCol, StringType())

testdataframe = testdataframe.withColumn("predictions", udfFunc())
testdataframe.printSchema()

root
 |-- movie_id: long (nullable = true)
 |-- movie_name: string (nullable = true)
 |-- plot: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filteredwords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- predictions: string (nullable = true)



In [23]:
# from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
# from pyspark.mllib.regression import LabeledPoint
# from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors

# def parsePoint(line):
#     return LabeledPoint(line.label, MLLibVectors.fromML(line.features))
# for i in range(len(genre_map)):
#     print('classifying Genre '+str(i+1)+ " : "+genre_map.get(i))
#     parsedData = traindataframe.selectExpr("label"+str(i)+" as label", "features").rdd.map(parsePoint)
#     model = LogisticRegressionWithLBFGS.train(parsedData)
#     model.save(spark,'./Models/Part1/model'+str(i))
#     labelsAndPreds = testdataframe.rdd.map(lambda p: (p.movie_id, model.predict(MLLibVectors.fromML(p.features))))
#     label_map = labelsAndPreds.collectAsMap()
#     def addLabel(m_id, row): 
#         row= row + str(label_map.get(m_id))+" "
#         return row
#     udfFunc = F.udf(addLabel, StringType())
#     testdataframe = testdataframe.withColumn("predictions", udfFunc("movie_id","predictions"))
    

In [24]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vector as MLLibVector, Vectors as MLLibVectors

def parsePoint(line):
    return LabeledPoint(line.label, MLLibVectors.fromML(line.features))
for i in range(len(genre_map)):
    print('classifying Genre '+str(i+1)+ " : "+genre_map.get(i))
    model = LogisticRegressionModel.load(spark, './Models/Part1/model'+str(i))
    labelsAndPreds = testdataframe.rdd.map(lambda p: (p.movie_id, model.predict(MLLibVectors.fromML(p.features))))
    label_map = labelsAndPreds.collectAsMap()
    def addLabel(m_id, row): 
        row= row + str(label_map.get(m_id))+" "
        return row
    udfFunc = F.udf(addLabel, StringType())
    testdataframe = testdataframe.withColumn("predictions", udfFunc("movie_id","predictions"))
    

classifying Genre 1 : Drama
classifying Genre 2 : Comedy
classifying Genre 3 : Romance Film
classifying Genre 4 : Thriller
classifying Genre 5 : Action
classifying Genre 6 : World cinema
classifying Genre 7 : Crime Fiction
classifying Genre 8 : Horror
classifying Genre 9 : Black-and-white
classifying Genre 10 : Indie
classifying Genre 11 : Action/Adventure
classifying Genre 12 : Adventure
classifying Genre 13 : Family Film
classifying Genre 14 : Short Film
classifying Genre 15 : Romantic drama
classifying Genre 16 : Animation
classifying Genre 17 : Musical
classifying Genre 18 : Science Fiction
classifying Genre 19 : Mystery
classifying Genre 20 : Romantic comedy


In [25]:
testdataframe.select('movie_id','predictions').show(20, False)

+--------+----------------------------------------+
|movie_id|predictions                             |
+--------+----------------------------------------+
|1335380 |1 0 0 0 0 1 0 0 0 0 1 0 0 0 0 0 0 0 0 0 |
|29062594|0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|9252321 |0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|13455076|0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|24165951|0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|1925869 |1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|10799612|1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|28238240|0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|17124781|0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 |
|28207941|0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 |
|19174305|0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|18392317|0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 |
|34420857|0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|4039635 |0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|8034072 |1 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 |
|4016437 |1 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 |
|1520023 |1 

In [26]:
testdataframe.select('movie_id','predictions').write.format("csv").option("header", "true").mode("append").save("outputs/output1")