In [1]:
import findspark
findspark.init()
from pyspark.sql.functions import *
from pyspark.sql import SparkSession

import datetime
import time
kafka_topic_name = "TICK_DATA"
kafka_bootstrap_servers = 'localhost:9092'


In [2]:
spark = SparkSession \
    .builder \
    .appName("Structured Streaming ") \
    .master("local[*]") \
    .getOrCreate()


In [3]:
df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
    .option("subscribe", kafka_topic_name) \
    .option("startingOffsets", "latest") \
    .load().selectExpr("CAST(value AS STRING)", "timestamp")


In [4]:
stock_data_schema = "timestamp STRING, instrumnet STRING, high Float, low Float, ltp Float"
df2 = df \
    .select(from_csv(col("value"), stock_data_schema)
            .alias("stock"))
df2 = df2.select("stock.*")
df2 = (df2.withColumn('timestamp',regexp_replace('timestamp',"'",""))
          .withColumn('instrumnet',regexp_replace('instrumnet',"'",""))
          .withColumn('timestamp',to_timestamp(col('timestamp'))))

# df2.writeStream.option('checkpointLocation',checkpoint_path).toTable('stock')
# spark.readStream.table('stock').writeStream.option('checkpointLocation',checkpoint_path).format('parquet').toTable('stockData')

df2.createOrReplaceTempView('stock_data')
all_data = spark.sql("SELECT * FROM stock_data")
stock_data_write_stream = all_data.writeStream \
    .trigger(processingTime='1 seconds') \
    .outputMode("append") \
    .option("truncate", "false") \
    .format("memory") \
    .queryName("stockData") \
    .start()


In [7]:
spark.sql('select * from stockData').show()

+-------------------+-----------+-------+--------+--------+
|          timestamp| instrumnet|   high|     low|     ltp|
+-------------------+-----------+-------+--------+--------+
|2022-02-15 12:32:53| Nifty Bank|37341.2|36651.85|37162.65|
|2022-02-15 12:32:54| Nifty Bank|37341.2|36651.85| 37154.0|
|2022-02-15 12:32:55| Nifty Bank|37341.2|36651.85|37154.05|
|2022-02-15 12:32:56| Nifty Bank|37341.2|36651.85| 37156.5|
|2022-02-15 12:32:57| Nifty Bank|37341.2|36651.85|37154.45|
|2022-02-15 12:32:58| Nifty Bank|37341.2|36651.85| 37157.8|
|2022-02-15 12:32:59| Nifty Bank|37341.2|36651.85|37157.35|
|2022-02-15 12:33:00| Nifty Bank|37341.2|36651.85|37160.75|
|2022-02-15 12:33:01| Nifty Bank|37341.2|36651.85|37160.15|
|2022-02-15 12:33:02| Nifty Bank|37341.2|36651.85|37159.45|
|2022-02-15 12:33:03| Nifty Bank|37341.2|36651.85| 37157.0|
|2022-02-15 12:33:04| Nifty Bank|37341.2|36651.85|37156.35|
|2022-02-15 12:33:05| Nifty Bank|37341.2|36651.85|37158.25|
|2022-02-15 12:33:06| Nifty Bank|37341.2

In [8]:
initial_time = spark.sql('select * from stockData').take(1)[0].timestamp.replace(second=0)

In [9]:
initial_time

datetime.datetime(2022, 2, 15, 12, 32)

In [10]:
spark.sql("select * from stockData").show()

+-------------------+-----------+-------+--------+--------+
|          timestamp| instrumnet|   high|     low|     ltp|
+-------------------+-----------+-------+--------+--------+
|2022-02-15 12:32:53| Nifty Bank|37341.2|36651.85|37162.65|
|2022-02-15 12:32:54| Nifty Bank|37341.2|36651.85| 37154.0|
|2022-02-15 12:32:55| Nifty Bank|37341.2|36651.85|37154.05|
|2022-02-15 12:32:56| Nifty Bank|37341.2|36651.85| 37156.5|
|2022-02-15 12:32:57| Nifty Bank|37341.2|36651.85|37154.45|
|2022-02-15 12:32:58| Nifty Bank|37341.2|36651.85| 37157.8|
|2022-02-15 12:32:59| Nifty Bank|37341.2|36651.85|37157.35|
|2022-02-15 12:33:00| Nifty Bank|37341.2|36651.85|37160.75|
|2022-02-15 12:33:01| Nifty Bank|37341.2|36651.85|37160.15|
|2022-02-15 12:33:02| Nifty Bank|37341.2|36651.85|37159.45|
|2022-02-15 12:33:03| Nifty Bank|37341.2|36651.85| 37157.0|
|2022-02-15 12:33:04| Nifty Bank|37341.2|36651.85|37156.35|
|2022-02-15 12:33:05| Nifty Bank|37341.2|36651.85|37158.25|
|2022-02-15 12:33:06| Nifty Bank|37341.2

