In [None]:
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, lower, regexp_replace, split
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.stem.snowball import SnowballStemmer
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import *
import pandas as pd
from pyspark.ml.feature import HashingTF,IDF
from pyspark.sql.functions import *
import re

 
spark = (SparkSession.builder
                  .appName('Toxic Comment Classification')
                  .enableHiveSupport()
                  .config("spark.executor.memory", "4G")
                  .config("spark.driver.memory","18G")
                  .config("spark.executor.cores","7")
                  .config("spark.python.worker.memory","4G")
                  .config("spark.driver.maxResultSize","0")
                  .config("spark.sql.crossJoin.enabled", "true")
                  .config("spark.serializer","org.apache.spark.serializer.KryoSerializer")
                  .config("spark.default.parallelism","2")
                  .config("spark.speculation","false")
                  .getOrCreate())

#spark.conf.set("spark.sql.shuffle.partitions", 6)

In [None]:
pd_df = pd.read_csv('/home/cse587/train.csv')
test_df = pd.read_csv('/home/cse587/test.csv')
genre_all=['Drama', 'Comedy','Romance Film','Thriller','Action','World cinema','Crime Fiction','Horror','Black-and-white','Indie','Action/Adventure','Adventure','Family Film','Short Film','Romantic drama','Animation','Musical','Science Fiction','Mystery','Romantic comedy']

In [None]:
mySchema = StructType([ StructField("movie_id", LongType())\
                       ,StructField("movie_name", StringType())\
                       ,StructField("plot", StringType())\
                       ,StructField("genre", StringType())])
testSchema = StructType([ StructField("movie_id", LongType())\
                       ,StructField("movie_name", StringType())\
                       ,StructField("plot", StringType())])

df = spark.createDataFrame(pd_df,schema=mySchema)
test_df = spark.createDataFrame(test_df,schema=testSchema)

In [None]:
df_clean = df.select('movie_id','movie_name',(lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')),'genre')
tdf_clean = test_df.select('movie_id','movie_name',(lower(regexp_replace('plot', "[^a-zA-Z\\s]", "")).alias('plot')))


tokenizer = Tokenizer(inputCol='plot', outputCol='words_token')
df_words_token = tokenizer.transform(df_clean).select('movie_id','movie_name','words_token','genre')

tokenizer = Tokenizer(inputCol='plot', outputCol='words_token')
tdf_words_token = tokenizer.transform(tdf_clean).select('movie_id','movie_name','words_token')

remover = StopWordsRemover(inputCol='words_token', outputCol='plot_clean')
df_words_no_stopw = remover.transform(df_words_token).select('movie_id','movie_name','plot_clean','genre' )

remover = StopWordsRemover(inputCol='words_token', outputCol='plot_clean')
tdf_words_no_stopw = remover.transform(tdf_words_token).select('movie_id','movie_name','plot_clean' )
 
df.unpersist()
df_clean.unpersist()
df_words_token.unpersist()
test_df.unpersist()
tdf_clean.unpersist()
tdf_words_token.unpersist()

In [None]:
label_string=StringIndexer(inputCol="genre",outputCol="label")
lsm=label_string.fit(df_words_no_stopw)
tfidf4=lsm.transform(df_words_no_stopw)
#tfidf4.printSchema()
#tfidf4.show(1,truncate=0)

In [None]:
#word2vec
from pyspark.ml.feature import Word2Vec

word2vec = Word2Vec(vectorSize = 10, minCount = 1200, inputCol = 'plot_clean', outputCol = 'features')
word_df = word2vec.fit(tfidf4)

word2vec_df = word_df.transform(tfidf4)
word2vec_tdf = word_df.transform(tdf_words_no_stopw) 

In [None]:
from pyspark.ml.classification import RandomForestClassificationModel

# create the trainer and set its parameters
#nb = RandomForestClassifier(labelCol="label", featuresCol="features",numTrees = 7)

#model = nb.fit(word2vec_df) 
#predictions.show()
model = RandomForestClassificationModel.load("word2vec_trained_model")

predictions = model.transform(word2vec_tdf)

word2vec_df.unpersist()

word2vec_tdf.unpersist()

In [None]:
from pyspark.ml.feature import IndexToString
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                               labels=lsm.labels)
pdf=labelConverter.transform(predictions)
#pdf.show(10)

def col_convert(l):
  x=re.sub("'","",l)
  x=re.sub(" ","",x)
  li=x[1:len(x)-1].split(",")
  req_li=[str(0)]*20
  for j in range(0,len(genre_all)):
    for i in range(0,len(li)): 
      if(li[i]==genre_all[j]):
        req_li[j]=str(1)
  t =" ".join(req_li)
  return t

udf_col=udf(col_convert,StringType())
df13=pdf.select('movie_id','predictedLabel')
df13=df13.withColumn("col9",udf_col(df13['predictedLabel']))
testdf=df13.select('movie_id','col9')
testdf.coalesce(1).write.csv('test_random_word2vec.csv') 