In [None]:
"""
Configuration file for Financial Metrics Extraction Pipeline
"""
import os
from dataclasses import dataclass, field
from typing import List, Dict, Any

@dataclass
class APIConfig:
    """API Configuration"""
    newsapi_key: str = os.getenv('NEWSAPI_KEY', '')
    alpha_vantage_key: str = os.getenv('ALPHA_VANTAGE_KEY', '')
    finnhub_key: str = os.getenv('FINNHUB_KEY', '')

@dataclass
class DataConfig:
    """Data Collection Configuration"""
    max_articles_per_company: int = 100
    companies: List[str] = field(default_factory=lambda: [
        "Apple", "Microsoft", "Google", "Amazon", "Tesla", "Meta", "Netflix",
        "Nvidia", "JPMorgan", "Goldman Sachs", "Boeing", "Coca Cola", "Disney",
        "McDonald's", "Nike", "Walmart", "Visa", "Intel", "Adobe", "Salesforce"
    ])
    news_sources: List[str] = field(default_factory=lambda: [
        "reuters", "bloomberg", "financial-times", "wall-street-journal",
        "cnbc", "business-insider", "fortune", "marketwatch"
    ])

@dataclass
class ModelConfig:
    """Model Configuration"""
    model_name: str = "microsoft/DialoGPT-medium"
    max_length: int = 512
    batch_size: int = 16
    learning_rate: float = 5e-5
    num_epochs: int = 5
    warmup_steps: int = 100
    weight_decay: float = 0.01
    save_steps: int = 500
    eval_steps: int = 500
    save_total_limit: int = 2

@dataclass
class TrainingConfig:
    """Training Configuration"""
    train_split: float = 0.8
    val_split: float = 0.1
    test_split: float = 0.1
    seed: int = 42

@dataclass
class PathConfig:
    """Path Configuration"""
    data_dir: str = "data"
    model_dir: str = "models"
    output_dir: str = "outputs"
    raw_data_dir: str = "data/raw"
    processed_data_dir: str = "data/processed"
    model_checkpoints_dir: str = "models/checkpoints"

@dataclass
class Config:
    """Main Configuration Class"""
    api: APIConfig = field(default_factory=APIConfig)
    data: DataConfig = field(default_factory=DataConfig)
    model: ModelConfig = field(default_factory=ModelConfig)
    training: TrainingConfig = field(default_factory=TrainingConfig)
    paths: PathConfig = field(default_factory=PathConfig)

In [4]:
!pip install newsapi-python

Collecting newsapi-python
  Downloading newsapi_python-0.2.7-py2.py3-none-any.whl.metadata (1.2 kB)
Downloading newsapi_python-0.2.7-py2.py3-none-any.whl (7.9 kB)
Installing collected packages: newsapi-python
Successfully installed newsapi-python-0.2.7


In [6]:
!pip install finnhub-python

Collecting finnhub-python
  Downloading finnhub_python-2.4.23-py3-none-any.whl.metadata (9.2 kB)
Downloading finnhub_python-2.4.23-py3-none-any.whl (11 kB)
Installing collected packages: finnhub-python
Successfully installed finnhub-python-2.4.23


In [33]:
def test_api_keys(config):
    """Test API keys before starting pipeline"""
    from newsapi import NewsApiClient

    try:
        newsapi = NewsApiClient(api_key=config.api.newsapi_key)
        test_result = newsapi.get_everything(q="Apple", page_size=1)
        logger.info(f"NewsAPI test successful: {test_result['totalResults']} results available")
        return True
    except Exception as e:
        logger.error(f"NewsAPI test failed: {str(e)}")
        logger.error("Please check your NewsAPI key in config/config.py")
        return False

# Add this call in main() before data collection:
if not test_api_keys(Config):
    logger.error("API key validation failed. Using synthetic data fallback.")

ERROR:__main__:NewsAPI test failed: type object 'Config' has no attribute 'api'
ERROR:__main__:Please check your NewsAPI key in config/config.py
ERROR:__main__:API key validation failed. Using synthetic data fallback.


