<a href="https://colab.research.google.com/github/memrranmian/my-first-repo1/blob/main/MSDSF25A007_Assignment6-1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Q1. Multi-Source Data Consolidation + Inconsistency Resolution
# You are given three datasets of hospital patient records:
# ‚Ä¢ patients_master.csv: patient_id, name, dob, gender
# ‚Ä¢ visit_records.csv: visit_id, patient_id, visit_date, diagnosis, doctor
# ‚Ä¢ billing.csv: visit_id, amount, discount, paid_status
# Tasks:
# ‚Ä¢ Merge the three datasets into a single patient-level dataframe such that:
# o All visits and billings must be preserved (even if no matching patient exists).
# o For missing patient info, create a placeholder row with "Unknown" fields.
# ‚Ä¢ Identify conflicting patient information (e.g., same patient_id with different gender or
# dob).
# Write Pandas code to:
# o Detect conflicts
# o Resolve them by keeping the most frequent value per patient
# o Log all conflicts to a separate DataFrame for auditing
# ‚Ä¢ Find the top 10 patients by total billed amount, accounting for:
# o Missing discount (treat as 0)
# o Missing amount (set to column median)



# First, we need to install the required libraries
# We'll use these commands in our terminal (command prompt):
# pip install pandas numpy faker

# Now let's start writing our code
import pandas as pd
import numpy as np
from faker import Faker

# ============================================
# PART 1: CREATE FAKE DATA
# ============================================

# Let me explain what we're doing here:
# 1. We're importing pandas (for data handling), numpy (for numbers), and Faker (to create fake data)
# 2. Faker helps us create realistic but fake data for practice

# Create a Faker object to generate fake data
fake = Faker()

# Set a random seed so we get the same results each time
np.random.seed(42)

# Let's create some fake patients first
patients_data = []
for i in range(1, 101):  # Creating 100 patients
    patients_data.append({
        'patient_id': f'P{i:03d}',  # Creates IDs like P001, P002, etc.
        'name': fake.name(),
        'dob': fake.date_of_birth(minimum_age=18, maximum_age=90).strftime('%Y-%m-%d'),
        'gender': np.random.choice(['M', 'F', 'Other'], p=[0.48, 0.48, 0.04])
    })

# Create the patients_master dataframe
patients_master = pd.DataFrame(patients_data)

# Now let's create visit records
visit_data = []
visit_ids = []
for i in range(1, 201):  # Creating 200 visits
    visit_id = f'V{i:03d}'
    visit_ids.append(visit_id)

    # Randomly pick a patient (some might not exist in patients_master)
    patient_id = f'P{np.random.randint(1, 120):03d}'  # Goes up to 120, but we only have 100 patients

    visit_data.append({
        'visit_id': visit_id,
        'patient_id': patient_id,
        'visit_date': fake.date_between(start_date='-2y', end_date='today').strftime('%Y-%m-%d'),
        'diagnosis': np.random.choice(['Flu', 'Cold', 'Injury', 'Checkup', 'Surgery', 'Other']),
        'doctor': fake.name()
    })

# Create the visit_records dataframe
visit_records = pd.DataFrame(visit_data)

# Now let's create billing records
billing_data = []
for visit_id in visit_ids:
    # Create some randomness in billing
    amount = np.random.choice([100, 200, 300, 400, 500, 1000, np.nan], p=[0.2, 0.2, 0.2, 0.1, 0.1, 0.1, 0.1])
    discount = np.random.choice([0, 5, 10, 20, np.nan], p=[0.6, 0.1, 0.1, 0.1, 0.1])
    paid_status = np.random.choice(['Paid', 'Unpaid', 'Partial'], p=[0.7, 0.2, 0.1])

    billing_data.append({
        'visit_id': visit_id,
        'amount': amount,
        'discount': discount,
        'paid_status': paid_status
    })

# Create the billing dataframe
billing = pd.DataFrame(billing_data)

# Let's add some conflicts in patient data (for demonstration)
# We'll create duplicate patient records with different information
conflict_data = [
    {'patient_id': 'P001', 'name': 'John Conflict', 'dob': '1990-01-01', 'gender': 'M'},
    {'patient_id': 'P001', 'name': 'John Doe', 'dob': '1990-01-01', 'gender': 'F'},  # Gender conflict
    {'patient_id': 'P002', 'name': 'Jane Smith', 'dob': '1985-05-15', 'gender': 'F'},
    {'patient_id': 'P002', 'name': 'Jane Smith', 'dob': '1980-05-15', 'gender': 'F'},  # DOB conflict
]

# Add these to patients_master to create conflicts
patients_master = pd.concat([patients_master, pd.DataFrame(conflict_data)], ignore_index=True)

# ============================================
# PART 2: MERGE THE DATASETS
# ============================================

print("Step 1: Merging all the datasets together")
print("-" * 50)

# First, let me show you what each dataset looks like
print("\nPatients Master (first 5 rows):")
print(patients_master.head())
print("\nVisit Records (first 5 rows):")
print(visit_records.head())
print("\nBilling Records (first 5 rows):")
print(billing.head())

# Now, let's merge visit records with billing records
# We use 'visit_id' as the common column
print("\nMerging visit records with billing records...")
visits_with_billing = pd.merge(visit_records, billing, on='visit_id', how='left')
print("Visits with billing (first 5 rows):")
print(visits_with_billing.head())

# Now, let's merge this with patient information
# We use 'patient_id' as the common column
print("\nMerging with patient information...")
# We use 'how=left' to keep all visits even if patient doesn't exist
all_data = pd.merge(visits_with_billing, patients_master, on='patient_id', how='left')
print("All merged data (first 5 rows):")
print(all_data.head())

# For patients who don't exist in patients_master, fill with "Unknown"
print("\nFilling missing patient info with 'Unknown'...")
all_data['name'] = all_data['name'].fillna('Unknown')
all_data['dob'] = all_data['dob'].fillna('Unknown')
all_data['gender'] = all_data['gender'].fillna('Unknown')

print("\nFinal merged dataset (first 10 rows):")
print(all_data.head(10))

# ============================================
# PART 3: FIND AND FIX CONFLICTS
# ============================================

print("\n\nStep 2: Finding and fixing conflicting information")
print("-" * 50)

# First, let's find all unique patient IDs
unique_patients = patients_master['patient_id'].unique()
print(f"\nWe have {len(unique_patients)} unique patient IDs in our master data")

# Let's check for conflicts - patients with different information
conflicts = []

# We'll check each patient one by one
for patient_id in unique_patients:
    # Get all records for this patient
    patient_records = patients_master[patients_master['patient_id'] == patient_id]

    # If patient has more than one record
    if len(patient_records) > 1:
        # Check if there are differences in gender
        unique_genders = patient_records['gender'].unique()
        if len(unique_genders) > 1:
            conflicts.append({
                'patient_id': patient_id,
                'field': 'gender',
                'values': list(unique_genders),
                'conflict_type': 'Multiple values'
            })

        # Check if there are differences in date of birth
        unique_dobs = patient_records['dob'].unique()
        if len(unique_dobs) > 1:
            conflicts.append({
                'patient_id': patient_id,
                'field': 'dob',
                'values': list(unique_dobs),
                'conflict_type': 'Multiple values'
            })

        # Check if there are differences in name
        unique_names = patient_records['name'].unique()
        if len(unique_names) > 1:
            conflicts.append({
                'patient_id': patient_id,
                'field': 'name',
                'values': list(unique_names),
                'conflict_type': 'Multiple values'
            })

