# Setup and Installation

In [1]:
# !apt-get install openjdk-8-jdk-headless -qq > /dev/null
# !wget --directory-prefix=/content -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
# !wget --directory-prefix=/content -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
# !tar xf spark-2.4.5-bin-hadoop2.7.tgz
# !pip install -q findspark

In [2]:
# import os
# import findspark

# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# findspark.init("/content/spark-2.4.5-bin-hadoop2.7")

# BASE_URL = "./drive/My Drive/Colab Notebooks"

# Spark Initialization

In [37]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, functions as F
from pyspark.ml.feature import StopWordsRemover, StringIndexer, Word2Vec, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.sql.types import (
    StringType
)
from pyspark.ml import PipelineModel

In [6]:
spark = SparkSession \
            .builder \
            .appName("EML Batch 9") \
            .getOrCreate()

sc = spark.sparkContext

In [7]:
# train = spark.read.csv(f"{BASE_URL}/datasets/train_document.csv", header=True)
# test = spark.read.csv(f"{BASE_URL}/datasets/test_document.csv", header=True)
train = spark.read.csv("./data/train_document.csv", header=True)
test = spark.read.csv("./data/test_document.csv", header=True)
train.show(1, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 category | business                                                                                                                                                                                                                                                                                                                                                                                                                                                                        

#  Text Preprocessing

In [8]:
@F.udf(returnType=StringType())
def preprocessor(x):
  x = x.lower()
  return x

train = train.withColumn("text_cleaned", preprocessor("text"))
test = test.withColumn("text_cleaned", preprocessor("text"))

train.show(1, truncate=False, vertical=True)

-RECORD 0--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 category     | business                                                                                                                                                                                                                                                                                                                                                                                                                                                                

# Pipeline Preprocessor

In [10]:
locale = sc._jvm.java.util.Locale
locale.setDefault(locale.forLanguageTag("en-US"))

In [11]:
indexer = StringIndexer(inputCol="category", outputCol="categoryIndex") 
tokenizer = Tokenizer(inputCol="text_cleaned", outputCol="tokenizer_words")
remover = StopWordsRemover(inputCol="tokenizer_words", outputCol="filtered_words")
word2Vec = Word2Vec(vectorSize=1, minCount=15, inputCol="filtered_words", outputCol="word_vectors")
lr = LogisticRegression(maxIter=1, regParam=0.001, featuresCol = "word_vectors", labelCol="categoryIndex")
pipeline = Pipeline(stages=[tokenizer, remover, word2Vec, indexer, lr])

# Model Training and Evaluation

In [None]:
# Fit the pipeline to training documents.
model = pipeline.fit(train)

In [12]:
output = model.transform(test)
predictionAndLabels = output.rdd.map(lambda x: (x["prediction"], x["categoryIndex"]))

In [13]:
# Instantiate metrics object
metrics = MulticlassMetrics(predictionAndLabels)

In [14]:
metrics.confusionMatrix().toArray()

array([[ 7.,  0., 95.,  0.,  0.],
       [19., 54., 29.,  0.,  0.],
       [ 1.,  0., 83.,  0.,  0.],
       [ 9.,  0., 71.,  0.,  0.],
       [19.,  6., 52.,  0.,  0.]])

In [15]:
print(f"Accuracy: {metrics.accuracy}")

Accuracy: 0.3235955056179775


# Hyper Parameter Tuning

In [20]:
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.1, 0.01, 0.001]) \
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

In [26]:
tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(labelCol="categoryIndex", metricName="accuracy"),
                           # 80% of the data will be used for training, 20% for validation.
                           trainRatio=0.8)

In [27]:
# Run TrainValidationSplit, and choose the best set of parameters.
model = tvs.fit(train)

# Make predictions on test data. model is the model with combination of parameters
# that performed best.
model.transform(test)\
    .show(3, vertical=True)

-RECORD 0-------------------------------
 category        | business             
 text            | Rank 'set to sell... 
 text_cleaned    | rank 'set to sell... 
 tokenizer_words | [rank, 'set, to, ... 
 filtered_words  | [rank, 'set, sell... 
 word_vectors    | [0.09590236956247... 
 categoryIndex   | 0.0                  
 rawPrediction   | [0.33299247895591... 
 probability     | [0.24817082173529... 
 prediction      | 2.0                  
-RECORD 1-------------------------------
 category        | business             
 text            | "Turkey knocks si... 
 text_cleaned    | "turkey knocks si... 
 tokenizer_words | ["turkey, knocks,... 
 filtered_words  | ["turkey, knocks,... 
 word_vectors    | [0.08278294069611... 
 categoryIndex   | 0.0                  
 rawPrediction   | [0.30714298564165... 
 probability     | [0.24917775035830... 
 prediction      | 2.0                  
-RECORD 2-------------------------------
 category        | entertainment        
 text           

# Save model

In [29]:
import os

In [31]:
os.makedirs("model", exist_ok=True)

In [33]:
best_model = model.bestModel

In [34]:
best_model.write().overwrite().save("model/first_model.pkl")

# Load model

In [39]:
load_model = PipelineModel.load("./model/first_model.pkl/")

In [42]:
load_model.transform(test)\
    .show(3, vertical=True)

-RECORD 0-------------------------------
 category        | business             
 text            | Rank 'set to sell... 
 text_cleaned    | rank 'set to sell... 
 tokenizer_words | [rank, 'set, to, ... 
 filtered_words  | [rank, 'set, sell... 
 word_vectors    | [0.09590236956247... 
 categoryIndex   | 0.0                  
 rawPrediction   | [0.33299247895591... 
 probability     | [0.24817082173529... 
 prediction      | 2.0                  
-RECORD 1-------------------------------
 category        | business             
 text            | "Turkey knocks si... 
 text_cleaned    | "turkey knocks si... 
 tokenizer_words | ["turkey, knocks,... 
 filtered_words  | ["turkey, knocks,... 
 word_vectors    | [0.08278294069611... 
 categoryIndex   | 0.0                  
 rawPrediction   | [0.30714298564165... 
 probability     | [0.24917775035830... 
 prediction      | 2.0                  
-RECORD 2-------------------------------
 category        | entertainment        
 text           

In [43]:
spark.stop()