In [None]:
import pandas as pd
from datetime import datetime

def load_weather_warnings(file_path):
    """
    Load weather warnings from a CSV file and perform initial data cleaning.
    This function preserves all county columns and ensures proper datetime formatting.
    """
    df = pd.read_csv(file_path)
    
    # Convert datetime columns to proper datetime objects
    datetime_columns = ['Issue Time', 'Valid From', 'Valid To']
    for col in datetime_columns:
        df[col] = pd.to_datetime(df[col])
    
    return df

def consolidate_warnings(df):
    """
    Consolidate weather warnings by grouping similar events.
    Uses a flat aggregation structure to avoid nested dictionary errors.
    """
    # First, let's identify our county columns
    base_columns = ['Issue Time', 'Valid From', 'Valid To', 'Warning Element',
                   'Warning Text', 'WhereToText', 'Warning Colour']
    county_columns = [col for col in df.columns if col not in base_columns]
    
    # Create the event key for grouping
    df['event_key'] = df.apply(
        lambda x: f"{x['Valid To']}_{x['Warning Element']}_{x['Warning Colour']}_{x['Warning Text']}", 
        axis=1
    )
    
    # Create a flat aggregation dictionary
    agg_dict = {
        'Issue Time': 'first',                    # Keep the first issue time
        'Valid From': 'first',
        'Valid To': 'first',
        'Warning Element': 'first',
        'Warning Text': 'first',
        'WhereToText': 'first',
        'Warning Colour': 'first'
    }
    
    # Add county columns to aggregation
    for col in county_columns:
        agg_dict[col] = 'first'
    
    # First grouping to get the basic consolidated data
    df_consolidated = df.groupby('event_key').agg(agg_dict).reset_index()
    
    # Now calculate the additional metrics separately
    issue_counts = df.groupby('event_key').size().reset_index(name='issue_count')
    first_issues = df.groupby('event_key')['Issue Time'].min().reset_index(name='first_issue')
    last_issues = df.groupby('event_key')['Issue Time'].max().reset_index(name='last_issue')
    
    # Merge all the metrics back together
    df_consolidated = (df_consolidated
                      .merge(issue_counts, on='event_key')
                      .merge(first_issues, on='event_key')
                      .merge(last_issues, on='event_key'))
    
    # Rename columns to match desired output
    rename_dict = {
        'Warning Element': 'warning_type',
        'Warning Text': 'warning_text',
        'WhereToText': 'location',
        'Warning Colour': 'warning_colour'
    }
    df_consolidated = df_consolidated.rename(columns=rename_dict)
    
    # Arrange columns in the desired order
    column_order = [
        'event_key', 
        'Issue Time', 
        'issue_count',
        'first_issue',
        'last_issue',
        'Valid From',
        'Valid To',
        'warning_type',
        'warning_text',
        'location',
        'warning_colour'
    ] + county_columns
    
    # Only select columns that exist
    final_columns = [col for col in column_order if col in df_consolidated.columns]
    df_consolidated = df_consolidated[final_columns]
    
    return df_consolidated

def analyze_warnings(df):
    """
    Generate summary statistics about the weather warnings.
    """
    analysis = {
        'total_warnings': len(df),
        'unique_events': df['event_key'].nunique(),
        'warning_types': df['warning_type'].value_counts().to_dict(),
        'most_reissued': df.nlargest(1, 'issue_count')[['warning_type', 'location', 'issue_count']].to_dict('records')[0],
        'avg_issues_per_event': df['issue_count'].mean()
    }
    return analysis

def process_weather_warnings(input_file, output_file):
    """
    Main function to process weather warnings data.
    """
    print(f"Loading data from {input_file}...")
    df = load_weather_warnings(input_file)
    
    print("Consolidating warnings...")
    df_xml_consolidated = consolidate_warnings(df)
    
    print("Analyzing results...")
    analysis_results = analyze_warnings(df_xml_consolidated)
    
    print("\nWeather Warnings Analysis Summary:")
    print(f"Total warnings issued: {analysis_results['total_warnings']}")
    print(f"Number of unique events: {analysis_results['unique_events']}")
    print(f"\nWarning types frequency:")
    for warning_type, count in analysis_results['warning_types'].items():
        print(f"- {warning_type}: {count}")
    print(f"\nMost reissued warning:")
    print(f"- Type: {analysis_results['most_reissued']['warning_type']}")
    print(f"- Location: {analysis_results['most_reissued']['location']}")
    print(f"- Times issued: {analysis_results['most_reissued']['issue_count']}")
    print(f"\nAverage issues per event: {analysis_results['avg_issues_per_event']:.2f}")
    
    print(f"\nExporting consolidated data to {output_file}...")
    df_xml_consolidated.to_csv(output_file, index=False)
    print("Export complete!")
    
    return df_xml_consolidated

from pathlib import Path
input_file = "/mnt/hgfs/shared/ul_project_Msc_AI/data/met_eireann/eda/output.csv"
output_file = "/mnt/hgfs/shared/ul_project_Msc_AI/data/met_eireann/eda/consolidated_weather_warnings.csv"
df_xml_consolidated = process_weather_warnings(input_file, output_file)

