# Real-Time Stock Data & Sentiment Analysis Pipeline

This notebook implements a real-time pipeline that:
1.  Ingests stock price data via Finnhub WebSocket.
2.  Ingests/Simulates news and social media text.
3.  Analyzes sentiment using FinBERT and VADER.
4.  Aggregates features in real-time.
5.  Prepares data for model inference.

In [None]:
# Install dependencies if not already installed
!pip install websocket-client transformers vaderSentiment yfinance torch

In [None]:
import websocket
import json
import threading
import time
import pandas as pd
import numpy as np
from datetime import datetime
import yfinance as yf
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import queue

## 1. Configuration

In [None]:
# --- CONFIGURATION ---
API_KEY = "YOUR_FINNHUB_KEY"  # <--- REPLACE THIS WITH YOUR KEY
SYMBOL = "AAPL"
FINBERT_MODEL = "ProsusAI/finbert"

# Global buffers for real-time data
price_buffer = queue.Queue()
news_buffer = queue.Queue()
social_buffer = queue.Queue()

## 2. Price Ingestion (WebSocket)

In [None]:
def on_message(ws, message):
    try:
        data = json.loads(message)
        if data.get('type') == 'trade':
            for trade in data['data']:
                # Extract price (p) and timestamp (t)
                record = {
                    'symbol': trade['s'],
                    'price': trade['p'],
                    'timestamp': trade['t'] / 1000.0,  # Convert ms to seconds
                    'volume': trade['v']
                }
                price_buffer.put(record)
                # print(f"Trade: {record}") # Debug print
    except Exception as e:
        print(f"Error parsing message: {e}")

def on_error(ws, error):
    print(f"WebSocket Error: {error}")

def on_close(ws, close_status_code, close_msg):
    print("WebSocket Closed")

def on_open(ws):
    print(f"Subscribing to {SYMBOL}...")
    ws.send(json.dumps({"type": "subscribe", "symbol": SYMBOL}))

def start_finnhub_ws():
    if API_KEY == "YOUR_FINNHUB_KEY":
        print("WARNING: No API Key provided. WebSocket will not connect.")
        return
    
    websocket.enableTrace(False)
    url = f"wss://ws.finnhub.io?token={API_KEY}"
    ws = websocket.WebSocketApp(url,
                                on_open=on_open,
                                on_message=on_message,
                                on_error=on_error,
                                on_close=on_close)
    ws.run_forever()

In [None]:
# Start WebSocket in a background thread
ws_thread = threading.Thread(target=start_finnhub_ws, daemon=True)
ws_thread.start()

### Alternative: Polling with yfinance (for testing without API key)

In [None]:
def poll_yfinance():
    # Simulate real-time by fetching latest minute bar
    try:
        df = yf.download(SYMBOL, period="1d", interval="1m", progress=False)
        if not df.empty:
            latest = df.iloc[-1]
            record = {
                'symbol': SYMBOL,
                'price': float(latest['Close']),
                'timestamp': time.time(), # Approximate current time
                'volume': int(latest['Volume'])
            }
            price_buffer.put(record)
            # print(f"Polled: {record}")
    except Exception as e:
        print(f"Polling error: {e}")

## 3. Sentiment Analysis Models

In [None]:
# --- FinBERT Setup ---
print("Loading FinBERT...")
tokenizer = AutoTokenizer.from_pretrained(FINBERT_MODEL)
model = AutoModelForSequenceClassification.from_pretrained(FINBERT_MODEL)
finbert_pipeline = pipeline("sentiment-analysis", model=model, tokenizer=tokenizer)
print("FinBERT Loaded.")

# --- VADER Setup ---
vader_analyzer = SentimentIntensityAnalyzer()

