In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [None]:
import findspark
findspark.init()

In [None]:
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2'

In [None]:

from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

spark

## Loading Data

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Specify column names and types
schema = StructType([
    StructField("id", IntegerType()),
    StructField("text", StringType()),
    StructField("label", IntegerType())
])

# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Data/sms.csv', sep=';', header=False, schema=schema)

# Print schema of DataFrame
sms.printSchema()

root
 |-- id: integer (nullable = true)
 |-- text: string (nullable = true)
 |-- label: integer (nullable = true)



## Data Preparation

* remove punctuation and numbers
* tokenize (split into individual words)
* remove stop words
* apply the hashing trick
* convert to TF-IDF representation.

In [None]:
# Import the necessary functions
from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer

# Remove punctuation (REGEX provided) and numbers
wrangled = sms.withColumn('text', regexp_replace(sms.text, '[_():;,.!?\\-]', ' '))
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, '[0-9]', ' '))

# Merge multiple spaces
wrangled = wrangled.withColumn('text', regexp_replace(wrangled.text, ' +', ' '))

# Split the text into words
wrangled = Tokenizer(inputCol='text', outputCol='words').transform(wrangled)

wrangled.show(4, truncate=False)

sms.show(4, truncate=False)

+---+----------------------------------+-----+------------------------------------------+
|id |text                              |label|words                                     |
+---+----------------------------------+-----+------------------------------------------+
|1  |Sorry I'll call later in meeting  |0    |[sorry, i'll, call, later, in, meeting]   |
|2  |Dont worry I guess he's busy      |0    |[dont, worry, i, guess, he's, busy]       |
|3  |Call FREEPHONE now                |1    |[call, freephone, now]                    |
|4  |Win a cash prize or a prize worth |1    |[win, a, cash, prize, or, a, prize, worth]|
+---+----------------------------------+-----+------------------------------------------+
only showing top 4 rows

+---+-------------------------------------------+-----+
|id |text                                       |label|
+---+-------------------------------------------+-----+
|1  |Sorry, I'll call later in meeting          |0    |
|2  |Dont worry. I guess he's b

## Data Preparation - II

The next steps will be to remove stop words and then apply the hashing trick, converting the results into a TF-IDF.

A quick reminder about these concepts:

* The hashing trick provides a fast and space-efficient way to map a very large (possibly infinite) set of items (in this case, all words contained in the SMS messages) onto a smaller, finite number of values.

* The TF-IDF matrix reflects how important a word is to each document. It takes into account both the frequency of the word within each document but also the frequency of the word across all of the documents in the collection.

In [None]:
from pyspark.ml.feature import StopWordsRemover, HashingTF, IDF


# Remove stop words.
wrangled = StopWordsRemover(inputCol='words', outputCol='terms')\
      .transform(wrangled)

# Apply the hashing trick
wrangled = HashingTF(inputCol='terms', outputCol='hash', numFeatures=1024)\
      .transform(wrangled)

# Convert hashed symbols to TF-IDF
tf_idf = IDF(inputCol='hash', outputCol='features')\
      .fit(wrangled).transform(wrangled)
      
tf_idf.select('terms', 'features').show(4, truncate=False)

+--------------------------------+----------------------------------------------------------------------------------------------------+
|terms                           |features                                                                                            |
+--------------------------------+----------------------------------------------------------------------------------------------------+
|[sorry, call, later, meeting]   |(1024,[138,384,577,996],[2.273418200008753,3.6288353225642043,3.5890949939146903,4.104259019279279])|
|[dont, worry, guess, busy]      |(1024,[215,233,276,329],[3.9913186080986836,3.3790235241678332,4.734227298217693,4.58299632849377]) |
|[call, freephone]               |(1024,[133,138],[5.367951058306837,2.273418200008753])                                              |
|[win, cash, prize, prize, worth]|(1024,[31,47,62,389],[3.6632029660684124,4.754846585420428,4.072170704727778,7.064594791043114])    |
+--------------------------------+--------------

## Fitting Logistic Regression Model

* Split the TF-IDF data into training and testing sets. 
* use the training data to fit a Logistic Regression model and 
* Finally evaluate the performance of that model on the testing data by preparing a confusion matrix

In [None]:
from pyspark.ml.classification import LogisticRegression

# Split the data into training and testing sets
sms_train, sms_test = tf_idf.randomSplit([0.8,0.2],seed=13)

# Fit a Logistic Regression model to the training data
logistic = LogisticRegression(regParam=0.2).fit(sms_train)

# Make predictions on the testing data
prediction = logistic.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   41|
|    0|       0.0|  948|
|    1|       1.0|  105|
|    0|       1.0|    2|
+-----+----------+-----+



## Classification Model Pipeline

Steps to be considered while preparing the pipeline

* Split the text into tokens
* Remove stop words
* Apply the hashing trick
* Convert the data from counts to IDF and
* Train a logistic regression model.

In [None]:
# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Data/sms.csv', sep=';', header=False, schema=schema)

# Split the data into training and testing sets
sms_train, sms_test = sms.randomSplit([0.8,0.2],seed=13)

