# 📈 Real-Time Stock Price ELT & Analytics Pipeline with Kafka, Snowflake & Streamlit

This project illustrates how to stream real-time stock price data from Yahoo Finance into Snowflake using Kafka and visualize it using a Snowflake-hosted Streamlit app.

---

## 🚀 Overview

- **Setup**: Setup Kafka and Snowflake Kafka Connector for real-time data streaming.
- **Ingest**: Publish live stock data from Yahoo Finance to a Kafka topic.
- **Load**: Stream the data into a Snowflake table using the Kafka Connector, and Snowflake Streams and Tasks.
- **Transform**: Real-time or near-real-time analytics on stock prices in Snowflake.
- **Visualize**: Deliver live metrics in a Streamlit dashboard.

---


## Setup Kafka and Snowflake Kafka Connector for real-time data streaming


__1. Create Snowflake database, schema, table objects, new kafka role and kafka connect user, setup grant and privileges in a Snowflake worksheet.__

```sql
USE ROLE SYSADMIN;

CREATE OR REPLACE DATABASE KAFKA_STREAMING;

CREATE OR REPLACE SCHEMA YAHOO_FINANCE;

-- Create target table
CREATE OR REPLACE TABLE KAFKA_STREAMING.YAHOO_FINANCE.STOCK_PRICES (
  symbol STRING,
  price FLOAT,
  currency STRING,
  time STRING
);

-- Create and grant a custom kafka role

USE ROLE ACCOUNTADMIN;

CREATE ROLE kafka_role;

-- Grant required permissions
GRANT ROLE KAFKA_ROLE TO ROLE SYSADMIN;


GRANT USAGE ON DATABASE KAFKA_STREAMING TO ROLE kafka_role;
GRANT USAGE ON SCHEMA KAFKA_STREAMING.YAHOO_FINANCE TO ROLE kafka_role;
GRANT INSERT ON TABLE KAFKA_STREAMING.YAHOO_FINANCE.STOCK_PRICES TO ROLE kafka_role;

GRANT OWNERSHIP ON DATABASE KAFKA_STREAMING TO ROLE kafka_role REVOKE CURRENT GRANTS;
GRANT OWNERSHIP ON SCHEMA KAFKA_STREAMING.YAHOO_FINANCE TO ROLE kafka_role REVOKE CURRENT GRANTS;
GRANT OWNERSHIP ON TABLE KAFKA_STREAMING.YAHOO_FINANCE.STOCK_PRICES TO ROLE kafka_role REVOKE CURRENT GRANTS;

-- Create kafka connector user
CREATE USER kafka_connector_user
  PASSWORD = '****'
  DEFAULT_ROLE = kafka_role
  MUST_CHANGE_PASSWORD = FALSE;

-- Assign role to user
GRANT ROLE kafka_role TO USER kafka_connector_user;

SHOW USERS IN ACCOUNT;
```

__2. Download kafka to local machine from https://kafka.apache.org/downloads.__

__3. Start `zookeeper` in new terminal.__

```bash
cd kafka/bin
./zookeeper-server-start.sh ../config/zookeeper.properties
```

__4. Start `kafka server` in new terminal.__

```bash
cd kafka/bin
./kafka-server-start.sh ../config/server.properties
```

__5. Create `kafka topic` in new terminal.__
```bash
cd kafka/bin
./kafka-topics.sh --create --topic yahoo-finance-topic --zookeeper localhost:2181 --partitions 2 --replication-factor 1
```







__6. Setup `Private Key Authentication` for Snowflake Kafka Connector__

- Generate `private key` in new terminal.

```bash
openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out snowflake-kafka-connector-private-rsa-key.p8 –nocrypt
```

Remove -nocrypt option to obtain an encrypted private key. Provide the encryption password.

- Generate `public key`.

```bash
openssl rsa -in snowflake-kafka-connector-private-rsa-key.p8 -pubout -out snowflake-kafka-connector-public-rsa-key.pub
```
Save the keys for later use.

