In [1]:
import os
import sys
import re
import time
import warnings
from typing import List, Dict, Tuple, Optional

import torch
import nltk
from transformers import pipeline

warnings.filterwarnings('ignore')

PROJECT_ROOT = os.path.dirname(os.getcwd())
if PROJECT_ROOT not in sys.path:
    sys.path.insert(0, PROJECT_ROOT)

for resource in ['tokenizers/punkt', 'tokenizers/punkt_tab']:
    try:
        nltk.data.find(resource)
    except LookupError:
        nltk.download(resource.split('/')[-1], quiet=True)

print(f"PyTorch: {torch.__version__}")
print(f"CUDA: {torch.cuda.is_available()}")

PyTorch: 2.6.0+cu124
CUDA: True


## 1. Load FinBERT

In [4]:
finbert = pipeline(
    "sentiment-analysis",
    model="ProsusAI/finbert",
    framework="pt",
    truncation=True,
    max_length=512
)

print(f"FinBERT loaded on {finbert.device}")

Device set to use cuda:0


FinBERT loaded on cuda:0


## 2. Forward-Looking Detector

In [5]:
class ForwardLookingDetector:
    """Detects forward-looking statements using keywords + FinBERT sentiment."""
    
    PATTERNS = {
        'high': [r'\bwill\b', r'\bexpect', r'\bplan\s+to', r'\bintend', r'\bcommitted\s+to'],
        'medium': [r'\bbelieve', r'\banticipate', r'\bproject', r'\bforecast', r'\bestimate'],
        'low': [r'\bmay\b', r'\bcould\b', r'\bmight\b', r'\bpossible', r'\bwould\b']
    }
    
    WEIGHTS = {'high': 0.9, 'medium': 0.7, 'low': 0.5}
    
    def __init__(self, sentiment_pipeline):
        self.sentiment = sentiment_pipeline
        self._compiled = {
            level: [re.compile(p, re.IGNORECASE) for p in patterns]
            for level, patterns in self.PATTERNS.items()
        }
    
    def _classify_keyword(self, text: str) -> Tuple[Optional[str], Optional[str]]:
        for level in ['high', 'medium', 'low']:
            for pattern in self._compiled[level]:
                match = pattern.search(text)
                if match:
                    return level, match.group()
        return None, None
    
    def detect(self, text: str) -> List[Dict]:
        results = []
        for sent in nltk.sent_tokenize(text):
            level, keyword = self._classify_keyword(sent)
            if level:
                result = self.sentiment(sent[:512])[0]
                sentiment = result['label'].lower()
                score = result['score']
                modifier = {'positive': 1.0, 'neutral': 0.9, 'negative': 0.8}[sentiment]
                combined = self.WEIGHTS[level] * modifier * score
                
                results.append({
                    'sentence': sent,
                    'keyword_level': level,
                    'keyword': keyword,
                    'sentiment': sentiment,
                    'sentiment_score': score,
                    'combined_score': combined
                })
        
        return sorted(results, key=lambda x: -x['combined_score'])

## 3. Risk Detector

In [6]:
class RiskDetector:
    """Detects risk factors using patterns + FinBERT for severity."""
    
    PATTERNS = {
        'regulatory': [r'regulatory\s+risk', r'litigation', r'antitrust', r'compliance'],
        'financial': [r'material\s+adverse', r'liquidity', r'impairment', r'going\s+concern'],
        'operational': [r'supply\s+chain', r'cybersecurity', r'data\s+breach', r'system\s+failure'],
        'market': [r'competition', r'market\s+volatility', r'economic\s+downturn', r'currency'],
        'geopolitical': [r'geopolitical', r'trade\s+war', r'tariff', r'sanction'],
        'climate': [r'climate', r'environmental', r'ESG', r'sustainability']
    }
    
    def __init__(self, sentiment_pipeline):
        self.sentiment = sentiment_pipeline
        self._compiled = {
            cat: [re.compile(p, re.IGNORECASE) for p in patterns]
            for cat, patterns in self.PATTERNS.items()
        }
    
    def _get_sentence(self, text: str, start: int) -> str:
        for sent in nltk.sent_tokenize(text):
            pos = text.find(sent)
            if pos <= start < pos + len(sent):
                return sent
        return text[max(0, start-50):start+100]
    
    def detect(self, text: str) -> Dict[str, List[Dict]]:
        results = {}
        for category, patterns in self._compiled.items():
            risks = []
            seen = set()
            for pattern in patterns:
                for match in pattern.finditer(text):
                    context = self._get_sentence(text, match.start())
                    if context in seen:
                        continue
                    seen.add(context)
                    
                    result = self.sentiment(context[:512])[0]
                    sentiment = result['label'].lower()
                    score = result['score']
                    
                    if sentiment == 'negative':
                        severity, sev_score = 'high', 0.7 + 0.3 * score
                    elif sentiment == 'neutral':
                        severity, sev_score = 'medium', 0.4 + 0.3 * score
                    else:
                        severity, sev_score = 'low', 0.1 + 0.2 * score
                    
                    risks.append({
                        'match': match.group(),
                        'context': context,
                        'severity': severity,
                        'severity_score': sev_score,
                        'sentiment': sentiment
                    })
            if risks:
                results[category] = risks
        return results
    
    def score(self, risks: Dict) -> Tuple[float, str]:
        if not risks:
            return 0.0, 'Low'
        scores = [r['severity_score'] for cat in risks.values() for r in cat]
        avg = sum(scores) / len(scores) * 100
        qty = min(1.0, len(scores) / 20)
        final = min(100, avg * 0.7 + qty * 100 * 0.3)
        level = 'High' if final >= 60 else 'Medium' if final >= 30 else 'Low'
        return final, level

