### Dependencies

In [1]:
# Use these commands to install required dependencies if necessary.

# !pip install pandas findspark py4j seaborn numpy
# !pip install torch==1.11.0+cu113 torchvision==0.12.0+cu113 torchaudio===0.11.0+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html
# !pip install transformers
# !pip install tqdm
# !pip install ipywidgets
# !jupyter nbextension enable --py widgetsnbextension

# Use this command if the above installation of PyTorch fails.

# !pip install torch torchvision torchaudio

### Spark Set-Up

In [1]:
import findspark

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import desc
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [2]:
# Constants for FILE PATHS

SPARK_PATH = '/home/vishakan/spark-3.2.1-bin-hadoop3.2'

In [3]:
findspark.init(SPARK_PATH)
findspark.add_packages("org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1")    #Required dependency

In [4]:
spark = SparkSession.builder.appName("FYP").getOrCreate()
spark

22/05/09 19:18:40 WARN Utils: Your hostname, Legion resolves to a loopback address: 127.0.1.1; using 192.168.1.2 instead (on interface wlp7s0)
22/05/09 19:18:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/vishakan/spark-3.2.1-bin-hadoop3.2/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vishakan/.ivy2/cache
The jars for the packages stored in: /home/vishakan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bef370c5-8a48-41da-a8fe-a61a844b4070;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.kafka#kafka-clients;2.8.0 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in cen

In [5]:
# Run this only once, restart kernel if errors
sc = spark.sparkContext
sc

#### Code To Ignore Warning Messages

In [6]:
#Doesn't seem to work here properly

import warnings
warnings.filterwarnings('ignore')

warnings.filterwarnings(action='once')

In [7]:
%%javascript
(function(on) {
const e=$( "<a>Setup failed</a>" );
const ns="js_jupyter_suppress_warnings";
var cssrules=$("#"+ns);
if(!cssrules.length) cssrules = $("<style id='"+ns+"' type='text/css'>div.output_stderr { } </style>").appendTo("head");
e.click(function() {
    var s='Showing';  
    cssrules.empty()
    if(on) {
        s='Hiding';
        cssrules.append("div.output_stderr, div[data-mime-type*='.stderr'] { display:none; }");
    }
    e.text(s+' warnings (click to toggle)');
    on=!on;
}).click();
$(element).append(e);
})(true);

<IPython.core.display.Javascript object>

In [8]:
class StopExecution(Exception):
    def _render_traceback_(self):
        pass

### Imports

In [9]:
import pkg_resources

import pandas as pd
import numpy as np

from time import sleep
import json
import os

from collections import namedtuple
import sqlite3

from tqdm.notebook import tqdm

from transformers import AutoTokenizer, AutoModelForSequenceClassification

  import imp


In [10]:
pd.set_option('display.max_colwidth', None)

In [11]:
# Constants for Spark processing

TABLE_COUNT = 0
IN_MEM_TABLENAME = "StockData"
SQLITE_TABLENAME = "scored_stocks"
OFFSET = 0
TOPIC = "evs-stocks"

In [15]:
def check_offset_status():
    """To check the current topic's offset status for data ingestion thro' Kafka. """
    
    global OFFSET
    
    connection = sqlite3.connect(os.path.join(os.getcwd(), f'../Database/cache.sqlite'))
    cursor = connection.cursor()

    query = f"SELECT offsetval FROM OFFSET_FINDER WHERE topic LIKE ?"

    rows = cursor.execute(query, [TOPIC]).fetchall()

    if rows:
        OFFSET = rows[0][0]
    else:
        insert_query = f"INSERT INTO OFFSET_FINDER VALUES(?, ?)"
        cursor.execute(insert_query, (TOPIC, 0))
        connection.commit()

    print({f"Starting Offset for {TOPIC}": OFFSET})

    cursor.close()
    connection.close()

In [16]:
check_offset_status()

{'Starting Offset for evs-stocks': 0}


In [15]:
# df = spark \
#   .readStream \
#   .format("kafka") \
#   .option("kafka.bootstrap.servers", "localhost:9092") \
#   .option("subscribe", TOPIC) \
#   .option("startingOffsets", f""" {{"{TOPIC}":{{"0":{OFFSET}}}}} """) \
#   .load()

# schema_str = "Data STRING"

# df = df.selectExpr("CAST(value AS STRING)")
# df = df.select(from_csv(col("value"),schema_str).alias("Table"))
# df = df.selectExpr("Table.*")
# df.printSchema()
# #option("truncate", "false")

In [16]:
# query = df.writeStream.trigger(processingTime='5 seconds').queryName(f"{IN_MEM_TABLENAME}{TABLE_COUNT}").format('memory').outputMode("append").start()

In [17]:
spark.sql('SHOW TABLES').show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [17]:
def my_round(val):
    """To round a string based on its decimal value. """
    arr = val.split('.')
    dec = ""
    if len(arr) > 1:
        dec = arr[1][:3]
    val = f"{arr[0]}.{dec}"
    return float(val)