__7. Add the public key to the `kafka_connect_user`.__ 

In the SQL worksheet from step 1:

```sql
ALTER USER kafka_connector_user SET RSA_PUBLIC_KEY='MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB.....';
```

__8. Setup Snowflake Kafka Connector__

- Download `Snowflake Kafka Connector` jar file from [Maven Repository](https://mvnrepository.com/artifact/com.snowflake/snowflake-kafka-connector/3.1.0?_fsi=WP9Bbe6o&_fsi=WP9Bbe6o) and copy the jar file into kafka/libs/ folder.

- Start `Kafka Connector` on local machine. In new terminal

```bash
cd kafka/bin
./connect-distributed.sh ../config/connect-distributed.properties
```

- Create Kafka Connect `Sink Connector Configuration` file (`snowflake-kafka-connector-config.json`)

```json
{
  "name": "snowflake-kafka-yahoo-finance-connector",
  "config": {
    "connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
    "tasks.max": "1",
    "topics": "yahoo-finance-topic",

    "snowflake.url.name": "ACCOUNT-IDENTIFIER.snowflakecomputing.com:443", -- replace ACCOUNT-IDENTIFIER
    "snowflake.user.name": "kafka_connector_user",
    "snowflake.private.key": "MIIEuwIBADANBgkqhkiG9w0BAQEFAASCBKUwggShA.....",
    "snowflake.database.name": "KAFKA_STREAMING",
    "snowflake.schema.name": "YAHOO_FINANCE",
    "snowflake.table.name": "stock_prices",
    "snowflake.role.name": "kafka_role",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "buffer.count.records": "1000",
    "buffer.flush.time": "10",
    "buffer.size.bytes": "5000000",

    "behavior.on.null.values": "IGNORE"
  }
}
```

__9. Deploy configuration and create the Kafka Connector via REST API.__

In new terminal:

```bash
curl -X POST -H "Content-Type: application/json" --data @snowflake-kafka-connector-config.json http://localhost:8083/connectors
```

## Publish live stock data from Yahoo Finance to yahoo-finance-topic Kafka topic


__Create `kafka producer` to fetch live stock prices and publish them to Kafka.__

__1. Install required Python packages.__

```bash
pip install yfinance confluent_kafka
```

__2. Create Python script `kafka-producer.py` to fetch data and publish.__

```python
import yfinance as yf 
from confluent_kafka import Producer 
import json
import time

# Kafka config
producer = Producer({'bootstrap.servers': 'localhost:9092'})
topic = 'yahoo-finance-topic'

# List of stock symbols to track
symbols = ['SNOW', 'AMZN', 'GOOGL', 'MSFT']

def acked(err, msg):
    if err is not None:
        print(f"Failed to deliver message: {err}")
    else:
        print(f"Published to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}")

while True:
    for symbol in symbols:
        stock = yf.Ticker(symbol)
        data = stock.info  # full info
        
        message = {
            'symbol': symbol,
            'price': data.get('regularMarketPrice'),
            'currency': data.get('currency'),
            'time': time.strftime('%Y-%m-%d %H:%M:%S'),
        }

        producer.produce(topic, value=json.dumps(message), key=symbol, callback=acked)
    
    producer.flush()
    time.sleep(30)  # Fetch every 30 seconds
```

__3. Run kafka-producer.py script.__

```bash
python kafka-producer.py
```

## Stream the data into a Snowflake table using the Kafka Connector


Once the Kafka Producer is publishing Yahoo Finance data to the Kafka topic and the Snowflake Kafka Connector is running, Snowflake will automatically ingest data.

__By default, the Snowflake Kafka Connector does not write directly to the specified target table `stock_prices` unless the Kafka message schema exactly matches the table schema.__

__If the schema-matching mode is not enforced, Kafka auto-generates, by default__, in the KAFKA_STREAMING.YAHOO_FINANCE schema:

- __A `staging table`__: 
```sql
CREATE OR REPLACE TABLE KAFKA_STREAMING.YAHOO_FINANCE.YAHOO_FINANCE_TOPIC_1140052305 (
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT)
```

- __An `internal stage` with client-side encryption and directory disabled__: 
```sql
CREATE OR REPLACE STAGE SNOWFLAKE_KAFKA_CONNECTOR_SNOWFLAKE_KAFKA_YAHOO_FINANCE_CONNECTOR_430186219_STAGE_YAHOO_FINANCE_TOPIC_1140052305;
```

- __A `Pipe`__: 

```sql
CREATE OR REPLACE PIPE KAFKA_STREAMING.YAHOO_FINANCE.SNOWFLAKE_KAFKA_CONNECTOR_SNOWFLAKE_KAFKA_YAHOO_FINANCE_CONNECTOR_430186219_PIPE_YAHOO_FINANCE_TOPIC_1140052305_0 auto_ingest=false as copy into yahoo_finance_topic_1140052305(RECORD_METADATA, RECORD_CONTENT) from (select $1:meta, $1:content from @SNOWFLAKE_KAFKA_CONNECTOR_snowflake_kafka_yahoo_finance_connector_430186219_STAGE_yahoo_finance_topic_1140052305 t) file_format = (type = 'json');
```

__Automate the movement of data from the Kafka Connector auto-created staging table into the stock_prices target table using Snowflake Streams and Tasks.__ 

This approach:

- Avoids directly ingesting into the stock_prices curated table

- Keeps staging and production concerns cleanly separated

- Runs automatically at intervals (e.g., every 1 minute)

__1. Identify the Auto-Generated Table__

```sql
SHOW TABLES LIKE '%_TOPIC_%';

SELECT * FROM YAHOO_FINANCE_TOPIC_1140052305;

-- TABLE name from SHOW TABLES
SET kafka_staging_table = 'YAHOO_FINANCE_TOPIC_1140052305';
```
__2. Create a Stream on the Auto Table__

The stream tracks new rows inserted into the staging table by the Kafka Connector.

```sql
CREATE OR REPLACE STREAM kafka_finance_stream
ON TABLE IDENTIFIER($kafka_staging_table);
```

__3. Create a Task to Copy Data Every Minute__

```sql
CREATE OR REPLACE TASK move_kafka_data_to_snowflake_stock_prices
  WAREHOUSE = COMPUTE_WH  
  SCHEDULE = '1 MINUTE'
AS
INSERT INTO stock_prices (symbol, price, currency, time)
SELECT
  RECORD_CONTENT:"symbol"::STRING,
  RECORD_CONTENT:"price"::FLOAT,
  RECORD_CONTENT:"currency"::STRING,
  RECORD_CONTENT:"time"::STRING
FROM kafka_finance_stream;
```

__4. Start the task__

```sql
ALTER TASK move_kafka_data_to_snowflake_stock_prices RESUME;
```
Now, Snowflake will:

- Continuously ingest data into the auto-generated Kafka staging table

- Use the stream to detect changes

- Copy those rows into the curated stock_prices table every minute

__5. Verification__ -- after 1 minute

```sql
SELECT * FROM stock_prices ORDER BY time DESC;
```





## Real-time or near-real-time analytics on stock prices

```sql
USE ROLE ACCOUNTADMIN;
USE DATABASE KAFKA_STREAMING;
USE SCHEMA YAHOO_FINANCE;
```

__1. Latest price per symbol__

```sql
CREATE OR REPLACE VIEW vw_latest_stock_prices AS
SELECT symbol, price, time
FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY time DESC) AS rn
  FROM stock_prices
)
WHERE rn = 1;
```

__2. Moving average (5-minute window)__

```sql
CREATE OR REPLACE DYNAMIC TABLE dt_moving_avg
TARGET_LAG = '1 minute'
WAREHOUSE = COMPUTE_WH
AS
WITH recent AS (
  SELECT 
    symbol,
    DATE_TRUNC('minute', time::TIMESTAMP_NTZ) AS minute_bucket,    
    AVG(price) OVER (PARTITION BY symbol ORDER BY minute_bucket ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_price_5min
  FROM stock_prices
)
SELECT *
FROM recent;
```

__3. Anomaly detection - Price spike/dip detection (5% deviation from 5-row moving avg)__

```sql
CREATE OR REPLACE DYNAMIC TABLE dt_price_anomalies
TARGET_LAG = '1 minute'
WAREHOUSE = COMPUTE_WH
AS
WITH recent AS (
  SELECT 
    symbol,
    price,
    time,
    AVG(price) OVER (PARTITION BY symbol ORDER BY time ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS moving_avg
  FROM stock_prices
)
SELECT *
FROM recent
WHERE ABS(price - moving_avg) / NULLIF(moving_avg, 0) > 0.05;
```

__4. Stock leaderboard by latest price - ranks latest prices to show top movers or high-value stocks__

```sql
CREATE OR REPLACE VIEW vw_price_leaderboard AS
SELECT symbol, price, RANK() OVER (ORDER BY price DESC) AS price_rank
FROM (
  SELECT symbol, price
  FROM stock_prices
  QUALIFY ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY time DESC) = 1
);
```

The Snowflake QUALIFY clause is a tool used to filter results of window functions in SQL queries. Window functions perform calculations across a set of table rows related to the current row, and QUALIFY acts as an additional filter after these calculations. QUALIFY can improve query performance by reducing the need for subqueries and intermediate result sets. This can lead to faster query execution times and more efficient use of system resources, particularly in large datasets where performance is a critical concern.

Check refresh history:

```sql
SHOW DYNAMIC TABLES;

SELECT *
FROM TABLE(
  INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
    NAME => 'KAFKA_STREAMING.YAHOO_FINANCE.dt_moving_avg',
    RESULT_LIMIT => 20
  )
)
ORDER BY DATA_TIMESTAMP DESC;

SELECT *
FROM TABLE(
  INFORMATION_SCHEMA.DYNAMIC_TABLE_REFRESH_HISTORY(
    NAME => 'KAFKA_STREAMING.YAHOO_FINANCE.dt_price_anomalies',
    RESULT_LIMIT => 20
  )
)
ORDER BY DATA_TIMESTAMP DESC;
```

## Deliver live metrics in a Streamlit dashboard


```python
import streamlit as st
import pandas as pd
from snowflake.snowpark import Session
import plotly.express as px

# Initialize Snowpark session
session = Session.get_active_session()
   
if st.button("🔄 Refresh Now"):
    st.rerun()

# Cached data loading functions (auto-refresh every 30s)
@st.cache_data(ttl=30)
def get_latest():
    return session.table("vw_latest_stock_prices").to_pandas()

@st.cache_data(ttl=30)
def get_moving_avg():
    return session.table("dt_moving_avg").to_pandas()

@st.cache_data(ttl=30)
def get_anomalies():
    return session.table("dt_price_anomalies").to_pandas()

@st.cache_data(ttl=30)
def get_leaderboard():
    return session.table("vw_price_leaderboard").to_pandas()

# Load data
latest = get_latest()
moving_avg = get_moving_avg()
anomalies = get_anomalies()
leaderboard = get_leaderboard()

# Streamlit UI components
st.title("📈 Yahoo Finance Dashboard")

# Latest prices
st.subheader("💲 Latest Prices")
st.dataframe(latest)

# Leaderboard
st.subheader("🏆 Price Leaderboard")
st.dataframe(leaderboard)


# Moving Average Chart
st.subheader("📉 5-Minutes Moving Averages")

# Date filter
min_date = pd.to_datetime(moving_avg["MINUTE_BUCKET"]).min().to_pydatetime()
max_date = pd.to_datetime(moving_avg["MINUTE_BUCKET"]).max().to_pydatetime()

# Set date slider
date_range = st.slider(
    "Date Range",
    min_value=min_date,
    max_value=max_date,
    value=(min_date, max_date),
    format="YYYY-MM-DD HH:mm"
)

# Set price filter
price_range = st.slider("Price Range", float(moving_avg["AVG_PRICE_5MIN"].min()), 
                        float(moving_avg["AVG_PRICE_5MIN"].max()), 
                        (0.0, float(moving_avg["AVG_PRICE_5MIN"].max())))

# Symbol multi-select
available_symbols = sorted(moving_avg['SYMBOL'].unique().tolist())
selected_symbols = st.multiselect("Select Symbols", options=available_symbols, 
                                  default=available_symbols[:2])

# Filter the DataFrame
start_date, end_date = date_range

filtered_data = moving_avg[
    (moving_avg["SYMBOL"].isin(selected_symbols)) &
    (moving_avg["MINUTE_BUCKET"] >= start_date) &
    (moving_avg["MINUTE_BUCKET"] <= end_date) &
    (moving_avg["AVG_PRICE_5MIN"] >= price_range[0]) &
    (moving_avg["AVG_PRICE_5MIN"] <= price_range[1])
]

fig = px.line(
    filtered_data.sort_values(["SYMBOL", "MINUTE_BUCKET"]),
    x="MINUTE_BUCKET",
    y="AVG_PRICE_5MIN",
    color="SYMBOL",
    title="Multi-Symbol 5-Minutes Moving Averages", 
    render_mode='svg'
)
fig.update_layout(xaxis_title="Time", yaxis_title="5-Min Avg Price", height=500)
st.plotly_chart(fig, use_container_width=True)

# Anomalies
st.subheader("🚨 Anomalies")
st.dataframe(anomalies[anomalies['SYMBOL'].isin(selected_symbols)])

# Dashboard SQL (Copyable)
st.markdown("### 🧾 Dashboard SQL")

sql_script = """
-- Latest Price
CREATE OR REPLACE VIEW vw_latest_stock_prices AS
SELECT symbol, price, time
FROM (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY time DESC) AS rn
  FROM stock_prices
)
WHERE rn = 1;

-- 5-Min Moving Average
CREATE OR REPLACE DYNAMIC TABLE dt_moving_avg
TARGET_LAG = '1 minute'
WAREHOUSE = COMPUTE_WH
AS
WITH recent AS (
  SELECT 
    symbol,
    DATE_TRUNC('minute', time::TIMESTAMP_NTZ) AS minute_bucket,    
    AVG(price) OVER (PARTITION BY symbol ORDER BY minute_bucket ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS avg_price_5min
  FROM stock_prices
)
SELECT *
FROM recent;

-- Price Anomalies
CREATE OR REPLACE DYNAMIC TABLE dt_price_anomalies
TARGET_LAG = '1 minute'
WAREHOUSE = COMPUTE_WH
AS
WITH recent AS (
  SELECT 
    symbol,
    price,
    time,
    AVG(price) OVER (PARTITION BY symbol ORDER BY time ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS moving_avg
  FROM stock_prices
)
SELECT *
FROM recent
WHERE ABS(price - moving_avg) / NULLIF(moving_avg, 0) > 0.05;

-- Leaderboard by Latest Price
CREATE OR REPLACE VIEW vw_price_leaderboard AS
SELECT symbol, price, RANK() OVER (ORDER BY price DESC) AS price_rank
FROM (
  SELECT symbol, price
  FROM stock_prices
  QUALIFY ROW_NUMBER() OVER (PARTITION BY symbol ORDER BY time DESC) = 1
);
"""

with st.expander("📋 SQL Script"):
    st.code(sql_script, language="sql")
```