In [3]:
import numpy as np
import pandas as pd
import re
import unicodedata
import logging
from typing import List, Dict, Any, Optional, Tuple
import json
import matplotlib.pyplot as plt
import seaborn as sns

# Preprocessing and feature extraction libraries
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import MinMaxScaler

class EnhancedDifferentialEvolutionDenoiser:
    def __init__(self,
                 population_size: int = 50,
                 max_iterations: int = 100,
                 F: float = 0.8,
                 CR: float = 0.7,
                 logging_level: int = logging.INFO):
        """
        Initialize Enhanced Differential Evolution Denoiser for Tweet Cleaning

        :param population_size: Number of solution candidates
        :param max_iterations: Maximum iterations for optimization
        :param F: Differential weight (mutation strength)
        :param CR: Crossover rate
        :param logging_level: Logging verbosity
        """
        # Logging configuration
        logging.basicConfig(level=logging_level,
                            format='%(asctime)s - %(levelname)s: %(message)s')
        self.logger = logging.getLogger(__name__)

        # DE algorithm parameters
        self.population_size = population_size
        self.max_iterations = max_iterations
        self.F = F  # Mutation scale factor
        self.CR = CR  # Crossover rate

        # Preprocessing resources
        self.stop_words = self._load_stop_words()
        self.health_keywords = self._load_health_keywords()

        # Language detection indicators
        self.language_indicators = {
            'en': ['cholera', 'outbreak', 'disease', 'water', 'hygiene'],
            'es': ['cólera', 'brote', 'enfermedad', 'agua', 'higiene'],
            'fr': ['choléra', 'épidémie', 'maladie', 'eau', 'hygiène']
        }

    # [Previous methods remain the same]
    def _load_stop_words(self) -> set:
        """
        Load comprehensive stop words with health domain specificity.

        :return: Set of stop words
        """
        basic_stop_words = {
            'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for',
            'of', 'with', 'by', 'from', 'up', 'about', 'into', 'over', 'after'
        }

        social_media_stop_words = {
            'rt', 'via', 'http', 'https', 'www', 'retweet', 'tweet',
            'dm', 'follow', 'unfollow'
        }

        health_stop_words = {
            'patient', 'doctor', 'hospital', 'medical', 'health',
            'treatment', 'medicine', 'care', 'clinic'
        }

        return basic_stop_words.union(social_media_stop_words).union(health_stop_words)

    def _load_health_keywords(self, custom_path: Optional[str] = None) -> set:
        """
        Load health-related keywords for cholera context.

        :param custom_path: Optional path to custom keywords JSON
        :return: Set of health-related keywords
        """
        default_keywords = {
            'cholera', 'outbreak', 'epidemic', 'water-borne', 'sanitation',
            'hygiene', 'diarrhea', 'dehydration', 'infection', 'treatment',
            'prevention', 'water', 'sewage', 'clean water', 'public health'
        }

        if custom_path:
            try:
                with open(custom_path, 'r') as f:
                    custom_keywords = set(json.load(f))
                default_keywords.update(custom_keywords)
            except Exception as e:
                self.logger.warning(f"Could not load custom keywords: {e}")

        return default_keywords

    def _preprocess_text(self, text: str) -> str:
        """
        Advanced preprocessing for health-related social media text.

        :param text: Input tweet text
        :return: Cleaned and normalized text
        """
        if not isinstance(text, str):
            text = str(text)
        # Normalize unicode characters
        text = unicodedata.normalize('NFKD', text).encode('ascii', 'ignore').decode('utf-8')

        # Remove URLs and web references
        text = re.sub(r'http\S+|www\S+|https\S+', '', text, flags=re.MULTILINE)

        # Remove mentions and hashtags
        text = re.sub(r'@\w+|#\w+', '', text)

        # Remove special characters and digits
        text = re.sub(r'[^a-zA-Z\s]', '', text)

        # Convert to lowercase
        text = text.lower()

        # Remove extra whitespaces
        text = ' '.join(text.split())

        return text

    def _calculate_domain_relevance(self, text: str) -> float:
        """
        Calculate the domain-specific relevance score for a tweet.

        :param text: Preprocessed tweet text
        :return: Relevance score (0-1 range)
        """
        # Count health-related keywords
        keyword_matches = sum(1 for keyword in self.health_keywords if keyword in text)

        # Calculate keyword density
        words = text.split()
        keyword_density = keyword_matches / len(words) if words else 0

        return min(keyword_density * 2, 1.0)  # Cap at 1.0

    def _fitness_function(self, tweets: List[str]) -> float:
        """
        Fitness function to evaluate the quality of denoised tweets.

        :param tweets: List of preprocessed tweets
        :return: Fitness score (higher is better)
        """
        # Calculate domain relevance for all tweets
        relevance_scores = [self._calculate_domain_relevance(tweet) for tweet in tweets]

        # Compute average relevance
        avg_relevance = np.mean(relevance_scores)

        # Penalize tweets with low domain relevance
        penalty = len([score for score in relevance_scores if score < 0.3])

        return avg_relevance - (penalty * 0.1)

    def _mutate(self, population: List[List[str]]) -> List[List[str]]:
        """
        Mutate the population using Differential Evolution strategy.

        :param population: Current population of tweet lists
        :return: Mutated population
        """
        mutated_population = []

        for i in range(len(population)):
            # Randomly select three distinct vectors
            candidates = list(range(len(population)))
            candidates.remove(i)

            r1, r2, r3 = np.random.choice(candidates, 3, replace=False)

            # Mutation strategy: DE/rand/1
            mutant = population[r1].copy()

            # Randomly choose a subset of tweets to mutate
            num_mutate = np.random.randint(1, len(mutant))
            mutate_indices = np.random.choice(len(mutant), num_mutate, replace=False)

            for idx in mutate_indices:
                # Apply contextual mutation based on other population members
                candidate_tweet = population[r2][idx]
                base_tweet = population[r3][idx]

                # Perform crossover and mutation
                if np.random.random() < self.CR:
                    # Remove stop words
                    candidate_words = candidate_tweet.split()
                    base_words = base_tweet.split()

                    # Combine and remove stop words
                    mutated_words = [
                        word for word in candidate_words + base_words
                        if word not in self.stop_words
                    ]

                    # Reconstruct tweet
                    mutant[idx] = ' '.join(set(mutated_words))

            mutated_population.append(mutant)

        return mutated_population

    def _select_best_population(self,
                                 original_population: List[List[str]],
                                 mutated_population: List[List[str]]) -> List[List[str]]:
        """
        Select the best population based on fitness function.

        :param original_population: Original tweet populations
        :param mutated_population: Mutated tweet populations
        :return: Selected population
        """
        selected_population = []

        for orig, mutant in zip(original_population, mutated_population):
            orig_fitness = self._fitness_function(orig)
            mutant_fitness = self._fitness_function(mutant)

            # Select population with higher fitness
            selected_population.append(
                orig if orig_fitness >= mutant_fitness else mutant
            )

        return selected_population

    def denoise_tweets(self, tweets: List[str]) -> Dict[str, Any]:
        """
        Apply Differential Evolution for tweet denoising.

        :param tweets: Input tweets
        :return: Denoised tweets and performance metrics
        """
        # Preprocess input tweets
        valid_tweets = [tweet for tweet in tweets if tweet is not None]
        preprocessed_tweets = [self._preprocess_text(tweet) for tweet in valid_tweets]

        # Initialize population
        population = [preprocessed_tweets.copy() for _ in range(self.population_size)]

        # Differential Evolution optimization
        best_population = population
        best_fitness = self._fitness_function(preprocessed_tweets)

        for iteration in range(self.max_iterations):
            # Mutation step
            mutated_population = self._mutate(best_population)

            # Selection step
            selected_population = self._select_best_population(
                best_population, mutated_population
            )

            # Update best population
            current_fitness = self._fitness_function(selected_population[0])

            if current_fitness > best_fitness:
                best_population = selected_population
                best_fitness = current_fitness

                self.logger.info(f"Iteration {iteration}: Best Fitness = {best_fitness:.4f}")

        # Calculate domain relevance scores
        domain_relevance_scores = [
            self._calculate_domain_relevance(tweet)
            for tweet in best_population[0]
        ]

        # Final denoising results
        denoising_results = {
            'original_tweets': tweets,
            'denoised_tweets': best_population[0],
            'final_fitness': best_fitness,
            'domain_relevance_scores': domain_relevance_scores
        }

        return denoising_results

    def visualize_results(self, results: Dict[str, Any], output_dir: str = '.'):
        """
        Create comprehensive visualizations for denoising results.

        :param results: Denoising results dictionary
        :param output_dir: Directory to save output files
        """
        import os
        import numpy as np

        # Ensure output directory exists
        os.makedirs(output_dir, exist_ok=True)

        # 1. Domain Relevance Distribution (Histogram)
        plt.figure(figsize=(12, 6))
        plt.subplot(1, 2, 1)
        sns.histplot(results['domain_relevance_scores'], bins=10, kde=True, color='skyblue', edgecolor='black')
        plt.title('Domain Relevance Score Distribution')
        plt.xlabel('Domain Relevance Score')
        plt.ylabel('Frequency')

        # 2. Domain Relevance Box Plot
        plt.subplot(1, 2, 2)
        sns.boxplot(x=results['domain_relevance_scores'], color='lightgreen')
        plt.title('Domain Relevance Score Box Plot')
        plt.xlabel('Domain Relevance Score')

        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'domain_relevance_analysis.png'))
        plt.close()

        # 3. Word Cloud of Denoised Tweets
        from wordcloud import WordCloud

        # Combine all denoised tweets
        all_denoised_text = ' '.join(filter(None, results['denoised_tweets']))

        wordcloud = WordCloud(width=800, height=400,
                            background_color='white',
                            min_font_size=10).generate(all_denoised_text)

        plt.figure(figsize=(16,8))
        plt.imshow(wordcloud, interpolation='bilinear')
        plt.axis("off")
        plt.title("Most Frequent Words in Denoised Tweets")
        plt.tight_layout(pad=0)
        plt.savefig(os.path.join(output_dir, 'denoised_tweets_wordcloud.png'))
        plt.close()

        # 4. Scatter plot of Original vs Denoised Tweet Length
        plt.figure(figsize=(10, 6))
        orig_tweets_filtered = [tweet for tweet in results['original_tweets'] if tweet is not None]
        orig_lengths = [len(tweet.split()) if isinstance(tweet, str) else 0 for tweet in orig_tweets_filtered]
        denoised_lengths = [len(tweet.split()) if isinstance(tweet, str) else 0 for tweet in results['denoised_tweets']]

        # Ensure both lists have the same length for the scatter plot
        min_len = min(len(orig_lengths), len(denoised_lengths))
        orig_lengths = orig_lengths[:min_len]
        denoised_lengths = denoised_lengths[:min_len]

        plt.scatter(orig_lengths, denoised_lengths, alpha=0.6)
        plt.plot([min(orig_lengths), max(orig_lengths)],
                [min(orig_lengths), max(orig_lengths)],
                color='red', linestyle='--')
        plt.title('Original vs Denoised Tweet Lengths')
        plt.xlabel('Original Tweet Length (words)')
        plt.ylabel('Denoised Tweet Length (words)')
        plt.tight_layout()
        plt.savefig(os.path.join(output_dir, 'tweet_length_comparison.png'))
        plt.close()

        orig_tweets_filtered = [tweet for tweet in results['original_tweets'] if tweet is not None] # Filter out None values
        min_len = min(len(orig_tweets_filtered), len(results['denoised_tweets']), len(results['domain_relevance_scores'])) # Get the minimum length

        results_df = pd.DataFrame({
            'Original Tweet': orig_tweets_filtered[:min_len], # Use filtered original tweets and slice to min_len
            'Denoised Tweet': results['denoised_tweets'][:min_len], # Slice to min_len
            'Domain Relevance': results['domain_relevance_scores'][:min_len]  # Slice to min_len
        })
        results_df.to_csv(os.path.join(output_dir, 'denoising_results.csv'), index=False)

        # Print comprehensive summary
        self._print_detailed_summary(results)

    def _print_detailed_summary(self, results: Dict[str, Any]):
        """
        Print a comprehensive summary of the denoising results.

        :param results: Denoising results dictionary
        """
        # Calculate additional statistics
        domain_relevance_scores = results['domain_relevance_scores']
        # Filter out None and non-string values before calculating lengths
        orig_lengths = [len(tweet.split()) for tweet in results['original_tweets'] if tweet is not None and isinstance(tweet, str)]
        denoised_lengths = [len(tweet.split()) for tweet in results['denoised_tweets'] if tweet is not None and isinstance(tweet, str)]
        print("\n--- Comprehensive Denoising Results Summary ---")
        print(f"Total Tweets Processed: {len(results['original_tweets'])}")
        print(f"Final Fitness Score: {results['final_fitness']:.4f}")

        # Domain Relevance Statistics
        print("\n--- Domain Relevance Analysis ---")
        print(f"Average Domain Relevance: {np.mean(domain_relevance_scores):.4f}")
        print(f"Median Domain Relevance: {np.median(domain_relevance_scores):.4f}")
        print(f"Highest Domain Relevance: {max(domain_relevance_scores):.4f}")
        print(f"Lowest Domain Relevance: {min(domain_relevance_scores):.4f}")
        print(f"Domain Relevance Standard Deviation: {np.std(domain_relevance_scores):.4f}")

        # Tweet Length Analysis
        print("\n--- Tweet Length Analysis ---")
        print(f"Average Original Tweet Length: {np.mean(orig_lengths):.2f} words")
        print(f"Average Denoised Tweet Length: {np.mean(denoised_lengths):.2f} words")
        print(f"Median Original Tweet Length: {np.median(orig_lengths):.2f} words")
        print(f"Median Denoised Tweet Length: {np.median(denoised_lengths):.2f} words")

        # Tweets with Highest and Lowest Relevance
        print("\n--- Notable Tweets ---")
        # Find index of highest and lowest relevance tweets
        max_relevance_idx = domain_relevance_scores.index(max(domain_relevance_scores))
        min_relevance_idx = domain_relevance_scores.index(min(domain_relevance_scores))

        print("\nHighest Relevance Tweet:")
        print(f"Original: {results['original_tweets'][max_relevance_idx]}")
        print(f"Denoised: {results['denoised_tweets'][max_relevance_idx]}")
        print(f"Relevance Score: {domain_relevance_scores[max_relevance_idx]:.4f}")

        print("\nLowest Relevance Tweet:")
        print(f"Original: {results['original_tweets'][min_relevance_idx]}")
        print(f"Denoised: {results['denoised_tweets'][min_relevance_idx]}")
        print(f"Relevance Score: {domain_relevance_scores[min_relevance_idx]:.4f}")

    def interpret_results(self, results: Dict[str, Any]) -> str:
        """
        Provide a comprehensive, human-readable interpretation of the denoising results.

        :param results: Denoising results dictionary
        :return: Detailed textual interpretation of the results
        """
        # Calculate key statistics
        domain_relevance_scores = results['domain_relevance_scores']
        # Filter out None values before calculating lengths
        orig_lengths = [len(tweet.split()) for tweet in results['original_tweets'] if tweet is not None and isinstance(tweet, str)]
        denoised_lengths = [len(str(tweet).split()) for tweet in results['denoised_tweets'] if tweet is not None] # Convert tweet to string if it's not

        # Interpret Domain Relevance
        avg_relevance = np.mean(domain_relevance_scores)
        relevance_interpretation = (
            "Weak Relevance" if avg_relevance < 0.3 else
            "Moderate Relevance" if avg_relevance < 0.6 else
            "Strong Relevance"
        )

        # Interpret Length Changes
        avg_orig_length = np.mean(orig_lengths)
        avg_denoised_length = np.mean(denoised_lengths)
        length_change_pct = ((avg_denoised_length - avg_orig_length) / avg_orig_length) * 100

        # Interpret Length Change
        length_interpretation = (
            "Significantly Shortened" if length_change_pct < -20 else
            "Moderately Shortened" if length_change_pct < -10 else
            "Slightly Shortened" if length_change_pct < 0 else
            "Approximately Same Length" if abs(length_change_pct) < 10 else
            "Moderately Lengthened" if length_change_pct < 20 else
            "Significantly Lengthened"
        )

        # Identify Problematic Tweets
        low_relevance_count = sum(1 for score in domain_relevance_scores if score < 0.3)
        low_relevance_pct = (low_relevance_count / len(domain_relevance_scores)) * 100

        # Construct Comprehensive Interpretation


    def visualization_insights(self, results: Dict[str, Any]) -> str:
        """
        Provide insights about the visualization outputs.

        :param results: Denoising results dictionary
        :return: Textual description of visualization insights
        """
        domain_relevance_scores = results['domain_relevance_scores']



