In [3]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from prophet import Prophet
import numpy as np
import io
from ollama import generate

class CSVAnalyst:
    def __init__(self):
        self.df = None
        self.analysis_report = ""
        self.emergency_level = None
        self.is_time_series = False
        self.date_column = None
        self.context_lists = {
            'lower_is_better': {"sugars", "calories", "sodium", "pollution", "errors", "temperature"},
            'higher_is_better': {"sales", "profit", "ratings", "performance", "growth", "speed"},
            'neutral': {"pressure", "ph", "voltage", "balance"}
        }
        self.learned_attributes = {}

    def load_csv(self, file_path):
        """Load CSV file into DataFrame with time series detection"""
        try:
            self.df = pd.read_csv(file_path)
            self._basic_analysis()
            return True
        except Exception as e:
            return f"Error loading CSV: {str(e)}"

    def _basic_analysis(self):
        """Perform initial data analysis with time series detection"""
        analysis = []
        self.is_time_series = False
        
        # Detect time series data
        for col in self.df.columns:
            if pd.api.types.is_datetime64_any_dtype(self.df[col]):
                self.is_time_series = True
                self.date_column = col
                self.df[col] = pd.to_datetime(self.df[col])
                break

        analysis.append(f"Time Series Detected: {self.is_time_series}")
        if self.is_time_series:
            analysis.append(f"Date Column: {self.date_column}")
            analysis.append(f"Date Range: {self.df[self.date_column].min()} to {self.df[self.date_column].max()}")

        # Add remaining analysis
        analysis.extend([
            f"\nDataset Shape: {self.df.shape}",
            "\nColumn Types:",
            self.df.dtypes.to_string(),
            "\nMissing Values:",
            self.df.isnull().sum().to_string(),
            "\nBasic Statistics:",
            self.df.describe(include='all').to_string()
        ])
        self.analysis_report = "\n".join(analysis)

    def generate_plots(self):
        """Generate appropriate visualizations for data type"""
        plots = {}
        
        if self.is_time_series:
            # Time series plots
            numeric_cols = self.df.select_dtypes(include=np.number).columns
            for col in numeric_cols:
                plt.figure()
                sns.lineplot(x=self.df[self.date_column], y=self.df[col])
                plt.title(f'{col} over Time')
                buf = io.BytesIO()
                plt.savefig(buf, format='png')
                plots[f'timeseries_{col}'] = buf.getvalue()
                plt.close()
        else:
            # Non-time series plots
            numeric_cols = self.df.select_dtypes(include=np.number).columns
            for col in numeric_cols:
                # Histogram
                plt.figure()
                sns.histplot(self.df[col])
                plt.title(f'Distribution of {col}')
                buf = io.BytesIO()
                plt.savefig(buf, format='png')
                plots[f'hist_{col}'] = buf.getvalue()
                plt.close()

                # Boxplot
                plt.figure()
                sns.boxplot(y=self.df[col])
                plt.title(f'{col} Distribution')
                buf = io.BytesIO()
                plt.savefig(buf, format='png')
                plots[f'box_{col}'] = buf.getvalue()
                plt.close()

            # Correlation heatmap if multiple numeric columns
            if len(numeric_cols) > 1:
                plt.figure(figsize=(10, 8))
                sns.heatmap(self.df[numeric_cols].corr(), annot=True)
                plt.title('Feature Correlation Matrix')
                buf = io.BytesIO()
                plt.savefig(buf, format='png')
                plots['correlation'] = buf.getvalue()
                plt.close()

        return plots

    def calculate_emergency_level(self, target_col, n_days=7, n_samples=30, 
                                user_avg=None, sensitivity=0.1,
                                auto_detect_direction=True):
        """
        Universal emergency level calculation that adapts to data type
        """
        try:
            if target_col not in self.df.columns:
                raise ValueError(f"Column '{target_col}' not found")

            # Get appropriate recent data
            if self.is_time_series:
                recent_data = self.df.set_index(self.date_column)[target_col].last(f'{n_days}D')
            else:
                recent_data = self.df[target_col].tail(n_samples)

            print(f"\n{' Data Context ':=^50}")
            print(f"Data Type: {'Time Series' if self.is_time_series else 'Standard Dataset'}")
            if self.is_time_series:
                print(f"Analyzing last {n_days} days of data")
            else:
                print(f"Analyzing last {n_samples} records")

            # Rest of the emergency calculation logic
            target_col_lower = target_col.lower()
            critical_direction = None

            # Check existing context lists
            if target_col_lower in self.context_lists['lower_is_better']:
                critical_direction = 'above'
                print(f"Context: Known metric where lower values are better")
            elif target_col_lower in self.context_lists['higher_is_better']:
                critical_direction = 'below'
                print(f"Context: Known metric where higher values are better")
            elif target_col_lower in self.context_lists['neutral']:
                critical_direction = 'both'
                print(f"Context: Neutral metric monitoring both directions")

            # AI detection for unknown attributes
            if not critical_direction and auto_detect_direction:
                print("\nPerforming AI context analysis...")
                critical_direction, confidence = self._detect_critical_direction(target_col)
                print(f"AI detected direction: {critical_direction} (confidence: {confidence:.0%})")
                
                if confidence < 0.7:
                    critical_direction, should_save = self._prompt_user_for_direction(target_col)
                    if should_save:
                        self._update_context_lists(target_col_lower, critical_direction)
                if critical_direction == 'neutral':
                    print("\nAttribute marked as non-critical - no emergency level calculated")
                    return 1
            # Set thresholds
            if user_avg is not None:
                threshold = user_avg * sensitivity
                print(f"\nUsing custom average: {user_avg:.2f}")
                print(f"Sensitivity threshold: ±{sensitivity:.0%} → ±{threshold:.2f}")
                
                if critical_direction == 'above':
                    upper_thresh = user_avg + threshold
                    threshold_info = {'Upper Threshold': upper_thresh}
                    critical_condition = recent_data > upper_thresh
                elif critical_direction == 'below':
                    lower_thresh = user_avg - threshold
                    threshold_info = {'Lower Threshold': lower_thresh}
                    critical_condition = recent_data < lower_thresh
                else:
                    lower_thresh = user_avg - threshold
                    upper_thresh = user_avg + threshold
                    threshold_info = {'Lower Threshold': lower_thresh, 'Upper Threshold': upper_thresh}
                    critical_condition = (recent_data < lower_thresh) | (recent_data > upper_thresh)
            else:
                historical = self.df[target_col]
                lower_quantile = historical.quantile(sensitivity)
                upper_quantile = historical.quantile(1 - sensitivity)
                print(f"\nUsing historical percentiles (sensitivity: {sensitivity:.0%})")
                print(f"Historical range: {historical.min():.2f} - {historical.max():.2f}")
                
                if critical_direction == 'above':
                    threshold_info = {'Critical Threshold': upper_quantile}
                    critical_condition = recent_data > upper_quantile
                elif critical_direction == 'below':
                    threshold_info = {'Critical Threshold': lower_quantile}
                    critical_condition = recent_data < lower_quantile
                else:
                    threshold_info = {'Lower Threshold': lower_quantile, 'Upper Threshold': upper_quantile}
                    critical_condition = (recent_data < lower_quantile) | (recent_data > upper_quantile)

            # Display threshold information
            print("\nThreshold Configuration:")
            for name, value in threshold_info.items():
                print(f"- {name}: {value:.2f}")

            # Calculate emergency level
            critical_count = critical_condition.sum()
            proportion = critical_count / len(recent_data)
            self.emergency_level = min(5, max(1, int(proportion * 5) + 1))

            # Detailed results output
            print(f"\nAnalysis Results:")
            print(f"Critical values found: {critical_count}/{len(recent_data)} ({proportion:.0%})")
            print(f"Emergency level calculation: ({critical_count}/{len(recent_data)}) * 5 + 1 = {self.emergency_level}")
            print(f"{' Final Emergency Level ':=^50}")
            print(f"Level {self.emergency_level}: ", end="")
            
            level_descriptions = {
                1: "Normal - Minimal anomalies detected",
                2: "Low Risk - Slightly elevated anomalies",
                3: "Moderate - Significant anomalies present",
                4: "High - Critical pattern emerging",
                5: "Critical - Immediate attention required"
            }
            print(level_descriptions.get(self.emergency_level, "Unknown level"))
            
            return self.emergency_level

        except Exception as e:
            return f"Error calculating emergency level: {str(e)}"

    def _detect_critical_direction(self, target_col):
        """AI-powered direction detection with confidence scoring"""
        analysis_prompt = f"""Analyze this metric context:
        Column name: {target_col}
        First values: {self.df[target_col].head().tolist()}
        Recent trend: {self.df[target_col].tail(7).mean():.2f}
        Historical average: {self.df[target_col].mean():.2f}

        Should we alert for HIGH or LOW values? 
        Provide confidence (0-1). Format: HIGH/LOW/BOTH,CONFIDENCE"""

        try:
            response = generate(
                model='deepseek-r1:7b',
                prompt=analysis_prompt,
                stream=False
            )
            direction, confidence = response['response'].split(',')
            direction_map = {'high': 'above', 'low': 'below', 'both': 'both'}
            return direction_map[direction.strip().lower()], float(confidence)
        except:
            return None, 0.0

    def _prompt_user_for_direction(self, target_col):
        """Interactive user prompt for new attributes with 'does not matter' option"""
        print(f"\n⚠️ New attribute detected: '{target_col}'")
        print(f"Sample values: {self.df[target_col].sample(5).tolist()}")

        while True:
            print("\nShould we alert for:")
            print("1. High values")
            print("2. Low values")
            print("3. Both directions")
            print("4. Does not matter (neutral)")
            choice = input("Choice (1-4): ").strip()

            if choice == '1':
                direction = 'above'
                break
            elif choice == '2':
                direction = 'below'
                break
            elif choice == '3':
                direction = 'both'
                break
            elif choice == '4':
                direction = 'neutral'
                break
            print("Invalid choice. Please enter 1, 2, 3, or 4")

        if direction != 'neutral':
            save = input("Remember this setting for future datasets? (y/n): ").lower() == 'y'
        else:
            save = False

        return direction, save

    def _update_context_lists(self, attribute, direction):
        """Store new attribute in appropriate context list"""
        attribute = attribute.lower()

        if direction == 'above':
            self.context_lists['lower_is_better'].add(attribute)
        elif direction == 'below':
            self.context_lists['higher_is_better'].add(attribute)
        elif direction == 'both':
            self.context_lists['neutral'].add(attribute)
        elif direction == 'neutral':
            # Don't add to any list, treat as non-critical
            pass

        if direction != 'neutral':
            self.learned_attributes[attribute] = direction
            print(f"✓ Learned new attribute: '{attribute}' -> {direction}")
        else:
            print(f"✓ Marked '{attribute}' as non-critical")


    def generate_forecast(self, target_col, periods=365):
        """Time series forecasting (only for time series data)"""
        if not self.is_time_series:
            return None, None, "Forecasting requires time series data"
        
        if self.date_column not in self.df.columns:
            return None, None, "No date column found for forecasting"

        # Prepare data for Prophet
        df_prophet = self.df[[self.date_column, target_col]].rename(
            columns={self.date_column: 'ds', target_col: 'y'})

        # Train Prophet model
        try:
            model = Prophet()
            model.fit(df_prophet)
            future = model.make_future_dataframe(periods=periods)
            forecast = model.predict(future)

            # Save forecast plot
            fig = model.plot(forecast)
            forecast_plot = io.BytesIO()
            fig.savefig(forecast_plot, format='png')
            plt.close()

            return forecast_plot.getvalue(), forecast.tail(periods), None
        except Exception as e:
            return None, None, f"Forecasting error: {str(e)}"

    def analyze_entire_dataset(self, emergency_threshold=3):
        """
        Perform comprehensive analysis of the entire dataset including:
        - Basic statistics
        - Missing values analysis
        - Correlation matrix
        - Emergency level overview
        - Data type distribution
        """
        analysis = {}
        
        # Basic Dataset Overview
        analysis['overview'] = {
            'num_rows': self.df.shape[0],
            'num_cols': self.df.shape[1],
            'memory_usage': self.df.memory_usage(deep=True).sum() // 1024,  # KB
            'duplicate_rows': self.df.duplicated().sum(),
            'time_series': self.is_time_series
        }

        # Column Type Analysis
        dtype_counts = self.df.dtypes.value_counts().to_dict()
        analysis['dtype_distribution'] = {str(k): v for k, v in dtype_counts.items()}

        # Missing Values Analysis
        missing = self.df.isnull().sum()
        analysis['missing_values'] = {
            'total_missing': missing.sum(),
            'columns_with_missing': missing[missing > 0].to_dict(),
            'missing_percentage': (missing / len(self.df)).to_dict()
        }

        # Numerical Analysis
        numerical_cols = self.df.select_dtypes(include=np.number).columns
        analysis['numerical'] = {}
        for col in numerical_cols:
            analysis['numerical'][col] = {
                'mean': self.df[col].mean(),
                'median': self.df[col].median(),
                'std': self.df[col].std(),
                'min': self.df[col].min(),
                'max': self.df[col].max(),
                'skew': self.df[col].skew(),
                'emergency_level': self.calculate_emergency_level(col, n_samples=len(self.df))
            }

        # Categorical Analysis
        categorical_cols = self.df.select_dtypes(include='object').columns
        analysis['categorical'] = {}
        for col in categorical_cols:
            analysis['categorical'][col] = {
                'unique_values': self.df[col].nunique(),
                'top_value': self.df[col].mode().iloc[0] if not self.df[col].empty else None,
                'top_frequency': self.df[col].value_counts().iloc[0] if not self.df[col].empty else 0
            }

        # Correlation Analysis
        if len(numerical_cols) > 1:
            analysis['correlation'] = {
                'pearson': self.df[numerical_cols].corr().to_dict(),
                'top_correlations': self._get_top_correlations()
            }

        # Emergency Level Summary
        critical_columns = [col for col in numerical_cols 
                          if analysis['numerical'][col]['emergency_level'] >= emergency_threshold]
        analysis['alerts'] = {
            'emergency_threshold': emergency_threshold,
            'critical_columns': critical_columns,
            'total_critical': len(critical_columns)
        }

        return analysis

    def _get_top_correlations(self, n=5):
        """Get top positive and negative correlations"""
        corr_matrix = self.df.select_dtypes(include=np.number).corr().abs()
        upper = corr_matrix.where(np.triu(np.ones(corr_matrix.shape), k=1).astype(bool))
        top_pairs = upper.stack().sort_values(ascending=False).head(n).index.tolist()
        
        return [f"{pair[0]} - {pair[1]}" for pair in top_pairs]

    def generate_comprehensive_report(self):
        """Generate human-readable report from full analysis"""
        analysis = self.analyze_entire_dataset()
        report = []

        # Overview Section
        report.append("="*50)
        report.append(" Comprehensive Dataset Analysis Report ")
        report.append("="*50)
        report.append(f"\nDataset Overview:")
        report.append(f"- Rows: {analysis['overview']['num_rows']:,}")
        report.append(f"- Columns: {analysis['overview']['num_cols']:,}")
        report.append(f"- Memory Usage: {analysis['overview']['memory_usage']:,} KB")
        report.append(f"- Duplicate Rows: {analysis['overview']['duplicate_rows']:,}")
        report.append(f"- Time Series Data: {analysis['overview']['time_series']}")

        # Data Types
        report.append("\nData Type Distribution:")
        for dtype, count in analysis['dtype_distribution'].items():
            report.append(f"- {dtype}: {count} columns")

        # Missing Values
        report.append("\nMissing Values Analysis:")
        report.append(f"- Total Missing Values: {analysis['missing_values']['total_missing']:,}")
        if analysis['missing_values']['columns_with_missing']:
            report.append("- Columns with Missing Values:")
            for col, count in analysis['missing_values']['columns_with_missing'].items():
                pct = analysis['missing_values']['missing_percentage'][col] * 100
                report.append(f"  - {col}: {count:,} ({pct:.1f}%)")
        else:
            report.append("- No missing values found")

        # Numerical Analysis
        report.append("\nNumerical Columns Summary:")
        for col, stats in analysis['numerical'].items():
            report.append(f"\n[{col}]")
            report.append(f"- Emergency Level: {stats['emergency_level']}/5")
            report.append(f"- Range: {stats['min']:.2f} to {stats['max']:.2f}")
            report.append(f"- Mean: {stats['mean']:.2f} | Median: {stats['median']:.2f}")
            report.append(f"- Std Dev: {stats['std']:.2f} | Skew: {stats['skew']:.2f}")

        # Categorical Analysis
        if analysis['categorical']:
            report.append("\nCategorical Columns Summary:")
            for col, stats in analysis['categorical'].items():
                report.append(f"\n[{col}]")
                report.append(f"- Unique Values: {stats['unique_values']:,}")
                report.append(f"- Most Common: '{stats['top_value']}' ({stats['top_frequency']:,} occurrences)")

        # Correlation Analysis
        if 'correlation' in analysis:
            report.append("\nTop Correlations:")
            for pair in analysis['correlation']['top_correlations']:
                report.append(f"- {pair}")

        # Critical Alerts
        report.append("\n" + "="*50)
        report.append(" Critical Alerts Summary ")
        report.append("="*50)
        if analysis['alerts']['total_critical'] > 0:
            report.append(f"\n{analysis['alerts']['total_critical']} columns requiring attention (emergency ≥{analysis['alerts']['emergency_threshold']}):")
            for col in analysis['alerts']['critical_columns']:
                report.append(f"- {col} (Level {analysis['numerical'][col]['emergency_level']})")
        else:
            report.append("\nNo critical columns detected at current threshold")

        # Return the report as a single string
        return "\n".join(report)