# Create a conflicts dataframe for logging
if conflicts:
    conflicts_df = pd.DataFrame(conflicts)
    print("\nFound these conflicts:")
    print(conflicts_df)
else:
    conflicts_df = pd.DataFrame()  # Empty dataframe if no conflicts
    print("\nNo conflicts found!")

# Now, let's resolve conflicts by keeping the most frequent value
print("\nResolving conflicts by keeping the most common value...")

# Create a clean version of patients_master
clean_patients = patients_master.copy()

# We'll fix each conflict one by one
if not conflicts_df.empty:
    for _, conflict in conflicts_df.iterrows():
        patient_id = conflict['patient_id']
        field = conflict['field']

        # Get all values for this field for this patient
        field_values = patients_master[patients_master['patient_id'] == patient_id][field]

        # Find the most common value
        most_common_value = field_values.mode()[0]  # mode() finds most frequent value

        # Keep only one record per patient with the most common value
        # First, get all records for this patient
        patient_mask = clean_patients['patient_id'] == patient_id

        # Keep the first record and update with most common value
        first_index = clean_patients[patient_mask].index[0]
        clean_patients.loc[first_index, field] = most_common_value

        # Remove duplicate records
        # We'll keep the first record and drop the rest
        duplicates_to_drop = clean_patients[patient_mask].index[1:]
        clean_patients = clean_patients.drop(duplicates_to_drop)

print("\nCleaned patient data (showing only previously conflicting patients):")
conflicting_ids = conflicts_df['patient_id'].unique() if not conflicts_df.empty else []
if len(conflicting_ids) > 0:
    print(clean_patients[clean_patients['patient_id'].isin(conflicting_ids)])

# ============================================
# PART 4: FIND TOP 10 PATIENTS BY BILLED AMOUNT
# ============================================

print("\n\nStep 3: Finding top 10 patients by total billed amount")
print("-" * 50)

# First, let's handle missing values in the billing data
print("\nHandling missing values in billing data...")

# Make a copy of our merged data to work with
analysis_data = all_data.copy()

# Handle missing discount - treat as 0
print("1. Missing discounts will be treated as 0")
analysis_data['discount'] = analysis_data['discount'].fillna(0)

# Handle missing amount - set to median
print("2. Missing amounts will be set to the median amount")
median_amount = analysis_data['amount'].median()
analysis_data['amount'] = analysis_data['amount'].fillna(median_amount)

print(f"Median amount used for missing values: ${median_amount:.2f}")

# Calculate net amount after discount
print("\n3. Calculating net amount (amount - discount)...")
analysis_data['net_amount'] = analysis_data['amount'] - analysis_data['discount']

# Group by patient to get total amount per patient
print("\n4. Grouping by patient to get totals...")
patient_totals = analysis_data.groupby('patient_id').agg({
    'name': 'first',  # Get the patient name
    'net_amount': 'sum'  # Sum up all net amounts
}).reset_index()

# Sort by total amount in descending order
patient_totals = patient_totals.sort_values('net_amount', ascending=False)

print("\nTop 10 patients by total billed amount:")
top_10_patients = patient_totals.head(10)
print(top_10_patients)

# ============================================
# PART 5: SAVE RESULTS
# ============================================

print("\n\nStep 4: Saving the results")
print("-" * 50)

# Save the merged data
all_data.to_csv('merged_patient_data.csv', index=False)
print("‚úì Saved merged data to 'merged_patient_data.csv'")

# Save the conflicts log
if not conflicts_df.empty:
    conflicts_df.to_csv('conflicts_log.csv', index=False)
    print("‚úì Saved conflicts log to 'conflicts_log.csv'")
else:
    print("‚úì No conflicts to save")

# Save the cleaned patient data
clean_patients.to_csv('cleaned_patients_master.csv', index=False)
print("‚úì Saved cleaned patient data to 'cleaned_patients_master.csv'")

# Save top 10 patients
top_10_patients.to_csv('top_10_patients_by_billing.csv', index=False)
print("‚úì Saved top 10 patients to 'top_10_patients_by_billing.csv'")

print("\n" + "="*50)
print("PROCESS COMPLETE!")
print("="*50)

ModuleNotFoundError: No module named 'faker'

In [1]:
# Q2. Time-Series Irregularity Detection + Resampling
# A sensor dataset contains data from 40 IoT devices:
# Columns:
# device_id, timestamp, temperature, humidity
# The sampling rate varies between devices because of connectivity issues.
# Tasks:
# ‚Ä¢ For each device, resample data to 1-minute intervals using:
# o Forward-fill for temperature
# o Cubic interpolation for humidity
# ‚Ä¢ Detect abnormal devices where:
# o More than 25% of data points were interpolated, OR
# o Any continuous gap > 20 minutes occurred
# ‚Ä¢ For each abnormal device, compute the time windows of instability and export them to
# a dataframe with:
# o device_id, gap_start, gap_end, gap_duration_minutes



"""
Time-Series Irregularity Detection + Resampling
A sensor dataset contains data from 40 IoT devices with varying sampling rates.
"""

# Import Required Modules
import pandas as pd
import numpy as np
from faker import Faker
import random
from datetime import datetime, timedelta

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)
fake = Faker()
Faker.seed(42)


def generate_sensor_data(num_devices=40, base_start_date='2024-01-01', duration_hours=24):
    """
    Generate synthetic sensor data with irregular sampling rates for IoT devices.

    Parameters:
    - num_devices: Number of IoT devices
    - base_start_date: Starting date for data generation
    - duration_hours: Duration of data collection in hours

    Returns:
    - DataFrame with columns: device_id, timestamp, temperature, humidity
    """
    data = []
    start_date = pd.to_datetime(base_start_date)

    for device_id in range(1, num_devices + 1):
        current_time = start_date
        end_time = start_date + timedelta(hours=duration_hours)

        # Randomly decide if this device will have connectivity issues
        has_issues = random.random() < 0.3  # 30% of devices have issues

        while current_time < end_time:
            # Generate temperature and humidity readings
            temperature = round(random.uniform(15.0, 35.0), 2)
            humidity = round(random.uniform(30.0, 90.0), 2)

            data.append({
                'device_id': f'device_{device_id:03d}',
                'timestamp': current_time,
                'temperature': temperature,
                'humidity': humidity
            })

            # Variable sampling rate (simulate connectivity issues)
            if has_issues and random.random() < 0.15:  # Occasional large gaps
                gap_minutes = random.choice([25, 30, 35, 40])  # Large gaps
                current_time += timedelta(minutes=gap_minutes)
            elif has_issues and random.random() < 0.3:  # Frequent small gaps
                gap_minutes = random.uniform(2, 10)
                current_time += timedelta(minutes=gap_minutes)
            else:  # Normal sampling
                gap_minutes = random.uniform(0.5, 3)
                current_time += timedelta(minutes=gap_minutes)

    df = pd.DataFrame(data)
    return df.sort_values(['device_id', 'timestamp']).reset_index(drop=True)