In [18]:
"""
Data collection module for financial news and metrics
"""
import requests
import pandas as pd
import yfinance as yf
from newsapi import NewsApiClient
import finnhub
from typing import List, Dict, Any, Tuple
import time
import logging
from datetime import datetime, timedelta
import json
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FinancialDataCollector:
    """Collect financial news and market data"""

    def __init__(self, config):
        self.config = config
        self.newsapi = NewsApiClient(api_key=config.api.newsapi_key)
        self.finnhub_client = finnhub.Client(api_key=config.api.finnhub_key)

    def collect_news_data(self) -> List[Dict[str, Any]]:
        """Collect financial news articles with fallback strategies"""
        all_articles = []

        for company in self.config.data.companies:
            logger.info(f"Collecting news for {company}")

            try:
                # Primary strategy: Specific financial keywords
                articles = self.newsapi.get_everything(
                    q=f"{company} AND (earnings OR revenue OR profit OR financial)",
                    language='en',
                    sort_by='relevancy',
                    page_size=min(100, self.config.data.max_articles_per_company),
                    from_param=(datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')
                )

                if articles['totalResults'] == 0:
                    logger.warning(f"No articles found for {company} with financial keywords. Trying broader search...")

                    # Fallback strategy: Broader search
                    articles = self.newsapi.get_everything(
                        q=company,
                        language='en',
                        sort_by='relevancy',
                        page_size=min(50, self.config.data.max_articles_per_company),
                        from_param=(datetime.now() - timedelta(days=60)).strftime('%Y-%m-%d')
                    )

                logger.info(f"Found {articles['totalResults']} articles for {company}")

                for article in articles['articles']:
                    if article['content'] and len(article['content']) > 100:
                        all_articles.append({
                            'company': company,
                            'title': article['title'],
                            'content': article['content'],
                            'source': article['source']['name'],
                            'published_at': article['publishedAt'],
                            'url': article['url']
                        })

                time.sleep(1)  # Rate limiting

            except Exception as e:
                logger.error(f"NewsAPI error for {company}: {str(e)}")

                # Fallback: Generate synthetic news if API fails
                logger.info(f"Generating synthetic news for {company}")
                synthetic_article = self.generate_synthetic_news(company)
                all_articles.append(synthetic_article)
                continue

        logger.info(f"Collected {len(all_articles)} articles total")

        # Ensure minimum data
        if len(all_articles) < 20:
            logger.warning("Insufficient articles collected. Generating additional synthetic data...")
            additional_needed = 20 - len(all_articles)
            for i in range(additional_needed):
                company = self.config.data.companies[i % len(self.config.data.companies)]
                synthetic_article = self.generate_synthetic_news(company)
                all_articles.append(synthetic_article)

        return all_articles

    def generate_synthetic_news(self, company: str) -> Dict[str, Any]:
        """Generate synthetic news article for training"""
        templates = [
            f"{company} Reports Strong Q3 2024 Financial Results with Record Revenue Growth",
            f"{company} Announces Quarterly Earnings Beat with Improved Profit Margins",
            f"{company} Delivers Solid Financial Performance Despite Market Challenges",
            f"{company} Posts Strong Revenue Growth and Expanded Operating Margins"
        ]

        content_templates = [
            f"{company} today announced financial results for the third quarter ended September 30, 2024. The company reported revenue of ${{revenue}} million, representing a {{growth}}% increase year-over-year. Net income was ${{net_income}} million with a profit margin of {{margin}}%. The strong results were driven by increased demand and operational efficiency improvements.",

            f"{company} delivered strong financial performance in Q3 2024 with total revenue of ${{revenue}} million and operating margin of {{operating_margin}}%. The company's net income reached ${{net_income}} million, reflecting effective cost management and strong market position.",
        ]

        # Generate random but realistic financial numbers
        revenue = np.random.uniform(5000, 100000)
        growth = np.random.uniform(3, 25)
        net_income = revenue * np.random.uniform(0.05, 0.25)
        margin = (net_income / revenue) * 100
        operating_margin = np.random.uniform(10, 30)

        title = np.random.choice(templates)
        content = np.random.choice(content_templates).format(
            revenue=f"{revenue:,.0f}",
            growth=f"{growth:.1f}",
            net_income=f"{net_income:,.0f}",
            margin=f"{margin:.1f}",
            operating_margin=f"{operating_margin:.1f}"
        )

        return {
            'company': company,
            'title': title,
            'content': content,
            'source': 'Synthetic',
            'published_at': datetime.now().isoformat(),
            'url': 'synthetic://example.com'
        }

        def collect_financial_metrics(self) -> Dict[str, Dict[str, Any]]:
            """Collect actual financial metrics from Yahoo Finance"""
            financial_data = {}

            # Convert company names to tickers
            ticker_mapping = {
                "Apple": "AAPL", "Microsoft": "MSFT", "Google": "GOOGL",
                "Amazon": "AMZN", "Tesla": "TSLA", "Meta": "META",
                "Netflix": "NFLX", "Nvidia": "NVDA", "JPMorgan": "JPM",
                "Goldman Sachs": "GS", "Boeing": "BA", "Coca Cola": "KO",
                "Disney": "DIS", "McDonald's": "MCD", "Nike": "NKE",
                "Walmart": "WMT", "Visa": "V", "Intel": "INTC",
                "Adobe": "ADBE", "Salesforce": "CRM"
            }

            for company, ticker in ticker_mapping.items():
                try:
                    print(f"Collecting financial data for {company} ({ticker})")

                    stock = yf.Ticker(ticker)
                    info = stock.info
                    financials = stock.financials

                    # Extract key metrics
                    metrics = {
                        'revenue': info.get('totalRevenue', 0),
                        'profit_margin': info.get('profitMargins', 0) * 100 if info.get('profitMargins') else 0,
                        'gross_margin': info.get('grossMargins', 0) * 100 if info.get('grossMargins') else 0,
                        'operating_margin': info.get('operatingMargins', 0) * 100 if info.get('operatingMargins') else 0,
                        'net_income': info.get('netIncomeToCommon', 0),
                        'total_debt': info.get('totalDebt', 0),
                        'market_cap': info.get('marketCap', 0),
                        'pe_ratio': info.get('trailingPE', 0),
                        'eps': info.get('trailingEps', 0),
                        'book_value': info.get('bookValue', 0),
                        'debt_to_equity': info.get('debtToEquity', 0),
                        'roe': info.get('returnOnEquity', 0) * 100 if info.get('returnOnEquity') else 0,
                        'roa': info.get('returnOnAssets', 0) * 100 if info.get('returnOnAssets') else 0
                    }

                    financial_data[company] = metrics
                    time.sleep(1)  # Rate limiting

                except Exception as e:
                    print(f"Error collecting financial data for {company}: {str(e)}")
                    continue

            return financial_data

    def save_raw_data(self, news_data: List[Dict], financial_data: Dict):
        """Save raw collected data"""
        os.makedirs(self.config.paths.raw_data_dir, exist_ok=True)

        # Save news data
        news_df = pd.DataFrame(news_data)
        news_df.to_csv(f"{self.config.paths.raw_data_dir}/news_data.csv", index=False)

        # Save financial data
        financial_df = pd.DataFrame.from_dict(financial_data, orient='index')
        financial_df.to_csv(f"{self.config.paths.raw_data_dir}/financial_data.csv")

        print("Raw data saved successfully")

In [26]:
"""
Data preprocessing module for training data preparation
"""
import pandas as pd
import numpy as np
import re
from typing import List, Dict, Any, Tuple
import json
import logging
from sklearn.model_selection import train_test_split
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FinancialDataPreprocessor:
    """Preprocess financial data for training"""

    def __init__(self, config):
        self.config = config

        # Financial metrics patterns
        self.metric_patterns = {
            'revenue': [
                r'revenue of \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?',
                r'sales of \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?',
                r'total revenue \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?'
            ],
            'profit_margin': [
                r'profit margin of ([\d,]+\.?\d*)%?',
                r'operating margin of ([\d,]+\.?\d*)%?',
                r'net margin of ([\d,]+\.?\d*)%?'
            ],
            'net_income': [
                r'net income of \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?',
                r'profit of \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?',
                r'earnings of \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?'
            ],
            'market_cap': [
                r'market cap of \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?',
                r'market capitalization of \$?([\d,]+\.?\d*)\s*(million|billion|thousand)?'
            ],
            'pe_ratio': [
                r'P/E ratio of ([\d,]+\.?\d*)',
                r'price-to-earnings ratio of ([\d,]+\.?\d*)',
                r'PE of ([\d,]+\.?\d*)'
            ],
            'eps': [
                r'earnings per share of \$?([\d,]+\.?\d*)',
                r'EPS of \$?([\d,]+\.?\d*)'
            ]
        }

    def load_raw_data(self) -> Tuple[pd.DataFrame, pd.DataFrame]:
        """Load raw collected data"""
        news_df = pd.read_csv(f"{self.config.paths.raw_data_dir}/news_data.csv")
        financial_df = pd.read_csv(f"{self.config.paths.raw_data_dir}/financial_data.csv", index_col=0)

        print(f"Loaded {len(news_df)} news articles and {len(financial_df)} financial records")
        return news_df, financial_df

    def extract_metrics_from_text(self, text: str) -> Dict[str, List[str]]:
        """Extract financial metrics from text using regex patterns"""
        extracted_metrics = {}

        for metric_type, patterns in self.metric_patterns.items():
            extracted_metrics[metric_type] = []

            for pattern in patterns:
                matches = re.findall(pattern, text.lower())
                for match in matches:
                    if isinstance(match, tuple):
                        value = match[0]
                        unit = match[1] if len(match) > 1 else ''
                    else:
                        value = match
                        unit = ''

                    # Normalize value
                    normalized_value = self.normalize_financial_value(value, unit)
                    if normalized_value:
                        extracted_metrics[metric_type].append(normalized_value)

        return extracted_metrics

    def normalize_financial_value(self, value: str, unit: str) -> str:
        """Normalize financial values to consistent format"""
        try:
            # Remove commas and convert to float
            numeric_value = float(value.replace(',', ''))

            # Convert based on unit
            if unit.lower() == 'billion':
                numeric_value *= 1000000000
            elif unit.lower() == 'million':
                numeric_value *= 1000000
            elif unit.lower() == 'thousand':
                numeric_value *= 1000

            return str(numeric_value)
        except:
            return None

    def create_training_examples(self, news_df: pd.DataFrame, financial_df: pd.DataFrame) -> List[Dict[str, Any]]:
        """Create training examples by combining news text with actual financial metrics"""
        training_examples = []

        for _, row in news_df.iterrows():
            company = row['company']
            text = f"{row['title']} {row['content']}"

            # Get actual financial metrics for this company
            if company in financial_df.index:
                actual_metrics = financial_df.loc[company].to_dict()

                # Extract mentioned metrics from text
                extracted_metrics = self.extract_metrics_from_text(text)

                # Create training example
                training_example = {
                    'input_text': text,
                    'company': company,
                    'actual_metrics': actual_metrics,
                    'extracted_metrics': extracted_metrics,
                    'target_output': self.format_metrics_as_json(actual_metrics)
                }

                training_examples.append(training_example)

        print(f"Created {len(training_examples)} training examples")
        return training_examples

    def format_metrics_as_json(self, metrics: Dict[str, Any]) -> str:
        """Format metrics as JSON string for training target"""
        formatted_metrics = {}

        for key, value in metrics.items():
            if pd.notna(value) and value != 0:
                if key in ['revenue', 'net_income', 'total_debt', 'market_cap']:
                    formatted_metrics[key] = f"${value:,.0f}"
                elif key in ['profit_margin', 'gross_margin', 'operating_margin', 'roe', 'roa']:
                    formatted_metrics[key] = f"{value:.2f}%"
                elif key in ['pe_ratio', 'eps', 'book_value', 'debt_to_equity']:
                    formatted_metrics[key] = f"{value:.2f}"

        return json.dumps(formatted_metrics)

    def create_conversational_format(self, training_examples: List[Dict[str, Any]]) -> List[Dict[str, str]]:
        """Convert training examples to conversational format"""
        conversational_data = []

        for example in training_examples:
            # Create instruction-following format
            instruction = f"Extract financial metrics from the following text about {example['company']}:"
            user_message = f"{instruction}\n\n{example['input_text']}"
            assistant_message = example['target_output']

            conversational_data.append({
                'user': user_message,
                'assistant': assistant_message
            })

        return conversational_data

    def split_and_save_data(self, conversational_data: List[Dict[str, str]]):
        """Split data and save to files with minimum data validation"""
        # Convert to DataFrame
        df = pd.DataFrame(conversational_data)

        # Check minimum data requirements
        min_samples = 10  # Minimum samples needed for meaningful training

        if len(df) < min_samples:
            logger.warning(f"Only {len(df)} samples collected. Minimum {min_samples} needed.")
            logger.warning("Generating synthetic examples to supplement data...")

            # Generate synthetic examples
            synthetic_data = self.generate_synthetic_examples(df, min_samples - len(df))
            df = pd.concat([df, pd.DataFrame(synthetic_data)], ignore_index=True)
            logger.info(f"Generated {len(synthetic_data)} synthetic examples")

        # Adjusted splitting for small datasets
        if len(df) < 50:
            # For very small datasets, use different split ratios
            train_split = 0.7
            val_split = 0.2
            test_split = 0.1
            logger.warning(f"Small dataset detected ({len(df)} samples). Using adjusted splits: 70/20/10")
        else:
            train_split = self.config.training.train_split
            val_split = self.config.training.val_split
            test_split = self.config.training.test_split

        # Ensure minimum 1 sample in each split
        n_samples = len(df)
        n_test = max(1, int(n_samples * test_split))
        n_val = max(1, int(n_samples * val_split))
        n_train = n_samples - n_test - n_val

        if n_train < 1:
            n_train = 1
            n_val = max(1, (n_samples - n_train) // 2)
            n_test = n_samples - n_train - n_val

        logger.info(f"Data split: Train={n_train}, Val={n_val}, Test={n_test}")

        # Manual splitting for small datasets
        df_shuffled = df.sample(frac=1, random_state=self.config.training.seed).reset_index(drop=True)

        train_data = df_shuffled[:n_train]
        val_data = df_shuffled[n_train:n_train + n_val]
        test_data = df_shuffled[n_train + n_val:]

        # Save data
        os.makedirs(self.config.paths.processed_data_dir, exist_ok=True)

        train_data.to_json(f"{self.config.paths.processed_data_dir}/train_data.json", orient='records', lines=True)
        val_data.to_json(f"{self.config.paths.processed_data_dir}/val_data.json", orient='records', lines=True)
        test_data.to_json(f"{self.config.paths.processed_data_dir}/test_data.json", orient='records', lines=True)

        logger.info(f"Data split and saved - Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}")

        return train_data, val_data, test_data

    def generate_synthetic_examples(self, original_df: pd.DataFrame, num_needed: int) -> List[Dict[str, str]]:
        """Generate synthetic training examples based on actual financial data"""
        synthetic_examples = []

        # Template patterns for synthetic data
        templates = [
            "Company {company} reported quarterly revenue of ${revenue:,.0f} million with a profit margin of {profit_margin:.1f}%.",
            "{company} announced financial results showing ${revenue:,.0f} million in revenue and ${net_income:,.0f} million in net income.",
            "Financial highlights for {company}: Revenue ${revenue:,.0f}M, Operating margin {operating_margin:.1f}%, PE ratio {pe_ratio:.1f}.",
            "{company}'s latest earnings show strong performance with ${revenue:,.0f} million revenue and {roe:.1f}% return on equity.",
            "Quarterly results: {company} generated ${revenue:,.0f} million in sales with gross margin of {gross_margin:.1f}%."
        ]

        # Sample financial metrics for synthetic generation
        sample_companies = ["TechCorp", "InnovateCo", "GlobalTech", "DataSystems", "CloudTech", "FinanceInc"]

        for i in range(num_needed):
            company = sample_companies[i % len(sample_companies)]

            # Generate realistic financial metrics
            revenue = np.random.uniform(1000, 50000)  # Million USD
            profit_margin = np.random.uniform(5, 30)
            operating_margin = np.random.uniform(8, 25)
            gross_margin = np.random.uniform(20, 60)
            net_income = revenue * (profit_margin / 100)
            pe_ratio = np.random.uniform(10, 35)
            roe = np.random.uniform(8, 25)

            # Create synthetic text
            template = templates[i % len(templates)]
            text = template.format(
                company=company,
                revenue=revenue,
                profit_margin=profit_margin,
                operating_margin=operating_margin,
                gross_margin=gross_margin,
                net_income=net_income,
                pe_ratio=pe_ratio,
                roe=roe
            )

            # Create target metrics
            target_metrics = {
                "revenue": f"${revenue:,.0f} million",
                "profit_margin": f"{profit_margin:.1f}%",
                "net_income": f"${net_income:,.0f} million"
            }

            # Format as training example
            user_message = f"Extract financial metrics from the following text about {company}:\n\n{text}"
            assistant_message = json.dumps(target_metrics)

            synthetic_examples.append({
                'user': user_message,
                'assistant': assistant_message
            })

        return synthetic_examples

In [20]:
"""
Model architecture for financial metrics extraction
"""
import torch
import torch.nn as nn
from transformers import (
    AutoTokenizer, AutoModelForCausalLM,
    TrainingArguments, Trainer, DataCollatorForLanguageModeling
)
from datasets import Dataset
import logging
from typing import Dict, List, Any
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FinancialMetricsModel:
    """Financial Metrics Extraction Model"""

    def __init__(self, config):
        self.config = config
        self.tokenizer = None
        self.model = None
        self.trainer = None

    def load_model_and_tokenizer(self):
        """Load pre-trained model and tokenizer"""
        print(f"Loading model: {self.config.model.model_name}")

        self.tokenizer = AutoTokenizer.from_pretrained(self.config.model.model_name)
        self.model = AutoModelForCausalLM.from_pretrained(self.config.model.model_name)

        # Add padding token if it doesn't exist
        if self.tokenizer.pad_token is None:
            self.tokenizer.pad_token = self.tokenizer.eos_token

        print("Model and tokenizer loaded successfully")

    def prepare_datasets(self, train_data, val_data, test_data):
        """Prepare datasets for training"""
        def tokenize_function(examples):
            # Combine user and assistant messages
            combined_texts = []
            for user_msg, assistant_msg in zip(examples['user'], examples['assistant']):
                combined_text = f"{user_msg}\n\nAssistant: {assistant_msg}"
                combined_texts.append(combined_text)

            # Tokenize
            tokenized = self.tokenizer(
                combined_texts,
                truncation=True,
                max_length=self.config.model.max_length,
                padding=True,
                return_tensors="pt"
            )

            # For language modeling, labels are the same as input_ids
            tokenized["labels"] = tokenized["input_ids"].clone()

            return tokenized

        # Convert to Hugging Face datasets
        train_dataset = Dataset.from_pandas(train_data)
        val_dataset = Dataset.from_pandas(val_data)
        test_dataset = Dataset.from_pandas(test_data)

        # Tokenize datasets
        train_dataset = train_dataset.map(tokenize_function, batched=True)
        val_dataset = val_dataset.map(tokenize_function, batched=True)
        test_dataset = test_dataset.map(tokenize_function, batched=True)

        return train_dataset, val_dataset, test_dataset

    def setup_trainer(self, train_dataset, val_dataset):
        """Setup trainer for model training"""
        # Training arguments
        training_args = TrainingArguments(
            output_dir=self.config.paths.model_checkpoints_dir,
            overwrite_output_dir=True,
            num_train_epochs=self.config.model.num_epochs,
            per_device_train_batch_size=self.config.model.batch_size,
            per_device_eval_batch_size=self.config.model.batch_size,
            warmup_steps=self.config.model.warmup_steps,
            weight_decay=self.config.model.weight_decay,
            learning_rate=self.config.model.learning_rate,
            logging_dir=f"{self.config.paths.output_dir}/logs",
            logging_steps=50,
            save_steps=self.config.model.save_steps,
            eval_steps=self.config.model.eval_steps,
            evaluation_strategy="steps",
            save_total_limit=self.config.model.save_total_limit,
            load_best_model_at_end=True,
            metric_for_best_model="eval_loss",
            greater_is_better=False,
            dataloader_num_workers=4,
            prediction_loss_only=True,
        )

        # Data collator
        data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer,
            mlm=False,  # We're doing causal language modeling
        )

        # Initialize trainer
        self.trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            data_collator=data_collator,
            tokenizer=self.tokenizer,
        )

        print("Trainer setup completed")

    def train(self):
        """Train the model"""
        print("Starting model training...")

        # Train the model
        self.trainer.train()

        # Save the final model
        final_model_path = f"{self.config.paths.model_dir}/final_model"
        self.trainer.save_model(final_model_path)
        self.tokenizer.save_pretrained(final_model_path)

        print(f"Training completed. Model saved to {final_model_path}")

    def generate_metrics(self, text: str, max_length: int = 200) -> str:
        """Generate financial metrics for given text"""
        # Prepare input
        input_text = f"Extract financial metrics from the following text:\n\n{text}\n\nAssistant:"

        # Tokenize
        inputs = self.tokenizer(
            input_text,
            return_tensors="pt",
            truncation=True,
            max_length=self.config.model.max_length
        )

        # Generate
        with torch.no_grad():
            outputs = self.model.generate(
                inputs["input_ids"],
                max_length=inputs["input_ids"].shape[1] + max_length,
                num_return_sequences=1,
                temperature=0.7,
                do_sample=True,
                pad_token_id=self.tokenizer.eos_token_id
            )

        # Decode response
        response = self.tokenizer.decode(
            outputs[0][inputs["input_ids"].shape[1]:],
            skip_special_tokens=True
        )

        return response.strip()

