This notebook demonstrates a data receiver using the AMQP Spark Connector provided in https://github.com/radanalyticsio/streaming-amqp

For each message received via AMQP, records in a Postgres database are updated. This is a stand-in to be replaced by JDG. 

In [None]:
import pixiedust
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext

import json
import psycopg2
from amqp import AMQPUtils

Below is:

getSale - a quick means for decoding the JSON message sent across

storeSale - Checks the database to see if an item already exists, if it doesn't, it enters quantity 1. If it does, it adds 1 to the extant quantity.

sendSale - Sends RDDs containing sales information to storeSale

In [2]:
def getSale(jsonMsg):
    data = json.loads(jsonMsg)
    return data["body"]["section"]

def storeSale(itemID):  
    conn = psycopg2.connect("""
        dbname=salesdb user=daikon password=daikon host=postgresql port=5432
        """)
    cur = conn.cursor()
    cur.execute("""
        SELECT * FROM sales
        WHERE itemid = %s;
        """,
        (itemID,))
    if(cur.fetchone()==None):
        cur.execute("""
        INSERT INTO sales(itemid, quantity)
        VALUES(%s, %s);
        """,
        (itemID, 1))
    else:
        cur.execute("""
        UPDATE sales
        SET quantity = quantity + 1
        WHERE itemid = %s;
        """,
        (itemID,))
    conn.commit()
    cur.close()
    conn.close()
    
def sendSale(rdd):    
    rdd.foreach(lambda record: storeSale(record))   

Below the spark streaming AMQP connector is created and set up to process messages, storing their quantities into Postgres as discussed above.

In [3]:
batchIntervalSeconds = 5

def makeStream():
    sc = spark.sparkContext 
    ssc = StreamingContext(sc, batchIntervalSeconds)
    
    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
    
    receiveStream = AMQPUtils.createStream(ssc, \
            "broker-amq-amqp", \
            5672, \
            "daikon", \
            "daikon", \
            "salesq")
    
    items = receiveStream.map(getSale).foreachRDD(sendSale)

    return ssc



In [4]:
spark = SparkSession.builder.appName("equoid-data-handler").config("spark.streaming.receiver.writeAheadLog.enable", "true").getOrCreate()

ssc = StreamingContext.getActiveOrCreate("/tmp/spark-streaming-amqp",makeStream)
ssc.start()
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)


False

In [None]:
ssc.stop(stopSparkContext=False)

In [None]:
if( not sc._jvm.StreamingContext.getActive().isEmpty() ): 
	sc._jvm.StreamingContext.getActive().get().stop(False)

In [None]:
!rm -rf /tmp/spark-st*