# Import class for creating a pipeline
from pyspark.ml import Pipeline

from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol='terms', outputCol="hash")
idf = IDF(inputCol='hash', outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[tokenizer,remover, hasher, idf, logistic])

# Train the pipeline on the training data
pipeline = pipeline.fit(sms_train)

# Make predictions on the testing data
predictions = pipeline.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   41|
|    0|       0.0|  948|
|    1|       1.0|  105|
|    0|       1.0|    2|
+-----+----------+-----+



## Grid Search Optimization with Pipeline

Error reference : https://stackoverflow.com/questions/61257344/attributeerror-pipelinemodel-object-has-no-attribute-fitmultiple

In [None]:
# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Data/sms.csv', sep=';', header=False, schema=schema)

# Split the data into training and testing sets
sms_train, sms_test = sms.randomSplit([0.8,0.2],seed=13)

sms_train.show(2)

+---+--------------------+-----+
| id|                text|label|
+---+--------------------+-----+
|  1|Sorry, I'll call ...|    0|
|  3|Call FREEPHONE 08...|    1|
+---+--------------------+-----+
only showing top 2 rows



In [None]:

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator( labelCol='label')

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol='terms', outputCol="hash")
idf = IDF(inputCol='hash', outputCol="features")

# Create a logistic regression object and add everything to a pipeline
logistic = LogisticRegression(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[tokenizer,remover, hasher, idf, logistic])


In [None]:
# Create parameter grid
params = ParamGridBuilder()

# Add grid for hashing trick parameters
# params = params.addGrid(hasher.numFeatures, [1024, 4096, 16384]) \
#                .addGrid(hasher.binary, [True,False])

# Add grid for logistic regression parameters
params = params.addGrid(logistic.regParam, [0.01, 0.1, 1.0 ,10.0]) \
               .addGrid(logistic.elasticNetParam, [0.0, 0.5, 1.0])

# Build parameter grid
params = params.build()
print('Number of models to be tested: ', len(params))

# Create cross-validator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params, evaluator=evaluator, numFolds=5)

# Train and test model on multiple folds of the training data
cv = cv.fit(sms_train)

Number of models to be tested:  12


In [None]:
# Get the best model from cross validation
best_model = cv.bestModel

# Look at the stages in the best model
print(best_model.stages)

##Prediction Evaluation
evaluator.evaluate(best_model.transform(sms_test))

[Tokenizer_e561c7110000, StopWordsRemover_799c60f896ca, HashingTF_9a1ec8a1a2f3, IDFModel: uid=IDF_d64b9b8341f2, numDocs=4478, numFeatures=262144, LogisticRegressionModel: uid=LogisticRegression_209601e83f66, numClasses=2, numFeatures=262144]


0.9948954578226397

## Other Classification Models

## Decision Tree Classifier

In [None]:
# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Data/sms.csv', sep=';', header=False, schema=schema)

# Split the data into training and testing sets
sms_train, sms_test = sms.randomSplit([0.8,0.2],seed=13)

# Import class for creating a pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol='terms', outputCol="hash")
idf = IDF(inputCol='hash', outputCol="features")

# Create a logistic regression object and add everything to a pipeline
dt = DecisionTreeClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[tokenizer,remover, hasher, idf, dt])

# Train the pipeline on the training data
pipeline = pipeline.fit(sms_train)

# Make predictions on the testing data
predictions = pipeline.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()

+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|    1|       0.0|   41|
|    0|       0.0|  948|
|    1|       1.0|  105|
|    0|       1.0|    2|
+-----+----------+-----+



## Gradient Boosting Classifier

In [None]:
# Load data from a delimited file
sms = spark.read.csv('/content/drive/MyDrive/Data/sms.csv', sep=';', header=False, schema=schema)

# Split the data into training and testing sets
sms_train, sms_test = sms.randomSplit([0.8,0.2],seed=13)

# Import class for creating a pipeline
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF

# Break text into tokens at non-word characters
tokenizer = Tokenizer(inputCol='text', outputCol='words')

# Remove stop words
remover = StopWordsRemover(inputCol='words', outputCol='terms')

# Apply the hashing trick and transform to TF-IDF
hasher = HashingTF(inputCol='terms', outputCol="hash")
idf = IDF(inputCol='hash', outputCol="features")

# Create a logistic regression object and add everything to a pipeline
gbdt = GBTClassifier(featuresCol='features', labelCol='label')
pipeline = Pipeline(stages=[tokenizer,remover, hasher, idf, gbdt])

# Train the pipeline on the training data
pipeline = pipeline.fit(sms_train)

# Make predictions on the testing data
predictions = pipeline.transform(sms_test)

# Create a confusion matrix, comparing predictions to known labels
prediction.groupBy('label', 'prediction').count().show()

KeyboardInterrupt: ignored

In [None]:
# Compare performance of the two models
evaluator.evaluate(tree.transform(sms_test))
evaluator.evaluate(gbt.transform(sms_test))


In [None]:
# Find the number of trees and the relative importance of features
print(gbt.trees)
print(gbt.featureImportances)