def calc_percent_change(open_value, close_value):
    """To calculate the percentage change of a stock value based on open and close values. """
    val = str(100*((close_value-open_value)/open_value))
    return my_round(val)

In [19]:
# sleep(10)

# tweet_dict_list = []

# value = spark.sql(f"SELECT * FROM {IN_MEM_TABLENAME}{TABLE_COUNT} LIMIT 10").collect()

# for row in value:
#     #print(row)
#     jsonCopy = json.loads(row["Data"])
#     jsonCopy['open'] = float(jsonCopy['open'])
#     jsonCopy['close'] = float(jsonCopy['close'])
#     jsonCopy['percentage'] = my_round1(jsonCopy['open'], jsonCopy['close'])
#     tweet_dict_list.append(jsonCopy)
# pdd = pd.DataFrame(tweet_dict_list)

# query.awaitTermination(1)
# pdd.head(10)

In [20]:
# rdd = sc.parallelize(tweet_dict_list)

In [21]:
# %%time
# rdd.map(lambda row: (row['stockDate'], row['ticker'] , row['percentage'])).toDF().toPandas().head(10)

In [22]:
# newrdd = rdd.map(lambda row: (row['category'], row['stockDate'], row['percentage']))
# newrdd.collect()

In [23]:
# nextrdd = newrdd.map(lambda tup: ((tup[0], tup[1]), tup[2])).reduceByKey(lambda a, b: (a+b)/2).map(lambda row: (row[0], row[1]))
# nextrdd = newrdd.map(lambda tup: ((tup[0], tup[1]), (tup[2], 1))).reduceByKey(lambda a, b: (a[0]+b[0], a[1]+b[1])).map(lambda row: (row[0], row[1][0]/row[1][1]))
# nextrdd.collect()

In [24]:
# #write to db
# connection = sqlite3.connect(os.path.join(os.getcwd(), f'../Database/results.sqlite'))
# cursor = connection.cursor()

# drop_table = f'''
#             DROP TABLE IF EXISTS {SQLITE_TABLENAME};
#             '''

# cursor.execute(drop_table)


# create_table = f'''CREATE TABLE IF NOT EXISTS {SQLITE_TABLENAME} (
#                 category TEXT,
#                 date DATE,
#                 agg_percent TEXT,
#                 CONSTRAINT uniq_stock PRIMARY KEY (category, date)
#                 );
#                 '''

# cursor.execute(create_table)

# insert_records = f'''INSERT INTO {SQLITE_TABLENAME} (category, date, agg_percent) VALUES(?, ?, ?)
#                         ON CONFLICT(category, date) DO 
#                         UPDATE SET agg_percent = (agg_percent + excluded.agg_percent)/2
#                         WHERE {SQLITE_TABLENAME}.category LIKE ? AND {SQLITE_TABLENAME}.date LIKE ? '''
    

# contents = []
# for row in nextrdd.collect():
#     contents.append((row[0][0], row[0][1], row[1], row[0][0], row[0][1]))
    
# try:
#     cursor.executemany(insert_records, contents)
#     connection.commit()

#     rows = cursor.execute(f"SELECT * FROM {SQLITE_TABLENAME}").fetchall()
#     for row in rows:
#         print(row)
# except sqlite3.Error as error:
#     print({error})
# finally:
#     cursor.close()
#     connection.close()

### Helper Methods 

