In [6]:
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')

class UniversalTrafficAnalyzer:
    """
    Complete Universal Traffic Pattern Analyzer for Jupyter Notebooks
    """

    def __init__(self, config=None):
        self.analysis_results = {}
        self.insights = []
        self.recommendations = []
        self.df = None
        self.config = config or self._default_config()
        self.column_mapping = {}

    def _default_config(self):
        return {
            'required_columns': {
                'vehicle_count': ['vehicle_count', 'vehicles', 'count', 'traffic_volume', 'car_count'],
                'hour': ['hour', 'hr', 'time_hour', 'h'],
                'day_name': ['day_name', 'day', 'weekday', 'day_of_week_name'],
                'congestion_level': ['congestion_level', 'congestion', 'traffic_level', 'jam_level'],
                'congestion_duration': ['congestion_duration_minutes', 'duration', 'clearance_time', 'congestion_time']
            },
            'optional_columns': {
                'weather': ['weather', 'weather_condition', 'conditions'],
                'temperature': ['temperature', 'temp', 'temp_c', 'temperature_celsius'],
                'visibility': ['visibility', 'vis', 'visibility_km'],
                'is_weekend': ['is_weekend', 'weekend', 'is_wknd'],
                'day_of_week': ['day_of_week', 'dow', 'weekday_num']
            }
        }

    def _validate_and_map_columns(self, df):
        missing = []
        mapping = {}
        for std, candidates in self.config['required_columns'].items():
            for c in candidates:
                if c in df.columns:
                    mapping[std] = c
                    break
            else:
                missing.append(std)
        if missing:
            raise ValueError(f"Missing required columns: {missing}")
        for std, candidates in self.config['optional_columns'].items():
            for c in candidates:
                if c in df.columns:
                    mapping[std] = c
                    break
        # build standardized df
        std_df = pd.DataFrame({std: df[orig] for std, orig in mapping.items()})
        return std_df, mapping

    def _categorize_time_period(self, h):
        if pd.isna(h): return 'Unknown'
        if 6 <= h <= 9: return 'Morning Rush'
        if 10 <= h <= 16: return 'Mid-Day'
        if 17 <= h <= 20: return 'Evening Rush'
        if 21 <= h <= 23: return 'Evening'
        return 'Night/Early Morning'

    def _preprocess_data(self):
        for col in self.df.select_dtypes(include=[np.number]).columns:
            if self.df[col].isnull().any():
                self.df[col] = self.df[col].fillna(self.df[col].median())
        if 'hour' in self.df:
            self.df['hour'] = pd.to_numeric(self.df['hour'], errors='coerce')
        if 'is_weekend' not in self.df and 'day_of_week' in self.df:
            self.df['is_weekend'] = (self.df['day_of_week'] >= 5).astype(int)
        if 'time_period' not in self.df and 'hour' in self.df:
            self.df['time_period'] = self.df['hour'].apply(self._categorize_time_period)

    def load_and_prepare_data(self, data):
        if isinstance(data, str):
            df = pd.read_csv(data)
        else:
            df = data.copy()
        std_df, mapping = self._validate_and_map_columns(df)
        self.df, self.column_mapping = std_df, mapping
        self._preprocess_data()
        return self.df

    def analyze_peak_hours(self):
        stats = self.df.groupby('hour')['vehicle_count'].agg(['mean','max','std']).round(2)
        peak = stats['mean'].nlargest(5)
        low  = stats['mean'].nsmallest(5)
        self.analysis_results['peak_hours'] = {'stats': stats, 'top': peak.index.tolist(), 'low': low.index.tolist()}
        return stats

    def analyze_daily_patterns(self):
        by_day = self.df.groupby('day_name')['vehicle_count'].agg(['mean','max','std']).round(2)
        wk = self.df.groupby('is_weekend')['vehicle_count'].mean()
        diff = None
        if 0 in wk and 1 in wk:
            diff = (wk[0]-wk[1])/wk[1]*100
        self.analysis_results['daily_patterns'] = {'by_day': by_day, 'weekday_vs_weekend_diff': diff}
        return by_day

    def analyze_weather_impact(self):
        if 'weather' not in self.df: return None
        wstats = self.df.groupby('weather')['vehicle_count'].mean().round(2)
        self.analysis_results['weather_impact'] = wstats
        return wstats

    def analyze_congestion_patterns(self):
        if 'congestion_level' not in self.df: return None
        dist = self.df['congestion_level'].value_counts(normalize=True).mul(100).round(1)
        dur = None
        if 'congestion_duration' in self.df:
            dur = self.df['congestion_duration'].describe().round(1)
        self.analysis_results['congestion_patterns'] = {'distribution_%': dist, 'duration_stats': dur}
        return dist

    def generate_insights(self):
        self.insights = []
        ph = self.analysis_results.get('peak_hours', {})
        if ph: self.insights.append(f"Peak hour: {ph['top'][0]:02d}:00")
        dp = self.analysis_results.get('daily_patterns', {})
        d = dp.get('weekday_vs_weekend_diff')
        if d is not None: self.insights.append(f"Weekday vs weekend diff: {d:.1f}%")
        cp = self.analysis_results.get('congestion_patterns', {}).get('duration_stats')
        if cp is not None: self.insights.append(f"Avg congestion time: {cp['mean']:.1f} min")

    def generate_recommendations(self):
        self.recommendations = []
        ph = self.analysis_results.get('peak_hours', {}).get('top', [])
        if ph: hours = ','.join(f"{h:02d}:00" for h in ph[:3]); 
        self.recommendations.append(f"Optimize signals at {hours}")
        dp = self.analysis_results.get('daily_patterns',{}).get('weekday_vs_weekend_diff')
        if dp and abs(dp)>20: self.recommendations.append("Different weekend strategy")
        if 'weather_impact' in self.analysis_results: self.recommendations.append("Plan for adverse weather")

    def run_complete_analysis(self, data):
        self.load_and_prepare_data(data)
        self.analyze_peak_hours()
        self.analyze_daily_patterns()
        self.analyze_weather_impact()
        self.analyze_congestion_patterns()
        self.generate_insights()
        self.generate_recommendations()
        return self.analysis_results

