In [10]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import requests
from bs4 import BeautifulSoup
import feedparser
from urllib.parse import urlparse
import re
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from newsapi import NewsApiClient
import time
from typing import List, Dict, Optional
from dataclasses import dataclass


In [11]:
@dataclass
class PipelineConfig:
    """Configuration for the news attribution pipeline"""
    newsapi_key: str
    groq_api_key: str
    
    # Pipeline parameters
    abnormal_return_threshold: float = 0.05 # Â±5%
    lookback_days: int = 7
    relevance_thresholds: List[float] = None
    sentiment_positive_threshold: float = 0.2
    sentiment_negative_threshold: float = -0.2
    llm_temperature: float = 0.3
    llm_max_tokens: int = 1000
    max_articles_per_source: int = 20
    article_content_length: int = 2000
    
    def __post_init__(self):
        if self.relevance_thresholds is None:
            self.relevance_thresholds = [0.70, 0.50, 0.25]


NEWS_CATALYSTS = [
    "Earnings Report", "Guidance/Forecast", "M&A", "Share Buyback",
    "Analyst Upgrade/Downgrade", "Dividend Changes", "FDA/Clinical Trial",
    "CEO/CFO Change", "Stock Split", "Index Rebalancing",
    "Activist Investor", "Short Squeeze/Meme Hype", "Insider Transactions",
    "Litigation/Regulatory", "Secondary Offering", "Cybersecurity Breach",
    "Trade/Tariff News", "Economic Data/Fed", "Contract Wins/Product Launch",
    "Patent/Licensing"
]

In [12]:
@dataclass
class StockMove:
    """Represents an abnormal stock price movement"""
    ticker: str
    date: datetime
    percent_change: float
    close: float
    volume: int

@dataclass
class NewsArticle:
    """Represents a news article"""
    title: str
    url: str
    source: str
    published: str
    description: str
    content: str = ""
    relevance_score: float = 0.0

@dataclass
class AnalysisResult:
    """Results of news analysis for a stock move"""
    ticker: str
    date: datetime
    percent_change: float
    num_articles: int
    sentiment_score: float
    sentiment_label: str
    summary: str
    explanation: str
    catalyst: str = ""

