In [8]:
import os
import traceback
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import numpy as np
from transformers import pipeline
from fpdf import FPDF
from scipy import stats
from datasets import load_dataset
import wordcloud  # For word cloud
import re  # For text cleaning

class ComprehensiveDatasetAnalyzer:
    def __init__(self, datasets):
        self.datasets = datasets

    def clean_text(self, text):
        # Remove special characters and numbers
        text = re.sub(r'[^a-zA-Z\s]', '', text.lower())
        
        # Remove common stopwords
        stopwords = set(['the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 
                         'to', 'for', 'of', 'with', 'by', 'from', 'up', 'about', 
                         'into', 'over', 'after', 'is', 'are', 'was', 'were'])
        
        # Split and filter out stopwords
        words = [word for word in text.split() if word not in stopwords]
        
        return ' '.join(words)

    def generate_word_cloud(self, dataset_name, data):
        output_dir = f"visualizations/nlp_{dataset_name}"
        os.makedirs(output_dir, exist_ok=True)

        # Combine all text
        all_text = ' '.join(data['text'].apply(self.clean_text))

        # Generate word cloud
        wordcloud_generator = wordcloud.WordCloud(
            width=800, 
            height=400, 
            background_color='white', 
            max_words=200, 
            colormap='viridis'
        ).generate(all_text)

        # Plot the word cloud
        plt.figure(figsize=(16,8))
        plt.imshow(wordcloud_generator, interpolation='bilinear')
        plt.axis('off')
        plt.title(f'Word Cloud - {dataset_name.upper()} Dataset')
        plt.tight_layout(pad=0)
        
        # Save the word cloud
        plt.savefig(f"{output_dir}/word_cloud.png")
        plt.close()

    def analyze_nlp_data(self, dataset_name, data):
        print(f"Analyzing NLP dataset: {dataset_name}")
        results = {}

        # Generate word cloud
        self.generate_word_cloud(dataset_name, data)

        # Enhanced text statistics
        data['text_length'] = data['text'].apply(len)
        data['word_count'] = data['text'].apply(lambda x: len(str(x).split()))
        data['unique_word_count'] = data['text'].apply(lambda x: len(set(str(x).split())))

        # Detailed text length analysis
        results['text_length_stats'] = {
            'basic_stats': data['text_length'].describe(),
            'skewness': stats.skew(data['text_length']),
            'kurtosis': stats.kurtosis(data['text_length'])
        }

        # Word count analysis
        results['word_count_stats'] = {
            'basic_stats': data['word_count'].describe(),
            'vocabulary_diversity': (data['unique_word_count'] / data['word_count']).mean()
        }

        

        # Label distribution
        if 'label' in data.columns:
            results['label_distribution'] = data['label'].value_counts(normalize=True)

        # Text complexity metrics
        def text_complexity(text):
            words = str(text).split()
            complex_words = [word for word in words if len(word) > 6]
            return len(complex_words) / len(words) if words else 0

        data['text_complexity'] = data['text'].apply(text_complexity)
        results['text_complexity'] = {
            'mean_complexity': data['text_complexity'].mean(),
            'complexity_distribution': data['text_complexity'].describe()
        }

        # Generate visualizations
        self._generate_nlp_visualizations(dataset_name, data, results)

        # Export detailed PDF report
        self.export_pdf_report(dataset_name, results, data_type="nlp")

        return results

    def _generate_nlp_visualizations(self, dataset_name, data, results):
        output_dir = f"visualizations/nlp_{dataset_name}"
        os.makedirs(output_dir, exist_ok=True)

        # Text length distribution
        plt.figure(figsize=(10, 6))
        sns.histplot(data['text_length'], kde=True)
        plt.title(f"Text Length Distribution - {dataset_name}")
        plt.xlabel("Text Length")
        plt.ylabel("Frequency")
        plt.tight_layout()
        plt.savefig(f"{output_dir}/text_length_distribution.png")
        plt.close()



        # Label distribution (if exists)
        if 'label' in data.columns:
            plt.figure(figsize=(10, 6))
            data['label'].value_counts().plot(kind='bar')
            plt.title(f"Label Distribution - {dataset_name}")
            plt.xlabel("Label")
            plt.ylabel("Count")
            plt.tight_layout()
            plt.savefig(f"{output_dir}/label_distribution.png")
            plt.close()

        # Text complexity boxplot
        plt.figure(figsize=(10, 6))
        sns.boxplot(x=data['text_complexity'])
        plt.title(f"Text Complexity Distribution - {dataset_name}")
        plt.xlabel("Text Complexity (Proportion of Complex Words)")
        plt.tight_layout()
        plt.savefig(f"{output_dir}/text_complexity_boxplot.png")
        plt.close()

    def analyze_structured_data(self, dataset_name, data):
        print(f"Analyzing Structured dataset: {dataset_name}")
        results = {}

        # Comprehensive data overview
        results['dataset_overview'] = {
            'total_rows': len(data),
            'total_columns': len(data.columns),
            'column_types': data.dtypes.to_dict()
        }

        # Detailed descriptive statistics for numeric columns
        numeric_cols = data.select_dtypes(include=[np.number]).columns
        results['numeric_stats'] = data[numeric_cols].describe().to_dict()

        # Advanced statistical analysis
        results['statistical_tests'] = {}
        for col in numeric_cols:
            # Normality test
            _, p_value = stats.normaltest(data[col].dropna())
            results['statistical_tests'][col] = {
                'normality_p_value': p_value,
                'is_normally_distributed': p_value > 0.05
            }

        # Null value and duplicate analysis
        results['data_quality'] = {
            'null_values': data.isnull().sum().to_dict(),
            'null_percentage': (data.isnull().sum() / len(data) * 100).to_dict(),
            'duplicate_rows': data[data.duplicated()].shape[0]
        }

        # Categorical variable analysis
        cat_cols = data.select_dtypes(include=['object', 'category']).columns
        results['categorical_analysis'] = {
            col: data[col].value_counts(normalize=True).to_dict() for col in cat_cols
        }

        # Correlation analysis
        numeric_data = data.select_dtypes(include=[np.number])
        results['correlation_analysis'] = {
            'correlation_matrix': numeric_data.corr().to_dict(),
            'high_correlations': self._find_high_correlations(numeric_data)
        }

        # Outlier detection for numeric columns
        results['outlier_analysis'] = {
            col: self._detect_outliers(data[col]) for col in numeric_cols
        }

        # Generate visualizations
        self._generate_structured_visualizations(dataset_name, data, results)

        # Export detailed PDF report
        self.export_pdf_report(dataset_name, results, data_type="structured")

        return results

    def _find_high_correlations(self, data, threshold=0.7):
        corr_matrix = data.corr().abs()
        upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        return [(col1, col2, corr_matrix.loc[col1, col2]) 
                for col1 in upper.columns 
                for col2 in upper.index 
                if upper.loc[col1, col2] > threshold]

    def _detect_outliers(self, series):
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = series[(series < lower_bound) | (series > upper_bound)]
        return {
            'outliers': outliers.tolist(),
            'outlier_percentage': len(outliers) / len(series) * 100,
            'lower_bound': lower_bound,
            'upper_bound': upper_bound
        }

    def _generate_structured_visualizations(self, dataset_name, data, results):
        output_dir = f"visualizations/structured_{dataset_name}"
        os.makedirs(output_dir, exist_ok=True)

        # Correlation heatmap for numeric data
        numeric_data = data.select_dtypes(include=[np.number])
        plt.figure(figsize=(12, 10))
        sns.heatmap(numeric_data.corr(), annot=True, cmap='coolwarm', center=0)
        plt.title(f"Correlation Heatmap - {dataset_name}")
        plt.tight_layout()
        plt.savefig(f"{output_dir}/correlation_heatmap.png")
        plt.close()

        # Boxplot for numeric features
        plt.figure(figsize=(15, 8))
        numeric_data.boxplot()
        plt.title(f"Boxplot of Numeric Features - {dataset_name}")
        plt.xticks(rotation=45)
        plt.tight_layout()
        plt.savefig(f"{output_dir}/numeric_boxplot.png")
        plt.close()

        # Categorical variable distribution
        cat_cols = data.select_dtypes(include=['object', 'category']).columns
        for col in cat_cols:
            plt.figure(figsize=(10, 6))
            data[col].value_counts().plot(kind='bar')
            plt.title(f"Distribution of {col} - {dataset_name}")
            plt.xlabel(col)
            plt.ylabel("Count")
            plt.tight_layout()
            plt.savefig(f"{output_dir}/{col}_distribution.png")
            plt.close()

    def export_pdf_report(self, dataset_name, analysis_results, data_type="general"):
        """Export a comprehensive analysis report as a PDF."""
        pdf = FPDF()
        pdf.add_page()
        pdf.set_font("Arial", size=12)

        pdf.cell(200, 10, txt=f"{data_type.upper()} Analysis Report for {dataset_name}", ln=True, align="C")
        pdf.ln(10)

        def write_dict_to_pdf(d, level=0):
            for key, value in d.items():
                pdf.set_font("Arial", style="B", size=10 + level)
                pdf.cell(200, 10, txt="  " * level + str(key), ln=True)
                pdf.set_font("Arial", size=10 - level)
                
                if isinstance(value, (dict, np.ndarray)):
                    write_dict_to_pdf(value, level + 1)
                else:
                    pdf.multi_cell(0, 10, txt=str(value))
                pdf.ln(5)

        write_dict_to_pdf(analysis_results)

        output_path = f"reports/{dataset_name}_{data_type}_analysis_report.pdf"
        os.makedirs("reports", exist_ok=True)
        pdf.output(output_path)
        print(f"PDF report saved at {output_path}")

    def perform_comprehensive_analysis(self):
        comprehensive_results = {}

        for dataset_name in self.datasets:
            print(f"Analyzing dataset: {dataset_name}")

            try:
                if dataset_name == "imdb":  # NLP Example
                    # Load full IMDB dataset from Hugging Face
                    dataset = load_dataset("imdb", split='train')
                    
                    # Convert to pandas DataFrame
                    data = pd.DataFrame(dataset)
                    
                    # Perform analysis
                    comprehensive_results[dataset_name] = self.analyze_nlp_data(dataset_name, data)

                elif dataset_name == "titanic":  # Structured Example
                    # Load Titanic dataset
                    data = sns.load_dataset("titanic")
                    
                    # Perform analysis
                    comprehensive_results[dataset_name] = self.analyze_structured_data(dataset_name, data)

                else:
                    print(f"Unknown dataset type for {dataset_name}, skipping.")

            except Exception as e:
                print(f"Error analyzing {dataset_name}: {e}")
                traceback.print_exc()

        return comprehensive_results

    def analyze_structured_data(self, dataset_name, data):
        print(f"Analyzing Structured dataset: {dataset_name}")
        results = {}

        # Comprehensive data overview
        results['dataset_overview'] = {
            'total_rows': len(data),
            'total_columns': len(data.columns),
            'column_types': data.dtypes.to_dict()
        }

        # Null value analysis with location tracking
        null_analysis = {}
        for column in data.columns:
            null_rows = data[data[column].isnull()]
            if not null_rows.empty:
                null_analysis[column] = {
                    'total_null': len(null_rows),
                    'null_percentage': (len(null_rows) / len(data)) * 100,
                    'row_indices': null_rows.index.tolist()
                }
        results['null_value_analysis'] = null_analysis

        # Duplicate rows analysis with location tracking
        duplicate_rows = data[data.duplicated(keep=False)]
        results['duplicate_rows_analysis'] = {
            'total_duplicates': len(duplicate_rows),
            'duplicate_percentage': (len(duplicate_rows) / len(data)) * 100,
            'duplicate_row_indices': duplicate_rows.index.tolist(),
            'duplicate_details': duplicate_rows.to_dict(orient='records') if not duplicate_rows.empty else {}
        }

        # Detailed descriptive statistics for numeric columns
        numeric_cols = data.select_dtypes(include=[np.number]).columns
        results['numeric_stats'] = data[numeric_cols].describe().to_dict()

        # Advanced statistical analysis
        results['statistical_tests'] = {}
        for col in numeric_cols:
            # Normality test
            _, p_value = stats.normaltest(data[col].dropna())
            results['statistical_tests'][col] = {
                'normality_p_value': p_value,
                'is_normally_distributed': p_value > 0.05
            }

        # Outlier detection with location tracking
        outlier_analysis = {}
        for col in numeric_cols:
            outliers_info = self._detect_outliers_with_location(data[col])
            if outliers_info['outliers']:
                outlier_analysis[col] = outliers_info
        results['outlier_analysis'] = outlier_analysis

        # Categorical variable analysis
        cat_cols = data.select_dtypes(include=['object', 'category']).columns
        results['categorical_analysis'] = {
            col: data[col].value_counts(normalize=True).to_dict() for col in cat_cols
        }

        # Correlation analysis
        numeric_data = data.select_dtypes(include=[np.number])
        results['correlation_analysis'] = {
            'correlation_matrix': numeric_data.corr().to_dict(),
            'high_correlations': self._find_high_correlations(numeric_data)
        }

        # Generate visualizations
        self._generate_structured_visualizations(dataset_name, data, results)

        # Export detailed PDF report
        self.export_pdf_report(dataset_name, results, data_type="structured")

        return results

    def _detect_outliers_with_location(self, series):
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # Detect outliers
        outlier_mask = (series < lower_bound) | (series > upper_bound)
        outliers = series[outlier_mask]
        
        return {
            'outliers': outliers.tolist(),
            'outlier_row_indices': outliers.index.tolist(),
            'outlier_details': [
                {
                    'row_index': idx, 
                    'value': value, 
                    'type': 'lower_outlier' if value < lower_bound else 'upper_outlier'
                } 
                for idx, value in outliers.items()
            ],
            'outlier_percentage': (len(outliers) / len(series)) * 100,
            'lower_bound': lower_bound,
            'upper_bound': upper_bound
        }


    def export_pdf_report(self, dataset_name, analysis_results, data_type="general"):

        pdf = FPDF()
        pdf.add_page()
        pdf.set_font("Arial", size=12)

        pdf.cell(200, 10, txt=f"{data_type.upper()} Analysis Report for {dataset_name}", ln=True, align="C")
        pdf.ln(10)

        def write_dict_to_pdf(d, level=0):
            for key, value in d.items():
                pdf.set_font("Arial", style="B", size=10 + level)
                pdf.cell(200, 10, txt="  " * level + str(key), ln=True)
                pdf.set_font("Arial", size=10 - level)
                
                if isinstance(value, (dict, list, np.ndarray)):
                    # Handle nested structures with more detailed formatting
                    if isinstance(value, dict):
                        write_dict_to_pdf(value, level + 1)
                    else:
                        # Limit the number of items shown to prevent overwhelming the PDF
                        limited_value = value[:50] if len(value) > 50 else value
                        pdf.multi_cell(0, 10, txt=str(limited_value))
                        if len(value) > 50:
                            pdf.multi_cell(0, 10, txt="... (truncated)")
                else:
                    pdf.multi_cell(0, 10, txt=str(value))
                pdf.ln(5)

        write_dict_to_pdf(analysis_results)

        output_path = f"reports/{dataset_name}_{data_type}_analysis_report.pdf"
        os.makedirs("reports", exist_ok=True)
        pdf.output(output_path)
        print(f"PDF report saved at {output_path}")


if __name__ == "__main__":
    # Analyze both IMDB and Titanic datasets
    datasets_to_analyze = ["imdb", "titanic"]
    analyzer = ComprehensiveDatasetAnalyzer(datasets_to_analyze)
    results = analyzer.perform_comprehensive_analysis()
    print("Analysis complete!")

Analyzing dataset: imdb
Analyzing NLP dataset: imdb


  with pd.option_context('mode.use_inf_as_na', True):


PDF report saved at reports/imdb_nlp_analysis_report.pdf
Analyzing dataset: titanic
Analyzing Structured dataset: titanic
PDF report saved at reports/titanic_structured_analysis_report.pdf
Analysis complete!
