In [None]:
import pandas as pd
import json
from google.colab import files
from datetime import datetime, timedelta
import numpy as np
from IPython.display import display, Markdown

class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (np.integer)):
            return int(obj)
        if isinstance(obj, (np.floating)):
            return float(obj)
        if isinstance(obj, (datetime, pd.Timestamp)):
            return obj.isoformat()
        return super().default(obj)

def calculate_frequency_metrics(data, source_name, dataset_min_date, dataset_max_date):
    """Calculates frequency metrics for a specific equipment"""
    source_data = data[data['SOURCE NAME'] == source_name]
    if source_data.empty:
        return None

    source_data = source_data.copy()
    source_data['EVENT DAY'] = pd.to_datetime(source_data['EVENT DAY'])

    alarm_days = source_data['EVENT DAY'].dt.normalize().nunique()
    total_alarms = len(source_data)
    total_days = (dataset_max_date - dataset_min_date).days + 1

    return {
        "Frequency of Alarms in Active day": round(total_alarms / alarm_days, 2) if alarm_days > 0 else 0,
        "Frequncy in 7 days": len(source_data[source_data['EVENT DAY'] > (dataset_max_date - timedelta(days=7))]),
        "Frequency in 30 days": len(source_data[source_data['EVENT DAY'] > (dataset_max_date - timedelta(days=30))])


    }

def load_and_clean_data():
    """Handles file upload and data cleaning"""
    print("Please upload your Excel file:")
    try:
        uploaded = files.upload()
        if not uploaded:
            raise ValueError("No file uploaded.")
        filename = list(uploaded.keys())[0]
        df = pd.read_excel(filename, header=3)
    except Exception as e:
        print(f"Error uploading or reading file: {e}")
        return None, None, None, None

    # Validate required columns
    required_columns = ['EVENT DAY', 'RESOLVED TIME', 'SOURCE NAME', 'NAME', 'PATH']
    missing_cols = [col for col in required_columns if col not in df.columns]
    if missing_cols:
        print(f"Missing required columns: {', '.join(missing_cols)}")
        return None, None, None, None

    # Data cleaning
    df['EVENT DAY'] = pd.to_datetime(df['EVENT DAY'], format='%d %b %Y %H:%M:%S', errors='coerce', dayfirst=True)
    df['RESOLVED TIME'] = pd.to_datetime(df['RESOLVED TIME'], errors='coerce')
    df['CLIENT NAME'] = df.get('CLIENT NAME', '').replace({'americanauae': 'UAE', 'americanaksa': 'KSA'})
    df['SOURCE NAME'] = df['SOURCE NAME'].str.strip()
    df['resolution_minutes'] = (df['RESOLVED TIME'] - df['EVENT DAY']).dt.total_seconds() / 60
    df['month'] = df['EVENT DAY'].dt.strftime('%B')
    df['week_of_month'] = (df['EVENT DAY'].dt.day - 1) // 7 + 1
    df['week'] = df['EVENT DAY'].dt.isocalendar().week

    # Get dataset date range
    min_date = df['EVENT DAY'].min()
    max_date = df['EVENT DAY'].max()

    # Filter valid resolutions
    valid_resolutions = df.dropna(subset=['RESOLVED TIME'])
    valid_resolutions = valid_resolutions[valid_resolutions['RESOLVED TIME'] >= valid_resolutions['EVENT DAY']]

    return df, valid_resolutions, min_date, max_date

