In [None]:
import datetime
import json
import logging
import os
import requests
import psycopg2
import psycopg2.pool
import psycopg2.extras
import threading
import signal
from concurrent.futures import ThreadPoolExecutor
import traceback
import pandas as pd
import numpy as np
import re
from textblob import TextBlob
from datetime import datetime
import time
from typing import List, Tuple, Dict, Optional

In [None]:
# Environment Configuration
QDB_PG_NAME = os.getenv('QDB_PG_NAME', 'qdb')
QDB_PG_USER = os.getenv('QDB_PG_USER', 'admin')
QDB_PG_PASSWORD = os.getenv('QDB_PG_PASSWORD', 'quest')
QDB_PG_HOST = os.getenv('QDB_PG_HOST', 'localhost')  # Changed from docker_host_ip_address
QDB_PG_PORT = os.getenv('QDB_PG_PORT', '8812')

In [None]:
# Configuration Constants
BATCH_SIZE = 1000  # Process tweets in batches
MAX_WORKERS = 4    # Number of parallel processing threads
RETRY_ATTEMPTS = 3 # Number of retry attempts for failed operations

In [None]:
# Logging Configuration
logging.basicConfig(
    level=logging.INFO, 
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('twitter_sentiment.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

In [None]:
class TwitterSentimentProcessor:
    def __init__(self):
        """Initialize the Twitter Sentiment Processor"""
        self.connection_pool = None
        self.setup_connection_pool()
        self.processed_count = 0
        self.failed_count = 0
        self.start_time = None
        
    def setup_connection_pool(self):
        """Initialize database connection pool"""
        try:
            self.connection_pool = psycopg2.pool.SimpleConnectionPool(
                1, 10,
                dbname=QDB_PG_NAME,
                user=QDB_PG_USER,
                password=QDB_PG_PASSWORD,
                host=QDB_PG_HOST,
                port=QDB_PG_PORT
            )
            logger.info("✅ Database connection pool initialized successfully")
        except Exception as e:
            logger.error(f"Error initializing connection pool: {e}")
            raise

    def download_dataset(self, url: str, filename: str) -> bool:
        """Download dataset with improved error handling and progress tracking"""
        try:
            logger.info(f"Starting download from: {url}")
            
            # Add headers to avoid being blocked
            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
            
            response = requests.get(url, stream=True, headers=headers, timeout=30)
            response.raise_for_status()
            
            # Get file size for progress tracking
            total_size = int(response.headers.get('content-length', 0))
            downloaded_size = 0
            
            with open(filename, "wb") as file:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        file.write(chunk)
                        downloaded_size += len(chunk)
                        
                        # Show progress every 1MB
                        if downloaded_size % (1024 * 1024) == 0:
                            if total_size > 0:
                                progress = (downloaded_size / total_size) * 100
                                logger.info(f"Download progress: {progress:.1f}%")
            
            logger.info(f"Dataset downloaded successfully as {filename}")
            logger.info(f"File size: {downloaded_size / (1024*1024):.2f} MB")
            return True
            
        except requests.exceptions.RequestException as e:
            logger.error(f"Network error downloading dataset: {e}")
            return False
        except Exception as e:
            logger.error(f"Error downloading dataset: {e}")
            logger.error(traceback.format_exc())
            return False

    def create_questdb_table(self) -> bool:
        """Create QuestDB table with improved schema"""
        conn = self.connection_pool.getconn()
        try:
            with conn.cursor() as cur:
                # Drop table if exists (for testing - remove in production)
                # cur.execute("DROP TABLE IF EXISTS twitter_sentiment;")
                
                create_table_sql = """
                CREATE TABLE IF NOT EXISTS twitter_sentiment (
                    id LONG,
                    original_tweet STRING,
                    cleaned_tweet STRING,
                    sentiment_label STRING,
                    sentiment_score DOUBLE,
                    confidence_score DOUBLE,
                    word_count INT,
                    char_count INT,
                    processing_time_ms LONG,
                    created_at TIMESTAMP,
                    processed_at TIMESTAMP
                ) TIMESTAMP(processed_at) PARTITION BY DAY;
                """
                cur.execute(create_table_sql)
                conn.commit()
                logger.info("✅ QuestDB table created/verified successfully")
                return True
                
        except Exception as e:
            logger.error(f"Error creating QuestDB table: {e}")
            logger.error(traceback.format_exc())
            return False
        finally:
            self.connection_pool.putconn(conn)

    def clean_tweet(self, text: str) -> str:
        """Enhanced tweet cleaning function"""
        if pd.isna(text) or not isinstance(text, str):
            return ""
        
        # Convert to string and lowercase
        text = str(text).lower()
        
        # Remove URLs
        text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)
        
        # Remove user mentions and hashtags (but keep the content)
        text = re.sub(r'@\w+', '', text)
        text = re.sub(r'#(\w+)', r'\1', text)  # Keep hashtag content
        
        # Remove special characters but keep punctuation that affects sentiment
        text = re.sub(r'[^\w\s!?.,]', '', text)
        
        # Remove extra whitespace
        text = ' '.join(text.split())
        
        # Remove very short tweets (less than 3 characters)
        if len(text.strip()) < 3:
            return ""
            
        return text.strip()

    def get_enhanced_sentiment(self, text: str) -> Tuple[str, float, float]:
        """Enhanced sentiment analysis with confidence scoring"""
        if not text or len(text.strip()) < 3:
            return 'neutral', 0.0, 0.0
        
        try:
            blob = TextBlob(text)
            polarity = blob.sentiment.polarity
            subjectivity = blob.sentiment.subjectivity
            
            # More nuanced sentiment classification
            if polarity > 0.1:
                label = 'positive'
                confidence = min(polarity * 2, 1.0)  # Scale confidence
            elif polarity < -0.1:
                label = 'negative'
                confidence = min(abs(polarity) * 2, 1.0)
            else:
                label = 'neutral'
                confidence = 1.0 - abs(polarity)
            
            return label, polarity, confidence
            
        except Exception as e:
            logger.warning(f"Error in sentiment analysis for text: '{text[:50]}...': {e}")
            return 'neutral', 0.0, 0.0

    def process_tweet_batch(self, tweet_batch: List[Dict]) -> List[Dict]:
        """Process a batch of tweets for sentiment analysis"""
        processed_tweets = []
        
        for tweet_data in tweet_batch:
            try:
                start_time = time.time()
                
                # Clean the tweet
                original_tweet = tweet_data['tweet']
                cleaned_tweet = self.clean_tweet(original_tweet)
                
                # Skip if cleaning resulted in empty tweet
                if not cleaned_tweet:
                    continue
                
                # Get sentiment analysis
                sentiment_label, sentiment_score, confidence_score = self.get_enhanced_sentiment(cleaned_tweet)
                
                # Calculate processing time
                processing_time_ms = int((time.time() - start_time) * 1000)
                
                # Prepare data for insertion
                processed_tweet = {
                    'id': tweet_data['id'],
                    'original_tweet': original_tweet,
                    'cleaned_tweet': cleaned_tweet,
                    'sentiment_label': sentiment_label,
                    'sentiment_score': sentiment_score,
                    'confidence_score': confidence_score,
                    'word_count': len(cleaned_tweet.split()),
                    'char_count': len(cleaned_tweet),
                    'processing_time_ms': processing_time_ms,
                    'created_at': datetime.now(),
                    'processed_at': datetime.now()
                }
                
                processed_tweets.append(processed_tweet)
                
            except Exception as e:
                logger.warning(f"Error processing tweet {tweet_data.get('id', 'unknown')}: {e}")
                self.failed_count += 1
                continue
        
        return processed_tweets

    def batch_insert_to_questdb(self, processed_tweets: List[Dict]) -> bool:
        """Batch insert tweets into QuestDB for better performance"""
        if not processed_tweets:
            return True
            
        conn = self.connection_pool.getconn()
        try:
            with conn.cursor() as cur:
                # Prepare batch insert data
                insert_data = []
                for tweet in processed_tweets:
                    insert_data.append((
                        tweet['id'],
                        tweet['original_tweet'],
                        tweet['cleaned_tweet'],
                        tweet['sentiment_label'],
                        tweet['sentiment_score'],
                        tweet['confidence_score'],
                        tweet['word_count'],
                        tweet['char_count'],
                        tweet['processing_time_ms'],
                        tweet['created_at'],
                        tweet['processed_at']
                    ))
                
                # Batch insert using execute_values for better performance
                insert_sql = """
                INSERT INTO twitter_sentiment 
                (id, original_tweet, cleaned_tweet, sentiment_label, sentiment_score, 
                 confidence_score, word_count, char_count, processing_time_ms, created_at, processed_at)
                VALUES %s
                """
                
                psycopg2.extras.execute_values(
                    cur, insert_sql, insert_data,
                    template=None, page_size=1000
                )
                
                conn.commit()
                self.processed_count += len(processed_tweets)
                
                logger.info(f"Batch inserted {len(processed_tweets)} tweets. Total: {self.processed_count}")
                return True
                
        except Exception as e:
            logger.error(f"Error in batch insert: {e}")
            logger.error(traceback.format_exc())
            conn.rollback()
            return False
        finally:
            self.connection_pool.putconn(conn)

    def process_dataset(self, filename: str, sample_size: Optional[int] = None) -> bool:
        """Process the entire dataset with batching and parallel processing"""
        try:
            logger.info(f"📖 Loading dataset: {filename}")
            
            # Load dataset with different encoding options
            encodings = ['utf-8', 'latin1', 'iso-8859-1', 'cp1252']
            df = None
            
            for encoding in encodings:
                try:
                    df = pd.read_csv(filename, encoding=encoding)
                    logger.info(f"Dataset loaded successfully with {encoding} encoding")
                    break
                except UnicodeDecodeError:
                    continue
            
            if df is None:
                logger.error("Could not load dataset with any encoding")
                return False
            
            # Data validation and cleaning
            if 'tweet' not in df.columns:
                # Try to find the tweet column
                text_columns = [col for col in df.columns if 'text' in col.lower() or 'tweet' in col.lower()]
                if text_columns:
                    df = df.rename(columns={text_columns[0]: 'tweet'})
                else:
                    logger.error("No tweet column found in dataset")
                    return False
            
            # Remove null tweets
            df = df.dropna(subset=['tweet'])
            df = df[df['tweet'].astype(str).str.strip() != '']
            
            # Sample if requested
            if sample_size and len(df) > sample_size:
                df = df.sample(n=sample_size, random_state=42)
                logger.info(f"Using sample of {sample_size} tweets")
            
            # Add ID column
            df['id'] = range(1, len(df) + 1)
            
            logger.info(f"Dataset info:")
            logger.info(f"Total tweets: {len(df)}")
            logger.info(f"Columns: {list(df.columns)}")
            
            # Start processing
            self.start_time = time.time()
            self.processed_count = 0
            self.failed_count = 0
            
            # Process in batches
            total_batches = (len(df) + BATCH_SIZE - 1) // BATCH_SIZE
            
            for batch_idx in range(0, len(df), BATCH_SIZE):
                batch_df = df.iloc[batch_idx:batch_idx + BATCH_SIZE]
                batch_num = (batch_idx // BATCH_SIZE) + 1
                
                logger.info(f"Processing batch {batch_num}/{total_batches} ({len(batch_df)} tweets)")
                
                # Convert batch to list of dictionaries
                tweet_batch = batch_df.to_dict('records')
                
                # Process the batch
                processed_tweets = self.process_tweet_batch(tweet_batch)
                
                # Insert to database
                if processed_tweets:
                    success = self.batch_insert_to_questdb(processed_tweets)
                    if not success:
                        logger.warning(f"Failed to insert batch {batch_num}")
                
                # Progress update
                elapsed_time = time.time() - self.start_time
                tweets_per_second = self.processed_count / elapsed_time if elapsed_time > 0 else 0
                logger.info(f"Progress: {self.processed_count}/{len(df)} tweets processed ({tweets_per_second:.1f}/sec)")
            
            # Final statistics
            self.print_final_statistics(len(df))
            return True
            
        except Exception as e:
            logger.error(f"Error processing dataset: {e}")
            logger.error(traceback.format_exc())
            return False

    def print_final_statistics(self, total_tweets: int):
        """Print final processing statistics"""
        elapsed_time = time.time() - self.start_time if self.start_time else 0
        
        logger.info("=" * 50)
        logger.info("PROCESSING COMPLETE - FINAL STATISTICS")
        logger.info("=" * 50)
        logger.info(f"Successfully processed: {self.processed_count} tweets")
        logger.info(f"Failed to process: {self.failed_count} tweets")
        logger.info(f"Success rate: {(self.processed_count/total_tweets)*100:.1f}%")
        logger.info(f"Total processing time: {elapsed_time:.2f} seconds")
        logger.info(f"Average speed: {self.processed_count/elapsed_time:.1f} tweets/second")
        
        # Get sentiment distribution from database
        self.print_sentiment_distribution()

    def print_sentiment_distribution(self):
        """Print sentiment distribution from database"""
        conn = self.connection_pool.getconn()
        try:
            with conn.cursor() as cur:
                cur.execute("""
                    SELECT sentiment_label, COUNT(*) as count 
                    FROM twitter_sentiment 
                    GROUP BY sentiment_label 
                    ORDER BY count DESC
                """)
                results = cur.fetchall()
                
                if results:
                    logger.info("Sentiment Distribution:")
                    for label, count in results:
                        percentage = (count / self.processed_count) * 100 if self.processed_count > 0 else 0
                        logger.info(f"   {label.capitalize()}: {count} ({percentage:.1f}%)")
                        
        except Exception as e:
            logger.warning(f"Could not retrieve sentiment distribution: {e}")
        finally:
            self.connection_pool.putconn(conn)

    def run_complete_pipeline(self, url: str, filename: str, sample_size: Optional[int] = None):
        """Run the complete sentiment analysis pipeline"""
        logger.info("🚀 Starting Twitter Sentiment Analysis Pipeline")
        logger.info("=" * 60)
        
        try:
            # Step 1: Download dataset
            if not os.path.exists(filename):
                if not self.download_dataset(url, filename):
                    logger.error("Failed to download dataset")
                    return False
            else:
                logger.info(f"📁 Using existing dataset: {filename}")
            
            # Step 2: Create database table
            if not self.create_questdb_table():
                logger.error("Failed to create database table")
                return False
            
            # Step 3: Process dataset
            if not self.process_dataset(filename, sample_size):
                logger.error("Failed to process dataset")
                return False
            
            logger.info("Pipeline completed successfully!")
            return True
            
        except KeyboardInterrupt:
            logger.info("Pipeline interrupted by user")
            return False
        except Exception as e:
            logger.error(f"Pipeline failed with error: {e}")
            logger.error(traceback.format_exc())
            return False

In [None]:
def main():
    """Main function"""
    # Configuration
    url = "https://raw.githubusercontent.com/sharmaroshan/Twitter-Sentiment-Analysis/refs/heads/master/train_tweet.csv"
    filename = "twitter_dataset.csv"
    sample_size = 10000  # Process only 10K tweets for testing (set to None for full dataset)
    
    # Initialize processor
    processor = TwitterSentimentProcessor()
    
    try:
        # Run the complete pipeline
        success = processor.run_complete_pipeline(url, filename, sample_size)
        
        if success:
            logger.info("All operations completed successfully!")
        else:
            logger.error("Pipeline failed")
            
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        logger.error(traceback.format_exc())


if __name__ == "__main__":
    main()