class PriceDetector:
    """Detects abnormal stock price movements"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
    
    def get_tickers(self) -> List[str]:
        """Get list of tickers to monitor"""
        return [
            'INTC', 'ADBE', 'CRM', 'NEE', 'LLY', 'VZ', 'GOOGL', 'NFLX', 
            'AMD', 'PYPL', 'CSCO', 'TMO', 'PG'
        ]
    
    def detect_abnormal_returns(self, tickers: List[str] = None) -> List[StockMove]:
        """Identify stocks with abnormal daily returns"""
        if tickers is None:
            tickers = self.get_tickers()
        
        stock_moves = []
        end_date = datetime.now()
        start_date = end_date - timedelta(days=self.config.lookback_days + 5)
        
        print(f"[PriceDetector] Scanning {len(tickers)} tickers for abnormal returns...")
        
        for ticker in tickers:
            try:
                stock = yf.Ticker(ticker)
                hist = stock.history(start=start_date, end=end_date)
                
                if len(hist) < 2:
                    continue
                
                hist['pct_change'] = hist['Close'].pct_change() * 100
                hist = hist.dropna(subset=['pct_change'])
                
                if len(hist) == 0:
                    continue
                
                recent_moves = hist.tail(self.config.lookback_days)
                
                for date, row in recent_moves.iterrows():
                    if abs(row['pct_change']) >= self.config.abnormal_return_threshold * 100:
                        stock_moves.append(StockMove(
                            ticker=ticker,
                            date=date,
                            percent_change=row['pct_change'],
                            close=row['Close'],
                            volume=int(row['Volume']) if pd.notna(row['Volume']) else 0
                        ))
            except Exception as e:
                print(f"[PriceDetector] Error processing {ticker}: {str(e)}")
        
        # Sort by absolute magnitude
        stock_moves.sort(key=lambda x: abs(x.percent_change), reverse=True)
        
        print(f"[PriceDetector] Found {len(stock_moves)} abnormal movements")
        return stock_moves


In [13]:
class NewsRetriever:
    """Retrieves news from multiple sources"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        #self.newsapi = NewsApiClient(api_key=config.newsapi_key)
    
    def get_company_name(self, ticker: str) -> str:
        """Get full company name from ticker"""
        try:
            stock = yf.Ticker(ticker)
            return stock.info.get('longName', ticker)
        except:
            return ticker
    
    def fetch_google_news(self, ticker: str, date: datetime) -> List[NewsArticle]:
        """Fetch news from Google News RSS"""
        articles = []
        try:
            date_str = date.strftime('%Y-%m-%d') if isinstance(date, datetime) else str(date).split()[0]
            target_date = datetime.strptime(date_str, '%Y-%m-%d')
            after_date = (target_date - timedelta(days=1)).strftime('%Y-%m-%d')
            before_date = (target_date + timedelta(days=1)).strftime('%Y-%m-%d')
            
            query = f"{ticker} stock after:{after_date} before:{before_date}"
            url = f"https://news.google.com/rss/search?q={requests.utils.quote(query)}&hl=en-US&gl=US&ceid=US:en"
            
            feed = feedparser.parse(url)
            
            for entry in feed.entries[:15]:
                articles.append(NewsArticle(
                    title=entry.title,
                    url=entry.link,
                    source='Google News',
                    published=entry.get('published', ''),
                    description=entry.get('summary', '')
                ))
        except Exception as e:
            print(f"[NewsRetriever] Google News error for {ticker}: {str(e)}")
        
        return articles
    
    def fetch_yahoo_finance(self, ticker: str) -> List[NewsArticle]:
        """Fetch news from Yahoo Finance RSS"""
        articles = []
        try:
            url = f"https://finance.yahoo.com/rss/headline?s={ticker}"
            feed = feedparser.parse(url)
            
            for entry in feed.entries[:10]:
                articles.append(NewsArticle(
                    title=entry.title,
                    url=entry.link,
                    source='Yahoo Finance',
                    published=entry.get('published', ''),
                    description=entry.get('summary', '')
                ))
        except Exception as e:
            print(f"[NewsRetriever] Yahoo Finance error for {ticker}: {str(e)}")
        
        return articles
    
    def scrape_article_content(self, url: str) -> str:
        """Scrape full article text"""
        try:
            parsed = urlparse(url)
            clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
            headers = {'User-Agent': 'Mozilla/5.0'}
            response = requests.get(clean_url, headers=headers, timeout=10)
            soup = BeautifulSoup(response.content, 'html.parser')
            paragraphs = soup.find_all('p')
            text = ' '.join([p.get_text() for p in paragraphs])
            return text[:self.config.article_content_length]
        except:
            return ""
    
    def deduplicate_articles(self, articles: List[NewsArticle]) -> List[NewsArticle]:
        """Remove duplicate articles"""
        seen = set()
        unique_articles = []
        
        for article in articles:
            norm_title = re.sub(r'\s+', ' ', article.title.lower().strip())
            parsed = urlparse(article.url)
            norm_url = f"{parsed.netloc}{parsed.path}"
            key = (norm_title, norm_url)
            
            if key not in seen:
                seen.add(key)
                unique_articles.append(article)
        
        return unique_articles
    
    def retrieve_news(self, ticker: str, date: datetime) -> List[NewsArticle]:
        """Aggregate news from all sources"""
        company_name = self.get_company_name(ticker)
        
        all_articles = []
        #all_articles.extend(self.fetch_newsapi_articles(ticker, company_name, date))
        all_articles.extend(self.fetch_google_news(ticker, date))
        all_articles.extend(self.fetch_yahoo_finance(ticker))
        
        unique_articles = self.deduplicate_articles(all_articles)
        
        # Scrape content
        for article in unique_articles:
            article.content = self.scrape_article_content(article.url)

        return unique_articles