# After running the cell above, you can do:

# df = pd.read_csv('synthetic_traffic_data.csv')
# analyzer = UniversalTrafficAnalyzer()
# results = analyzer.run_complete_analysis(df)
# print(analyzer.insights)
# print(analyzer.recommendations)


In [7]:
df = pd.read_csv('synthetic_traffic_data.csv')
analyzer = UniversalTrafficAnalyzer()
results = analyzer.run_complete_analysis(df)
print("Insights:", analyzer.insights)
print("Recommendations:", analyzer.recommendations)


Insights: ['Peak hour: 17:00', 'Weekday vs weekend diff: 70.5%', 'Avg congestion time: 27.8 min']
Recommendations: ['Optimize signals at 17:00,18:00,08:00', 'Different weekend strategy', 'Plan for adverse weather']


In [8]:
import pandas as pd
import numpy as np
import warnings

In [9]:
import pandas as pd
import numpy as np
import warnings

def load_any_csv_file(file_path):
    """Load any CSV file and automatically detect traffic-related columns"""
    print(f"📂 Loading data from: {file_path}")
    df = pd.read_csv(file_path)
    print(f"✅ Dataset shape: {df.shape}")
    print(f"✅ Columns found: {list(df.columns)}")
    print("\n📊 Dataset Info:")
    print(df.info())
    return df