def resample_device_data(device_df, device_id):
    """
    Resample a single device's data to 1-minute intervals.

    Parameters:
    - device_df: DataFrame for a single device
    - device_id: ID of the device

    Returns:
    - Resampled DataFrame with interpolation metadata
    """
    # Set timestamp as index
    device_df = device_df.set_index('timestamp').sort_index()

    # Create 1-minute interval range
    start_time = device_df.index.min()
    end_time = device_df.index.max()
    full_range = pd.date_range(start=start_time, end=end_time, freq='1min')

    # Reindex to 1-minute intervals
    resampled = device_df.reindex(full_range)

    # Track which values were interpolated
    original_mask = resampled['temperature'].notna()

    # Forward-fill for temperature
    resampled['temperature'] = resampled['temperature'].ffill()

    # Cubic interpolation for humidity (with fallback to linear if insufficient points)
    try:
        # Try cubic interpolation first
        resampled['humidity'] = resampled['humidity'].interpolate(method='cubic')
    except (ValueError, TypeError):
        # Fall back to polynomial or linear interpolation if cubic fails
        try:
            resampled['humidity'] = resampled['humidity'].interpolate(method='polynomial', order=2)
        except (ValueError, TypeError):
            # Final fallback to linear interpolation
            resampled['humidity'] = resampled['humidity'].interpolate(method='linear')

    # Add device_id back
    resampled['device_id'] = device_id

    # Calculate interpolation statistics
    total_points = len(resampled)
    interpolated_points = (~original_mask).sum()
    interpolation_percentage = (interpolated_points / total_points) * 100

    return resampled.reset_index().rename(columns={'index': 'timestamp'}), interpolation_percentage, original_mask


def detect_gaps(original_mask, timestamps):
    """
    Detect continuous gaps in the original data.

    Parameters:
    - original_mask: Boolean mask indicating original (non-interpolated) data points
    - timestamps: Array of timestamps

    Returns:
    - List of tuples: (gap_start, gap_end, gap_duration_minutes)
    """
    gaps = []
    in_gap = False
    gap_start = None

    for i, (is_original, timestamp) in enumerate(zip(original_mask, timestamps)):
        if not is_original and not in_gap:
            # Start of a gap
            in_gap = True
            gap_start = timestamp
        elif is_original and in_gap:
            # End of a gap
            gap_end = timestamps[i - 1]
            gap_duration = pd.Timedelta(gap_end - gap_start).total_seconds() / 60
            gaps.append((gap_start, gap_end, gap_duration))
            in_gap = False

    # Handle case where data ends in a gap
    if in_gap:
        gap_end = timestamps[-1]
        gap_duration = pd.Timedelta(gap_end - gap_start).total_seconds() / 60
        gaps.append((gap_start, gap_end, gap_duration))

    return gaps


def process_all_devices(df):
    """
    Process all devices: resample and detect abnormalities.

    Parameters:
    - df: Original sensor DataFrame

    Returns:
    - resampled_data: Dictionary of resampled DataFrames by device
    - abnormal_devices: List of abnormal device IDs
    - instability_windows: DataFrame with instability time windows
    """
    resampled_data = {}
    abnormal_devices = []
    instability_records = []

    device_ids = df['device_id'].unique()

    print(f"Processing {len(device_ids)} devices...\n")

    for device_id in device_ids:
        device_df = df[df['device_id'] == device_id].copy()

        # Resample the device data
        resampled_df, interp_pct, original_mask = resample_device_data(device_df, device_id)
        resampled_data[device_id] = resampled_df

        # Detect gaps
        gaps = detect_gaps(original_mask.values, resampled_df['timestamp'].values)

        # Check for abnormality conditions
        has_high_interpolation = interp_pct > 25
        has_large_gap = any(gap_duration > 20 for _, _, gap_duration in gaps)

        if has_high_interpolation or has_large_gap:
            abnormal_devices.append(device_id)

            print(f"[!] {device_id} - ABNORMAL")
            print(f"   Interpolation: {interp_pct:.2f}%")
            print(f"   Number of gaps: {len(gaps)}")

            # Record all gaps for abnormal devices (instability windows)
            for gap_start, gap_end, gap_duration in gaps:
                instability_records.append({
                    'device_id': device_id,
                    'gap_start': gap_start,
                    'gap_end': gap_end,
                    'gap_duration_minutes': round(gap_duration, 2)
                })
                print(f"   Gap: {gap_start} to {gap_end} ({gap_duration:.2f} min)")
            print()
        else:
            print(f"[OK] {device_id} - Normal (Interpolation: {interp_pct:.2f}%)")

    # Create instability windows DataFrame
    instability_df = pd.DataFrame(instability_records)

    return resampled_data, abnormal_devices, instability_df


def main():
    """
    Main execution function.
    """
    print("=" * 70)
    print("Time-Series Irregularity Detection + Resampling")
    print("=" * 70)
    print()

    # Step 1: Generate sensor data
    print("Step 1: Generating sensor data for 40 IoT devices...")
    sensor_data = generate_sensor_data(num_devices=40, duration_hours=24)
    print(f"Generated {len(sensor_data)} data points")
    print(f"Date range: {sensor_data['timestamp'].min()} to {sensor_data['timestamp'].max()}")
    print()

    # Display sample of original data
    print("Sample of original data:")
    print(sensor_data.head(10))
    print()

    # Step 2: Process all devices
    print("=" * 70)
    print("Step 2: Resampling and detecting abnormalities...")
    print("=" * 70)
    print()

    resampled_data, abnormal_devices, instability_df = process_all_devices(sensor_data)

    # Step 3: Summary
    print("=" * 70)
    print("Summary")
    print("=" * 70)
    print(f"Total devices: {len(sensor_data['device_id'].unique())}")
    print(f"Abnormal devices: {len(abnormal_devices)}")
    print(f"Abnormal device IDs: {', '.join(abnormal_devices)}")
    print()

    # Step 4: Display instability windows
    print("=" * 70)
    print("Instability Windows (Abnormal Devices)")
    print("=" * 70)
    if not instability_df.empty:
        print(instability_df.to_string(index=False))
        print()
        print(f"Total instability windows: {len(instability_df)}")
    else:
        print("No abnormal devices detected.")
    print()

    # Step 5: Export results
    print("=" * 70)
    print("Exporting Results")
    print("=" * 70)

    # Export instability windows
    instability_df.to_csv('instability_windows.csv', index=False)
    print("[OK] Instability windows exported to: instability_windows.csv")

    # Export sample resampled data (first abnormal device if exists)
    if abnormal_devices:
        sample_device = abnormal_devices[0]
        resampled_data[sample_device].to_csv(f'resampled_{sample_device}.csv', index=False)
        print(f"[OK] Sample resampled data exported to: resampled_{sample_device}.csv")

    # Export all resampled data
    all_resampled = pd.concat(resampled_data.values(), ignore_index=True)
    all_resampled.to_csv('all_resampled_data.csv', index=False)
    print("[OK] All resampled data exported to: all_resampled_data.csv")

    print()
    print("=" * 70)
    print("Processing Complete!")
    print("=" * 70)

    return sensor_data, resampled_data, abnormal_devices, instability_df


if __name__ == "__main__":
    sensor_data, resampled_data, abnormal_devices, instability_df = main()


ModuleNotFoundError: No module named 'faker'

In [None]:
# Q3. High-Level Grouping + Multi-Stage Aggregation
# You have an e-commerce dataset:
# Columns:
# order_id, customer_id, product_id, price, discount, category, order_timestamp
# Tasks:
# ‚Ä¢ Compute category-wise ‚ÄúEffective Price Variance,‚Äù defined as:
# ÔøΩ
# ÔøΩùëÉùëâ =ùëâùëéùëü(ùëùùëüùëñùëêùëí ‚àíùëëùëñùë†ùëêùëúùë¢ùëõùë°)
# ‚Ä¢ For each customer, compute:
# o Average inter-purchase time (in hours)
# o Category with highest spending
# o Rolling 3-purchase moving average of spending (sorted by timestamp)
# ‚Ä¢ Identify customers showing anomalous behaviour, defined as:
# o Rolling average increasing by > 300% compared to previous window
# o At least 5 purchases in last 30 days
# Create a final dataframe with these customers and their metrics.






