# Tasks:
## 1. Pipeline containing regex tokenization, removal of stopwords and feature vector creation using
###     a. Count vectorizer
###     b. Hashing TF and IDF
###     c. Word2 Vector
## 2. Conversion of mulitlabel classification into 20 binary classification problems via
###     a.Parsing through the list of available labels
###     b.Checking if the label is a part of a row- if yes then replace that label by 1, otherwise 0
## 3. Fitting logistic regression and converting to output format compatible dataframe
###     a.Storing the prediction column for each model in a dictionary
###     b.Converting the dictionary into dataframe and converting it to output compatible csv file
###     c.Storing 3 different csvs for three feature vectorization methods used

Run the below cell only once per notebook session


Specifically, line 13 to be run only once

In [1]:
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import RegexTokenizer,CountVectorizer
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import StopWordsRemover
# sentenceDataFrame = spark.createDataFrame -- to create dataframe
from pyspark.ml.feature import Word2Vec
from pyspark.ml.feature import NGram
from pyspark.ml.classification import LogisticRegression, OneVsRest
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.classification import LogisticRegression
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType
import numpy as np
#Vector assembler , string indexer and index to string from spark ML
# spark.driver.maxResultSize
spark = SparkSession.builder.master("local[*]")\
        .config("spark.executor.memory", "16g")\
        .config("spark.driver.memory", "16g")\
        .config("spark.memory.offHeap.enabled",'true')\
        .config("spark.memory.offHeap.size","8g")\
        .getOrCreate()
sqlContext = SQLContext(spark)

Spark version check

In [None]:
# !pyspark --driver-class-path /usr/share/java/mysql-connector-java.jar
!spark-submit --version

In [2]:
#load csv file (only for version 1.4 and above--current version is 2.4.5 BTW)
import pandas as pd
df = pd.read_csv('/Users/karunparashar/Downloads/Assignment 3 DIC/train2.csv')
dataframe = sqlContext.createDataFrame(df)
df2 = pd.read_csv('/Users/karunparashar/Downloads/Assignment 3 DIC/dic487-587/test.csv')
dataframe2 = sqlContext.createDataFrame(df2)

In [5]:
#trying to remove stopwords from the plot column using in build stop words from Spark
add_stopwords = ["http","https","amp","rt","t","c","the"] 
regexTokenizer = RegexTokenizer(inputCol="plot", outputCol="words", pattern="\\W")
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
countVectors = CountVectorizer(inputCol="filtered",vocabSize=10000, outputCol="features",minDF=1000)

Using count vectorizer-20 minutes

In [6]:
from pyspark.sql.types import IntegerType

mapping_dict = {'Drama':0,
               'Comedy':1,
               'Romance Film':2,
               'Thriller':3,
               'Action':4,
               'World cinema':5,
               'Crime Fiction':6,
               'Horror':7,
               'Black-and-white':8,
               'Indie':9,
               'Action/Adventure':10,
               'Adventure':11,
               'Family Film':12,
               'Short Film':13,
               'Romantic drama':14,
               'Animation':15,
               'Musical':16,
               'Science Fiction':17,
               'Mystery':18,
               'Romantic comedy':19}
nd1={}
for i in mapping_dict:
    udf = UserDefinedFunction(lambda x: 1 if i in x else 0 , IntegerType())
    new_df = dataframe.withColumn('label', udf(dataframe.genre))
    pipeline = Pipeline(stages=[regexTokenizer,stopwordsRemover, countVectors])
    pipelineFit = pipeline.fit(new_df)
    dataset = pipelineFit.transform(new_df)
    lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0)
    lrModel = lr.fit(dataset)
    dataframetest_0 =pipelineFit.transform(dataframe2) 
    pred1 = lrModel.transform(dataframetest_0)
    nd1[i] = np.array(pred1.select('prediction').collect())

In [10]:
#how the prediction stored for each genre looks like
for i in nd1:
    nd1[i] = list(nd1[i])

In [13]:
#converting dictionary to dataframe for output compatible format store in CSV file,pandas has been used only for this purpose
a =pd.DataFrame(nd1).astype(int)
predds = pd.Series(list(a.iloc[i,]) for i in range(len(a)))
pd.DataFrame(predds).to_csv('intermediate_count_vectorizerlogreg.csv',index=False)