In [None]:
def severity_to_color(severity):
    """Map severity levels to warning colors"""
    mapping = {
        'Extreme': 'Red',
        'Severe': 'Orange',
        'Moderate': 'Yellow'
    }
    return mapping.get(severity, 'notmapped')

def get_element_text(element, path, namespace):
    """Safely get text from an XML element"""
    found = element.find(path, namespace)
    return found.text if found is not None else None

def parse_xml(file_path_xml):
    """Parse an XML file and extract relevant information."""
    try:
        # Read the file content
        with codecs.open(file_path_xml, 'r', encoding='utf-8', errors='ignore') as file:
            xml_content = file.read()
        
        # Parse the XML content
        root = ET.fromstring(xml_content)
        namespace = {'cap': 'urn:oasis:names:tc:emergency:cap:1.2'}
        
        # Extract data from the info element first to check if we should process this warning
        info = root.find('cap:info', namespace)
        if info is None:
            return None
            
        # Check for advisory warnings (type 22) - exclude them
        parameters = info.findall('cap:parameter', namespace)
        for param in parameters:
            if get_element_text(param, 'cap:valueName', namespace) == 'awareness_type':
                awareness_type = get_element_text(param, 'cap:value', namespace)
                if awareness_type and '22' in awareness_type:
                    return None
                break
        
        # Check for county information
        area = info.find('cap:area', namespace)
        if area is None:
            return None
            
        geocodes = area.findall('cap:geocode', namespace)
        if not geocodes:
            return None

        # Initialize row with the old column format
        row = {
            'Issue Time': get_element_text(root, 'cap:sent', namespace),
            'Valid From': (get_element_text(info, 'cap:effective', namespace) or 
                         get_element_text(info, 'cap:onset', namespace)),
            'Valid To': get_element_text(info, 'cap:expires', namespace),
            'Warning Element': get_element_text(info, 'cap:event', namespace),
            'Warning Text': get_element_text(info, 'cap:description', namespace),
            'WhereToText': get_element_text(area, 'cap:areaDesc', namespace),
            'Warning Colour': severity_to_color(get_element_text(info, 'cap:severity', namespace))
        }
        
        # Initialize all county columns to 0
        county_info = {
            'EI01': 'Carlow', 'EI02': 'Cavan', 'EI03': 'Clare', 'EI04': 'Cork', 'EI32': 'Cork City',
            'EI06': 'Donegal', 'EI33': 'Dublin City', 'EI34': 'Dún Laoghaire-Rathdown', 'EI35': 'Fingal',
            'EI10': 'Galway', 'EI36': 'Galway City', 'EI11': 'Kerry', 'EI12': 'Kildare', 'EI13': 'Kilkenny',
            'EI15': 'Laois', 'EI14': 'Leitrim', 'EI42': 'Limerick', 'EI37': 'Limerick City', 'EI18': 'Longford',
            'EI19': 'Louth', 'EI20': 'Mayo', 'EI21': 'Meath', 'EI22': 'Monaghan', 'EI23': 'Offaly',
            'EI24': 'Roscommon', 'EI25': 'Sligo', 'EI39': 'South Dublin', 'EI43': 'Tipperary',
            'EI44': 'Waterford', 'EI29': 'Westmeath', 'EI30': 'Wexford', 'EI31': 'Wicklow'
        }
        
        # Initialize all counties to 0
        for county_name in county_info.values():
            row[county_name] = 0
            
        # Set affected counties to 1
        for gc in geocodes:
            if get_element_text(gc, 'cap:valueName', namespace) == 'FIPS':
                county_code = get_element_text(gc, 'cap:value', namespace)
                if county_code in county_info:
                    row[county_info[county_code]] = 1
        
        return row
    
    except ET.ParseError as e:
        return None
    except Exception as e:
        return None

def process_files(data_directory):
    """Process all XML files in the given directory and extract weather warning data."""
    file_pattern = os.path.join(data_directory, '*.xml')
    file_list = glob.glob(file_pattern)

    data = []
    error_files = []
    excluded_files = []

    for file in tqdm(file_list, desc="Processing files"):
        row = parse_xml(file)
        if row is not None:
            data.append(row)
        else:
            try:
                with codecs.open(file, 'r', encoding='utf-8', errors='ignore') as f:
                    ET.parse(f)
                excluded_files.append(file)
            except:
                error_files.append(file)

    # Create DataFrame
    df_xml = pd.DataFrame(data)

    # Convert date fields to datetime
    date_columns = ['Issue Time', 'Valid From', 'Valid To']
    for col in date_columns:
        if col in df_xml.columns:
            df_xml[col] = pd.to_datetime(df_xml[col], utc=True, errors='coerce')

    return df_xml, error_files, excluded_files

def save_error_files(file_list, filename):
    """Save list of error files to CSV"""
    pd.DataFrame({'file': file_list}).to_csv(filename, index=False)


df_xml, error_files, excluded_files = process_files(data_directory_xml)

# Save results
# df_xml.to_csv('weather_warnings.csv', index=False)
save_error_files(error_files, 'error_files.csv')
save_error_files(excluded_files, 'excluded_files.csv')