"""
Detailed Analysis of Anomalous Customers
This script provides deeper insights into the anomalous customer behavior.
"""

import pandas as pd
import numpy as np

def analyze_anomalous_customers():
    """
    Load and analyze anomalous customers in detail.
    """
    # Load data
    ecommerce_df = pd.read_csv('ecommerce_data.csv')
    ecommerce_df['order_timestamp'] = pd.to_datetime(ecommerce_df['order_timestamp'])
    ecommerce_df['effective_price'] = ecommerce_df['price'] - ecommerce_df['discount']

    anomalous_df = pd.read_csv('anomalous_customers.csv')

    print("=" * 80)
    print("Detailed Analysis of Anomalous Customers")
    print("=" * 80)
    print()

    for idx, row in anomalous_df.iterrows():
        customer_id = row['customer_id']

        print(f"\n{'=' * 80}")
        print(f"Customer: {customer_id}")
        print(f"{'=' * 80}")

        # Get customer's orders
        customer_orders = ecommerce_df[ecommerce_df['customer_id'] == customer_id].sort_values('order_timestamp')

        print(f"\nTotal Purchases: {len(customer_orders)}")
        print(f"Total Spending: ${customer_orders['effective_price'].sum():.2f}")
        print(f"Average Order Value: ${customer_orders['effective_price'].mean():.2f}")
        print(f"Date Range: {customer_orders['order_timestamp'].min()} to {customer_orders['order_timestamp'].max()}")

        # Calculate rolling average
        customer_orders['rolling_avg_3'] = customer_orders['effective_price'].rolling(window=3, min_periods=1).mean()
        customer_orders['prev_rolling_avg'] = customer_orders['rolling_avg_3'].shift(1)
        customer_orders['pct_change'] = ((customer_orders['rolling_avg_3'] - customer_orders['prev_rolling_avg']) /
                                          customer_orders['prev_rolling_avg'] * 100)

        print(f"\n--- Purchase Timeline (Last 10 Purchases) ---")
        recent_orders = customer_orders.tail(10)[['order_timestamp', 'category', 'effective_price', 'rolling_avg_3', 'pct_change']]
        recent_orders['order_timestamp'] = recent_orders['order_timestamp'].dt.strftime('%Y-%m-%d %H:%M')
        print(recent_orders.to_string(index=False))

        # Find the spike
        max_spike_idx = customer_orders['pct_change'].idxmax()
        if pd.notna(max_spike_idx):
            spike_row = customer_orders.loc[max_spike_idx]
            print(f"\n--- Maximum Spike Details ---")
            print(f"Date: {spike_row['order_timestamp']}")
            print(f"Category: {spike_row['category']}")
            print(f"Order Value: ${spike_row['effective_price']:.2f}")
            print(f"Rolling Avg (3-purchase): ${spike_row['rolling_avg_3']:.2f}")
            print(f"Previous Rolling Avg: ${spike_row['prev_rolling_avg']:.2f}")
            print(f"Percentage Increase: {spike_row['pct_change']:.2f}%")

        # Category breakdown
        print(f"\n--- Spending by Category ---")
        category_spending = customer_orders.groupby('category')['effective_price'].agg(['sum', 'count', 'mean'])
        category_spending.columns = ['Total Spending', 'Num Orders', 'Avg Order Value']
        category_spending = category_spending.sort_values('Total Spending', ascending=False)
        print(category_spending.to_string())

        # Recent activity (last 30 days)
        last_date = ecommerce_df['order_timestamp'].max()
        recent_30 = customer_orders[customer_orders['order_timestamp'] >= (last_date - pd.Timedelta(days=30))]
        print(f"\n--- Last 30 Days Activity ---")
        print(f"Number of Purchases: {len(recent_30)}")
        print(f"Total Spending: ${recent_30['effective_price'].sum():.2f}")
        print(f"Average Order Value: ${recent_30['effective_price'].mean():.2f}")

        print()

    print("\n" + "=" * 80)
    print("Comparison: Anomalous vs Normal Customers")
    print("=" * 80)

    # Load all customer metrics
    all_metrics = pd.read_csv('customer_metrics.csv')
    normal_customers = all_metrics[all_metrics['is_anomalous'] == False]

    print(f"\nNormal Customers (n={len(normal_customers)}):")
    print(f"  Avg Total Spending: ${normal_customers['total_spending'].mean():.2f}")
    print(f"  Avg Purchases: {normal_customers['total_purchases'].mean():.2f}")
    print(f"  Avg Inter-Purchase Time: {normal_customers['avg_inter_purchase_time_hours'].mean():.2f} hours")
    print(f"  Avg Max Spike: {normal_customers['max_rolling_avg_spike_pct'].mean():.2f}%")

    print(f"\nAnomalous Customers (n={len(anomalous_df)}):")
    print(f"  Avg Total Spending: ${anomalous_df['total_spending'].mean():.2f}")
    print(f"  Avg Purchases: {anomalous_df['total_purchases'].mean():.2f}")
    print(f"  Avg Inter-Purchase Time: {anomalous_df['avg_inter_purchase_time_hours'].mean():.2f} hours")
    print(f"  Avg Max Spike: {anomalous_df['max_rolling_avg_spike_pct'].mean():.2f}%")

    print("\n" + "=" * 80)
    print("Analysis Complete!")
    print("=" * 80)


if __name__ == "__main__":
    analyze_anomalous_customers()


In [None]:
# Q4. Multi-Key Merge + Window Analytics
# A transportation dataset contains:
# 1. trips.csv
# trip_id, driver_id, start_time, end_time, distance_km, rating
# 2. fuel_logs.csv
# driver_id, log_time, fuel_liters, fuel_price
# 3. incidents.csv
# driver_id, incident_time, incident_type
# Tasks:
# ‚Ä¢ Combine all data to produce a per-driver performance dataframe with:
# o Total trips
# o Average rating (exclude NaN)
# o Total incidents
# o Fuel consumption aligned to the nearest trip end_time using a custom merge-as
# of tolerance of 30 minutes
# ‚Ä¢ Using window functions, compute:
# o Rolling 7-trip average rating per driver
# o Rolling sum of incidents per 10 trips
# ‚Ä¢ Identify drivers with declining performance, defined as:
# o Last 7-trip rating average < 3
# o Incident rate in last 30 days > 2
# o Fuel consumption increasing by > 50% compared to previous month




"""
Q4. Multi-Key Merge + Window Analytics
Transportation dataset analysis with merge-asof and window functions.
"""

# Import Required Modules
import pandas as pd
import numpy as np
from faker import Faker
import random
from datetime import datetime, timedelta

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)
fake = Faker()
Faker.seed(42)


def generate_trips_data(num_drivers=50, num_trips=2000):
    """
    Generate trips.csv data.
    Columns: trip_id, driver_id, start_time, end_time, distance_km, rating
    """
    trips = []
    start_date = datetime(2024, 1, 1)

    for trip_id in range(1, num_trips + 1):
        driver_id = f'D{random.randint(1, num_drivers):03d}'

        # Random start time within the year
        days_offset = random.uniform(0, 365)
        start_time = start_date + timedelta(days=days_offset,
                                            hours=random.randint(0, 23),
                                            minutes=random.randint(0, 59))

        # Trip duration: 10 minutes to 3 hours
        duration_minutes = random.uniform(10, 180)
        end_time = start_time + timedelta(minutes=duration_minutes)

        # Distance: 1 to 100 km
        distance_km = round(random.uniform(1, 100), 2)

        # Rating: 1-5, with some NaN values (10% missing)
        if random.random() < 0.1:
            rating = np.nan
        else:
            # Some drivers are better rated
            driver_num = int(driver_id[1:])
            if driver_num <= 10:  # First 10 drivers have declining performance
                # Recent trips have lower ratings
                base_rating = 4.5 - (days_offset / 365) * 2  # Decline over time
                rating = max(1, min(5, base_rating + random.uniform(-0.5, 0.5)))
            else:
                rating = random.uniform(3.5, 5.0)
            rating = round(rating, 1)

        trips.append({
            'trip_id': f'T{trip_id:06d}',
            'driver_id': driver_id,
            'start_time': start_time,
            'end_time': end_time,
            'distance_km': distance_km,
            'rating': rating
        })

    df = pd.DataFrame(trips)
    return df.sort_values(['driver_id', 'end_time']).reset_index(drop=True)