In [18]:
def init_df_table():
    """To initialize a Spark DataFrame with data ingested from Kafka. """
    
    df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", TOPIC) \
      .option("startingOffsets", f""" {{"{TOPIC}":{{"0":{OFFSET}}}}} """) \
      .load()

    schema_str = "Data STRING"

    df = df.selectExpr("CAST(value AS STRING)")
    df = df.select(from_csv(col("value"),schema_str).alias("Table"))
    df = df.selectExpr("Table.*")
    df.printSchema()

    query = df.writeStream \
                        .trigger(processingTime='5 seconds') \
                        .queryName(f"{IN_MEM_TABLENAME}{TABLE_COUNT}") \
                        .format('memory') \
                        .outputMode("append") \
                        .start()
    
    spark.sql('SHOW TABLES').show()
    return query

In [19]:
def delete_spark_sql_table():
    """To delete existing SparkSQL tables from memory. """
    
    connection = sqlite3.connect(os.path.join(os.getcwd(), f'../Database/results.sqlite'))
    cursor = connection.cursor()

    drop_table = f'''
            DROP TABLE IF EXISTS {SQLITE_TABLENAME};
            '''

    cursor.execute(drop_table)


    create_table = f'''CREATE TABLE IF NOT EXISTS {SQLITE_TABLENAME} (
                category TEXT,
                date DATE,
                agg_percent TEXT,
                CONSTRAINT uniq_stock PRIMARY KEY (category, date)
                );
                '''


    cursor.execute(create_table)
    
    cursor.close()
    connection.close()

In [20]:
def write_to_db(rdd):
    """To write a SparkSQL table to permanent storage. """
    
    connection = sqlite3.connect(os.path.join(os.getcwd(), f'../Database/results.sqlite'))
    cursor = connection.cursor()
    
    insert_records = f'''INSERT INTO {SQLITE_TABLENAME} (category, date, agg_percent) VALUES(?, ?, ?)
                        ON CONFLICT(category, date) DO 
                        UPDATE SET agg_percent = (agg_percent + excluded.agg_percent)/2
                        WHERE {SQLITE_TABLENAME}.category LIKE ? AND {SQLITE_TABLENAME}.date LIKE ? '''
    

    contents = []
    for row in rdd.collect():
        contents.append((row[0][0], row[0][1], row[1], row[0][0], row[0][1]))
    
    try:
        cursor.executemany(insert_records, contents)
        connection.commit()
    
    except sqlite3.Error as error:
        print({error})
    finally:
        cursor.close()
        connection.close()

In [21]:
def update_offset_table():
    """To update the offset values in storage for subsequent data ingestion. """
    
    global OFFSET
    
    connection = sqlite3.connect(os.path.join(os.getcwd(), f'../Database/cache.sqlite'))
    cursor = connection.cursor()

    query = f"UPDATE OFFSET_FINDER SET offsetval = {OFFSET} WHERE topic LIKE ?";
    cursor.execute(query, [TOPIC]);
    connection.commit();
    
    query = f"SELECT offsetval FROM OFFSET_FINDER WHERE topic LIKE ?"
    rows = cursor.execute(query, [TOPIC]).fetchall()

    if rows:
        OFFSET = rows[0][0]
    else:
        OFFSET = -1
        
    print({f"Updated Starting Offset for {TOPIC}": OFFSET})

    cursor.close()
    connection.close()
    
    raise StopExecution

In [22]:
def consumer_call():
    """Consolidated method to handle the Spark processing of data. """
    
    LIMIT_COUNT = 1000
    global TABLE_COUNT, OFFSET
    TABLE_COUNT = 1
    #delete_spark_sql_table()
    
    while True:
        query = init_df_table()
        sleep(10)
        
        value = spark.sql(f"SELECT * FROM {IN_MEM_TABLENAME}{TABLE_COUNT}").collect()
        spark.sql(f"DROP TABLE {IN_MEM_TABLENAME}{TABLE_COUNT}")
        
        TABLE_COUNT = (TABLE_COUNT+1)
        OFFSET += len(value)
        
        total_stock_count = len(value)
        
        print({"Stocks-data collected from SELECT query": total_stock_count})
        
        if(total_stock_count == 0):
            update_offset_table()
        
        iter_count = 0
        
        while len(value):
            
            tweet_dict_list = []
            
            p_bar = tqdm(enumerate(value[:LIMIT_COUNT]))
            
            for indx, row in p_bar:
                jsonCopy = json.loads(row["Data"])
                jsonCopy['open'] = float(jsonCopy['open'])
                jsonCopy['close'] = float(jsonCopy['close'])
                jsonCopy['percentage'] = calc_percent_change(jsonCopy['open'], jsonCopy['close'])
                tweet_dict_list.append(jsonCopy)
                p_bar.set_description(f'Working on "{indx + iter_count*LIMIT_COUNT + 1}/{total_stock_count}"')
                
            print({"Number of stock records" : len(tweet_dict_list)})
            print("----------------------------------------------------------------")
            query.awaitTermination(1)

            rdd = sc.parallelize(tweet_dict_list)

            newrdd = rdd.map(lambda row: (row['category'], row['stockDate'], row['percentage']))
            newrdd.collect()
            
            nextrdd = newrdd.map(lambda tup: ((tup[0], tup[1]), tup[2])).reduceByKey(lambda a, b: (a+b)/2).map(lambda row: (row[0], row[1]))
            nextrdd.collect()
            
            write_to_db(nextrdd)

            for i in range(LIMIT_COUNT):
                if(value):
                    value.pop(0)
            
            iter_count += 1

In [23]:
consumer_call()

root
 |-- Data: string (nullable = true)



22/05/09 19:20:31 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0d62c2c6-f006-40c5-8eb3-abfd8299c2ab. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/05/09 19:20:31 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         |stockdata1|       true|
+---------+----------+-----------+



                                                                                

{'Stocks-data collected from SELECT query': 725}


  self._sock = None


0it [00:00, ?it/s]

{'Number of stock records': 725}
----------------------------------------------------------------


  self._sock = None
  self._sock = None
  self._sock = None
22/05/09 19:20:46 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7c99072a-c7bb-40d1-b990-91b3bb8ac614. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


root
 |-- Data: string (nullable = true)

+---------+----------+-----------+
|namespace| tableName|isTemporary|
+---------+----------+-----------+
|         |stockdata2|       true|
+---------+----------+-----------+



22/05/09 19:20:46 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


{'Stocks-data collected from SELECT query': 0}
{'Updated Starting Offset for evs-stocks': 725}


  self._sock = None


**NOTE**: For re-runs of the program with offset > 0,
cell 19 - 24 (cell that takes limited data from IN_MEM_TABLE, till sqlite3 db connection) - comment out fully, 
cell 25, dont call delete_spark_sql_table()
