# Overview

This Notebook connects to **kafka** to send and receive messages. These messages are then sent to a **Flask App** which serves a restful interaction with a **sentiment analysis** algorithm using python's **NLTK package** and returns a prediction on sentiment given a text document as input. The data is handled using **Spark**. Specifically, PySpark SQL [Structured Streaming](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html) module which is ran in this jupyter service instance.

This is all being ran locally, but can be migrated to a scalable cluster. The scripting in this notebook can be converted into a script *(idealy in scala)* to be launched in production. This notebook allows for prototyping of ml-based real-time solutions utilizing kafka and spark streaming

**Note**, these queries all end in `.start()` instead of `.awaitTermination()` because we are sending results to memory to allow output to jupyter notebook instead of console. 

# Dependencies

## Packages

In [2]:
import os
SCALA_VERSION="2.12"
SPARK_VERSION="3.0.1"
os.environ['PYSPARK_SUBMIT_ARGS'] = f"--packages=org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION} pyspark-shell"

import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

from confluent_kafka import Producer
import uuid
import random

import pandas as pd
import requests
import json

# To display spark sql streaming output
from IPython.display import display, clear_output
from time import sleep

Python Version expected 3.8.6

In [3]:
# Python 3.8.6
!python --version

Python 3.8.6


Expecting confluent-kafka version 1.5.0

In [4]:
# Version: 1.5.0
!pip show confluent-kafka

Name: confluent-kafka
Version: 1.5.0
Summary: Confluent's Python client for Apache Kafka
Home-page: https://github.com/confluentinc/confluent-kafka-python
Author: Confluent Inc
Author-email: support@confluent.io
License: UNKNOWN
Location: /opt/conda/lib/python3.8/site-packages
Requires: 
Required-by: 


In order to see what version of kafka is running, navigate to the kafka service in Docker. Open the CLI to access interactive shell. Then run the following commands:
    
    /bin/bash 
    find /usr/share/java/kafka -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
We are expecting to see kafka_2.11-2.2.0. This is scala 2.11 and kafka 2.2.0

## Functions

Sending Messages to Kafka

In [5]:
def delivery_report(err, msg):    
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print(f'Message delivery failed: {err}')
    else:
        print(f'Message delivered to {msg.topic()}')

def confluent_kafka_producer(messages, topic, bootstrap_servers):
    
    p = Producer({'bootstrap.servers': bootstrap_servers})
    for data in messages:
        
        record_key = str(uuid.uuid4())
        record_value = json.dumps({'data': data})
        p.produce(topic, key=record_key, value=record_value, on_delivery=delivery_report)
       
        p.poll(0)

    p.flush()
    print(f"we've sent {len(messages)} messages to {bootstrap_servers}")
    

Printing Query Output. Used for checking intermediate steps. Needs to print try and display a few times to get output.

In [6]:
def display_query_results(query, query_name):

    for i in range(3):
        clear_output(wait=True)
        display(f"{query.status}")
        display(spark.sql(f'SELECT * FROM {query_name}').show())
        sleep(4)

Send request for sentiment analysis 

In [7]:
def apply_sentiment_analysis(data):
    
    result = requests.post(ML_PREDICT_URL, json=json.loads(data))
    return json.dumps(result.json())

## Constants

In [8]:
# Kafka
BOOTSTRAP_SERVERS = 'kafka:9092'
KAFKA_TOPIC = 'test'

# Flask App
ML_CONTAINER_HOSTNAME="ml"
ML_CONTAINER_PORT="9000"
ML_PREDICT_URL=f'http://{ML_CONTAINER_HOSTNAME}:{ML_CONTAINER_PORT}/predict'

# Initialize PySpark SQL Session

In [10]:
spark = SparkSession \
    .builder \
    .appName('RealtimeKafkaML') \
    .getOrCreate()

Expecting Spark 3.0.1

In [11]:
# Spark 3.0.1
print(f"Spark {spark.version}")

Spark 3.0.1


# Send Messages to Kafka

Here we will create a list of text sentences and send them to a kafka topic labeled `"test"`. The kafka docker container is exposing **port 9092** under the **kafka hostname**. This is defined in the docker compose file

In [None]:
msg_count = 5 # Not sure what this is used for

In [None]:
simple_messages = [
'I love this pony',
'This restaurant is great',
'The weather is bad today',
'I will go to the beach this weekend',
'She likes to swim',
'Apple is a great company'
]

In [None]:
confluent_kafka_producer(messages=simple_messages, 
                         topic=KAFKA_TOPIC, bootstrap_servers=BOOTSTRAP_SERVERS)

# Read From Kafka to Spark

Read From Kafka Stream

In [None]:
df_raw = (
    spark
    .readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', BOOTSTRAP_SERVERS)
    .option("startingOffsets", "earliest")
    .option('subscribe', KAFKA_TOPIC)
    .load())

In [None]:
df_json = df_raw.selectExpr('CAST(value AS STRING) as json')

In [None]:
query_name = 'df_json'
query = (
    df_json 
    .writeStream
    .format("memory")
    .queryName(query_name)
    .start())

Show kafka stream to spark