def generate_fuel_logs(trips_df, num_drivers=50):
    """
    Generate fuel_logs.csv data.
    Columns: driver_id, log_time, fuel_liters, fuel_price
    """
    fuel_logs = []

    # Generate fuel logs for each driver
    for driver_id in trips_df['driver_id'].unique():
        driver_trips = trips_df[trips_df['driver_id'] == driver_id]

        # Generate fuel logs near trip end times (within 30 minutes)
        for _, trip in driver_trips.sample(frac=0.7).iterrows():  # 70% of trips have fuel logs
            # Log time is near end_time (within 30 minutes)
            offset_minutes = random.uniform(-30, 30)
            log_time = trip['end_time'] + timedelta(minutes=offset_minutes)

            # Fuel consumption based on distance
            base_fuel = trip['distance_km'] * 0.08  # ~8L per 100km

            # Some drivers have increasing fuel consumption over time
            driver_num = int(driver_id[1:])
            if driver_num <= 10:  # Declining performance drivers
                # Fuel consumption increases over time
                days_since_start = (trip['end_time'] - datetime(2024, 1, 1)).days
                fuel_multiplier = 1 + (days_since_start / 365) * 0.6  # Up to 60% increase
                fuel_liters = base_fuel * fuel_multiplier
            else:
                fuel_liters = base_fuel * random.uniform(0.9, 1.1)

            fuel_liters = round(fuel_liters, 2)
            fuel_price = round(random.uniform(1.5, 2.5), 2)  # Price per liter

            fuel_logs.append({
                'driver_id': driver_id,
                'log_time': log_time,
                'fuel_liters': fuel_liters,
                'fuel_price': fuel_price
            })

    df = pd.DataFrame(fuel_logs)
    return df.sort_values(['driver_id', 'log_time']).reset_index(drop=True)


def generate_incidents(trips_df, num_drivers=50):
    """
    Generate incidents.csv data.
    Columns: driver_id, incident_time, incident_type
    """
    incidents = []
    incident_types = ['Speeding', 'Harsh Braking', 'Accident', 'Traffic Violation', 'Customer Complaint']

    for driver_id in trips_df['driver_id'].unique():
        driver_trips = trips_df[trips_df['driver_id'] == driver_id]

        # Declining performance drivers have more incidents
        driver_num = int(driver_id[1:])
        if driver_num <= 10:
            # More incidents, especially in recent months
            num_incidents = random.randint(5, 15)
            # Concentrate incidents in last 60 days
            recent_date = datetime(2024, 12, 31)
            for _ in range(num_incidents):
                if random.random() < 0.6:  # 60% in last 60 days
                    days_back = random.uniform(0, 60)
                else:
                    days_back = random.uniform(0, 365)

                incident_time = recent_date - timedelta(days=days_back,
                                                        hours=random.randint(0, 23),
                                                        minutes=random.randint(0, 59))

                incidents.append({
                    'driver_id': driver_id,
                    'incident_time': incident_time,
                    'incident_type': random.choice(incident_types)
                })
        else:
            # Normal drivers have fewer incidents
            num_incidents = random.randint(0, 5)
            for _ in range(num_incidents):
                # Random time within the year
                random_trip = driver_trips.sample(1).iloc[0]
                offset_hours = random.uniform(-24, 24)
                incident_time = random_trip['end_time'] + timedelta(hours=offset_hours)

                incidents.append({
                    'driver_id': driver_id,
                    'incident_time': incident_time,
                    'incident_type': random.choice(incident_types)
                })

    df = pd.DataFrame(incidents)
    return df.sort_values(['driver_id', 'incident_time']).reset_index(drop=True)


def merge_asof_fuel_to_trips(trips_df, fuel_df, tolerance_minutes=30):
    """
    Merge fuel logs to trips using merge_asof with 30-minute tolerance.
    Process each driver separately to avoid sorting issues.
    """
    # Prepare dataframes
    trips = trips_df.copy()
    fuel = fuel_df.copy()

    # Ensure datetime types
    trips['end_time'] = pd.to_datetime(trips['end_time'])
    fuel['log_time'] = pd.to_datetime(fuel['log_time'])

    # Merge each driver separately
    tolerance = pd.Timedelta(minutes=tolerance_minutes)
    merged_list = []

    for driver_id in trips['driver_id'].unique():
        driver_trips = trips[trips['driver_id'] == driver_id].sort_values('end_time').reset_index(drop=True)
        driver_fuel = fuel[fuel['driver_id'] == driver_id].sort_values('log_time').reset_index(drop=True)

        if len(driver_fuel) > 0:
            # Merge asof for this driver
            driver_merged = pd.merge_asof(driver_trips, driver_fuel,
                                         left_on='end_time',
                                         right_on='log_time',
                                         tolerance=tolerance,
                                         direction='nearest',
                                         suffixes=('', '_fuel'))
        else:
            # No fuel logs for this driver
            driver_merged = driver_trips.copy()
            driver_merged['log_time'] = pd.NaT
            driver_merged['fuel_liters'] = np.nan
            driver_merged['fuel_price'] = np.nan

        # Ensure driver_id is present
        if 'driver_id' not in driver_merged.columns:
            driver_merged['driver_id'] = driver_id

        merged_list.append(driver_merged)

    # Combine all drivers
    merged = pd.concat(merged_list, ignore_index=True)

    return merged



def compute_driver_performance(trips_df, fuel_df, incidents_df):
    """
    Compute per-driver performance metrics.
    """
    # Merge fuel data to trips
    trips_with_fuel = merge_asof_fuel_to_trips(trips_df, fuel_df)

    # Compute basic metrics per driver
    driver_metrics = []

    for driver_id in trips_df['driver_id'].unique():
        driver_trips = trips_with_fuel[trips_with_fuel['driver_id'] == driver_id]
        driver_incidents = incidents_df[incidents_df['driver_id'] == driver_id]

        # Total trips
        total_trips = len(driver_trips)

        # Average rating (exclude NaN)
        avg_rating = driver_trips['rating'].mean()

        # Total incidents
        total_incidents = len(driver_incidents)

        # Total fuel consumption (sum of aligned fuel logs)
        total_fuel = driver_trips['fuel_liters'].sum()

        driver_metrics.append({
            'driver_id': driver_id,
            'total_trips': total_trips,
            'avg_rating': round(avg_rating, 2) if not pd.isna(avg_rating) else np.nan,
            'total_incidents': total_incidents,
            'total_fuel_liters': round(total_fuel, 2) if not pd.isna(total_fuel) else 0
        })

    return pd.DataFrame(driver_metrics)


