In [0]:
import os
# Install pyspark
! pip install --ignore-installed pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp==2.4.5

openjdk version "1.8.0_252"
OpenJDK Runtime Environment (build 1.8.0_252-8u252-b09-1~18.04-b09)
OpenJDK 64-Bit Server VM (build 25.252-b09, mixed mode)
Processing /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471/pyspark-2.4.4-py2.py3-none-any.whl
Collecting py4j==0.10.7
  Using cached https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


Collecting spark-nlp==2.4.5
  Using cached https://files.pythonhosted.org/packages/31/46/5c5a2bda407f693126386da5378f132e5e163fa6dfa46109548270348786/spark_nlp-2.4.5-py2.py3-none-any.whl
Installing collected packages: spark-nlp
Successfully installed spark-nlp-2.4.5


In [0]:
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')

import sparknlp
spark = sparknlp.start()

print("Spark NLP version")
sparknlp.version()
print("Apache Spark version")
spark.version

Spark NLP version
Apache Spark version


'2.4.4'

In [0]:
from pyspark import SparkContext
from pyspark import SQLContext

sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)

In [0]:
from pyspark.sql.types import IntegerType,ArrayType, StringType
from pyspark.sql.functions import udf
from pyspark.sql.functions import col
from copy import deepcopy

import ast

In [0]:
import sparknlp
from sparknlp.base import *
from sparknlp.annotator import *

spark = sparknlp.start()

data = sqlContext.read.csv('train.csv',header=True)

In [0]:
document = DocumentAssembler().setInputCol('plot').setOutputCol('document').setCleanupMode('shrink')
tokenizer = Tokenizer().setInputCols(['document']).setOutputCol('tokens')
stopWords = StopWordsCleaner().setInputCols('tokens').setOutputCol('clean')
lemma = LemmatizerModel().pretrained('lemma_antbnc').setInputCols(['clean']).setOutputCol('lemma')
embeddings = WordEmbeddingsModel.pretrained().setInputCols(['document','lemma']).setOutputCol('embeddings')
sentence = SentenceEmbeddings().setInputCols(['document','embeddings']).setOutputCol('sentence_embeddings').setPoolingStrategy('AVERAGE')


lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


In [0]:
clf_pipeline = Pipeline (
    stages = [
              document,
              tokenizer,
              stopWords,
              lemma,
              embeddings,
              sentence
    ])

model = clf_pipeline.fit(data)
dataset = model.transform(data)

In [0]:
dataset

DataFrame[movie_id: string, features: vector, genre: string, label: array<bigint>, Drama: bigint, Comedy: bigint, Romance Film: bigint, Thriller: bigint, Action: bigint, World cinema: bigint, Crime Fiction: bigint, Horror: bigint, Black-and-white: bigint, Indie: bigint, Action/Adventure: bigint, Adventure: bigint, Family Film: bigint, Short Film: bigint, Romantic drama: bigint, Animation: bigint, Musical: bigint, Science Fiction: bigint, Mystery: bigint, Romantic comedy: bigint]

In [0]:
from pyspark.ml.linalg import Vectors,DenseVector, VectorUDT
from pyspark.sql.functions import col,udf

to_vector_udf = udf(lambda vs: Vectors.dense(vs), VectorUDT())
dataset = dataset.withColumn('features',to_vector_udf(col('sentence_embeddings')[0].embeddings))

dataset = dataset.select('movie_id','features','genre')

In [0]:
import pandas as pd

d = pd.read_csv('mapping.csv',index_col=0)
df = d.to_dict()['0']

dic = dict((v,k) for k,v in df.items())
encode = [0] * len(dic.keys())


from ast import literal_eval
from copy import deepcopy

def kHotEncode(x):
    global dic
    global encode

    key = deepcopy(encode)
    try:
        labs = list(literal_eval(x))
        res = []
        for lab in labs:
            if lab in dic.keys(): key[dic[lab]] = 1
        return key
    except:
        return [0] * 20

