In [0]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget -q http://us.mirrors.quenda.co/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
# !tar xf spark-2.4.5-bin-hadoop2.7.tgz
# !pip install -q findspark

In [0]:
# import os
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init('/home/cse587/spark-2.4.0-bin-hadoop2.7')
from pyspark.sql import SparkSession

In [0]:
# from google.colab import files
# files.upload()

In [0]:
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel, StopWordsRemover, IDF
from pyspark.sql.types import FloatType, IntegerType
from pyspark.ml.classification import LogisticRegression

In [0]:
spark = SparkSession.builder.appName('DicAssign3').config("spark.some.config.option","some-value").getOrCreate()

In [0]:
df = spark.read.csv("train.csv",inferSchema="true",header="true",escape='"')
df = df.na.drop(subset=["genre"])
df_test = spark.read.csv("test.csv",inferSchema="true",header="true",escape='"')
mappings = spark.read.csv("mapping.csv",header="true")

In [0]:
mappings = mappings.withColumn("genre_array", f.split(f.col("0"), ",\s*"))
mapping_list = ["Drama","Comedy","Romance Film","Thriller","Action","World cinema","Crime Fiction","Horror","Black-and-white","Indie","Action/Adventure","Adventure","Family Film","Short Film","Romantic drama","Animation","Musical","Science Fiction","Mystery","Romantic comedy"]

In [0]:
df1 = df.withColumn("genre_array", f.regexp_replace(f.col("genre"), r'[\[\]\']', ''))

In [0]:
df2 = df1.withColumn("genre_array", f.split(f.col("genre_array"), ", "))

In [0]:
# vect = CountVectorizer(inputCol="genre_array",outputCol="ohe_genre",vocabSize=20)
vect = CountVectorizerModel.from_vocabulary(vocabulary=mapping_list,inputCol="genre_array",outputCol="ohe_genre")
# vect_model = vect.fit(df2)
df_ohe = vect.transform(df2)

In [0]:
# clean plot
df_ohe = df_ohe.withColumn("plot", f.regexp_replace(f.col("plot"), r'[^A-Za-z ]', ''))
df_ohe = df_ohe.withColumn("plot", f.regexp_replace(f.col("plot"), '\s+', ' '))
df_ohe = df_ohe.withColumn("plot", f.lower(f.col("plot")))

In [0]:
# clean test plot
mappins_ohe_test = df_test.withColumn("plot", f.regexp_replace(f.col("plot"), r'[^A-Za-z ]', ''))
mappins_ohe_test = mappins_ohe_test.withColumn("plot", f.regexp_replace(f.col("plot"), '\s+', ' '))
mappins_ohe_test = mappins_ohe_test.withColumn("plot", f.lower(f.col("plot")))

In [0]:
df_ohe1 = df_ohe.withColumn("plot_tokens", f.split(f.col("plot"), " "))
mappins_ohe_test1 = mappins_ohe_test.withColumn("plot_tokens", f.split(f.col("plot"), " "))

In [0]:
remover = StopWordsRemover(inputCol="plot_tokens", outputCol="filtered_plot")
filtered_df = remover.transform(df_ohe1)
filtered_df_test = remover.transform(mappins_ohe_test1)

In [0]:
plot_vectorizer = CountVectorizer(inputCol="filtered_plot",outputCol="matrix",vocabSize=15000)
plot_vect_model = plot_vectorizer.fit(filtered_df)
term_doc_matrix = plot_vect_model.transform(filtered_df)
term_doc_matrix_test = plot_vect_model.transform(filtered_df_test)

In [0]:
idf = IDF(inputCol="matrix", outputCol="features")
idfModel = idf.fit(term_doc_matrix)
rescaledData = idfModel.transform(term_doc_matrix)
rescaledData_test = idfModel.transform(term_doc_matrix_test)

In [21]:
rescaledData.select("features").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [22]:
rescaledData.select("matrix").head(1)

[Row(matrix=SparseVector(15000, {125: 1.0, 179: 1.0, 497: 1.0, 534: 1.0, 574: 1.0, 1863: 1.0, 2080: 1.0, 3127: 1.0, 5251: 1.0, 8211: 1.0, 13625: 1.0}))]

In [23]:
tfidf = rescaledData.selectExpr("movie_id","features", "ohe_genre as label")
tfidf_test = rescaledData_test.selectExpr("movie_id","features")
tfidf.count()

31109

In [0]:
lrModels = []
for i in range(20):
    firstElement = f.udf(lambda v: float(v[i]), FloatType())

    new_df = tfidf.withColumn("label", firstElement("label"))
    lr = LogisticRegression(maxIter=100, regParam=0.3, elasticNetParam=0)
    lrModel = lr.fit(new_df)
    lrModels.append(lrModel)

In [0]:
predictions = []
for i in range(len(lrModels)):
    firstElement = f.udf(lambda v: float(v[i]), FloatType())

    new_df = tfidf.withColumn("label", firstElement("label"))
    predicted = lrModels[i].transform(tfidf_test)
    predicted = predicted.withColumn("prediction", predicted["prediction"].cast(IntegerType()))
    predictions.append(predicted)

In [0]:
final_df = predictions[0].selectExpr("movie_id","prediction as predictions")
for i in range(1,len(predictions)):
    final_df = final_df.join(predictions[i].selectExpr("movie_id","prediction as prediction"+str(i)), ["movie_id"])
    final_df = final_df.withColumn("predictions", f.concat(f.col("predictions"), f.lit(" "), f.col("prediction"+str(i)))).select("movie_id","predictions")

In [0]:
final_df.show(truncate=False)

In [0]:
final_df.toPandas().to_csv("pred2.csv",index = False)