In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

import findspark
findspark.init()
from pyspark.sql import SparkSession
# spark = SparkSession.builder.master("local[*]").getOrCreate()
spark = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "16g")\
        .config("spark.driver.memory", "16g")\
        .config("spark.memory.offHeap.enabled",'true')\
        .config("spark.memory.offHeap.size","16g")\
        .getOrCreate()

!ls

drive		  spark-2.4.0-bin-hadoop2.7	 train.csv
genre_lables.csv  spark-2.4.0-bin-hadoop2.7.tgz
sample_data	  test.csv


In [None]:
import numpy as np
import pandas as pd
from ast import literal_eval 
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, ArrayType
from pyspark.ml import Pipeline
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import regexp_replace,col,array_contains,explode
from pyspark.sql.functions import split
from pyspark.sql.functions import monotonically_increasing_id
from functools import reduce
from pyspark.sql.functions import concat_ws
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import Word2Vec

In [None]:
pd_df = pd.read_csv(r'train.csv')
data_spark_df = spark.createDataFrame(pd_df)
pd_df['genre']= pd_df['genre'].apply(literal_eval)
all_genre = pd_df['genre'].to_list()
names =['Drama','Comedy','Romance Film','Thriller','Action','World cinema','Crime Fiction','Horror','Black-and-white','Indie','Action/Adventure','Adventure','Family Film','Short Film','Romantic drama','Animation','Musical','Science Fiction','Mystery','Romantic comedy']
matrix = np.zeros((len(pd_df),len(names)))

for i,genre in enumerate(all_genre):
  for j,g in enumerate(genre):
    for k,name in enumerate(names):
        if name==g:
          matrix[i][k] = 1
names = "Drama , Comedy , Romance Film , Thriller , Action , World cinema , Crime Fiction , Horror , Black-and-white , Indie , Action/Adventure , Adventure , Family Film , Short Film , Romantic drama , Animation , Musical , Science Fiction , Mystery , Romantic comedy"
np.savetxt("genre_lables.csv", matrix, delimiter=",",fmt='%d',header=names)


In [None]:
lables_df = pd.read_csv(r'genre_lables.csv')
test_pd_df = pd.read_csv(r'test.csv')
lables_spark_df = spark.createDataFrame(lables_df)
test_spark_df = spark.createDataFrame(test_pd_df)
ddf1 = data_spark_df.withColumn("row_id", monotonically_increasing_id())
ddf2 = lables_spark_df.withColumn("row_id", monotonically_increasing_id())
ddf3 = test_spark_df.withColumn("row_id", monotonically_increasing_id())
df = ddf1.join(ddf2, "row_id").drop("row_id")
test_df = ddf3.join(ddf2, "row_id").drop("row_id")

In [None]:
%%time
tokenizer = Tokenizer(inputCol="plot", outputCol="words")
word2Vec = Word2Vec(inputCol="words", outputCol="features", minCount=1)
pipeline = Pipeline(stages=[tokenizer, word2Vec])

model = pipeline.fit(df)
dataset = model.transform(df)

model2 = pipeline.fit(test_df)
test_dataset = model2.transform(test_df)

CPU times: user 276 ms, sys: 65.5 ms, total: 342 ms
Wall time: 7min 38s


In [None]:
train_dataset, validation_dataset = dataset.randomSplit([1.0, 0.0], seed = 123)

In [None]:
%%time
dfList = []
labelCols = lables_spark_df.columns
lr = LogisticRegression(featuresCol = 'features',maxIter=700)
for labelCol in labelCols:
    lr.setLabelCol(labelCol)
    lrModel = lr.fit(train_dataset)
    predictions = lrModel.transform(test_dataset)
    predictions = predictions.drop("features",labelCol,"rawPrediction","probability")
    predictions = predictions.withColumn("prediction",F.col("prediction").cast(IntegerType()))
    dfList.append(predictions.select("movie_id","prediction"))

CPU times: user 1min 7s, sys: 22.2 s, total: 1min 29s
Wall time: 46min 12s


In [None]:
%%time
dfs_renamed = [df.selectExpr('movie_id', f'prediction as prediction_{i}') for i, df in enumerate(dfList)]
temp_df = reduce(lambda x, y: x.join(y, ['movie_id'], how='full'), dfs_renamed)
col_list = ['prediction_%d' % i for i in range(len(dfList))]
temp_df = temp_df.withColumn('predictions',concat_ws(" ",*col_list)).drop(*col_list).toPandas().to_csv("predictions_part3.csv",index=False)

CPU times: user 1.51 s, sys: 375 ms, total: 1.88 s
Wall time: 2min 48s