In [None]:
display_query_results(query, query_name)

Explode nested json into table format where each row is text, under the column labeled `data` 

In [None]:
schema = StructType([StructField('data', StringType())])

query_name = 'exploded_json'
query = (
    df_json.select(
        from_json(df_json.json, schema)
        .alias('raw_data'))
    .select('raw_data.data')
    .writeStream
    .trigger(once=True)
    .format("memory")
    .queryName(query_name)
    .start())

In [None]:
display_query_results(query, query_name)

## REST NLP Sentiment

User Defined function to use with spark to send column data to sentiment analysis endpoint

In [None]:
vader_udf = udf(lambda data: apply_sentiment_analysis(data), StringType())

In [None]:
schema_input = StructType([StructField('data', StringType())])

schema_output = StructType(
    [StructField('neg', StringType()),
     StructField('pos', StringType()),
     StructField('neu', StringType()),
     StructField('compound', StringType())
    ])

In [None]:
query_name = 'input_output'
query = (
    df_json
    .select(
        from_json(df_json.json, schema_input)
        .alias('sentence'),
        from_json(vader_udf(df_json.json), schema_output)
        .alias('response')
    )
    .select('sentence.data', 'response.*')
    .writeStream
    .trigger(once=True)
    .format("memory")
    .queryName(query_name)
    .start()
)

Display the Prediction outputs appended to data input 

In [None]:
display_query_results(query, query_name)

# Twitter Streaming

In [5]:
import tweepy
import json

In [11]:
def load_twitter_secrets(secrets_path="/home/jovyan/.secrets/twitter.json"):    
    with open(secrets_path, 'r') as f:
        twitter_secrets = json.load(f)
    return twitter_secrets

In [12]:
twitter_secrets = load_twitter_secrets()
# TODO: Put this token in file outside of notebook.
CONSUMER_KEY=twitter_secrets.get("api_key")
CONSUMER_SECRET=twitter_secrets.get("api_secret_key")
CONSUMER_TOKEN=twitter_secrets.get("access_token")
CONSUMER_TOKEN_SECRET=twitter_secrets.get("access_token_secret")

## Auth

In [17]:
auth = tweepy.OAuthHandler(CONSUMER_KEY, CONSUMER_SECRET)
auth.set_access_token(key=CONSUMER_TOKEN, secret=CONSUMER_TOKEN_SECRET)
api = tweepy.API(auth)

## Stream Listener Class

In [25]:
TWITTER_TOPIC="python"

In [26]:
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
    
    
    def on_data(self, data):
        p = Producer({'bootstrap.servers': BOOTSTRAP_SERVERS})
        #record_key = str(uuid.uuid4())
        record_key = TWITTER_TOPIC
        record_value = json.dumps({'data': data})
        p.produce(KAFKA_TOPIC, key=record_key, value=record_value)
    
    def on_status(self, status):
        print(status.text)
        
    def on_error(self, status_code):
        print(f"ERROR: {status_code}")
        return False

  p.produce(KAFKA_TOPIC, key=record_key, value=record_value)
  p.produce(KAFKA_TOPIC, key=record_key, value=record_value)


In [27]:
myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth=api.auth, listener=myStreamListener)

  p.produce(KAFKA_TOPIC, key=record_key, value=record_value)


In [28]:
myStream.filter(track=[TWITTER_TOPIC], is_async=True)

  p.produce(KAFKA_TOPIC, key=record_key, value=record_value)


In [29]:
df_twitter_raw = (
    spark
    .readStream
    .format('kafka')
    .option('kafka.bootstrap.servers', BOOTSTRAP_SERVERS)
    .option("startingOffsets", "earliest")
    .option('subscribe', KAFKA_TOPIC)
    .load())

  p.produce(KAFKA_TOPIC, key=record_key, value=record_value)


In [31]:
query_name = 'df_twitter_raw3'
query = (
    df_twitter_raw
    .writeStream
    .format("memory")
    .queryName(query_name)
    .start())

  p.produce(KAFKA_TOPIC, key=record_key, value=record_value)


Show kafka stream to spark

In [33]:
display_query_results(query, query_name)

"{'message': 'Getting offsets from KafkaV2[Subscribe[test]]', 'isDataAvailable': False, 'isTriggerActive': True}"

+---+-----+-----+---------+------+---------+-------------+
|key|value|topic|partition|offset|timestamp|timestampType|
+---+-----+-----+---------+------+---------+-------------+
+---+-----+-----+---------+------+---------+-------------+



None

# Resources

## General Streaming

* [Real Time ML with Kafka and Spark](https://github.com/BogdanCojocar/medium-articles/tree/master/realtime_kafka)
* [For Redis for Flask caching](https://docs.docker.com/compose/gettingstarted/) 
* [Networking for flask app](https://pythonspeed.com/articles/docker-connection-refused/)

## Twitter

* http://docs.tweepy.org/en/v3.10.0/streaming_how_to.html
* https://www.bmc.com/blogs/working-streaming-twitter-data-using-kafka/
* https://towardsdatascience.com/using-kafka-to-optimize-data-flow-of-your-twitter-stream-90523d25f3e8