In [11]:
KAFKA_OHLC_TOPIC = "OHLC_DATA"
from kafka import KafkaProducer
kafka_producer_obj = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda x: x.encode("utf-8"),
    )

In [None]:
while True:
    latest_time = initial_time + datetime.timedelta(seconds=61)
    temp_data = spark.sql(f"select * from stockData where timestamp>='{latest_time}' limit 1")
    if(len(temp_data.collect())):
        req_data = spark.sql(f"select * from stockData where timestamp >='{initial_time}' and timestamp < '{initial_time + datetime.timedelta(seconds=60)}'")
        ohlc = req_data.toPandas().set_index('timestamp')['ltp'].resample('1min').ohlc()
        d = list(ohlc.reset_index().values[0])
        d[0] = d[0].strftime('%Y-%m-%d %H:%M:%S')
        ohlc_data = ','.join([str(i) for i in d])
        print(ohlc_data)
        kafka_producer_obj.send(KAFKA_OHLC_TOPIC, ohlc_data)
        initial_time +=datetime.timedelta(minutes=1)
    else:
        continue

2022-02-15 12:32:00,37162.6484375,37162.6484375,37154.0,37157.3515625
2022-02-15 12:33:00,37160.75,37197.0,37156.3515625,37192.8984375
2022-02-15 12:34:00,37187.6015625,37211.0,37184.1015625,37208.80078125
2022-02-15 12:35:00,37208.94921875,37261.75,37202.6484375,37259.6484375
2022-02-15 12:36:00,37258.30078125,37267.8515625,37254.19921875,37263.3515625
2022-02-15 12:37:00,37266.5,37289.5,37264.1484375,37279.05078125
2022-02-15 12:38:00,37277.8515625,37280.1015625,37239.5,37265.05078125
2022-02-15 12:39:00,37270.25,37276.55078125,37249.69921875,37266.44921875
2022-02-15 12:40:00,37262.80078125,37269.0,37250.1015625,37262.30078125
2022-02-15 12:41:00,37263.6015625,37269.1015625,37246.94921875,37248.69921875
2022-02-15 12:42:00,37248.1484375,37257.6015625,37230.75,37248.0
2022-02-15 12:43:00,37254.05078125,37254.1015625,37236.1484375,37236.1484375
2022-02-15 12:44:00,37238.8984375,37258.3984375,37238.3984375,37253.05078125
2022-02-15 12:45:00,37257.0,37268.55078125,37249.3515625,37249.35

2022-02-15 14:24:00,37994.6484375,37995.3515625,37934.0,37944.3984375
2022-02-15 14:25:00,37942.0,37973.19921875,37927.8984375,37973.19921875
2022-02-15 14:26:00,37980.19921875,37985.5,37911.69921875,37922.6484375
2022-02-15 14:27:00,37916.69921875,37950.8515625,37916.69921875,37945.6015625
2022-02-15 14:28:00,37946.94921875,37984.3515625,37946.94921875,37977.80078125
2022-02-15 14:29:00,37977.05078125,37986.6484375,37965.30078125,37985.69921875
2022-02-15 14:30:00,37988.6484375,37997.25,37979.6015625,37996.80078125
2022-02-15 14:31:00,37994.3515625,38029.8984375,37994.3515625,38018.94921875
2022-02-15 14:32:00,38016.55078125,38022.3984375,37965.25,37982.1015625
2022-02-15 14:33:00,37981.05078125,38009.3515625,37981.05078125,37998.05078125
2022-02-15 14:34:00,37996.1484375,38005.05078125,37970.5,37983.3984375
2022-02-15 14:35:00,37981.94921875,38020.75,37981.94921875,37987.69921875
2022-02-15 14:36:00,37989.80078125,37999.8984375,37946.3515625,37969.44921875
2022-02-15 14:37:00,37970.3