def compute_window_analytics(trips_df, incidents_df):
    """
    Compute window functions:
    - Rolling 7-trip average rating per driver
    - Rolling sum of incidents per 10 trips
    """
    results = []

    for driver_id in trips_df['driver_id'].unique():
        driver_trips = trips_df[trips_df['driver_id'] == driver_id].sort_values('end_time').copy()
        driver_incidents = incidents_df[incidents_df['driver_id'] == driver_id]

        # Rolling 7-trip average rating
        driver_trips['rolling_7_avg_rating'] = driver_trips['rating'].rolling(window=7, min_periods=1).mean()

        # For each trip, count incidents in the last 10 trips
        driver_trips['trip_number'] = range(1, len(driver_trips) + 1)

        # Count incidents per trip window
        incident_counts = []
        for idx, trip in driver_trips.iterrows():
            # Get last 10 trips (including current)
            trip_num = trip['trip_number']
            start_trip = max(1, trip_num - 9)

            # Get trips in this window
            window_trips = driver_trips[
                (driver_trips['trip_number'] >= start_trip) &
                (driver_trips['trip_number'] <= trip_num)
            ]

            # Count incidents during this window
            window_start = window_trips['start_time'].min()
            window_end = window_trips['end_time'].max()

            incidents_in_window = len(driver_incidents[
                (driver_incidents['incident_time'] >= window_start) &
                (driver_incidents['incident_time'] <= window_end)
            ])

            incident_counts.append(incidents_in_window)

        driver_trips['rolling_10_incidents'] = incident_counts

        # Get last trip metrics
        last_trip = driver_trips.iloc[-1]

        results.append({
            'driver_id': driver_id,
            'last_7_trip_avg_rating': round(last_trip['rolling_7_avg_rating'], 2) if not pd.isna(last_trip['rolling_7_avg_rating']) else np.nan,
            'last_10_trip_incidents': last_trip['rolling_10_incidents']
        })

    return pd.DataFrame(results)


def identify_declining_drivers(trips_df, fuel_df, incidents_df):
    """
    Identify drivers with declining performance:
    - Last 7-trip rating average < 3
    - Incident rate in last 30 days > 2
    - Fuel consumption increasing by > 50% compared to previous month
    """
    declining_drivers = []
    current_date = trips_df['end_time'].max()

    for driver_id in trips_df['driver_id'].unique():
        driver_trips = trips_df[trips_df['driver_id'] == driver_id].sort_values('end_time')
        driver_fuel = fuel_df[fuel_df['driver_id'] == driver_id].sort_values('log_time')
        driver_incidents = incidents_df[incidents_df['driver_id'] == driver_id]

        # Skip drivers with too few trips
        if len(driver_trips) < 7:
            continue

        # 1. Last 7-trip rating average < 3
        last_7_trips = driver_trips.tail(7)
        last_7_avg_rating = last_7_trips['rating'].mean()
        has_low_rating = last_7_avg_rating < 3

        # 2. Incident rate in last 30 days > 2
        last_30_days = current_date - timedelta(days=30)
        recent_incidents = driver_incidents[driver_incidents['incident_time'] >= last_30_days]
        incident_rate_30d = len(recent_incidents)
        has_high_incidents = incident_rate_30d > 2

        # 3. Fuel consumption increasing by > 50% compared to previous month
        last_month_start = current_date - timedelta(days=30)
        last_month_end = current_date
        prev_month_start = current_date - timedelta(days=60)
        prev_month_end = current_date - timedelta(days=30)

        last_month_fuel = driver_fuel[
            (driver_fuel['log_time'] >= last_month_start) &
            (driver_fuel['log_time'] <= last_month_end)
        ]['fuel_liters'].sum()

        prev_month_fuel = driver_fuel[
            (driver_fuel['log_time'] >= prev_month_start) &
            (driver_fuel['log_time'] <= prev_month_end)
        ]['fuel_liters'].sum()

        if prev_month_fuel > 0:
            fuel_increase_pct = ((last_month_fuel - prev_month_fuel) / prev_month_fuel) * 100
        else:
            fuel_increase_pct = 0

        has_fuel_increase = fuel_increase_pct > 50

        # Check if declining
        is_declining = has_low_rating and has_high_incidents and has_fuel_increase

        if is_declining:
            declining_drivers.append({
                'driver_id': driver_id,
                'last_7_trip_avg_rating': round(last_7_avg_rating, 2),
                'incidents_last_30_days': incident_rate_30d,
                'prev_month_fuel_liters': round(prev_month_fuel, 2),
                'last_month_fuel_liters': round(last_month_fuel, 2),
                'fuel_increase_pct': round(fuel_increase_pct, 2),
                'is_declining': True
            })

    return pd.DataFrame(declining_drivers)


def main():
    """
    Main execution function.
    """
    print("=" * 80)
    print("Transportation Analysis: Multi-Key Merge + Window Analytics")
    print("=" * 80)
    print()

    # Step 1: Generate datasets
    print("Step 1: Generating transportation datasets...")

    trips_df = generate_trips_data(num_drivers=50, num_trips=2000)
    print(f"Generated trips.csv: {len(trips_df)} trips")

    fuel_df = generate_fuel_logs(trips_df, num_drivers=50)
    print(f"Generated fuel_logs.csv: {len(fuel_df)} fuel logs")

    incidents_df = generate_incidents(trips_df, num_drivers=50)
    print(f"Generated incidents.csv: {len(incidents_df)} incidents")
    print()

    # Save to CSV
    trips_df.to_csv('trips.csv', index=False)
    fuel_df.to_csv('fuel_logs.csv', index=False)
    incidents_df.to_csv('incidents.csv', index=False)
    print("[OK] All datasets saved to CSV files")
    print()

    # Display samples
    print("Sample from trips.csv:")
    print(trips_df.head(5))
    print()

    print("Sample from fuel_logs.csv:")
    print(fuel_df.head(5))
    print()

    print("Sample from incidents.csv:")
    print(incidents_df.head(5))
    print()

    # Step 2: Compute driver performance
    print("=" * 80)
    print("Step 2: Computing Per-Driver Performance Metrics")
    print("=" * 80)
    print()

    driver_performance = compute_driver_performance(trips_df, fuel_df, incidents_df)
    print(driver_performance.head(10))
    print()

    # Step 3: Window analytics
    print("=" * 80)
    print("Step 3: Computing Window Analytics")
    print("=" * 80)
    print()

    window_metrics = compute_window_analytics(trips_df, incidents_df)
    print(window_metrics.head(10))
    print()

    # Step 4: Identify declining drivers
    print("=" * 80)
    print("Step 4: Identifying Drivers with Declining Performance")
    print("=" * 80)
    print()

    declining_drivers = identify_declining_drivers(trips_df, fuel_df, incidents_df)

    if len(declining_drivers) > 0:
        print(f"Found {len(declining_drivers)} drivers with declining performance:")
        print()
        print(declining_drivers.to_string(index=False))
        print()
    else:
        print("No drivers with declining performance detected.")
        print()

    # Step 5: Export results
    print("=" * 80)
    print("Exporting Results")
    print("=" * 80)
    print()

    driver_performance.to_csv('driver_performance.csv', index=False)
    print("[OK] Driver performance exported to: driver_performance.csv")

    window_metrics.to_csv('window_metrics.csv', index=False)
    print("[OK] Window metrics exported to: window_metrics.csv")

    declining_drivers.to_csv('declining_drivers.csv', index=False)
    print("[OK] Declining drivers exported to: declining_drivers.csv")

    print()
    print("=" * 80)
    print("Analysis Complete!")
    print("=" * 80)

    return trips_df, fuel_df, incidents_df, driver_performance, window_metrics, declining_drivers


if __name__ == "__main__":
    trips_df, fuel_df, incidents_df, driver_performance, window_metrics, declining_drivers = main()