In [10]:
def detect_traffic_columns(df):
    """Automatically detect important columns in any traffic dataset"""
    column_map = {}
    
    # Traffic/Vehicle count columns
    for col in df.columns:
        if any(word in col.lower() for word in ['vehicle', 'count', 'traffic', 'car', 'volume']):
            column_map['traffic'] = col
            print(f"✓ Traffic column found: {col}")
            break
    
    # Hour/Time columns
    for col in df.columns:
        if any(word in col.lower() for word in ['hour', 'hr', 'time']):
            if df[col].dtype in ['int64', 'float64'] and df[col].max() <= 24:
                column_map['hour'] = col
                print(f"✓ Hour column found: {col}")
                break
    
    # Day columns
    for col in df.columns:
        if any(word in col.lower() for word in ['day_name', 'day', 'weekday']):
            column_map['day'] = col
            print(f"✓ Day column found: {col}")
            break
    
    # Weather columns
    for col in df.columns:
        if any(word in col.lower() for word in ['weather', 'condition', 'climate']):
            column_map['weather'] = col
            print(f"✓ Weather column found: {col}")
            break
    
    # Congestion columns
    for col in df.columns:
        if any(word in col.lower() for word in ['congestion', 'level', 'jam']):
            column_map['congestion'] = col
            print(f"✓ Congestion column found: {col}")
            break
    
    return column_map


In [11]:
def detailed_traffic_stats(df, traffic_col):
    """Generate detailed traffic statistics"""
    print("\n" + "="*50)
    print("DETAILED TRAFFIC STATISTICS")
    print("="*50)
    
    stats = df[traffic_col].describe()
    
    print(f"Total Records: {len(df)}")
    print(f"Average Traffic: {stats['mean']:.1f}")
    print(f"Peak Traffic: {stats['max']:.0f}")
    print(f"Minimum Traffic: {stats['min']:.0f}")
    print(f"Standard Deviation: {stats['std']:.1f}")
    print(f"Median Traffic: {stats['50%']:.1f}")
    
    # Traffic categories
    high_traffic = df[df[traffic_col] > stats['75%']]
    medium_traffic = df[(df[traffic_col] >= stats['25%']) & (df[traffic_col] <= stats['75%'])]
    low_traffic = df[df[traffic_col] < stats['25%']]
    
    print(f"\nTraffic Distribution:")
    print(f"High Traffic (>75%): {len(high_traffic)} records ({len(high_traffic)/len(df)*100:.1f}%)")
    print(f"Medium Traffic (25-75%): {len(medium_traffic)} records ({len(medium_traffic)/len(df)*100:.1f}%)")
    print(f"Low Traffic (<25%): {len(low_traffic)} records ({len(low_traffic)/len(df)*100:.1f}%)")
    
    return stats


In [12]:
def advanced_peak_analysis(df, traffic_col, hour_col):
    """Advanced analysis of peak hours with rush hour identification"""
    print("\n" + "="*50)
    print("ADVANCED PEAK HOUR ANALYSIS")
    print("="*50)
    
    hourly_stats = df.groupby(hour_col)[traffic_col].agg(['mean', 'max', 'std', 'count']).round(1)
    
    # Identify rush hours (top 25% of traffic)
    threshold = hourly_stats['mean'].quantile(0.75)
    rush_hours = hourly_stats[hourly_stats['mean'] >= threshold]
    
    print("Peak Hours Analysis:")
    print(f"Peak Hour: {hourly_stats['mean'].idxmax()}:00 ({hourly_stats['mean'].max():.1f} avg vehicles)")
    print(f"Quietest Hour: {hourly_stats['mean'].idxmin()}:00 ({hourly_stats['mean'].min():.1f} avg vehicles)")
    
    print(f"\nRush Hours (Traffic >= {threshold:.1f}):")
    for hour in rush_hours.index:
        avg_traffic = rush_hours.loc[hour, 'mean']
        max_traffic = rush_hours.loc[hour, 'max']
        print(f"  {hour}:00 - Avg: {avg_traffic:.1f}, Peak: {max_traffic:.1f}")
    
    return hourly_stats, rush_hours


In [13]:
def analyze_daily_patterns(df, traffic_col, day_col):
    """Analyze traffic patterns by day of week"""
    print("\n" + "="*50)
    print("DAILY PATTERN ANALYSIS")
    print("="*50)
    
    daily_stats = df.groupby(day_col)[traffic_col].agg(['mean', 'max', 'min', 'std']).round(1)
    
    print("Traffic by Day of Week:")
    for day in daily_stats.index:
        avg = daily_stats.loc[day, 'mean']
        max_val = daily_stats.loc[day, 'max']
        min_val = daily_stats.loc[day, 'min']
        print(f"  {day}: Avg {avg:.1f}, Range {min_val:.0f}-{max_val:.0f}")
    
    busiest_day = daily_stats['mean'].idxmax()
    quietest_day = daily_stats['mean'].idxmin()
    
    print(f"\nBusiest Day: {busiest_day} ({daily_stats.loc[busiest_day, 'mean']:.1f} avg)")
    print(f"Quietest Day: {quietest_day} ({daily_stats.loc[quietest_day, 'mean']:.1f} avg)")
    
    return daily_stats