In [14]:
class LLMInterface:
    """Interface for LLM API calls"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.api_url = "https://api.groq.com/openai/v1/chat/completions"
    
    def call_with_retry(self, prompt: str, max_retries: int = 3) -> Optional[str]:
        """Call LLM with retry logic"""
        for attempt in range(max_retries):
            try:
                response = requests.post(
                    self.api_url,
                    headers={
                        "Content-Type": "application/json",
                        "Authorization": f"Bearer {self.config.groq_api_key}"
                    },
                    json={
                        "model": "llama-3.1-8b-instant",
                        "messages": [{"role": "user", "content": prompt}],
                        "temperature": self.config.llm_temperature,
                        "max_tokens": self.config.llm_max_tokens
                    }
                )
                
                if response.status_code == 200:
                    result = response.json()
                    return result['choices'][0]['message']['content']
                elif response.status_code == 429:
                    wait_time = min(60, 2 ** attempt * 10)
                    print(f"[LLMInterface] Rate limit hit, waiting {wait_time}s...")
                    time.sleep(wait_time)
                    continue
                else:
                    raise Exception(f"API error: {response.status_code}")
            
            except Exception as e:
                if "429" in str(e):
                    wait_time = min(60, 2 ** attempt * 10)
                    print(f"[LLMInterface] Rate limit (attempt {attempt+1}/{max_retries}), waiting {wait_time}s...")
                    time.sleep(wait_time)
                else:
                    print(f"[LLMInterface] Error: {str(e)}")
                    return None
        
        print(f"[LLMInterface] Max retries reached")
        return None

class RelevanceScorer:
    """Scores article relevance using LLM"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.llm = LLMInterface(config)
    
    def score_article(self, article: NewsArticle, ticker: str, company_name: str, price_change: float) -> float:
        """Score a single article's relevance"""
        try:
            prompt = f"""You are a financial analyst. Rate the relevance of this news article to explain the stock price movement.

Company: {company_name} ({ticker})
Price Change: {price_change:.2f}%
Article Title: {article.title}
Article Content: {article.content[:1000]}

On a scale of 0.0 to 1.0, how relevant is this article to explaining the stock price movement?
- 1.0: Directly explains the price movement with company-specific news
- 0.5: Mentions the company but not directly related to price movement
- 0.0: Not related to the company or its stock price

Respond with ONLY a number between 0.0 and 1.0."""

            response_text = self.llm.call_with_retry(prompt)
            
            if response_text is None:
                return 0.0
            
            score_match = re.search(r'0?\.\d+|[01]\.0|^[01]$', response_text.strip())
            
            if score_match:
                score = float(score_match.group())
                return score
            
            return 0.0
        
        except Exception as e:
            print(f"[RelevanceScorer] Error: {str(e)}")
            return 0.0
    
    def filter_relevant_articles(self, articles: List[NewsArticle], ticker: str, 
                                 company_name: str, price_change: float) -> List[NewsArticle]:
        """Filter articles by relevance score"""
        print(f"[RelevanceScorer] Scoring {len(articles)} articles...")
        
        scored_articles = []
        
        for i, article in enumerate(articles):
            score = self.score_article(article, ticker, company_name, price_change)
            article.relevance_score = score
            scored_articles.append(article)
            
            if i < len(articles) - 1:
                time.sleep(2)  # Rate limiting
        
        # Try thresholds in descending order
        for threshold in self.config.relevance_thresholds:
            relevant = [a for a in scored_articles if a.relevance_score >= threshold]
            if relevant:
                print(f"[RelevanceScorer] {len(relevant)} articles passed {threshold} threshold")
                return relevant
        
        print(f"[RelevanceScorer] No articles met minimum relevance criteria")
        return []

In [15]:
class NewsSummarizer:
    """Generates structured summaries using LLM"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.llm = LLMInterface(config)
    
    def summarize(self, articles: List[NewsArticle], ticker: str, 
                  company_name: str, price_change: float) -> str:
        """Generate structured summary"""
        try:
            combined_content = "\n\n".join([
                f"Article {i+1}: {a.title}\n{a.content[:500]}"
                for i, a in enumerate(articles[:5])
            ])
            
            catalyst_list = "\n".join([f"{i+1}. {cat}" for i, cat in enumerate(NEWS_CATALYSTS)])
            
            prompt = f"""You are a financial analyst. Analyze these news articles about {company_name} ({ticker}).

Stock Price Change: {price_change:.2f}%

News Articles:
{combined_content}

News Catalyst Categories:
{catalyst_list}