udf_encode = udf(kHotEncode, ArrayType(IntegerType()))

dataset = dataset.withColumn("label", udf_encode(col("genre")))


In [0]:
cols = dataset.columns
labels = list(dic.keys())

dataset = dataset.rdd.map(lambda row: ([row[c] for c in cols]+row['label'])).toDF(cols+labels)

In [0]:
dataset.show()

+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|movie_id|            features|               genre|               label|Drama|Comedy|Romance Film|Thriller|Action|World cinema|Crime Fiction|Horror|Black-and-white|Indie|Action/Adventure|Adventure|Family Film|Short Film|Romantic drama|Animation|Musical|Science Fiction|Mystery|Romantic comedy|
+--------+--------------------+--------------------+--------------------+-----+------+------------+--------+------+------------+-------------+------+---------------+-----+----------------+---------+-----------+----------+--------------+---------+-------+---------------+-------+---------------+
|23890098|[0.04802859947085...|['World cinema', ...|[1, 0, 0, 0, 0, 1...|    1|     0|           0|       0|     0|

In [0]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel

load_model = []
try:
  for i in range(20):
    path = "lr_ml_model/"+str(i)+"/"
    load_model.append(LogisticRegressionModel.load(path))
except:
  pass

In [0]:

from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
import time


label_names = list(dic.keys())
lr_models = []

if (len(load_model) != 20):
  start = time.time()
  for index,label_name in enumerate(label_names):
    print(label_name, time.time()-start)
    lr = LogisticRegression(maxIter=10, predictionCol='lr_prediction', labelCol=label_name, regParam=0.3, elasticNetParam=0)
    lr_models.append(lr.fit(dataset))
  print(time.time()-start)
else:
  lr_models = load_model

load_model

[LogisticRegressionModel: uid = LogisticRegression_21115049c225, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_837cd0f16974, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_ba94f5b88122, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_3f26494d31a8, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_5afca6cf1c1c, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_73a43171c212, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_e83efcdd7d2f, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_3552e959aede, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_642d39e0b20b, numClasses = 2, numFeatures = 100,
 LogisticRegressionModel: uid = LogisticRegression_1d7838628a3e, numClasses = 2, numFeatures = 100,


In [0]:
# for index,model in enumerate(lr_models):
#   model.save('lr_ml_model/'+str(index))

In [0]:
testing = sqlContext.read.csv('test.csv',header=True)

test_pipeline = Pipeline (
    stages = [
              document,
              tokenizer,
              stopWords,
              lemma,
              embeddings,
              sentence,
    ])

test_model = test_pipeline.fit(testing)
test_dataset = test_model.transform(testing)
test_dataset = test_dataset.withColumn('features',to_vector_udf(col('embeddings')[0].embeddings))
test_dataset = test_dataset.select('movie_id','features')



In [0]:
preds = []
start = time.time()
for i in range(len(lr_models)):
  print(i, time.time()-start, end=' ')
  start = time.time()
  pred = lr_models[i].transform(test_dataset)
  preds.append(pred.select('movie_id','lr_prediction').toPandas())
  print(time.time()-start)


In [0]:
preds_cpy = [p.copy() for p in preds]

In [0]:
preds_cpy

In [0]:
df = preds_cpy[0].copy()
df.columns = ['movie_id', '0']
df['0'] = df['0'].astype(int).astype(str).copy()
df
for i in range(1,20):
  tmp = preds_cpy[i].copy()
  tmp.columns = ['movie_id',str(i)]
  tmp[str(i)] = tmp[str(i)].astype(int).astype(str)
  df = df.merge(tmp, on='movie_id', how='left')


df['prediction'] = df[[str(i) for i in range(20)]].apply(lambda x: ' '.join(x), axis=1)

In [0]:
df_new = df[['movie_id', 'prediction']]
df_new.to_csv('result_part3.csv',header=True,index=False)