<a href="https://colab.research.google.com/github/irabufan/BZFabricPub/blob/main/yfinance_kafka_stream.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install confluent_kafka



In [None]:
import time
from confluent_kafka import Producer
import json
import yfinance as yf
from google.colab import userdata


def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

# Get the Kafka broker address from the environment variable
kafka_broker = userdata.get('kafka_broker')
username = userdata.get('sasl_username')
password = userdata.get('sasl_password')

p = Producer({
    'bootstrap.servers': kafka_broker,
    'sasl.mechanisms': 'PLAIN',
    'security.protocol': 'SASL_SSL',
    'sasl.username': username,
    'sasl.password': password
})

# Define the symbols and the Kafka topic
symbols = ["EURUSD=X","USDJPY=X","USDHKD=X","JPYEUR=X"]
topic = userdata.get('kafka_topic')

# Initialize an empty DataFrame to store the previous data
prev_data = None
# Initialize a count variable to set how many times to execute
rcount = 0

while rcount < 30:
    # Download the latest market data
    data = yf.download(symbols, period="1d", interval="1m")

    # Stack and sort the data, and remove rows with missing 'Open' prices
    out = data.stack().reset_index(level=1, names=["", "Ticker"]).sort_values(by="Ticker").reset_index(level=0)
    out = out.dropna(subset=['Open']).sort_values(['Datetime'], ascending=[False], key=lambda s: s.astype("string") if s.name in ['Datetime'] else s).rename(columns={'Adj Close': 'AdjClose'})
    # only select top 5 rows for each ticker for performance
    out = out.groupby("Ticker").head(5)

    # Compare the new results with the prior download
    if prev_data is not None:
        # Get the latest timestamp in the previous data
        latest_timestamp = prev_data['Datetime'].max()

        # Keep only the new data that has a later timestamp
        new_data = out[out['Datetime'] > latest_timestamp]

        if not new_data.empty:
            # Convert the new results to JSON
            out_json = new_data.to_json(orient='records', date_format="iso")

            # Send the JSON message to the Kafka topic
            p.produce(topic, out_json)
            p.flush()

    else:
        # On initial run, just send the latest record available per Ticker
        new_data = out.groupby("Ticker").head(1)

        if not new_data.empty:
            # Convert the new results to JSON
            out_json = new_data.to_json(orient='records', date_format="iso")

            # Send the JSON message to the Kafka topic
            p.produce(topic, out_json)
            p.flush()

    # Update the previous data
    prev_data = out.copy()

    rcount +=1
    print(rcount)

    # Wait for a while before the next iteration
    time.sleep(60)


[*********************100%%**********************]  4 of 4 completed


1


[*********************100%%**********************]  4 of 4 completed


2


[*********************100%%**********************]  4 of 4 completed


3


[*********************100%%**********************]  4 of 4 completed


4


[*********************100%%**********************]  4 of 4 completed


5


[*********************100%%**********************]  4 of 4 completed


6


[*********************100%%**********************]  4 of 4 completed


7


[*********************100%%**********************]  4 of 4 completed


8


[*********************100%%**********************]  4 of 4 completed


9


[*********************100%%**********************]  4 of 4 completed


10


[*********************100%%**********************]  4 of 4 completed


11


[*********************100%%**********************]  4 of 4 completed


12


[*********************100%%**********************]  4 of 4 completed


13