In [14]:
def analyze_weather_impact(df, traffic_col, weather_col):
    """Analyze how weather conditions affect traffic"""
    print("\n" + "="*50)
    print("WEATHER IMPACT ANALYSIS")
    print("="*50)
    
    weather_stats = df.groupby(weather_col)[traffic_col].agg(['mean', 'count', 'std']).round(1)
    
    print("Traffic by Weather Condition:")
    for weather in weather_stats.index:
        avg = weather_stats.loc[weather, 'mean']
        count = weather_stats.loc[weather, 'count']
        print(f"  {weather}: {avg:.1f} avg traffic ({count} observations)")
    
    best_weather = weather_stats['mean'].idxmax()
    worst_weather = weather_stats['mean'].idxmin()
    
    print(f"\nBest Traffic Weather: {best_weather}")
    print(f"Worst Traffic Weather: {worst_weather}")
    
    return weather_stats


In [15]:
def analyze_traffic_data(csv_file_path):
    """Main function to run complete traffic analysis on any CSV file"""
    print("🚗 STARTING COMPREHENSIVE TRAFFIC ANALYSIS")
    print("="*60)
    
    # Load data
    df = load_any_csv_file(csv_file_path)
    
    # Detect columns
    column_map = detect_traffic_columns(df)
    
    if 'traffic' not in column_map:
        print("❌ No traffic data column found!")
        return None
    
    analysis_results = {}
    
    # Basic statistics
    traffic_col = column_map['traffic']
    stats = detailed_traffic_stats(df, traffic_col)
    analysis_results['basic_stats'] = stats
    
    # Peak hour analysis
    if 'hour' in column_map:
        hourly_stats, rush_hours = advanced_peak_analysis(df, traffic_col, column_map['hour'])
        analysis_results['hourly_stats'] = hourly_stats
        analysis_results['rush_hours'] = rush_hours
    
    # Daily pattern analysis
    if 'day' in column_map:
        daily_stats = analyze_daily_patterns(df, traffic_col, column_map['day'])
        analysis_results['daily_patterns'] = daily_stats
    
    # Weather analysis
    if 'weather' in column_map:
        weather_stats = analyze_weather_impact(df, traffic_col, column_map['weather'])
        analysis_results['weather_impact'] = weather_stats
    
    print("\n🎉 ANALYSIS COMPLETED!")
    return analysis_results, column_map


In [16]:
# Run this to analyze your CSV file
results, columns = analyze_traffic_data('synthetic_traffic_data.csv')

# To analyze your own file, change the filename:
# results, columns = analyze_traffic_data('your_traffic_file.csv')


🚗 STARTING COMPREHENSIVE TRAFFIC ANALYSIS
📂 Loading data from: synthetic_traffic_data.csv
✅ Dataset shape: (168, 12)
✅ Columns found: ['date', 'time', 'hour', 'day_of_week', 'day_name', 'vehicle_count', 'congestion_level', 'congestion_duration_minutes', 'is_weekend', 'weather', 'temperature', 'visibility']

📊 Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 168 entries, 0 to 167
Data columns (total 12 columns):
 #   Column                       Non-Null Count  Dtype  
---  ------                       --------------  -----  
 0   date                         168 non-null    object 
 1   time                         168 non-null    object 
 2   hour                         168 non-null    int64  
 3   day_of_week                  168 non-null    int64  
 4   day_name                     168 non-null    object 
 5   vehicle_count                168 non-null    int64  
 6   congestion_level             168 non-null    object 
 7   congestion_duration_minutes  168 non-null  