<p style="text-align:center; font-size: 32px">Final project</p>
<br>
<p style="font-size:15px">Sujay Bokil (ME17B120)<br>
Irfan Thayyil (ME17B112)<br>
Joel Baby Johnson(ME17B144)</p>

## 1. Data preprocessing and Modelling

In [1]:
# Linking pyspark to kafka #
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1'

In [2]:
## Importing necessary libraries ##
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import col
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import PipelineModel

In [6]:
## Initialising spark session ##
spark = SparkSession.builder.appName('yelp_project').getOrCreate()
sc = spark.sparkContext

In [8]:
%%time
## Reading and storing the data ##
data = spark.read.json('gs://sgb1/yelp_train.json/*.json')
drop_list = ['business_id','review_id','user_id','date']#'cool', 'funny''useful'
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

+----+-----+-----+--------------------+------+
|cool|funny|stars|                text|useful|
+----+-----+-----+--------------------+------+
|   0|    0|  5.0|I had my sofa, lo...|     0|
|   0|    0|  5.0|Again great servi...|     0|
|   0|    0|  4.0|Opening night, ne...|     1|
|   0|    0|  4.0|Fun times. Great ...|     1|
|   0|    0|  2.0|I wanted to like ...|     0|
+----+-----+-----+--------------------+------+
only showing top 5 rows

CPU times: user 21.2 ms, sys: 1.36 ms, total: 22.6 ms
Wall time: 45.6 s


In [4]:
data.printSchema()

root
 |-- cool: long (nullable = true)
 |-- funny: long (nullable = true)
 |-- stars: double (nullable = true)
 |-- text: string (nullable = true)
 |-- useful: long (nullable = true)



In [5]:
%%time
## Show the counts for labels ##
data=data.withColumnRenamed("stars","label")
data.groupBy("label") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

+-----+-------+
|label|  count|
+-----+-------+
|  5.0|3516238|
|  4.0|1640703|
|  1.0|1258657|
|  3.0| 825490|
|  2.0| 622627|
+-----+-------+

CPU times: user 10.2 ms, sys: 0 ns, total: 10.2 ms
Wall time: 14.6 s


In [6]:
%%time
## Splitting train and test data ##
(trainingData, testData) = data.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 5503714
Test Dataset Count: 2360001
CPU times: user 14.4 ms, sys: 0 ns, total: 14.4 ms
Wall time: 36.8 s


In [None]:
## Preprocessing steps ##
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
add_stopwords = ["http","https","amp","rt","t","c","the","a","an","it","its"]
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
countVectors = CountVectorizer(inputCol="filtered", outputCol="text_features", vocabSize=10000, minDF=5)

hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="t_features", minDocFreq=5) #minDocFreq: remove sparse terms
assembler = VectorAssembler(inputCols=['cool','funny','useful','t_features'],outputCol="features")
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)

pipeline = Pipeline(stages=[regexTokenizer,stopwordsRemover,hashingTF,idf,assembler,lr])

In [None]:
%%time
## Creating a pipeline ##
pipelineModel = pipeline.fit(trainingData)

CPU times: user 175 ms, sys: 16.1 ms, total: 191 ms
Wall time: 10min 1s


In [None]:
## Fitting the model ##
print("Model Fitting Done")

Model Fitting Done


In [None]:
## Evaluating trained model on test dataset ##
evaluator =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
te_predictions =  pipelineModel.transform(testData)
accuracy = evaluator.evaluate(te_predictions)
print("Test Accuracy = ",accuracy)

('Test Accuracy = ', 0.626888717420035)


In [None]:
# Saving the model
pipelineModel.save("gs://joel_trail/PipelineModel_LR")

In [14]:
print('Saving Done')

Saving Done


In [None]:
%%time
evaluator =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
evaluator1 =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

tr_predictions =  pipelineModel.transform(trainingData)
te_predictions =  pipelineModel.transform(testData)

accuracy = evaluator.evaluate(tr_predictions);fscore = evaluator1.evaluate(tr_predictions)
print("Train Accuracy = ",accuracy," ,Train F1 score =",fscore)
accuracy = evaluator.evaluate(te_predictions);fscore = evaluator1.evaluate(te_predictions)
print("Test Accuracy = ",accuracy," ,Test F1 score =",fscore)

('Train Accuracy = ', 0.6280051979445153, ' ,Train F score =', 0.5726721694739169)


In [16]:
Model = PipelineModel.load("gs://joel_trail/PipelineModel_LR")
df = Model.transform(testData)

evaluator =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(df)
print("Test Accuracy = ",accuracy)

('Test Accuracy = ', 0.626888717420035)


## 2. Kafka streaming

In [None]:
## Installing Kafka on Dataproc cluster
# !conda install -c conda-forge kafka-python

In [4]:
## Importing necessary libraries ##
import time
from kafka import KafkaProducer
from google.cloud import storage

import pyspark.sql.functions as F