ModuleNotFoundError: No module named 'prophet'

In [4]:
if __name__ == "__main__":
    analyst = CSVAnalyst()
    
    # Load your CSV file
    file_path = "solar_data_khulna_from_jan_2014_to_nov_2022.csv"
    load_status = analyst.load_csv(file_path)
    
    if load_status is True:
        # Generate automatic analysis
        print("Basic Analysis:\n", analyst.analysis_report)
        
        # Generate visualizations
        plots = analyst.generate_plots()
        for name, plot_data in plots.items():
            with open(f"{name}.png", "wb") as f:
                f.write(plot_data)
        
        # Calculate emergency level
        emergency_level = analyst.calculate_emergency_level(
            #input
            target_col="Temperature",
            n_days=30,
            user_avg=0
        )
        print(f"Emergency Level: {emergency_level}")

        # Generate forecast if time series
        if analyst.is_time_series:
            forecast_plot, forecast_data, error = analyst.generate_forecast(
                #input
                target_col="Temperature",
                periods=90
            )
            if error:
                print(f"Forecast Error: {error}")
            else:
                forecast_data.to_csv("forecast_results.csv")
                with open("forecast_plot.png", "wb") as f:
                    f.write(forecast_plot)
    else:
        print(load_status)


NameError: name 'CSVAnalyst' is not defined