###Testing eventhub

In [0]:
from azure.eventhub import EventHubProducerClient, EventData
import json

#Event Hub Configuration
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope="", key="")
EVENT_HUB_NAME = ""

#Intialize the Event Hub Producer
producer = EventHubProducerClient.from_connection_string(EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

#Function to send events to event hub
def send_event(data):
    event_data_batch = producer.create_batch()
    event_data_batch.add(EventData(json.dumps(event)))
    producer.send_batch(event_data_batch)

#sample json event  
event = {
    "event_id": 2222,
    "event_name" : "Key Vault Test Event"
    }
#SEND THE EVENT
send_event(event)

#close the producer
producer.close()

### Sending the complete data to the event hub

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# Event Hub Configuration
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope="", key="")
EVENT_HUB_NAME = ""

# Initialize the Event Hub Producer
producer = EventHubProducerClient.from_connection_string(EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    """Sends a single event to Event Hub."""
    try:
        event_data_batch = producer.create_batch()
        event_data_batch.add(EventData(json.dumps(event)))
        producer.send_batch(event_data_batch)
    except Exception as e:
        print(f"Error sending event: {str(e)}")

# Function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        print(f"API Error: {response.status_code}, {response.text}")
        return None

# Function to fetch End-of-Day stock price data from MarketStack
def get_stock_data(base_url, api_key, symbol):
    eod_url = f"{base_url}/eod"
    params = {
        "access_key": api_key,  # MarketStack uses 'access_key'
        "symbols": symbol
    }
    response = requests.get(eod_url, params=params)
    return handle_response(response)

# Flatten and merge the financial data
def flatten_data(stock_data):
    """Extracts and formats relevant stock data."""
    if not stock_data or "data" not in stock_data:
        return []

    data_list = stock_data.get("data", [])

    flattened_data = [
        {
            "symbol": entry.get("symbol"),
            "date": entry.get("date"),
            "open": entry.get("open"),
            "high": entry.get("high"),
            "low": entry.get("low"),
            "close": entry.get("close"),
            "volume": entry.get("volume"),
            "adjusted_close": entry.get("adj_close", entry.get("close"))  # MarketStack uses 'adj_close'
        }
        for entry in data_list
    ]

    return flattened_data

# Main function to fetch and stream financial data
def fetch_financial_data():
    """Fetches stock data from MarketStack and streams it to Azure Event Hub."""
    base_url = "http://api.marketstack.com/v1"
    stock_symbol = "AAPL"  # Example stock symbol (Apple Inc.)
    finance_api_key = dbutils.secrets.get(scope="", key="")

    # Fetch stock data
    stock_data = get_stock_data(base_url, finance_api_key, stock_symbol)

    if not stock_data:
        print("No data received from API")
        return

    # Flatten the data
    formatted_data = flatten_data(stock_data)

    if not formatted_data:
        print("No valid stock data to send")
        return

    # Send data to Event Hub
    for data in formatted_data:
        send_event(data)

# Execute the data fetch
fetch_financial_data()

# Close the producer after sending all events
producer.close()


# Sending the data in Streaming Fashion

# Sending  data to Eventhub in every 30 seconds

In [0]:
import requests
import json
from azure.eventhub import EventHubProducerClient, EventData

# Event Hub Configuration
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope="", key="")
EVENT_HUB_NAME = ""

