# <font color=blue> Real Time Stock Ticker Streaming - Kafka Producer </font>


Producing the streaming data, where you can use csv modules to read and publish the data to the Kafka stream.

### Flight Data Overview:
The flight-delays and cancellation data was collected and published by the U.S. Department of Transportation’s (DOT) Bureau of Transportation Statistics. This data records the flights operated by large air carriers and tracks the on-time performance of domestic flights. This data summarises various flight information such as the number of on-time, delayed, cancelled, and diverted flights published in DOT's monthly in 2015.

In [2]:
# import required libraries
import pandas as pd

from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import csv
from datetime import timezone
import datetime

In [3]:
import yfinance as yf
from pandas_datareader import data as pdr
from datetime import datetime
yf.pdr_override()

In [8]:
stock = 'AMAT'
def GetHistoricalData(numberOfMonths = 2):
    print(numberOfMonths)
    end = datetime.now()
    start = datetime(end.year, end.month - numberOfMonths, end.day , end.hour, end.minute,end.second)
    print(start,end)
    df_Historical = yf.download(stock, start, end)
    df_Historical = df_Historical.reset_index()
    df_Historical['Label'] = stock
    df_Historical['Date'] = pd.to_datetime(df_Historical['Date'])
    return(df_Historical)

In [9]:
df_Historical = GetHistoricalData()


2
2023-02-13 11:26:50 2023-04-13 11:26:50.121364
[*********************100%***********************]  1 of 1 completed


In [10]:
df_Historical.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Label
0,2023-02-13,115.169998,116.720001,114.550003,116.529999,116.257736,4198200,AMAT
1,2023-02-14,114.870003,119.010002,113.849998,118.440002,118.163277,4870700,AMAT
2,2023-02-15,117.0,119.580002,115.989998,119.459999,119.180885,4980000,AMAT
3,2023-02-16,116.879997,118.080002,115.139999,115.389999,115.120399,9117600,AMAT
4,2023-02-17,116.779999,117.400002,113.379997,115.440002,115.17028,9287100,AMAT


In [11]:
df_Historical.tail()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Label
36,2023-04-05,119.589996,119.690002,115.650002,117.32,117.32,6691700,AMAT
37,2023-04-06,114.18,115.290001,111.870003,114.449997,114.449997,6275000,AMAT
38,2023-04-10,112.470001,116.709999,112.449997,116.5,116.5,4096500,AMAT
39,2023-04-11,117.330002,117.830002,116.080002,116.379997,116.379997,3373000,AMAT
40,2023-04-12,117.730003,117.800003,112.870003,113.160004,113.160004,5837200,AMAT


In [12]:
def GetLiveData():
    ticker = yf.Ticker(stock).info
    market_price = ticker['regularMarketPrice']
    market_open = ticker['regularMarketOpen']
    market_High = ticker['regularMarketDayHigh']
    market_Low = ticker['regularMarketDayLow']
    market_volume = ticker['regularMarketVolume']
    market_symbol = ticker['symbol']
    #print('Ticker: AMAT')
    #print('Market Price:', market_price)
    #print('Previous Close Price:', previous_close_price)
    df_live = {'Date':datetime.now(),'Open': market_open , 'High':market_High,'Low':market_Low,'Close':market_price,'Adj Close':market_price,'Volume':market_volume,'Label':market_symbol}
    return(df_live)

In [13]:
df_live = GetLiveData()

In [14]:
df_all = df_Historical.append(df_live, ignore_index = True)


  df_all = df_Historical.append(df_live, ignore_index = True)


In [15]:
df_all.head()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Label
0,2023-02-13,115.169998,116.720001,114.550003,116.529999,116.257736,4198200,AMAT
1,2023-02-14,114.870003,119.010002,113.849998,118.440002,118.163277,4870700,AMAT
2,2023-02-15,117.0,119.580002,115.989998,119.459999,119.180885,4980000,AMAT
3,2023-02-16,116.879997,118.080002,115.139999,115.389999,115.120399,9117600,AMAT
4,2023-02-17,116.779999,117.400002,113.379997,115.440002,115.17028,9287100,AMAT


In [16]:
df_all.tail()