def main():
    # Load error handling for CSV
    def bad_line(x):
        return None

    # Load the CSV file
    try:
        df = pd.read_csv('merge-csv.com__673838347b7e7.csv',
                         encoding='unicode_escape',
                         on_bad_lines=bad_line,
                         engine='python',
                         skiprows=3)
    except FileNotFoundError:
        print("Error: CSV file not found. Please check the file path.")
        return
    except Exception as e:
        print(f"An error occurred while reading the CSV: {e}")
        return

    # Extract tweets
    tweets = df['Text'].tolist()

    denoiser = EnhancedDifferentialEvolutionDenoiser(
        population_size=20,
        max_iterations=50,
        logging_level=logging.INFO
    )

    denoising_results = denoiser.denoise_tweets(tweets)

    denoiser.visualize_results(denoising_results)
    interpretation = denoiser.interpret_results(denoising_results)
    print(interpretation)

    viz_insights = denoiser.visualization_insights(denoising_results)
    print(viz_insights)

if __name__ == "__main__":
    main()

2025-03-12 11:06:41,199 - INFO: Iteration 0: Best Fitness = -1568.6477
2025-03-12 11:06:45,165 - INFO: Iteration 1: Best Fitness = -1526.0334
2025-03-12 11:06:48,997 - INFO: Iteration 2: Best Fitness = -1524.9318
2025-03-12 11:06:53,050 - INFO: Iteration 3: Best Fitness = -1508.9267
2025-03-12 11:06:56,543 - INFO: Iteration 4: Best Fitness = -1504.3249
2025-03-12 11:07:04,117 - INFO: Iteration 6: Best Fitness = -1503.3244
2025-03-12 11:07:07,968 - INFO: Iteration 7: Best Fitness = -1501.7242
2025-03-12 11:07:15,528 - INFO: Iteration 9: Best Fitness = -1501.6242
2025-03-12 11:07:18,880 - INFO: Iteration 10: Best Fitness = -1501.6240
2025-03-12 11:07:22,442 - INFO: Iteration 11: Best Fitness = -1501.4240
2025-03-12 11:07:30,038 - INFO: Iteration 13: Best Fitness = -1501.4240
2025-03-12 11:10:01,248 - INFO: Iteration 46: Best Fitness = -1501.4239



--- Comprehensive Denoising Results Summary ---
Total Tweets Processed: 18552
Final Fitness Score: -1501.4239

--- Domain Relevance Analysis ---
Average Domain Relevance: 0.1761
Median Domain Relevance: 0.1333
Highest Domain Relevance: 1.0000
Lowest Domain Relevance: 0.0000
Domain Relevance Standard Deviation: 0.1786

--- Tweet Length Analysis ---
Average Original Tweet Length: 25.63 words
Average Denoised Tweet Length: 16.94 words
Median Original Tweet Length: 23.00 words
Median Denoised Tweet Length: 16.00 words

--- Notable Tweets ---

Highest Relevance Tweet:
Original: Meanwhile cholera... https://t.co/52W58busdA
Denoised: cholera meanwhile
Relevance Score: 1.0000

Lowest Relevance Tweet:
Original: @K00LIP Send me some ideas!!
Denoised: me some send ideas
Relevance Score: 0.0000
None
None