def analyze_equipment_source(data, full_data, min_date, max_date):
    """Analyze equipment sources with frequency metrics"""
    results = []

    condition_times = full_data[full_data['resolution_minutes'].notna()].groupby(
        ['SOURCE NAME', 'NAME']
    )['resolution_minutes'].mean().reset_index()
    condition_times_dict = condition_times.groupby('SOURCE NAME').apply(
        lambda x: dict(zip(x['NAME'], x['resolution_minutes'].round(2)))
    ).to_dict()

    name_count = full_data.groupby('SOURCE NAME').agg({
        'NAME': lambda x: dict(x.value_counts()),
        'PATH': 'first',
        'month': lambda x: sorted(x.unique(), key=lambda m: pd.to_datetime(m, format='%B').month),
        'week_of_month': lambda x: [f"Week {w}" for w in sorted(x.unique())],
        'resolution_minutes': [
            'size',
            lambda x: x.isna().sum(),
            lambda x: x.notna().sum(),
            'mean'
        ]
    }).reset_index()

    name_count.columns = [
        'Source name', 'Name', 'Path', 'Month', 'Week of Month',
        'Total Count', 'Unresolved', 'Resolved', 'Total Resolution Time'
    ]

    name_count['Frequency'] = name_count['Source name'].apply(
        lambda x: calculate_frequency_metrics(full_data, x, min_date, max_date)
    )

    name_count['Time taken to resolve in minutes'] = name_count.apply(
        lambda row: {
            'Total': round(row['Total Resolution Time'], 2) if pd.notna(row['Total Resolution Time']) else None,
            'By Condition': condition_times_dict.get(row['Source name'], {})
        }, axis=1
    )

    name_count = name_count.drop(columns=['Total Resolution Time'])
    name_count['Name'] = name_count['Name'].apply(lambda x: [f"{k}: {v}" for k, v in x.items()])
    name_count = name_count.sort_values('Total Count', ascending=False)

    return name_count.to_dict('records')

def analyze_condition_name(data, min_date, max_date):
    """Analyze all alarm conditions with frequency metrics"""
    results = {}
    conditions = data['NAME'].unique()

    for cond in conditions:
        cond_key = cond.lower().replace(' ', '_')
        cond_data = data[data['NAME'] == cond]
        if not cond_data.empty:
            cond_analysis = cond_data.groupby('SOURCE NAME').agg({
                'PATH': 'first',
                'resolution_minutes': ['count', 'mean']
            }).reset_index()
            cond_analysis.columns = ['Source name', 'Path', 'count', 'time taken']

            # Add frequency metrics
            cond_analysis['frequency'] = cond_analysis['Source name'].apply(
                lambda x: calculate_frequency_metrics(cond_data, x, min_date, max_date)
            )

            cond_analysis['time taken'] = cond_analysis['time taken'].round(2)
            cond_analysis = cond_analysis.sort_values('count', ascending=False)
            results[cond_key] = cond_analysis.to_dict('records')
        else:
            results[cond_key] = f"No data available for condition: {cond}"

    return results

def analyze_specific_condition(data, condition, min_date, max_date):
    """Analyze specific condition with frequency metrics"""
    condition = next((c for c in data['NAME'].unique() if c.lower() == condition.lower()), condition)
    cond_data = data[data['NAME'] == condition]
    if cond_data.empty:
        return f"No data available for condition: {condition}"

    results = cond_data.groupby('SOURCE NAME').agg({
        'PATH': 'first',
        'resolution_minutes': ['count', 'mean']
    }).reset_index()
    results.columns = ['Source name', 'Path', 'count', 'time taken']

    # Add frequency metrics
    results['frequency'] = results['Source name'].apply(
        lambda x: calculate_frequency_metrics(cond_data, x, min_date, max_date)
    )

    results['time taken'] = results['time taken'].round(2)
    results = results.sort_values('count', ascending=False)

    return results.to_dict('records')

def save_and_download(data, filename_prefix):
    """Save analysis results to JSON and download"""
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"{filename_prefix}_{timestamp}.json"
    with open(filename, 'w') as f:
        json.dump(data, f, indent=2, cls=CustomJSONEncoder)
    files.download(filename)
    print(f"Downloaded: {filename}")