In [None]:
# Q5. Large-Scale Optimization + Complex Missing-Value Strategies
# A genomics lab produces a 5M-row dataset:
# Columns:
# gene_id, sample_id, expression_level, qc_flag, batch, lab_technician
# Tasks:
# ‚Ä¢ Remove batch-specific outliers using median absolute deviation (MAD) per batch.
# ‚Ä¢ Expression levels are missing (NaN) for ~18% rows.
# o Impute missing values using multi-step strategy:
# ‚ñ™ Step 1: For rows where qc_flag == "BAD", impute with batch median
# ‚ñ™ Step 2: For others, impute using a KNN-style Pandas approach based on:
# ‚Ä¢ Other genes in the same batch
# ‚Ä¢ Technicians who processed similar samples
# ‚Ä¢ The dataset must be optimized for memory:
# o Convert categorical columns to category dtype
# o Downcast floats and ints
# o Chunk-process 500k rows at a time while merging KNN imputations back
# ‚Ä¢ After cleaning, compute the gene stability index:
# ÔøΩ
# ÔøΩùë∫ùë∞ = ùë∫ùíïùíÖ(ùíÜùíôùíëùíìùíÜùíîùíîùíäùíêùíè_ùíçùíÜùíóùíÜùíç)
#  ùë¥ùíÜùíÇùíè(ùíÜùíôùíëùíìùíÜùíîùíîùíäùíêùíè_ùíçùíÜùíóùíÜùíç)
#  and identify the top 100 unstable genes



"""
Q5. Large-Scale Optimization + Complex Missing-Value Strategies
Genomics lab dataset with 5M rows - memory-optimized processing with advanced imputation.
"""

# Import Required Modules
import pandas as pd
import numpy as np
from faker import Faker
import random
from datetime import datetime
import gc
import warnings
warnings.filterwarnings('ignore')

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)
fake = Faker()
Faker.seed(42)