Unnamed: 0,Date,Open,High,Low,Close,Adj Close,Volume,Label
37,2023-04-06 00:00:00.000000,114.18,115.290001,111.870003,114.449997,114.449997,6275000,AMAT
38,2023-04-10 00:00:00.000000,112.470001,116.709999,112.449997,116.5,116.5,4096500,AMAT
39,2023-04-11 00:00:00.000000,117.330002,117.830002,116.080002,116.379997,116.379997,3373000,AMAT
40,2023-04-12 00:00:00.000000,117.730003,117.800003,112.870003,113.160004,113.160004,5837200,AMAT
41,2023-04-13 11:27:06.787608,113.53,113.88,111.515,113.24,113.24,2015788,AMAT


In [18]:
df_all['Date'] =df_all['Date'].astype(str)

In [20]:
all_data = df_all.to_dict(orient='records')
print(all_data[0])

{'Date': '2023-02-13 00:00:00.000000', 'Open': 115.16999816894531, 'High': 116.72000122070312, 'Low': 114.55000305175781, 'Close': 116.52999877929688, 'Adj Close': 116.25773620605469, 'Volume': 4198200, 'Label': 'AMAT'}


In [21]:
len(all_data)

42

## Step 3

#### Create publisher and producer function

Functions are taken from the Week 9 Lab material provided.

In [22]:
# function that publishes the message 
def publish_message(producer_instance, topic_name, data):
    print(data)
    try:
        producer_instance.send(topic_name, data)
        
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))

        
# function that connects the kafka producer        
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['192.168.86.48:9092'],
                                  value_serializer=lambda x: dumps(x).encode('ascii'),
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
 

In [23]:
producer = connect_kafka_producer()

## Step 4

#### Send desired data batches 

Using the above created functions we create the desired flight data batches and send them so the consumers can receive the data.

In [25]:
if __name__ == '__main__':
   
    
    ## SET TOPIC AND DATA TO BE SENT
    
    topic = 'stock_ticker'
    
    #all_data = df.to_dict(orient='records')
    
    ## SET THE PRODUCERS
    
    #producer = connect_kafka_producer()
    
    
    
    ## GET DATA AND META DATA FOR EACH KEY
    
    
    iteration_counter = 0
    

    # start the data publishing process
    print('Publishing records..')
    
    # set a continous loop to produce and publish data
    df_Historical = GetHistoricalData()
    while True: 
            df_live = GetLiveData()
            df_all = df_Historical.append(df_live, ignore_index = True)
            df_all['Date'] =df_all['Date'].astype(str)
            all_data = df_all.to_dict(orient='records')
            publish_message(producer, topic, all_data)
                
         
            
            if iteration_counter > 100:
                break
                
            iteration_counter += 1   
            
            # send producer to sleep 
            sleep(120)