def main():
    df, valid_resolutions, min_date, max_date = load_and_clean_data()
    if df is None or valid_resolutions is None:
        print("Exiting due to data loading error.")
        return

    equipment_results = analyze_equipment_source(valid_resolutions, df, min_date, max_date)
    condition_results = analyze_condition_name(valid_resolutions, min_date, max_date)

    while True:
        print("\nMAIN MENU")
        print("1. Alarm Equipment Analysis")
        print("2. Alarm Condition Name Analysis")
        print("3. Export Full Results")
        print("4. Exit")

        main_choice = input("Select option: ")

        if main_choice == "1":
            while True:
                print("\nALARM EQUIPMENT ANALYSIS")
                print("1. All equipment analysis")
                print("2. Specific equipment analysis")
                print("3. Back to main menu")

                equip_choice = input("Select analysis: ")

                if equip_choice == "1":
                    display(Markdown("## All Equipment Analysis"))
                    display(pd.DataFrame(equipment_results))
                    save_and_download(equipment_results, "Equipment_All_Analysis")
                elif equip_choice == "2":
                    print("Available equipment:", ", ".join(df['SOURCE NAME'].unique()))
                    source_name = input("Enter equipment name: ")
                    specific_results = analyze_specific_equipment(df, source_name, min_date, max_date)
                    if isinstance(specific_results, str):
                        print(specific_results)
                    else:
                        display(Markdown(f"## Analysis for: {source_name}"))
                        display(pd.DataFrame(specific_results))
                        save_and_download(specific_results, f"Equipment_{source_name.replace(' ', '_')}_Analysis")
                elif equip_choice == "3":
                    break
                else:
                    print("Invalid selection")

        elif main_choice == "2":
            while True:
                print("\nALARM CONDITION ANALYSIS")
                print("1. All condition analysis")
                print("2. Specific condition analysis")
                print("3. Back to main menu")

                cond_choice = input("Select analysis: ")

                if cond_choice == "1":
                    display(Markdown("## All Alarm Conditions"))
                    for cond, result in condition_results.items():
                        display(Markdown(f"### {cond.replace('_', ' ').title()}"))
                        if isinstance(result, str):
                            print(result)
                        else:
                            display(pd.DataFrame(result))
                    save_and_download(condition_results, "Condition_All_Analysis")
                elif cond_choice == "2":
                    print("Available conditions:", ", ".join(df['NAME'].unique()))
                    condition = input("Enter condition name: ")
                    specific_results = analyze_specific_condition(df, condition, min_date, max_date)
                    if isinstance(specific_results, str):
                        print(specific_results)
                    else:
                        display(Markdown(f"## Analysis for: {condition}"))
                        display(pd.DataFrame(specific_results))
                        save_and_download(specific_results, f"Condition_{condition.replace(' ', '_')}_Analysis")
                elif cond_choice == "3":
                    break
                else:
                    print("Invalid selection")

        elif main_choice == "3":
            full_results = {
                "equipment_analysis": equipment_results,
                "condition_analysis": condition_results
            }
            save_and_download(full_results, "Full_Analysis_Results")
            print("Full results exported")

        elif main_choice == "4":
            print("Exiting program...")
            break
        else:
            print("Invalid selection")

if __name__ == "__main__":
    main()

Please upload your Excel file:


Saving testx.xlsx to testx.xlsx


  condition_times_dict = condition_times.groupby('SOURCE NAME').apply(



MAIN MENU
1. Alarm Equipment Analysis
2. Alarm Condition Name Analysis
3. Export Full Results
4. Exit
Select option: 1

ALARM EQUIPMENT ANALYSIS
1. All equipment analysis
2. Specific equipment analysis
3. Back to main menu
Select analysis: 3

MAIN MENU
1. Alarm Equipment Analysis
2. Alarm Condition Name Analysis
3. Export Full Results
4. Exit
Select option: 2

ALARM CONDITION ANALYSIS
1. All condition analysis
2. Specific condition analysis
3. Back to main menu
Select analysis: 3

MAIN MENU
1. Alarm Equipment Analysis
2. Alarm Condition Name Analysis
3. Export Full Results
4. Exit
Select option: 4
Exiting program...