Task:
1. Extract up to 3 key sentences that explain the price movement
2. Identify which catalyst category(ies) apply (use numbers from list)
3. Write a single paragraph (3-5 sentences) explaining the likely cause and overall sentiment

Format your response as:
KEY POINTS:
- [point 1]
- [point 2]
- [point 3]

CATALYST: [number(s)]

EXPLANATION:
[single paragraph]"""

            response_text = self.llm.call_with_retry(prompt)
            
            if response_text is None:
                return "Summary generation failed due to API limits"
            
            return response_text
        
        except Exception as e:
            print(f"[NewsSummarizer] Error: {str(e)}")
            return "Summary generation failed"

class SentimentAnalyzer:
    """Analyzes sentiment using FinBERT"""
    
    def __init__(self):
        self.tokenizer = AutoTokenizer.from_pretrained("ProsusAI/finbert")
        self.model = AutoModelForSequenceClassification.from_pretrained("ProsusAI/finbert")
        self.model.eval()
    
    def analyze(self, text: str) -> Dict[str, float]:
        """Analyze sentiment of text"""
        inputs = self.tokenizer(text, return_tensors="pt", truncation=True, 
                               max_length=512, padding=True)
        
        with torch.no_grad():
            outputs = self.model(**inputs)
            probs = torch.nn.functional.softmax(outputs.logits, dim=-1)
            
            pos_prob = probs[0][0].item()
            neg_prob = probs[0][1].item()
            neu_prob = probs[0][2].item()
            
            sentiment_score = pos_prob - neg_prob
        
        return {
            'positive_prob': pos_prob,
            'negative_prob': neg_prob,
            'neutral_prob': neu_prob,
            'sentiment_score': sentiment_score
        }

# MAIN PIP

In [16]:
class NewsAttributionPipeline:
    """MAIN PIP"""
    
    def __init__(self, config: PipelineConfig):
        self.config = config
        self.price_detector = PriceDetector(config)
        self.news_retriever = NewsRetriever(config)
        self.relevance_scorer = RelevanceScorer(config)
        self.summarizer = NewsSummarizer(config)
        self.sentiment_analyzer = SentimentAnalyzer()
    
    def analyze_stock_move(self, stock_move: StockMove) -> AnalysisResult:
        """Analyze a single stock movement"""
        print(f"\n[Pipeline] Processing: {stock_move.ticker} | {stock_move.percent_change:.2f}% | {stock_move.date}")
        
        company_name = self.news_retriever.get_company_name(stock_move.ticker)
        
        # Retrieve news
        articles = self.news_retriever.retrieve_news(stock_move.ticker, stock_move.date)
        
        if not articles:
            return AnalysisResult(
                ticker=stock_move.ticker,
                date=stock_move.date,
                percent_change=stock_move.percent_change,
                num_articles=0,
                sentiment_score=0.0,
                sentiment_label='No Data',
                summary='No news articles found',
                explanation='N/A'
            )
        
        # Filter relevant articles
        relevant_articles = self.relevance_scorer.filter_relevant_articles(
            articles, stock_move.ticker, company_name, stock_move.percent_change
        )
        
        if not relevant_articles:
            return AnalysisResult(
                ticker=stock_move.ticker,
                date=stock_move.date,
                percent_change=stock_move.percent_change,
                num_articles=len(articles),
                sentiment_score=0.0,
                sentiment_label='No Relevant News',
                summary='No articles met relevancy criteria',
                explanation='N/A'
            )
        
        # Generate summary
        summary = self.summarizer.summarize(
            relevant_articles, stock_move.ticker, company_name, stock_move.percent_change
        )
        
        # Analyze sentiment
        sentiment_result = self.sentiment_analyzer.analyze(summary)
        sentiment_score = sentiment_result['sentiment_score']
        
        if sentiment_score > self.config.sentiment_positive_threshold:
            sentiment_label = 'Positive'
        elif sentiment_score < self.config.sentiment_negative_threshold:
            sentiment_label = 'Negative'
        else:
            sentiment_label = 'Neutral'
        
        print(f"[Pipeline] Sentiment Score: {sentiment_score:.3f} ({sentiment_label})")
        
        return AnalysisResult(
            ticker=stock_move.ticker,
            date=stock_move.date,
            percent_change=stock_move.percent_change,
            num_articles=len(relevant_articles),
            sentiment_score=sentiment_score,
            sentiment_label=sentiment_label,
            summary=summary,
            explanation=summary
        )
    
    def run(self) -> List[AnalysisResult]:
        """Run full pipeline"""
        
        stock_moves = self.price_detector.detect_abnormal_returns()
        
        if not stock_moves:
            print("[Pipeline] No abnormal stock movements detected")
            return []
        
        results = []
        for stock_move in stock_moves:
            result = self.analyze_stock_move(stock_move)
            results.append(result)
        
    
        return results

In [17]:
class ReportGenerator:
    """Generates formatted reports"""
    
    @staticmethod
    def generate_text_report(results: List[AnalysisResult]) -> str:
        """Generate formatted text report"""
        lines = []
        lines.append("="*80)
        lines.append("AUTOMATED NEWS ATTRIBUTION REPORT")
        lines.append("="*80)
        lines.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        lines.append(f"Total Stocks Analyzed: {len(results)}")
        lines.append("="*80)
        
        for i, result in enumerate(results, 1):
            lines.append(f"\n{'='*80}")
            lines.append(f"STOCK #{i}: {result.ticker}")
            lines.append(f"{'='*80}")
            lines.append(f"Date: {result.date.strftime('%Y-%m-%d')}")
            lines.append(f"Price Change: {result.percent_change:+.2f}%")
            lines.append(f"Articles Found: {result.num_articles}")
            lines.append(f"Sentiment: {result.sentiment_label} (Score: {result.sentiment_score:+.3f})")
            lines.append(f"\nSUMMARY:")
            lines.append("-"*80)
            lines.append(result.summary)
            lines.append("-"*80)
        
        lines.append(f"\n{'='*80}")
        lines.append("END OF REPORT")
        lines.append("="*80)
        
        return "\n".join(lines)
    
    @staticmethod
    def save_report(results: List[AnalysisResult], filename: str = "news_attribution_report.txt"):
        """Save report to text file"""
        report = ReportGenerator.generate_text_report(results)
        
        with open(filename, 'w', encoding='utf-8') as f:
            f.write(report)
        
        print(f"[ReportGenerator] Report saved to {filename}")
    
    @staticmethod
    def save_csv(results: List[AnalysisResult], filename: str = "results.csv"):
        """Save results to CSV"""
        data = []
        for result in results:
            data.append({
                'ticker': result.ticker,
                'date': result.date.strftime('%Y-%m-%d'),
                'percent_change': result.percent_change,
                'num_articles': result.num_articles,
                'sentiment_score': result.sentiment_score,
                'sentiment_label': result.sentiment_label,
                'summary': result.summary
            })
        
        df = pd.DataFrame(data)
        df.to_csv(filename, index=False)
        print(f"[ReportGenerator] CSV saved to {filename}")

In [18]:
if __name__ == "__main__":
    # Configuration
    config = PipelineConfig(
        newsapi_key="YOURAPIHERE",
        groq_api_key="YOURAPIHERE"
    )
    
    pipeline = NewsAttributionPipeline(config)
    results = pipeline.run()
    
    ReportGenerator.save_report(results, "news_attribution_report.txt")
    ReportGenerator.save_csv(results, "results_aug7.csv")

[PriceDetector] Scanning 13 tickers for abnormal returns...
[PriceDetector] Found 2 abnormal movements

[Pipeline] Processing: ADBE | 5.33% | 2025-12-05 00:00:00-05:00
[RelevanceScorer] Scoring 25 articles...
[RelevanceScorer] 12 articles passed 0.7 threshold
[Pipeline] Sentiment Score: 0.925 (Positive)

[Pipeline] Processing: CRM | 5.30% | 2025-12-05 00:00:00-05:00
[RelevanceScorer] Scoring 25 articles...
[RelevanceScorer] 8 articles passed 0.7 threshold
[Pipeline] Sentiment Score: 0.939 (Positive)
[ReportGenerator] Report saved to news_attribution_report.txt
[ReportGenerator] CSV saved to results_aug7.csv