In [21]:
"""
Model training orchestration
"""
import pandas as pd
import logging
import os
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FinancialMetricsTrainer:
    """Orchestrate the training process"""

    def __init__(self, config):
        self.config = config
        self.model = FinancialMetricsModel(config)

    def load_processed_data(self):
        """Load processed training data"""
        train_data = pd.read_json(f"{self.config.paths.processed_data_dir}/train_data.json", lines=True)
        val_data = pd.read_json(f"{self.config.paths.processed_data_dir}/val_data.json", lines=True)
        test_data = pd.read_json(f"{self.config.paths.processed_data_dir}/test_data.json", lines=True)

        print(f"Loaded training data - Train: {len(train_data)}, Val: {len(val_data)}, Test: {len(test_data)}")
        return train_data, val_data, test_data

    def train_model(self):
        """Complete training pipeline"""
        # Load data
        train_data, val_data, test_data = self.load_processed_data()

        # Load model and tokenizer
        self.model.load_model_and_tokenizer()

        # Prepare datasets
        train_dataset, val_dataset, test_dataset = self.model.prepare_datasets(
            train_data, val_data, test_data
        )

        # Setup trainer
        self.model.setup_trainer(train_dataset, val_dataset)

        # Train model
        self.model.train()

        # Save test dataset for evaluation
        with open(f"{self.config.paths.processed_data_dir}/test_dataset.json", "w") as f:
            json.dump(test_data.to_dict('records'), f)

        print("Training pipeline completed successfully")

        return self.model