# Initialize the Event Hub Producer
producer = EventHubProducerClient.from_connection_string(EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    """Sends a single event to Event Hub."""
    try:
        event_data_batch = producer.create_batch()
        event_data_batch.add(EventData(json.dumps(event)))
        producer.send_batch(event_data_batch)
    except Exception as e:
        print(f"Error sending event: {str(e)}")

# Function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        print(f"API Error: {response.status_code}, {response.text}")
        return None

# Function to fetch End-of-Day stock price data from MarketStack
def get_stock_data(base_url, api_key, symbol):
    eod_url = f"{base_url}/eod"
    params = {
        "access_key": api_key,  # MarketStack uses 'access_key'
        "symbols": symbol
    }
    response = requests.get(eod_url, params=params)
    return handle_response(response)

# Flatten and merge the financial data
def flatten_data(stock_data):
    """Extracts and formats relevant stock data."""
    if not stock_data or "data" not in stock_data:
        return []

    data_list = stock_data.get("data", [])

    flattened_data = [
        {
            "symbol": entry.get("symbol"),
            "date": entry.get("date"),
            "open": entry.get("open"),
            "high": entry.get("high"),
            "low": entry.get("low"),
            "close": entry.get("close"),
            "volume": entry.get("volume"),
            "adjusted_close": entry.get("adj_close", entry.get("close"))  # MarketStack uses 'adj_close'
        }
        for entry in data_list
    ]

    return flattened_data

# Main function to fetch and stream financial data
def fetch_financial_data():
    """Fetches stock data from MarketStack and streams it to Azure Event Hub."""
    base_url = "http://api.marketstack.com/v1"
    stock_symbol = "AAPL"  # Example stock symbol (Apple Inc.)
    finance_api_key = dbutils.secrets.get(scope="", key="")

    # Fetch stock data
    stock_data = get_stock_data(base_url, finance_api_key, stock_symbol)

    if not stock_data:
        print("No data received from API")
        return

    # Flatten the data
    formatted_data = flatten_data(stock_data)

    if not formatted_data:
        print("No valid stock data to send")
        return

    # Send data to Event Hub
    for data in formatted_data:
        send_event(data)

# Execute the data fetch
fetch_financial_data()

# Close the producer after sending all events
producer.close()


###3days update

In [0]:
import requests
import json
import time
from datetime import datetime, timedelta
from azure.eventhub import EventHubProducerClient, EventData

# Event Hub Configuration
EVENT_HUB_CONNECTION_STRING = dbutils.secrets.get(scope="", key="")
EVENT_HUB_NAME = ""

# Initialize the Event Hub Producer
producer = EventHubProducerClient.from_connection_string(EVENT_HUB_CONNECTION_STRING, eventhub_name=EVENT_HUB_NAME)

def send_event(event):
    """Sends a single event to Event Hub."""
    try:
        event_data_batch = producer.create_batch()
        event_data_batch.add(EventData(json.dumps(event)))
        producer.send_batch(event_data_batch)
    except Exception as e:
        print(f"Error sending event: {str(e)}")

# Function to handle API response
def handle_response(response):
    if response.status_code == 200:
        return response.json()
    else:
        print(f"API Error: {response.status_code}, {response.text}")
        return None

# Function to fetch End-of-Day stock price data from MarketStack
def get_stock_data(base_url, api_key, symbol):
    eod_url = f"{base_url}/eod"
    params = {
        "access_key": api_key,  # MarketStack uses 'access_key'
        "symbols": symbol
    }
    response = requests.get(eod_url, params=params)
    return handle_response(response)

# Flatten and merge the financial data
def flatten_data(stock_data):
    """Extracts and formats relevant stock data."""
    if not stock_data or "data" not in stock_data:
        return []

    data_list = stock_data.get("data", [])

    flattened_data = [
        {
            "symbol": entry.get("symbol"),
            "date": entry.get("date"),
            "open": entry.get("open"),
            "high": entry.get("high"),
            "low": entry.get("low"),
            "close": entry.get("close"),
            "volume": entry.get("volume"),
            "adjusted_close": entry.get("adj_close", entry.get("close"))  # MarketStack uses 'adj_close'
        }
        for entry in data_list
    ]

    return flattened_data

# Main function to fetch and stream financial data
def fetch_financial_data():
    """Fetches stock data from MarketStack and streams it to Azure Event Hub every 30 seconds."""
    base_url = "http://api.marketstack.com/v1"
    stock_symbol = "AAPL"  # Example stock symbol (Apple Inc.)
    finance_api_key = dbutils.secrets.get(scope="", key="")
    
    while True:
        # Fetch stock data
        stock_data = get_stock_data(base_url, finance_api_key, stock_symbol)

        if not stock_data:
            print("No data received from API")
        else:
            # Flatten the data
            formatted_data = flatten_data(stock_data)

            if not formatted_data:
                print("No valid stock data to send")
            else:
                # Send data to Event Hub
                for data in formatted_data:
                    send_event(data)
                print(f"Data sent successfully at {datetime.now()}")
        
        # Wait for 30 seconds before the next update
        time.sleep(30)

# Execute the data fetch
fetch_financial_data()

# Close the producer after sending all events
producer.close()