Publishing records..
[{'Date': '2023-02-13 00:00:00.000000', 'Open': 115.16999816894531, 'High': 116.72000122070312, 'Low': 114.55000305175781, 'Close': 116.52999877929688, 'Adj Close': 116.25773620605469, 'Volume': 4198200, 'Label': 'AMAT'}, {'Date': '2023-02-14 00:00:00.000000', 'Open': 114.87000274658203, 'High': 119.01000213623047, 'Low': 113.8499984741211, 'Close': 118.44000244140625, 'Adj Close': 118.16327667236328, 'Volume': 4870700, 'Label': 'AMAT'}, {'Date': '2023-02-15 00:00:00.000000', 'Open': 117.0, 'High': 119.58000183105469, 'Low': 115.98999786376953, 'Close': 119.45999908447266, 'Adj Close': 119.1808853149414, 'Volume': 4980000, 'Label': 'AMAT'}, {'Date': '2023-02-16 00:00:00.000000', 'Open': 116.87999725341797, 'High': 118.08000183105469, 'Low': 115.13999938964844, 'Close': 115.38999938964844, 'Adj Close': 115.12039947509766, 'Volume': 9117600, 'Label': 'AMAT'}, {'Date': '2023-02-17 00:00:00.000000', 'Open': 116.77999877929688, 'High': 117.4000015258789, 'Low': 113.3799

[{'Date': '2023-02-13 00:00:00.000000', 'Open': 115.16999816894531, 'High': 116.72000122070312, 'Low': 114.55000305175781, 'Close': 116.52999877929688, 'Adj Close': 116.25773620605469, 'Volume': 4198200, 'Label': 'AMAT'}, {'Date': '2023-02-14 00:00:00.000000', 'Open': 114.87000274658203, 'High': 119.01000213623047, 'Low': 113.8499984741211, 'Close': 118.44000244140625, 'Adj Close': 118.16327667236328, 'Volume': 4870700, 'Label': 'AMAT'}, {'Date': '2023-02-15 00:00:00.000000', 'Open': 117.0, 'High': 119.58000183105469, 'Low': 115.98999786376953, 'Close': 119.45999908447266, 'Adj Close': 119.1808853149414, 'Volume': 4980000, 'Label': 'AMAT'}, {'Date': '2023-02-16 00:00:00.000000', 'Open': 116.87999725341797, 'High': 118.08000183105469, 'Low': 115.13999938964844, 'Close': 115.38999938964844, 'Adj Close': 115.12039947509766, 'Volume': 9117600, 'Label': 'AMAT'}, {'Date': '2023-02-17 00:00:00.000000', 'Open': 116.77999877929688, 'High': 117.4000015258789, 'Low': 113.37999725341797, 'Close': 

[{'Date': '2023-02-13 00:00:00.000000', 'Open': 115.16999816894531, 'High': 116.72000122070312, 'Low': 114.55000305175781, 'Close': 116.52999877929688, 'Adj Close': 116.25773620605469, 'Volume': 4198200, 'Label': 'AMAT'}, {'Date': '2023-02-14 00:00:00.000000', 'Open': 114.87000274658203, 'High': 119.01000213623047, 'Low': 113.8499984741211, 'Close': 118.44000244140625, 'Adj Close': 118.16327667236328, 'Volume': 4870700, 'Label': 'AMAT'}, {'Date': '2023-02-15 00:00:00.000000', 'Open': 117.0, 'High': 119.58000183105469, 'Low': 115.98999786376953, 'Close': 119.45999908447266, 'Adj Close': 119.1808853149414, 'Volume': 4980000, 'Label': 'AMAT'}, {'Date': '2023-02-16 00:00:00.000000', 'Open': 116.87999725341797, 'High': 118.08000183105469, 'Low': 115.13999938964844, 'Close': 115.38999938964844, 'Adj Close': 115.12039947509766, 'Volume': 9117600, 'Label': 'AMAT'}, {'Date': '2023-02-17 00:00:00.000000', 'Open': 116.77999877929688, 'High': 117.4000015258789, 'Low': 113.37999725341797, 'Close': 

[{'Date': '2023-02-13 00:00:00.000000', 'Open': 115.16999816894531, 'High': 116.72000122070312, 'Low': 114.55000305175781, 'Close': 116.52999877929688, 'Adj Close': 116.25773620605469, 'Volume': 4198200, 'Label': 'AMAT'}, {'Date': '2023-02-14 00:00:00.000000', 'Open': 114.87000274658203, 'High': 119.01000213623047, 'Low': 113.8499984741211, 'Close': 118.44000244140625, 'Adj Close': 118.16327667236328, 'Volume': 4870700, 'Label': 'AMAT'}, {'Date': '2023-02-15 00:00:00.000000', 'Open': 117.0, 'High': 119.58000183105469, 'Low': 115.98999786376953, 'Close': 119.45999908447266, 'Adj Close': 119.1808853149414, 'Volume': 4980000, 'Label': 'AMAT'}, {'Date': '2023-02-16 00:00:00.000000', 'Open': 116.87999725341797, 'High': 118.08000183105469, 'Low': 115.13999938964844, 'Close': 115.38999938964844, 'Adj Close': 115.12039947509766, 'Volume': 9117600, 'Label': 'AMAT'}, {'Date': '2023-02-17 00:00:00.000000', 'Open': 116.77999877929688, 'High': 117.4000015258789, 'Low': 113.37999725341797, 'Close': 

KeyboardInterrupt: 