In [None]:
import pandas as pd
import numpy as np
from datasets import Dataset, load_dataset
from dateutil import parser
import os
import sys
from dotenv import load_dotenv
from huggingface_hub import login
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
import logging
from dataclasses import dataclass
from sqlalchemy import create_engine
import asyncio
from concurrent.futures import ThreadPoolExecutor


@dataclass
class DataConfig:
    """Configuration for data pipeline"""
    hf_token: str
    db_connection_string: str
    s3_bucket: str
    batch_size: int = 1000
    max_workers: int = 4


class DataProcessor(ABC):
    """Abstract base class for data processors"""
    
    @abstractmethod
    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        pass


class FinancialNewsProcessor(DataProcessor):
    """Processes financial news data"""
    
    
    
    def process(self, data: pd.DataFrame) -> pd.DataFrame:
        """Clean and standardize financial news data"""
        
        print('Initiating data cleaning')
        print(f"Data shape: {data.shape}")
        print()
        print(f"Sample data: {data.head()}")
        print()
        
        # Text length calculation
        data['text_length'] = data['text'].apply(len)
        
        # Extract news medium
        data['news_medium'] = (
            data["url"]
            .str.extract(r"https?://([^/]+)")[0]
            .str.replace("www.", "", regex=False)
            .str[:-4]
        )
        
        
        # Standardize dates
        data['Date'] = data['Date'].apply(
            lambda date: parser.parse(date) if pd.notna(date) else np.NaN
        )
        
        # Extract dates from URLs
        url_dates = (data['url']
                    .str.extract(r'/(\d{4})[/\-](\d{2})[/\-](\d{2})/')
                    .dropna()
                    .T.apply('-'.join))
        
        print('Dates extracted')
        
        data['Date'] = data['Date'].fillna(url_dates)
        data['Date'] = pd.to_datetime(data['Date'], errors='coerce')
        data['Date'] = data['Date'].dt.strftime("%Y-%m-%d")
        
        # Extract news types
        data['news_type'] = data['url'].str.extract(r'/([a-zA-Z]+)/')[0]
        
        return data


class DataSink(ABC):
    """Abstract base class for data sinks"""
    
    @abstractmethod
    async def write(self, data: pd.DataFrame) -> bool:
        pass


class DatabaseSink(DataSink):
    """Writes data to analytics database"""
    
    def __init__(self, connection_string: str):
        self.engine = create_engine(connection_string)
    
    async def write(self, data: pd.DataFrame) -> bool:
        try:
            data.to_sql('financial_news', self.engine, if_exists='append', index=False)
            return True
        except Exception as e:
            logging.error(f"Database write failed: {e}")
            return False


class DataPipeline:
    """Main data pipeline orchestrator"""
    
    def __init__(self, config: DataConfig):
        self.config = config
        self.processor = FinancialNewsProcessor()
        self.sinks = [
            DatabaseSink(config.db_connection_string)
        ]
        
        # Setup logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def extract_data(self) -> pd.DataFrame:
        """Extract data from multiple sources"""
        
        # Load datasets
        reuters_df = load_dataset("danidanou/Reuters_Financial_News")['train'].to_pandas()
        finnews_df = load_dataset("ashraq/financial-news-articles")['train'].to_pandas()
        
        # Standardize reuters columns
        reuters_df = reuters_df.rename(columns={
            'Article': 'text',
            'Link': 'url',
            'Headline': 'title'
        })
        
        reuters_df = reuters_df.drop(columns=['Journalists', '__index_level_0__', 'Summary'])
        
        # Merge datasets
        merged_df = pd.concat([reuters_df, finnews_df])
        
        # Shuffle and deduplicate
        merged_df = merged_df.sample(frac=1).reset_index(drop=True)
        merged_df = merged_df.drop_duplicates(subset=['url'], keep='first')
        
        self.logger.info(f"Extracted {len(merged_df)} records")
        return merged_df
    
    def transform_data(self, data: pd.DataFrame) -> pd.DataFrame:
        """Transform data using processor"""
        return self.processor.process(data)
    
    async def load_data(self, data: pd.DataFrame) -> Dict[str, bool]:
        """Load data to multiple sinks concurrently"""
        
        # Process in batches
        results = {}
        
        for i in range(0, len(data), self.config.batch_size):
            batch = data.iloc[i:i + self.config.batch_size]
            
            # Write to all sinks concurrently
            tasks = []
            for sink in self.sinks:
                tasks.append(sink.write(batch))
            
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # Log results
            for j, result in enumerate(batch_results):
                
                sink_name = type(self.sinks[j]).__name__
                if sink_name not in results:
                    results[sink_name] = []
                results[sink_name].append(result)
        
        return results
    
    async def run(self):
        """Run the complete pipeline"""
        
        self.logger.info("Starting data pipeline...")
        
        # Extract
        raw_data = self.extract_data()
        print('Extraction step complete')
        # Transform
        processed_data = self.transform_data(raw_data)
        print('Transform step complete')
        
        # Load
        results = await self.load_data(processed_data)
        print('Load step complete')
        
        self.logger.info(f"Pipeline completed. Results: {results}")
        return results


# Usage example
async def main():
    config = DataConfig(
        hf_token=os.getenv("MACBOOK_HF_KEY"),
        db_connection_string=os.getenv("DB_URL"),
        s3_bucket=os.getenv("DB_TOKEN"),
        batch_size=1000
    )
    pipeline = DataPipeline(config)
    await pipeline.run()


if __name__ == "__main__":
    load_dotenv()
    await main()

INFO:__main__:Starting data pipeline...
INFO:__main__:Extracted 411547 records


Extraction step complete
Initiating data cleaning
Data shape: (411547, 4)

Sample data:                                                title  \
0  AppRiver welcomes Kevin Hatch as Chief Financi...   
1  Italy president names ex-IMF official as inter...   
2  Nobel Biocare CEO aims for growth, not M&A: paper   
3  Atria Wealth Solutions Acquires Cadaret, Grant...   
4  HSBC third-quarter profit rises to $5 billion,...   

                           Date  \
0                           NaN   
1                           NaN   
2  Sun Aug 26, 2012 10:26am EDT   
3                           NaN   
4    Mon Nov 4, 2013 9:20am EST   

                                                 url  \
0  http://www.cnbc.com/2018/01/24/globe-newswire-...   
1  https://www.reuters.com/article/us-italy-polit...   
2  http://www.reuters.com/article/2012/08/26/us-n...   
3  http://www.cnbc.com/2018/04/19/business-wire-a...   
4  http://www.reuters.com/article/2013/11/04/us-h...   

                           



Dates extracted
Transform step complete


INFO:__main__:Pipeline completed. Results: {'DatabaseSink': [True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, True, Tru

Load step complete
