In [9]:
from kafka import KafkaConsumer
import time
from time import sleep
import json
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
from pyspark import SparkContext
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, lit, to_date, to_timestamp, year, avg, min, max, desc

In [2]:
consumer = KafkaConsumer('my_topic')
cluster = Cluster()
session = cluster.connect()
spark = SparkSession.builder.getOrCreate()

In [3]:

query_keyspace = SimpleStatement(\
    "CREATE KEYSPACE first WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};",\
    consistency_level=ConsistencyLevel.QUORUM)

session.execute(query_keyspace)


AlreadyExists: Keyspace 'first' already exists

In [5]:
session.set_keyspace('first')
query_create = SimpleStatement(\
     """
    CREATE TABLE emp(EventId TEXT PRIMARY KEY, Type TEXT ,Severity TEXT,StartTime TEXT, 
    EndTime TEXT, TimeZone TEXT, LocationLat TEXT, LocationLng TEXT, AirportCode TEXT,
    City TEXT, County TEXT, State TEXT, ZipCode TEXT);
    """,\
    consistency_level=ConsistencyLevel.QUORUM)
session.execute(query_create)

AlreadyExists: Table 'first.emp' already exists

In [6]:
schema = ['EventId', 'Type', 'Severity', 'StartTime', 'EndTime', 'TimeZone', 'LocationLat',\
          'LocationLng', 'AirportCode', 'City', 'County', 'State', 'ZipCode']
N = 5

In [None]:
while True:
    sleep(1)
    dict_msg = consumer.poll(timeout_ms=1000, max_records=500)
    rows = next(iter(dict_msg.values()))
    table = []
    for row in rows:
        string_row = row.value.decode('UTF-8')
        tuple_row = tuple(string_row.split(", ")) 
        table.append(tuple_row)
    df = spark.createDataFrame(table,schema)
    start_time = time.time()
    df_agg = df.groupBy("County", year(df.StartTime))\
    .agg(avg("LocationLat").alias("avg_latitude"), \
        min("LocationLat").alias("min_latitude"), \
        max("LocationLat").alias("max_latitude"), \
        avg("LocationLng").alias("avg_longitude"), \
        min("LocationLng").alias("min_longitude"), \
        max("LocationLng").alias("max_longitude")) \
        .orderBy("County").show(truncate=False)
    print("Aggregating: --- %s seconds ---" % (time.time() - start_time))
    start_time = time.time()
    df_top = df.groupBy("Type").count().orderBy(desc("count")).limit(N).show(truncate=False)
    print("Top N: --- %s seconds ---" % (time.time() - start_time))
    
    query_insert = SimpleStatement(
        """
    INSERT INTO emp(EventId, Type, Severity, StartTime, EndTime, 
    TimeZone, LocationLat, LocationLng, AirportCode, City, County, State, ZipCode) 
    VALUES (%s, %s,%s, %s,%s, %s,%s, %s,%s, %s,%s, %s,%s)
    """,
    consistency_level=ConsistencyLevel.QUORUM)
    start_time = time.time()
    for item in table:
        session.execute(query_insert,
                        (item[0], item[1], item[2], item[3], item[4], item[5],
                        item[6], item[7], item[8], item[9], item[10], item[11], item[12]))
    print("Inserting: --- %s seconds ---" % (time.time() - start_time))

+--------+---------------+------------------+------------+------------+------------------+-------------+-------------+
|County  |year(StartTime)|avg_latitude      |min_latitude|max_latitude|avg_longitude     |min_longitude|max_longitude|
+--------+---------------+------------------+------------+------------+------------------+-------------+-------------+
|Muskogee|2018           |35.657699999999984|35.6577     |35.6577     |-95.36580000000002|-95.3658     |-95.3658     |
|Muskogee|2017           |35.65769999999992 |35.6577     |35.6577     |-95.36579999999968|-95.3658     |-95.3658     |
+--------+---------------+------------------+------------+------------+------------------+-------------+-------------+

Aggregating: --- 4.531867742538452 seconds ---
+-------------+-----+
|Type         |count|
+-------------+-----+
|Rain         |423  |
|Fog          |73   |
|Snow         |2    |
|Cold         |1    |
|Precipitation|1    |
+-------------+-----+

Top N: --- 3.094186305999756 seconds --

In [45]:
rows = session.execute("SELECT Type FROM emp LIMIT 10")
for user_row in rows:
    print(user_row)

Row(type='Rain')
Row(type='Rain')
Row(type='Rain')
Row(type='Rain')
Row(type='Rain')
Row(type='Rain')
Row(type='Snow')
Row(type='Rain')
Row(type='Rain')
Row(type='Rain')


In [23]:
consumer.close()

In [29]:
metrics = consumer.metrics()
print(json.dumps(metrics, indent=4, sort_keys=True))

{
    "consumer-coordinator-metrics": {
        "assigned-partitions": 1.0,
        "commit-latency-avg": 0.0,
        "commit-latency-max": -Infinity,
        "commit-rate": 0.0,
        "heartbeat-rate": 0.0,
        "heartbeat-response-time-max": -Infinity,
        "join-rate": 0.0,
        "join-time-avg": 0.0,
        "join-time-max": -Infinity,
        "last-heartbeat-seconds-ago": Infinity,
        "sync-rate": 0.0,
        "sync-time-avg": 0.0,
        "sync-time-max": -Infinity
    },
    "consumer-fetch-manager-metrics": {
        "bytes-consumed-rate": 0.0,
        "fetch-latency-avg": 0.0,
        "fetch-latency-max": -Infinity,
        "fetch-rate": 0.0,
        "fetch-size-avg": 0.0,
        "fetch-size-max": -Infinity,
        "fetch-throttle-time-avg": 0.0,
        "fetch-throttle-time-max": -Infinity,
        "records-consumed-rate": 0.0,
        "records-lag-max": -Infinity,
        "records-per-request-avg": 0.0
    },
    "consumer-metrics": {
        "connection-cl