In [22]:
"""
Evaluation metrics for financial metrics extraction
"""
import json
import re
from typing import Dict, List, Tuple, Any
import pandas as pd
from sklearn.metrics import precision_recall_fscore_support
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class FinancialMetricsEvaluator:
    """Evaluate financial metrics extraction performance"""

    def __init__(self):
        self.metric_types = [
            'revenue', 'profit_margin', 'net_income', 'market_cap',
            'pe_ratio', 'eps', 'gross_margin', 'operating_margin',
            'roe', 'roa', 'debt_to_equity'
        ]

    def extract_metrics_from_json(self, json_string: str) -> Dict[str, Any]:
        """Extract metrics from JSON string"""
        try:
            return json.loads(json_string)
        except:
            # Fallback: try to extract metrics using regex
            metrics = {}

            # Revenue patterns
            revenue_patterns = [
                r'"revenue":\s*"?\$?([\d,]+\.?\d*)"?',
                r'"sales":\s*"?\$?([\d,]+\.?\d*)"?'
            ]

            # Margin patterns
            margin_patterns = [
                r'"(\w*_?margin)":\s*"?([\d,]+\.?\d*)%?"?',
                r'"(roe|roa)":\s*"?([\d,]+\.?\d*)%?"?'
            ]

            # Ratio patterns
            ratio_patterns = [
                r'"(pe_ratio|eps|debt_to_equity)":\s*"?([\d,]+\.?\d*)"?'
            ]

            for pattern_group in [revenue_patterns, margin_patterns, ratio_patterns]:
                for pattern in pattern_group:
                    matches = re.findall(pattern, json_string.lower())
                    for match in matches:
                        if isinstance(match, tuple):
                            key, value = match
                            metrics[key] = value
                        else:
                            metrics['revenue'] = match

            return metrics

    def normalize_metric_value(self, value: Any) -> float:
        """Normalize metric value to float"""
        if isinstance(value, str):
            # Remove currency symbols and commas
            cleaned = re.sub(r'[$,%]', '', value)
            try:
                return float(cleaned)
            except:
                return 0.0
        elif isinstance(value, (int, float)):
            return float(value)
        else:
            return 0.0

    def calculate_metric_accuracy(self, predicted: Dict[str, Any], actual: Dict[str, Any]) -> Dict[str, float]:
        """Calculate accuracy for each metric type"""
        accuracies = {}

        for metric_type in self.metric_types:
            if metric_type in predicted and metric_type in actual:
                pred_value = self.normalize_metric_value(predicted[metric_type])
                actual_value = self.normalize_metric_value(actual[metric_type])

                if actual_value != 0:
                    # Calculate percentage error
                    error = abs(pred_value - actual_value) / actual_value
                    accuracy = max(0, 1 - error)  # Accuracy as 1 - relative error
                    accuracies[metric_type] = accuracy
                else:
                    accuracies[metric_type] = 1.0 if pred_value == 0 else 0.0
            else:
                # Penalize missing predictions
                accuracies[metric_type] = 0.0

        return accuracies

    def calculate_extraction_metrics(self, predicted: Dict[str, Any], actual: Dict[str, Any]) -> Dict[str, float]:
        """Calculate precision, recall, F1 for metric extraction"""
        # Which metrics were predicted vs actually present
        predicted_metrics = set(predicted.keys())
        actual_metrics = set(actual.keys())

        # True positives: correctly identified metrics
        tp = len(predicted_metrics & actual_metrics)

        # False positives: predicted but not actually present
        fp = len(predicted_metrics - actual_metrics)

        # False negatives: actually present but not predicted
        fn = len(actual_metrics - predicted_metrics)

        # Calculate metrics
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

        return {
            'precision': precision,
            'recall': recall,
            'f1_score': f1,
            'true_positives': tp,
            'false_positives': fp,
            'false_negatives': fn
        }

    def evaluate_model_predictions(self, predictions: List[str], ground_truth: List[str]) -> Dict[str, Any]:
        """Evaluate model predictions against ground truth"""
        results = {
            'metric_accuracies': [],
            'extraction_metrics': [],
            'overall_accuracy': 0.0,
            'detailed_results': []
        }

        for i, (pred_str, truth_str) in enumerate(zip(predictions, ground_truth)):
            # Extract metrics from strings
            pred_metrics = self.extract_metrics_from_json(pred_str)
            truth_metrics = self.extract_metrics_from_json(truth_str)

            # Calculate metric-specific accuracies
            metric_accuracies = self.calculate_metric_accuracy(pred_metrics, truth_metrics)
            results['metric_accuracies'].append(metric_accuracies)

            # Calculate extraction metrics (precision, recall, F1)
            extraction_metrics = self.calculate_extraction_metrics(pred_metrics, truth_metrics)
            results['extraction_metrics'].append(extraction_metrics)

            # Store detailed results
            results['detailed_results'].append({
                'index': i,
                'predicted': pred_metrics,
                'actual': truth_metrics,
                'metric_accuracies': metric_accuracies,
                'extraction_metrics': extraction_metrics
            })

        # Calculate overall metrics
        if results['metric_accuracies']:
            # Average accuracy across all metrics and examples
            all_accuracies = []
            for acc_dict in results['metric_accuracies']:
                all_accuracies.extend(acc_dict.values())
            results['overall_accuracy'] = sum(all_accuracies) / len(all_accuracies) if all_accuracies else 0

            # Average extraction metrics
            avg_precision = sum(em['precision'] for em in results['extraction_metrics']) / len(results['extraction_metrics'])
            avg_recall = sum(em['recall'] for em in results['extraction_metrics']) / len(results['extraction_metrics'])
            avg_f1 = sum(em['f1_score'] for em in results['extraction_metrics']) / len(results['extraction_metrics'])

            results['average_precision'] = avg_precision
            results['average_recall'] = avg_recall
            results['average_f1'] = avg_f1

        return results

    def generate_evaluation_report(self, results: Dict[str, Any]) -> str:
        """Generate a comprehensive evaluation report"""
        report = []
        report.append("="*50)
        report.append("FINANCIAL METRICS EXTRACTION EVALUATION REPORT")
        report.append("="*50)

        # Overall metrics
        report.append(f"\nOVERALL PERFORMANCE:")
        report.append(f"Overall Accuracy: {results['overall_accuracy']:.4f}")
        report.append(f"Average Precision: {results['average_precision']:.4f}")
        report.append(f"Average Recall: {results['average_recall']:.4f}")
        report.append(f"Average F1-Score: {results['average_f1']:.4f}")

        # Metric-specific performance
        if results['metric_accuracies']:
            report.append(f"\nMETRIC-SPECIFIC ACCURACY:")
            metric_avgs = {}
            for metric_type in self.metric_types:
                accuracies = [ma.get(metric_type, 0) for ma in results['metric_accuracies']]
                avg_acc = sum(accuracies) / len(accuracies) if accuracies else 0
                metric_avgs[metric_type] = avg_acc
                report.append(f"{metric_type}: {avg_acc:.4f}")

        # Best and worst performers
        if results['detailed_results']:
            report.append(f"\nBEST PERFORMING EXAMPLES:")
            sorted_results = sorted(
                results['detailed_results'],
                key=lambda x: x['extraction_metrics']['f1_score'],
                reverse=True
            )

            for i, result in enumerate(sorted_results[:3]):
                report.append(f"\nExample {i+1} (F1: {result['extraction_metrics']['f1_score']:.4f}):")
                report.append(f"Predicted: {result['predicted']}")
                report.append(f"Actual: {result['actual']}")

        return "\n".join(report)

