# Read Sub Demographic Data

In [0]:
from pyspark.sql.types import *

import pyspark.sql.functions as F

from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("my project 2")\
    .config("spark.kryoserializer.buffer.max", "512m")\
    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0')\
        .getOrCreate()
sc = spark.sparkContext


for_students_demo_path = "/mnt/ddscoursedatastorage/ro/fwm-stb-data/proj_B_demographic"
demographic_df = spark.read.parquet(for_students_demo_path)
display(demographic_df)

household_id,household_size,num_adults,num_generations,marital_status,race_code,dwelling_type,home_owner_status,length_residence,home_market_value,net_worth,gender_individual,education_highest
85,2,1,2,B,W,S,O,15,0.125,0.05,F,1
2073,1,1,2,M,H,S,O,15,0.15,0.1,F,1
2523,7,6,3,M,W,S,O,15,0.1,0.1,M,2
2717,3,2,2,S,W,S,O,11,0.125,0.2,M,3
3364,2,2,2,M,W,S,O,15,0.1,0.1,M,1
4046,4,3,3,M,W,S,O,6,0.075,0.05,F,1
4303,1,1,1,S,W,S,O,15,0.15,0.2,M,1
4559,3,2,2,S,W,S,O,12,0.175,0.2,F,2
5277,3,2,2,M,W,S,R,15,0.125,0.02,M,2
5440,1,1,1,S,W,S,O,8,0.225,0.2,F,1


# Read Static viewing data from Kafka

In [0]:

kafka_server = "kafka96224.eastus.cloudapp.azure.com:29092" 
topic='viewstatic'
OFFSETS_PER_TRIGGER = 50000
SCHEMA = "device_id STRING, event_date STRING, event_time STRING, station_num STRING, prog_code STRING, household_id LONG"

static_df = spark.read\
                  .format("kafka")\
                  .option("kafka.bootstrap.servers", kafka_server)\
                  .option("subscribe", topic)\
                  .option("startingOffsets", "earliest")\
                  .option("failOnDataLoss",False)\
                  .load()

static_view_data = static_df.select(F.from_csv(F.decode("value", "US-ASCII"), schema=SCHEMA).alias("value")).select("value.*").na.drop()
display(static_view_data)

device_id,event_date,event_time,station_num,prog_code,household_id
000000033449,20150114,0,59444,MV000963020000,1496400
000000033633,20150114,0,16374,EP018001900333,1477911
0000000792ce,20150114,0,60179,EP018001900333,1464390
00000007a06a,20150114,0,19746,SH000299490000,1491604
00000007a196,20150114,0,61812,EP002654380201,1468157
00000007a236,20150114,0,11207,EP003169780032,3250955
0000000a0707,20150114,0,79051,EP006331691288,1462990
0000000cb631,20150114,0,21247,EP005927330074,1478303
0000000dee65,20150114,0,10142,EP003118650048,1492519
0000000e5290,20150114,0,11066,EP000548690053,1479402


In [0]:
static_view_data.count()

Out[4]: 7223153

# Read Streaming viewing data from Kafka

In [0]:
topic = "viewstream" 
streaming_df = spark.readStream\
                  .format("kafka")\
                  .option("kafka.bootstrap.servers", kafka_server)\
                  .option("subscribe", topic)\
                  .option("startingOffsets", "earliest")\
                  .option("failOnDataLoss",False)\
                  .option("maxOffsetsPerTrigger", OFFSETS_PER_TRIGGER )\
                  .load()\
                  .select(F.from_csv(F.decode("value", "US-ASCII"), schema=SCHEMA).alias("value")).select("value.*")

In [0]:
# Of course that the following will change after you implement the instructions - this is just an example

count_events_per_station = streaming_df.groupBy('station_num').count()
count_events_query = count_events_per_station.writeStream\
.queryName('num_events')\
.format("memory")\
.outputMode("complete")\
.start()

In [0]:
from IPython.display import display, clear_output
import time
# Waiting for stream to initialize...
time.sleep(17)
# Starting to access the data stream
for i in range(5):
    print(f"Iteration: {i}")
    print(count_events_query.status)
    spark.sql('SELECT * FROM num_events').show()
    time.sleep(2)
    
count_events_query.stop()

Iteration: 0
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
+-----------+-----+
|station_num|count|
+-----------+-----+
+-----------+-----+

Iteration: 1
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
+-----------+-----+
|station_num|count|
+-----------+-----+
|      16576|    9|
|      10436|   16|
|      14899|   35|
|      45540|    1|
|      18726|    1|
|      12394|   37|
|      57569|    4|
|      65348|   15|
|      42975|   62|
|      11313|    4|
|      30750|   34|
|      18429|   33|
|      63391|    1|
|      10730|   32|
|      69130|   13|
|      47794|    2|
|      10801|   71|
|      16619|   14|
|      26079|    5|
|      50001|   23|
+-----------+-----+
only showing top 20 rows

Iteration: 2
{'message': 'Processing new data', 'isDataAvailable': True, 'isTriggerActive': True}
+-----------+-----+
|station_num|count|
+-----------+-----+
|      16576|    9|
|      10436|   16|
|      14899|   35|

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<command-2192792069347694>[0m in [0;36m<cell line: 12>[0;34m()[0m
[1;32m     10[0m     [0mtime[0m[0;34m.[0m[0msleep[0m[0;34m([0m[0;36m2[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[1;32m     11[0m [0;34m[0m[0m
[0;32m---> 12[0;31m [0mcount_events_query[0m[0;34m.[0m[0mstop[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/databricks/spark/python/pyspark/sql/streaming/query.py[0m in [0;36mstop[0;34m(self)[0m
[1;32m    152[0m     [0;32mdef[0m [0mstop[0m[0;34m([0m[0mself[0m[0;34m)[0m [0;34m->[0m [0;32mNone[0m[0;34m:[0m[0;34m[0m[0;34m[0m[0m
[1;32m    153[0m         [0;34m"""Stop this streaming query."""[0m[0;34m[0m[0;34m[0m[0m
[0;32m--> 154[0;31m         [0mself[0m[0;34m.[0m[0m_jsq[0m[0;34m.[0m[0mstop[0m[0;34m([0m[0;34m)[