# Training Batch Model on Streaming Data with Spark Streaming and Online  Predictions with Structured Streaming

Apache Spark was build to analyze Big Data with faster speed. One of the important features that Apache Spark offers is the ability to run the computations in memory. 
PySpark is the interface that gives access to Spark using the Python programming language. PySpark is an API developed in python for spark programming and writing spark applications in Python style

In this effort, we will mostly deal with the PySpark ml - machine learning library that can be used to import the Logistic Regression model or other machine learning models.

Google Colab?
Colab by Google is based on Jupyter Notebook which is an incredibly powerful tool that leverages google docs features. Since it runs on google server, we don't need to install anything in our system locally, be it Spark or deep learning model. The most attractive features of Colab are the free GPU and TPU support! Since the GPU support runs on Google's own server, it is, in fact, faster than some commercially available GPUs like the Nvidia 1050Ti. 

Let’s create a simple logistic regression model with PySpark in Google Colab. To open Colab Jupyter Notebook, click on this link.

# Running Pyspark in Colab

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

/bin/sh: apt-get: command not found


In [None]:
import os
# Apache Hadoop 3.x now supports only Java 8
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
#from pyspark.sql import SparkSession
#spark = SparkSession.builder.master("local[*]").getOrCreate()

Our Colab is ready to run PySpark. Let's build a simple Logistic Regression model.

# Import libraries

In [1]:
import os
import pyspark
import sys
import re

import pandas as pd
import numpy as np
import time


from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils
from pyspark.context import SparkContext

from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import StructType
from pyspark.sql import functions as f

from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.feature import HashingTF
from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassificationModel, RandomForestClassifier
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.linalg import Vectors, SparseVector, DenseVector
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [2]:

# location to stream train data from 
train_csv_dir = os.getcwd()+"/module11_cs1/train_data_csv"
# location to stream test data from
test_csv_dir = os.getcwd()+"/module11_cs1/test_data_csv"

In [3]:
# create training and testing dirs if they do not exist
if not os.path.exists(train_csv_dir):
    os.makedirs(train_csv_dir)

if not os.path.exists(test_csv_dir):
    os.makedirs(test_csv_dir)

# Streaming Batch Logistic Regression model 

The following example demonstrates how to load training and testing data from two different input streams of text files, parse the streams, fit a logistic regression model online to the first stream, and make predictions on the second stream.

Now we register the streams for training and testing and start the job.

We can now save text files with data to the training or testing folders. Anytime a text file is placed in sys.argv[1] the model will update. Anytime a text file is placed in sys.argv[2] you will see predictions. As you feed more data to the training directory, the predictions theoretically will get better!

## Training model in time intervals using Spark Streaming

For a Spark Streaming application running on a cluster to be stable, the system should be able to process data as fast as it is being received, so the batch processing time should be less than the batch interval.
The batch interval needs to be set such that the expected data rate in production can be sustained.
We will test it with a conservative batch interval (say, 5-20 seconds) and a low data rate. To verify whether the system is able to keep up with the data rate.

In [8]:


# Function to create and setup a new StreamingContext
def functionToCreateContext():
    global spark
    global dataset_df
    global userSchema
    global train_duration
    
    sc = SparkContext("local[2]", "Batch_Model_on_Stream_Data")  # new context
    ssc = StreamingContext(sc, train_duration)
    
    spark = SparkSession(sc)
    sqlContext = SQLContext(sc)
    # create an empty datframe
    dataset_df = sqlContext.createDataFrame(sc.emptyRDD(), userSchema)

    emptly_stream = ssc.queueStream([dataset_df.limit(1).rdd], oneAtATime=True, default=dataset_df.limit(1).rdd)  # create DStream
    emptly_stream.foreachRDD(train)
    ssc.start() 

    ssc.checkpoint(checkpointDirectory)  # set checkpoint directory
    return ssc



In [4]:

def predict(df, epoch_id):
    global model
    
    if not model == None:
        print("Predictions:")
        predictions = model.transform(df)['spam', 'message','label', 'prediction', 'probability']
        # print predictions to the console
        predictions.show()
        
    else:
        print("Model has not seen training data yet, therefore - no model exists")



In [5]:
def sendPartition(df, epoch_id):
    global dataset_df
    
    if not len(df.head(1)) == 0:
        dataset_df = df.union(dataset_df)



In [6]:


def train(rdd):
    global dataset_df
    global model
    global prev_length
    global evaluate
    global crossval_full

    if dataset_df.count() > prev_length:
        prev_length = dataset_df.count()

        if evaluate == True:
            # Split to train and test
            (trainingData, testData) = dataset_df.randomSplit([0.7, 0.3], seed=0)
        else:
            trainingData = dataset_df

        my_train_df = trainingData

        print("\n\nStarting to fit a model on " + str(my_train_df.count()) +" records")
        # crossvalidating on full_pipeline
        model = crossval_full.fit(my_train_df)
        print("Model fit compleeted\n")
        
        if evaluate == True:
            predictions = model.transform(testData)
            evaluator = BinaryClassificationEvaluator().setLabelCol("label").setRawPredictionCol("prediction").setMetricName("areaUnderROC")
            accuracy = evaluator.evaluate(predictions)
            print("Evalluated on "+ str(predictions.count()) +" records")
            print ("Accuracy", accuracy)


In [7]:
# Create DataFrame representing the stream of input lines from connection to localhost:9999
#userSchema = StructType().add("spam", "string").add("message", "string")
#df = spark \
#    .readStream \
#    .format("socket") \
#    .option("host", "localhost") \
#    .option("port", 9999) \
#    .load()
#df = df.selectExpr( "CAST(value AS STRING)")