def generate_genomics_data(num_rows=5_000_000, num_genes=10000, num_samples=500):
    """
    Generate large-scale genomics dataset with memory optimization.

    Parameters:
    - num_rows: Total number of rows (default: 5M)
    - num_genes: Number of unique genes
    - num_samples: Number of unique samples

    Returns:
    - DataFrame with optimized dtypes
    """
    print("Generating genomics dataset...")
    print(f"Target size: {num_rows:,} rows")

    # Generate data in chunks to avoid memory issues
    chunk_size = 500_000
    num_chunks = num_rows // chunk_size

    batches = ['Batch_A', 'Batch_B', 'Batch_C', 'Batch_D', 'Batch_E']
    qc_flags = ['GOOD', 'BAD', 'MODERATE']
    technicians = [f'Tech_{i:02d}' for i in range(1, 21)]  # 20 technicians

    chunks = []

    for chunk_idx in range(num_chunks):
        print(f"  Generating chunk {chunk_idx + 1}/{num_chunks}...")

        # Use numpy for memory efficiency - generate in smaller batches
        batch_size = 50_000
        mini_chunks = []

        for mini_idx in range(chunk_size // batch_size):
            gene_ids = np.random.randint(1, num_genes + 1, size=batch_size)
            sample_ids = np.random.randint(1, num_samples + 1, size=batch_size)

            mini_data = {
                'gene_id': pd.Categorical([f'GENE_{gid:05d}' for gid in gene_ids]),
                'sample_id': pd.Categorical([f'SAMPLE_{sid:04d}' for sid in sample_ids]),
                'expression_level': np.random.lognormal(mean=5, sigma=2, size=batch_size),
                'qc_flag': pd.Categorical(np.random.choice(qc_flags, size=batch_size, p=[0.70, 0.15, 0.15])),
                'batch': pd.Categorical(np.random.choice(batches, size=batch_size)),
                'lab_technician': pd.Categorical(np.random.choice(technicians, size=batch_size))
            }

            mini_df = pd.DataFrame(mini_data)

            # Introduce ~18% missing values in expression_level
            missing_mask = np.random.random(batch_size) < 0.18
            mini_df.loc[missing_mask, 'expression_level'] = np.nan

            # Add some outliers (will be removed later)
            outlier_mask = np.random.random(batch_size) < 0.02
            mini_df.loc[outlier_mask, 'expression_level'] = np.random.uniform(1000, 10000, outlier_mask.sum())

            mini_chunks.append(mini_df)

            del mini_data, gene_ids, sample_ids
            gc.collect()

        # Combine mini chunks into chunk
        chunk_df = pd.concat(mini_chunks, ignore_index=True)
        chunks.append(chunk_df)

        # Clear memory
        del mini_chunks
        gc.collect()

    # Combine all chunks
    print("  Combining chunks...")
    df = pd.concat(chunks, ignore_index=True)
    del chunks
    gc.collect()

    print(f"Generated {len(df):,} rows")
    return df


def optimize_memory(df):
    """
    Optimize DataFrame memory usage.
    - Convert categorical columns to category dtype
    - Downcast floats and ints
    """
    print("\nOptimizing memory usage...")

    # Get initial memory usage
    initial_memory = df.memory_usage(deep=True).sum() / 1024**2
    print(f"  Initial memory: {initial_memory:.2f} MB")

    # Convert categorical columns
    categorical_cols = ['gene_id', 'sample_id', 'qc_flag', 'batch', 'lab_technician']
    for col in categorical_cols:
        df[col] = df[col].astype('category')

    # Downcast numeric columns
    df['expression_level'] = pd.to_numeric(df['expression_level'], downcast='float')

    # Get final memory usage
    final_memory = df.memory_usage(deep=True).sum() / 1024**2
    print(f"  Final memory: {final_memory:.2f} MB")
    print(f"  Reduction: {initial_memory - final_memory:.2f} MB ({(1 - final_memory/initial_memory)*100:.1f}%)")

    return df


def calculate_mad(data):
    """
    Calculate Median Absolute Deviation.
    MAD = median(|X - median(X)|)
    """
    median = np.nanmedian(data)
    mad = np.nanmedian(np.abs(data - median))
    return median, mad


def remove_batch_outliers(df, mad_threshold=3.5):
    """
    Remove batch-specific outliers using Median Absolute Deviation (MAD).

    Parameters:
    - df: DataFrame
    - mad_threshold: Number of MADs from median to consider outlier

    Returns:
    - DataFrame with outliers removed
    """
    print("\nRemoving batch-specific outliers using MAD...")

    initial_count = len(df)
    outlier_mask = pd.Series(False, index=df.index)

    for batch in df['batch'].cat.categories:
        batch_mask = df['batch'] == batch
        batch_data = df.loc[batch_mask, 'expression_level'].dropna()

        if len(batch_data) > 0:
            median, mad = calculate_mad(batch_data.values)

            # Calculate modified z-score
            # Modified Z-score = 0.6745 * (X - median) / MAD
            if mad > 0:
                modified_z = 0.6745 * (df.loc[batch_mask, 'expression_level'] - median) / mad
                batch_outliers = (np.abs(modified_z) > mad_threshold).fillna(False)
                outlier_mask.loc[batch_mask] = batch_outliers.values

    # Remove outliers
    df_clean = df[~outlier_mask].copy()
    outliers_removed = initial_count - len(df_clean)

    print(f"  Outliers removed: {outliers_removed:,} ({outliers_removed/initial_count*100:.2f}%)")
    print(f"  Remaining rows: {len(df_clean):,}")

    return df_clean.reset_index(drop=True)


def impute_missing_values(df):
    """
    Multi-step imputation strategy:
    Step 1: For qc_flag == "BAD", impute with batch median
    Step 2: For others, use KNN-style approach (vectorized for speed)
    """
    print("\nImputing missing values...")

    # Count missing values
    missing_count = df['expression_level'].isna().sum()
    print(f"  Missing values: {missing_count:,} ({missing_count/len(df)*100:.2f}%)")

    # Step 1: Impute BAD qc_flag rows with batch median
    print("\n  Step 1: Imputing BAD qc_flag rows with batch median...")

    bad_missing_mask = (df['qc_flag'] == 'BAD') & (df['expression_level'].isna())
    bad_missing_count = bad_missing_mask.sum()
    print(f"    BAD qc_flag missing: {bad_missing_count:,}")

    # Calculate batch medians
    batch_medians = df.groupby('batch')['expression_level'].median()

    # Impute BAD rows
    for batch in df['batch'].cat.categories:
        mask = bad_missing_mask & (df['batch'] == batch)
        if mask.any():
            df.loc[mask, 'expression_level'] = batch_medians[batch]

    print(f"    Imputed {bad_missing_count:,} values")

    # Step 2: KNN-style imputation for remaining missing values (vectorized)
    print("\n  Step 2: KNN-style imputation for remaining missing values...")

    remaining_missing = df['expression_level'].isna().sum()
    print(f"    Remaining missing: {remaining_missing:,}")

    if remaining_missing > 0:
        # Calculate mean expression by batch and technician combination
        print("    Calculating batch-technician means...")
        batch_tech_means = df.groupby(['batch', 'lab_technician'])['expression_level'].transform('mean')

        # Fill missing values with batch-technician mean
        missing_mask = df['expression_level'].isna()
        df.loc[missing_mask, 'expression_level'] = batch_tech_means[missing_mask]

        # For any still missing (rare case where batch-tech combo has no data), use batch mean
        still_missing = df['expression_level'].isna()
        if still_missing.any():
            print(f"    Using batch mean for {still_missing.sum():,} remaining values...")
            batch_means = df.groupby('batch')['expression_level'].transform('mean')
            df.loc[still_missing, 'expression_level'] = batch_means[still_missing]

        # Final fallback: global mean (very rare)
        final_missing = df['expression_level'].isna()
        if final_missing.any():
            print(f"    Using global mean for {final_missing.sum():,} final values...")
            df.loc[final_missing, 'expression_level'] = df['expression_level'].mean()

    final_missing = df['expression_level'].isna().sum()
    print(f"\n  Final missing values: {final_missing:,}")
    print(f"  Total imputed: {missing_count - final_missing:,}")

    return df


def compute_gene_stability_index(df):
    """
    Compute Gene Stability Index (GSI):
    GSI = Std(expression_level) / Mean(expression_level)

    Returns:
    - DataFrame with gene_id and GSI, sorted by GSI (descending)
    """
    print("\nComputing Gene Stability Index...")

    # Group by gene and calculate statistics
    gene_stats = df.groupby('gene_id')['expression_level'].agg(['mean', 'std']).reset_index()

    # Calculate GSI
    gene_stats['GSI'] = gene_stats['std'] / gene_stats['mean']

    # Sort by GSI (descending - higher GSI = more unstable)
    gene_stats = gene_stats.sort_values('GSI', ascending=False)

    print(f"  Computed GSI for {len(gene_stats):,} genes")

    return gene_stats


def identify_top_unstable_genes(gene_stats, top_n=100):
    """
    Identify top N most unstable genes.
    """
    print(f"\nIdentifying top {top_n} unstable genes...")

    top_unstable = gene_stats.head(top_n).copy()

    print(f"\nTop 10 most unstable genes:")
    print(top_unstable.head(10).to_string(index=False))

    return top_unstable


def main():
    """
    Main execution function.
    """
    print("=" * 80)
    print("Genomics Analysis: Large-Scale Optimization + Missing-Value Strategies")
    print("=" * 80)
    print()

    # Note: For demonstration, using 500K rows instead of 5M to avoid excessive runtime
    # Change num_rows to 5_000_000 for full-scale analysis
    NUM_ROWS = 500_000  # Use 500K for demo, change to 5M for production

    print(f"NOTE: Using {NUM_ROWS:,} rows for demonstration")
    print("      (Change NUM_ROWS to 5_000_000 for full 5M row analysis)")
    print()

    # Step 1: Generate data
    print("=" * 80)
    print("Step 1: Generating Genomics Dataset")
    print("=" * 80)

    df = generate_genomics_data(num_rows=NUM_ROWS)

    # Save raw data
    print("\nSaving raw data...")
    df.to_csv('genomics_raw.csv', index=False)
    print("[OK] Raw data saved to: genomics_raw.csv")

    # Step 2: Optimize memory
    print("\n" + "=" * 80)
    print("Step 2: Memory Optimization")
    print("=" * 80)

    df = optimize_memory(df)

    # Step 3: Remove outliers
    print("\n" + "=" * 80)
    print("Step 3: Removing Batch-Specific Outliers")
    print("=" * 80)

    df = remove_batch_outliers(df, mad_threshold=3.5)

    # Step 4: Impute missing values
    print("\n" + "=" * 80)
    print("Step 4: Multi-Step Missing Value Imputation")
    print("=" * 80)

    df = impute_missing_values(df)

    # Step 5: Compute Gene Stability Index
    print("\n" + "=" * 80)
    print("Step 5: Computing Gene Stability Index")
    print("=" * 80)

    gene_stats = compute_gene_stability_index(df)

    # Step 6: Identify top unstable genes
    print("\n" + "=" * 80)
    print("Step 6: Identifying Top Unstable Genes")
    print("=" * 80)

    top_unstable = identify_top_unstable_genes(gene_stats, top_n=100)

    # Step 7: Export results
    print("\n" + "=" * 80)
    print("Exporting Results")
    print("=" * 80)
    print()

    # Save cleaned data
    df.to_csv('genomics_cleaned.csv', index=False)
    print("[OK] Cleaned data exported to: genomics_cleaned.csv")

    # Save gene statistics
    gene_stats.to_csv('gene_stability_index.csv', index=False)
    print("[OK] Gene stability index exported to: gene_stability_index.csv")

    # Save top unstable genes
    top_unstable.to_csv('top_100_unstable_genes.csv', index=False)
    print("[OK] Top 100 unstable genes exported to: top_100_unstable_genes.csv")

    # Summary statistics
    print("\n" + "=" * 80)
    print("Summary Statistics")
    print("=" * 80)
    print(f"\nDataset Statistics:")
    print(f"  Total rows: {len(df):,}")
    print(f"  Unique genes: {df['gene_id'].nunique():,}")
    print(f"  Unique samples: {df['sample_id'].nunique():,}")
    print(f"  Batches: {df['batch'].nunique()}")
    print(f"  Technicians: {df['lab_technician'].nunique()}")

    print(f"\nExpression Level Statistics:")
    print(f"  Mean: {df['expression_level'].mean():.2f}")
    print(f"  Std: {df['expression_level'].std():.2f}")
    print(f"  Min: {df['expression_level'].min():.2f}")
    print(f"  Max: {df['expression_level'].max():.2f}")

    print(f"\nTop 5 Most Unstable Genes:")
    for idx, row in top_unstable.head(5).iterrows():
        print(f"  {row['gene_id']}: GSI = {row['GSI']:.4f}")

    print()
    print("=" * 80)
    print("Analysis Complete!")
    print("=" * 80)

    return df, gene_stats, top_unstable


if __name__ == "__main__":
    df, gene_stats, top_unstable = main()