In [23]:
"""
Model evaluation orchestration
"""
import pandas as pd
import json
from typing import List, Dict, Any
import logging
import os

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ModelEvaluator:
    """Orchestrate model evaluation process"""

    def __init__(self, config):
        self.config = config
        self.evaluator = FinancialMetricsEvaluator()
        self.model = None

    def load_trained_model(self, model_path: str = None):
        """Load trained model for evaluation"""
        if model_path is None:
            model_path = f"{self.config.paths.model_dir}/final_model"

        self.model = FinancialMetricsModel(self.config)
        self.model.load_model_and_tokenizer()

        # Load the fine-tuned weights
        from transformers import AutoModelForCausalLM, AutoTokenizer
        self.model.model = AutoModelForCausalLM.from_pretrained(model_path)
        self.model.tokenizer = AutoTokenizer.from_pretrained(model_path)

        print(f"Loaded trained model from {model_path}")

    def load_test_data(self) -> pd.DataFrame:
        """Load test dataset"""
        test_data = pd.read_json(f"{self.config.paths.processed_data_dir}/test_data.json", lines=True)
        print(f"Loaded {len(test_data)} test examples")
        return test_data

    def generate_predictions(self, test_data: pd.DataFrame) -> List[str]:
        """Generate predictions for test data"""
        predictions = []

        for _, row in test_data.iterrows():
            # Extract the input text (remove the instruction part)
            user_text = row['user']
            # Find the actual financial text after the instruction
            text_start = user_text.find('\n\n') + 2
            input_text = user_text[text_start:] if text_start > 1 else user_text

            # Generate prediction
            prediction = self.model.generate_metrics(input_text)
            predictions.append(prediction)

            print(f"Generated prediction {len(predictions)}/{len(test_data)}")

        return predictions

    def evaluate_model(self) -> Dict[str, Any]:
        """Complete model evaluation"""
        # Load test data
        test_data = self.load_test_data()

        # Generate predictions
        predictions = self.generate_predictions(test_data)

        # Get ground truth
        ground_truth = test_data['assistant'].tolist()

        # Evaluate predictions
        results = self.evaluator.evaluate_model_predictions(predictions, ground_truth)

        # Generate report
        report = self.evaluator.generate_evaluation_report(results)

        # Save results
        self.save_evaluation_results(results, report, predictions, ground_truth)

        return results

    def save_evaluation_results(self, results: Dict[str, Any], report: str,
                              predictions: List[str], ground_truth: List[str]):
        """Save evaluation results"""
        os.makedirs(self.config.paths.output_dir, exist_ok=True)

        # Save detailed results
        with open(f"{self.config.paths.output_dir}/evaluation_results.json", "w") as f:
            json.dump(results, f, indent=2)

        # Save report
        with open(f"{self.config.paths.output_dir}/evaluation_report.txt", "w") as f:
            f.write(report)

        # Save predictions vs ground truth
        comparison_df = pd.DataFrame({
            'prediction': predictions,
            'ground_truth': ground_truth
        })
        comparison_df.to_csv(f"{self.config.paths.output_dir}/predictions_comparison.csv", index=False)

        print("Evaluation results saved")

        # Print report
        print(report)