#import pyspark.sql.functions as f
# split text lines into two fields
#split_col = f.split(df['value'], '\t')
#df = df.withColumn('spam', split_col.getItem(0))
#df = df.withColumn('message', split_col.getItem(1))
#df = df.select(['spam', 'message'])


In [9]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.3 pyspark-shell'

userSchema = StructType().add("spam", "string").add("message", "string")

dataset_df = None
# alternatilvelly, start with an intitial dataset
# dataset_df = sc.textFile("gs://drive3/data/spark/8_cs1_dataset/SMSSpamCollection").map(lambda line: re.split('\t', line)).toDF(["spam", "message"])  

checkpointDirectory = "chkpnt"

spark = None

model = None

prev_length = 0

evaluate = True
# duration of training a model on entire batch dataset
train_duration = 20  # train a model every n seconds


In [10]:
# Get StreamingContext from checkpoint data or create a new one
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

# Start the context
#context.start()
#context.awaitTermination()

In [11]:


indexer = StringIndexer().setInputCol("spam").setOutputCol("label")
# Extract words
tokenizer = Tokenizer().setInputCol("message").setOutputCol("words")
# Remove custom stopwords
stopwords = StopWordsRemover().getStopWords() + ["-"]
remover = StopWordsRemover().setStopWords(stopwords).setInputCol("words").setOutputCol("filtered")
# create features
hashingTF = HashingTF(numFeatures=10, inputCol="filtered", outputCol="features")
rf = RandomForestClassifier().setFeaturesCol("features").setNumTrees(10)
#dt = DecisionTreeClassifier()
lr = LogisticRegression(maxIter=10)

full_pipeline = Pipeline().setStages([ indexer, tokenizer, remover, hashingTF, lr])
############################################################

paramGrid = ParamGridBuilder() \
.addGrid(hashingTF.numFeatures, [10, 100, 1000]) \
.addGrid(lr.regParam, [0.1, 0.01]) \
.build()


crossval_full = CrossValidator(estimator=full_pipeline,
                            estimatorParamMaps=paramGrid,
                            evaluator=BinaryClassificationEvaluator(),
                            numFolds=2)  # use 3+ folds in practice



Stream processing on Sprak SQL engine is fast scalable, fault-tolerant.

In [12]:

# screen subfolders in working directory for new csv files

df = spark \
    .readStream \
    .option("sep", "\t") \
    .schema(userSchema)  \
    .csv(train_csv_dir) \
    .writeStream.foreachBatch(sendPartition)\
    .start()

time.sleep(train_duration*4)

df_test = spark \
    .readStream \
    .option("sep", "\t") \
    .schema(userSchema)  \
    .csv(test_csv_dir) \
    .writeStream.foreachBatch(predict).start()



Starting to fit a model on 3920 records
Model fit compleeted

Evalluated on 1654 records
Accuracy 0.8891976062530532


In [None]:
#ssc.start() 
# Start the computation
context.awaitTermination()  # Wait for the computation to terminate
#print(flag)

Predictions:
+----+--------------------+-----+----------+--------------------+
|spam|             message|label|prediction|         probability|
+----+--------------------+-----+----------+--------------------+
| ham|Go until jurong p...|  0.0|       0.0|[0.98222810319386...|
| ham|Ok lar... Joking ...|  0.0|       0.0|[0.98225861109283...|
|spam|Free entry in 2 a...|  1.0|       1.0|[0.01885752181723...|
| ham|U dun say so earl...|  0.0|       0.0|[0.97771260955088...|
| ham|Nah I don't think...|  0.0|       0.0|[0.94934090451217...|
|spam|FreeMsg Hey there...|  1.0|       1.0|[0.37203591496283...|
| ham|Even my brother i...|  0.0|       0.0|[0.96752277542991...|
| ham|As per your reque...|  0.0|       0.0|[0.94607020920184...|
|spam|WINNER!! As a val...|  1.0|       1.0|[0.05238823021501...|
|spam|Had your mobile 1...|  1.0|       1.0|[0.06098116990971...|
| ham|I'm gonna be home...|  0.0|       0.0|[0.99097672028864...|
|spam|SIX chances to wi...|  1.0|       1.0|[0.12092943424657..

# Streaming from kafka topic

In [None]:
#flume-ng agent --conf conf --conf-file /usr/local/Cellar/flume/1.9.0/libexec/conf/flume-sample.conf  -Dflume.root.logger=DEBUG,console --name a1 -Xmx512m -Xms256m

In [None]:
#!flume-ng agent --conf conf --conf-file /Users/val/Documents/code/spark/m11_to_Upload/netcat.conf.txt  -Dflume.root.logger=DEBUG,console --name NetcatAgent -Xmx512m -Xms256m

In [3]:
# Subscribe to 1 topic
#df = spark \
#    .readStream \
#    .format("kafka") \
#    .option("kafka.bootstrap.servers", "localhost:9092") \
#    .option("subscribe", "sample-topic") \
#    .option("startingOffsets", "earliest") \
#    .load()

#df = df.selectExpr( "CAST(value AS STRING)")


#import pyspark.sql.functions as f
# split text lines into two fields
#split_col = f.split(df['value'], '\t')
#df = df.withColumn('spam', split_col.getItem(0))
#df = df.withColumn('message', split_col.getItem(1))
#df = df.select(['spam', 'message'])

# convert label: spam = 1, ham = 0 
#from pyspark.sql import functions as f
#df = df.withColumn("label", f.when(f.col('spam') == "spam", 1).otherwise(0))