In [None]:
def score_news(text):
    """Returns FinBERT score: {'label': 'positive'/'negative'/'neutral', 'score': float}"""
    try:
        result = finbert_pipeline(text)[0]
        # Map to numeric score? Or keep as label/confidence
        # Simple mapping: positive=1, neutral=0, negative=-1 * score
        val = 0
        if result['label'] == 'positive':
            val = result['score']
        elif result['label'] == 'negative':
            val = -result['score']
        return val
    except Exception as e:
        print(f"FinBERT error: {e}")
        return 0.0

def score_social(text):
    """Returns VADER compound score"""
    return vader_analyzer.polarity_scores(text)['compound']

## 4. Feature Aggregation Logic

In [None]:
def aggregate_features(price_list, news_list, social_list):
    """
    Aggregates raw data buffers into a single feature vector.
    This is a simplified example aggregating over the collected batch.
    """
    features = {}
    
    # Price Features
    if price_list:
        df_p = pd.DataFrame(price_list)
        features['price_mean'] = df_p['price'].mean()
        features['price_std'] = df_p['price'].std()
        features['price_last'] = df_p['price'].iloc[-1]
        features['volume_sum'] = df_p['volume'].sum()
    else:
        features['price_mean'] = np.nan
        features['price_last'] = np.nan
    
    # News Sentiment
    if news_list:
        scores = [score_news(n['text']) for n in news_list]
        features['news_sent_mean'] = np.mean(scores)
        features['news_count'] = len(scores)
    else:
        features['news_sent_mean'] = 0
        features['news_count'] = 0
        
    # Social Sentiment
    if social_list:
        scores = [score_social(s['text']) for s in social_list]
        features['social_sent_mean'] = np.mean(scores)
        features['social_count'] = len(scores)
    else:
        features['social_sent_mean'] = 0
        features['social_count'] = 0
        
    return features

## 5. Main Real-Time Loop

In [None]:
# Dummy data generator for testing sentiment pipeline
def generate_dummy_text():
    import random
    news_headlines = [
        "Apple reports record quarterly revenue.",
        "Tech sector faces headwinds from new regulations.",
        "Analysts upgrade AAPL price target.",
        "Supply chain issues persist for major tech companies."
    ]
    tweets = [
        "Buying the dip! $AAPL to the moon!",
        "Market looks weak today, selling my positions.",
        "Just bought a new iPhone, love it.",
        "Why is the stock dropping??"
    ]
    
    if random.random() < 0.3:
        news_buffer.put({'text': random.choice(news_headlines), 'timestamp': time.time()})
    if random.random() < 0.5:
        social_buffer.put({'text': random.choice(tweets), 'timestamp': time.time()})

In [None]:
print("Starting Real-Time Pipeline Loop... (Stop manually to exit)")

try:
    while True:
        # 1. Ingest Data
        # (WebSocket is filling price_buffer in background)
        # If no API key, use polling:
        if API_KEY == "YOUR_FINNHUB_KEY":
            poll_yfinance()
            
        # Generate dummy text for demo purposes
        generate_dummy_text()
        
        # 2. Drain Buffers (Process collected data for this window)
        current_prices = []
        while not price_buffer.empty():
            current_prices.append(price_buffer.get())
            
        current_news = []
        while not news_buffer.empty():
            current_news.append(news_buffer.get())
            
        current_social = []
        while not social_buffer.empty():
            current_social.append(social_buffer.get())
            
        # 3. Aggregate & Analyze
        if current_prices or current_news or current_social:
            feats = aggregate_features(current_prices, current_news, current_social)
            feats['timestamp'] = datetime.now().strftime("%H:%M:%S")
            
            # 4. Output / Inference
            print(f"[{feats['timestamp']}] Price: {feats['price_last']:.2f} | News Sent: {feats['news_sent_mean']:.2f} ({feats['news_count']}) | Social Sent: {feats['social_sent_mean']:.2f} ({feats['social_count']})")
            
            # Here you would feed `feats` into your ML model:
            # prediction = model.predict(feats)
            
        time.sleep(5) # Run loop every 5 seconds for demo
        
except KeyboardInterrupt:
    print("Pipeline stopped.")