In [24]:
"""
Main training script for financial metrics extraction model
"""
import os
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def main():
    """Main training pipeline"""
    print("Starting Financial Metrics Extraction Model Training Pipeline")

    # Initialize configuration
    config = Config()

    # Create necessary directories
    os.makedirs(config.paths.data_dir, exist_ok=True)
    os.makedirs(config.paths.raw_data_dir, exist_ok=True)
    os.makedirs(config.paths.processed_data_dir, exist_ok=True)
    os.makedirs(config.paths.model_dir, exist_ok=True)
    os.makedirs(config.paths.output_dir, exist_ok=True)

    # Step 1: Data Collection
    print("Step 1: Collecting financial data and news...")
    collector = FinancialDataCollector(config)

    # Collect news data
    news_data = collector.collect_news_data()

    # Collect financial metrics
    financial_data = collector.collect_financial_metrics()

    # Save raw data
    collector.save_raw_data(news_data, financial_data)

    # Step 2: Data Preprocessing
    print("Step 2: Preprocessing data for training...")
    preprocessor = FinancialDataPreprocessor(config)

    # Load raw data
    news_df, financial_df = preprocessor.load_raw_data()

    # Create training examples
    training_examples = preprocessor.create_training_examples(news_df, financial_df)

    # Convert to conversational format
    conversational_data = preprocessor.create_conversational_format(training_examples)

    # Split and save data
    train_data, val_data, test_data = preprocessor.split_and_save_data(conversational_data)

    # Step 3: Model Training
    print("Step 3: Training the model...")
    trainer = FinancialMetricsTrainer(config)

    # Train model
    trained_model = trainer.train_model()

    print("Training pipeline completed successfully!")
    print(f"Model saved to: {config.paths.model_dir}/final_model")
    print(f"Training data saved to: {config.paths.processed_data_dir}")

if __name__ == "__main__":
    main()

Starting Financial Metrics Extraction Model Training Pipeline
Step 1: Collecting financial data and news...
Collecting news for Apple
Collecting news for Microsoft
Collecting news for Google
Collecting news for Amazon
Collecting news for Tesla
Collecting news for Meta
Collecting news for Netflix
Collecting news for Nvidia
Collecting news for JPMorgan
Collecting news for Goldman Sachs
Collecting news for Boeing
Collecting news for Coca Cola
Collecting news for Disney
Collecting news for McDonald's
Collecting news for Nike
Collecting news for Walmart
Collecting news for Visa
Collecting news for Intel
Collecting news for Adobe
Collecting news for Salesforce
Collected 1 articles
Collecting financial data for Apple (AAPL)
Collecting financial data for Microsoft (MSFT)
Collecting financial data for Google (GOOGL)
Collecting financial data for Amazon (AMZN)
Collecting financial data for Tesla (TSLA)
Collecting financial data for Meta (META)
Collecting financial data for Netflix (NFLX)
Collect

ValueError: With n_samples=1, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

In [16]:
print(collector)

NameError: name 'collector' is not defined