In [5]:
## Utility functions for Kafka streaming ##
def load_model(path_to_model):
    return PipelineModel.load(path_to_model)

def segment_by_tabs(line):
    elements = line.split()
    return "\t".join(elements[:2]) + "\t" + \
            " ".join(elements[2:4]) + "\t" + \
            "\t".join(elements[4:7]) + "\t" + \
            " ".join(elements[7:-2]) + \
            "\t" + "\t".join(elements[-2:])

### Producer

Reads data from json files inside a given directory and streams them line by line to the given topic

In [22]:
BROKER_IP = "10.128.0.8" # Internal Ip of Kafka VM
TOPIC = "nlp"
DATA_DIR = "gs://sgb1/yelp_train.json"


def run_producer(broker_ip, topic, data_dir, sleep_time=0):
    
    producer = KafkaProducer(bootstrap_servers=[f"{broker_ip}:9092"])
    
    # Reading the data
    bucket = data_dir.split("/")[2]
    bucket_dir = ''.join(data_dir.split("/")[3:])
    
    client = storage.Client()
    print("Reading the files ...\n")
    json_files = [os.path.join(f"gs://{bucket}", x.name) for x in client.list_blobs(bucket, prefix=bucket_dir) 
                                                                      if x.name.endswith('.json')]
    print("Files read successfully!\n")
    
    count = 1
    n = len(json_files)
    
    print(f"Writing the data to Kafka {topic} ...\n")
    for fnum, fname in enumerate(json_files):
        df = pd.read_json(fname, lines=True)
        
        lines = df.to_string(header=False,
                             index=False,
                             index_names=False).split('\n')
        
        lines = [segment_by_tabs(line) for line in lines]
        for line in lines:
            producer.send(topic, key=str(count).encode(), value=line.encode())
            producer.flush()
            time.sleep(sleep_time)
            count+=1
    
        print(f"File {fnum + 1}/{n} completed.")
            
    producer.close()    

In [None]:
# Running the producer
run_producer(BROKER_IP, TOPIC, DATA_DIR, 0.0)

Reading the files ...

Files read successfully!

Writing the data to Kafka nlp ...



### 2. Consumer

The consumer reads the streaming output data as a string line by line and converts it into a Spark dataframe. After that, it is sent to the model for getting the predictions which are then evaluated batchwise. The code below is the same as that inside the file subscriber.py which is submitted as a spark job

In [None]:
def eval_metrics(df, epoch_id):
    """Evaluates accuracy and F1 score for a spark dataframe and prints the dataframe

    Args:
        df (spark.DataFrame): Spark dataframe
        epoch_id (int): batch number
    """

    if df.count() > 0:
        eval_acc =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
        eval_f1 =  MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")

        print("-"*50)
        print(f"Batch: {epoch_id}")
        print("-"*50)
        df.show(df.count())
        print(f"Accuracy: {eval_acc.evaluate(df):.4f}\nF1 score: {eval_f1.evaluate(df):.4f}")

    pass


def get_inferences(broker_ip, topic, model_path):
    """Reads from stream and prints evaluated metrics

    Args:
        broker_ip (str): Internal IP address of Kafka VM
        topic (str): kafka topic
        model_path (str): path to the model in GCS bucket
    """
    
    spark = SparkSession.builder.appName("yelp_proj").getOrCreate()
    spark.sparkContext.setLogLevel("WARN")

    df = spark.readStream.format("kafka").\
            option("kafka.bootstrap.servers", f"{broker_ip}:9092").\
            option("subscribe", topic).\
            load()
    
    split_cols = F.split(df.value,'\t')
    df = df.withColumn('cool',split_cols.getItem(1))
    df = df.withColumn('funny',split_cols.getItem(3))
    df = df.withColumn('stars',split_cols.getItem(5))
    df = df.withColumn('text',split_cols.getItem(6))
    df = df.withColumn('useful',split_cols.getItem(7))
    
    for col in ['cool', 'funny', 'stars', 'useful']:
        df = df.withColumn(col, df[col].cast('float'))

    df = df.withColumnRenamed("stars","label")
    
    df.createOrReplaceTempView("intermediate")
    
    model = PipelineModel.load(model_path)
    
    predictions = model.transform(df)
    
    predictions = predictions.withColumn('correct',F.when((F.col('prediction')== F.col('label')),1).otherwise(0))

    output_df = predictions[['prediction', 'label', 'correct']]
    output_df.createOrReplaceTempView('output')
    
    query = output_df.writeStream.foreachBatch(eval_metrics).start()
    query.awaitTermination()

In [None]:
# Running the subscriber
BROKER_IP = "10.128.0.8"
TOPIC = "nlp"
DATA_DIR = "gs://sgb1/yelp_train.json"
MODEL_DIR = "gs://sgb1/PipelineModel_LR"

get_inferences(BROKER_IP, TOPIC, MODEL_DIR)