## 4. Evaluation

In [8]:
TEST_FL = """
Apple reported revenue of $94.8 billion for Q4 2024.
We expect revenue to grow 15% next fiscal year.
The company believes services will continue to drive growth.
Management anticipates margin improvement.
There may be currency headwinds.
We are committed to returning capital to shareholders.
Market conditions could adversely affect business.
"""

TEST_RISK = """
The Company faces significant regulatory risk.
We may experience material adverse effects from supply chain disruptions.
Geopolitical tensions could impact operations.
Competition is intense with pricing pressure.
Cybersecurity threats pose operational risk.
Climate regulations may require capital investments.
Our diversified model mitigates many risks.
"""

In [9]:
fl_detector = ForwardLookingDetector(finbert)
fl_results = fl_detector.detect(TEST_FL)

print("Forward-Looking Statements")
print("=" * 60)
for r in fl_results:
    print(f"[{r['keyword_level'].upper()}] '{r['keyword']}' | {r['sentiment']} ({r['sentiment_score']:.2f}) | combined={r['combined_score']:.3f}")
    print(f"  {r['sentence'][:80]}")
print(f"\nTotal: {len(fl_results)}")

Forward-Looking Statements
[HIGH] 'expect' | positive (0.95) | combined=0.857
  We expect revenue to grow 15% next fiscal year.
[HIGH] 'will' | positive (0.94) | combined=0.842
  The company believes services will continue to drive growth.
[MEDIUM] 'anticipate' | positive (0.95) | combined=0.668
  Management anticipates margin improvement.
[HIGH] 'committed to' | positive (0.58) | combined=0.523
  We are committed to returning capital to shareholders.
[LOW] 'may' | negative (0.94) | combined=0.375
  There may be currency headwinds.
[LOW] 'could' | negative (0.89) | combined=0.355
  Market conditions could adversely affect business.

Total: 6


In [10]:
risk_detector = RiskDetector(finbert)
risk_results = risk_detector.detect(TEST_RISK)
risk_score, risk_level = risk_detector.score(risk_results)

print("Risk Detection")
print("=" * 60)
print(f"Score: {risk_score:.0f}/100 ({risk_level})\n")
for cat, risks in risk_results.items():
    print(f"{cat}: {len(risks)} risks")
    for r in risks:
        print(f"  [{r['severity'].upper()}] {r['match']} | {r['sentiment']}")

You seem to be using the pipelines sequentially on GPU. In order to maximize efficiency please use a dataset


Risk Detection
Score: 67/100 (High)

regulatory: 1 risks
  [MEDIUM] regulatory risk | neutral
financial: 1 risks
  [HIGH] material adverse | negative
operational: 2 risks
  [HIGH] supply chain | negative
  [HIGH] Cybersecurity | negative
market: 1 risks
  [MEDIUM] Competition | neutral
geopolitical: 1 risks
  [HIGH] Geopolitical | negative
climate: 1 risks
  [MEDIUM] Climate | neutral


## 5. Baseline Comparison

In [11]:
from src.risk_detector import RiskDetector as BaselineRisk
from src.metrics_extractor import ForwardLookingDetector as BaselineFL

baseline_fl = BaselineFL()
baseline_risk = BaselineRisk()

b_fl = baseline_fl.detect(TEST_FL)
b_risk = baseline_risk.detect(TEST_RISK)
b_score, b_level = baseline_risk.get_risk_score(b_risk)

print("Baseline vs Enhanced")
print("=" * 40)
print(f"Forward-Looking: {len(b_fl)} vs {len(fl_results)}")
print(f"Risk Score: {b_score:.0f} ({b_level}) vs {risk_score:.0f} ({risk_level})")

  ✓ Forward-looking detector ready (keyword-based)
  ✓ Risk detector ready (keyword-based)
Baseline vs Enhanced
Forward-Looking: 6 vs 6
Risk Score: 12 (Low) vs 67 (High)


## 6. Performance

In [12]:
sample_path = os.path.join(PROJECT_ROOT, 'data', 'sample_docs', 'sample_section_1A.txt')
large_text = open(sample_path).read() if os.path.exists(sample_path) else TEST_RISK * 5
print(f"Test size: {len(large_text):,} chars")

t0 = time.time()
_ = baseline_risk.detect(large_text)
_ = baseline_fl.detect(large_text)
t_baseline = time.time() - t0

t0 = time.time()
_ = risk_detector.detect(large_text)
_ = fl_detector.detect(large_text)
t_enhanced = time.time() - t0

print(f"Baseline: {t_baseline:.3f}s")
print(f"Enhanced: {t_enhanced:.3f}s")
print(f"Overhead: {t_enhanced/t_baseline:.1f}x")

Test size: 5,000 chars
Baseline: 0.013s
Enhanced: 0.557s
Overhead: 43.0x


## Conclusion

- FinBERT adds sentiment-based severity scoring
- Distinguishes acknowledged vs severe risks
- Trade-off: increased inference time

**Usage**: Pass `finbert_pipeline` to `RiskDetector` or `ForwardLookingDetector` in `src/`.

In [13]:
import gc
del finbert, fl_detector, risk_detector
gc.collect()
if torch.cuda.is_available():
    torch.cuda.empty_cache()
print("Done.")

Done.
