# Analysis of Save Functions - Optimization Opportunities

## Functions Analyzed
1. `save_processed_data_by_weekday_to_dataframe()`
2. `save_stations_by_category()`

## Current Performance
- **Total runtime**: ~1 full day (24 hours) to process and save all station data
- This is unacceptable for production use and iterative development

---

## Function 1: `save_stations_by_category()` - Batch Processing Orchestrator

### Current Implementation Analysis

#### ‚úÖ Already Optimized:
1. **Pre-loads all data once** - Schedule and incident data are loaded once and reused for all stations
2. **Avoids redundant file I/O** - No reloading for each station
3. **Sequential processing** - Processes stations one by one

#### ‚ö†Ô∏è Major Bottlenecks:

### 1. **NO PARALLEL PROCESSING**
- **Current**: Sequential loop through all stations (one at a time)
- **Problem**: Only uses 1 CPU core out of potentially 8-16 available
- **Impact**: Linear scaling - processing 100 stations takes 100x longer than 1 station

**Optimization Opportunity:**
```python
# Instead of:
for st_code in stations:
    process_single_station(st_code)

# Use parallel processing:
from multiprocessing import Pool
from functools import partial

# Create worker function
worker_func = partial(process_single_station, 
                      schedule_data=schedule_data_loaded,
                      stanox_ref=stanox_ref,
                      tiploc_to_stanox=tiploc_to_stanox,
                      incident_data=incident_data_loaded)

# Process in parallel with 8 workers
with Pool(processes=8) as pool:
    results = pool.map(worker_func, stations)
```

**Expected speedup**: 6-8x faster (on 8-core machine)

---

### 2. **Inefficient File Cleanup**
- **Current**: Uses `shutil.rmtree()` for EACH station folder individually
- **Problem**: Lots of filesystem operations in a loop

**Optimization Opportunity:**
```python
# Instead of removing each station folder individually,
# remove the entire processed_data directory once:
if os.path.exists(output_dir):
    shutil.rmtree(output_dir)
os.makedirs(output_dir, exist_ok=True)
```

**Expected speedup**: Marginal, but cleaner code

---

### 3. **Progress Tracking Overhead**
- **Current**: Extensive print statements for every operation
- **Problem**: Console I/O can slow down processing, especially when running hundreds of stations

**Optimization Opportunity:**
```python
# Use logging with levels instead of print statements
import logging
logging.basicConfig(level=logging.INFO)  # Can be changed to WARNING for production

# Or use tqdm for progress bars
from tqdm import tqdm
for st_code in tqdm(stations, desc="Processing stations"):
    # process...
```

**Expected speedup**: 5-10% when processing many stations

---

### 4. **No Checkpointing/Resume Capability**
- **Current**: If processing crashes at station 80/100, you start over from station 1
- **Problem**: Wastes time re-processing already completed stations

**Optimization Opportunity:**
```python
# Check which stations are already processed
completed_stations = []
for st_code in stations:
    station_folder = os.path.join(output_dir, st_code)
    if os.path.exists(station_folder) and has_all_weekday_files(station_folder):
        completed_stations.append(st_code)

# Only process remaining stations
remaining_stations = [s for s in stations if s not in completed_stations]
print(f"Skipping {len(completed_stations)} already completed stations")
```

**Expected benefit**: Fault tolerance - can resume after crashes

---

### 5. **Memory Management**
- **Current**: Loads ALL schedule data (potentially GB) and ALL incident data into memory
- **Problem**: May cause memory issues on machines with limited RAM

**Note**: This is actually necessary for the optimization strategy (load once, reuse), so it's a trade-off.

**Alternative Approach** (if memory is a problem):
- Process in batches of stations (e.g., 50 at a time)
- Load data ‚Üí Process 50 stations ‚Üí Clear memory ‚Üí Repeat

---

## Function 2: `save_processed_data_by_weekday_to_dataframe()` - Single Station Processor

### Current Implementation Analysis

#### ‚úÖ Already Optimized:
1. **Accepts pre-loaded data** - Avoids redundant file I/O when called in batch mode
2. **Uses optimized delay processing** - `process_delays_optimized()` instead of legacy version

#### ‚ö†Ô∏è Major Bottlenecks:

### 1. **Deduplication Algorithm is O(n¬≤) in worst case**
- **Current**: Nested loop creating hashable tuples for each entry
- **Problem**: For stations with 10,000+ entries, this becomes very slow

**Current Code:**
```python
seen = set()
deduplicated = []
for entry in schedule_timeline_adjusted:
    key_fields = []
    for k, v in sorted(entry.items()):  # <-- Sorting every item for every entry
        if isinstance(v, (str, int, float, type(None))):
            key_fields.append((k, v))
        elif isinstance(v, list):
            key_fields.append((k, tuple(v)))
    entry_key = tuple(key_fields)
    if entry_key not in seen:
        seen.add(entry_key)
        deduplicated.append(entry)
```

**Optimization Opportunity:**
```python
# Use pandas for deduplication (MUCH faster)
df_temp = pd.DataFrame(schedule_timeline_adjusted)

# Convert list columns to hashable strings for deduplication
list_cols = [col for col in df_temp.columns if df_temp[col].apply(lambda x: isinstance(x, list)).any()]
for col in list_cols:
    df_temp[col] = df_temp[col].apply(lambda x: str(x) if isinstance(x, list) else x)

# Use pandas drop_duplicates (optimized C implementation)
df_dedup = df_temp.drop_duplicates()

# Convert back to list of dicts
schedule_timeline_adjusted = df_dedup.to_dict('records')
```

**Expected speedup**: 10-50x faster for large datasets

---

### 2. **Weekday Organization Creates Unnecessary Copies**
- **Current**: Manually loops through entries and creates copies for multi-day schedules
- **Problem**: Uses `.copy()` extensively which is slow

**Optimization Opportunity:**
```python
# Instead of looping and copying:
# Use pandas explode() to expand multi-day schedules efficiently
df = pd.DataFrame(schedule_timeline_adjusted)

# Explode ENGLISH_DAY_TYPE to create one row per day
df_expanded = df.explode('ENGLISH_DAY_TYPE')
df_expanded['WEEKDAY'] = df_expanded['ENGLISH_DAY_TYPE']
df_expanded['DATASET_TYPE'] = df_expanded['ENGLISH_DAY_TYPE'].apply(
    lambda x: 'SINGLE_DAY' if len(x) == 1 else 'MULTI_DAY'
)

# Group by WEEKDAY
weekday_dataframes = {day: group for day, group in df_expanded.groupby('WEEKDAY')}
```

**Expected speedup**: 5-10x faster for this step

---

### 3. **Custom Sorting Function is Inefficient**
- **Current**: Custom `safe_sort_key()` function called for every entry

**Current Code:**
```python
def safe_sort_key(x):
    actual_calls = x.get("ACTUAL_CALLS")
    if actual_calls is None or actual_calls == "NA":
        return 0
    # ... lots of type checking
    
entries.sort(key=safe_sort_key)
```

**Optimization Opportunity:**
```python
# Use pandas sorting (faster, handles type conversion automatically)
df['ACTUAL_CALLS_NUMERIC'] = pd.to_numeric(df['ACTUAL_CALLS'], errors='coerce').fillna(0)
df = df.sort_values('ACTUAL_CALLS_NUMERIC')
df = df.drop(columns=['ACTUAL_CALLS_NUMERIC'])  # Remove temp column
```

**Expected speedup**: 3-5x faster for this step

---

### 4. **Excessive Print Statements**
- **Current**: 6 print statements per station, each with string formatting
- **Problem**: When processing 200+ stations, this adds up

**Optimization Opportunity:**
```python
# Use a verbosity flag
def save_processed_data_by_weekday_to_dataframe(st_code, verbose=True, ...):
    if verbose:
        print(f"Processing data for STANOX: {st_code}")
    # ...

# When called in batch mode, set verbose=False
# Only log major milestones or errors
```

---

### 5. **Parquet Writing Could Be Optimized**
- **Current**: Writes each weekday file individually with default settings

**Optimization Opportunity:**
```python
# Use compression and optimized engine
df.to_parquet(
    filename, 
    index=False,
    engine='pyarrow',  # Faster than fastparquet
    compression='snappy'  # Good balance of speed and size
)
```

**Expected benefit**: Smaller file sizes, potentially faster writes

---

## Summary: Optimization Priority Ranking

### üî¥ **CRITICAL - Implement First** (Expected 6-10x speedup)
1. **Parallel processing in `save_stations_by_category()`**
   - Use `multiprocessing.Pool` with 8 workers
   - Expected speedup: 6-8x (24 hours ‚Üí 3-4 hours)

### üü° **HIGH PRIORITY** (Expected 3-5x additional speedup)
2. **Replace deduplication with pandas in `save_processed_data_by_weekday_to_dataframe()`**
   - Expected speedup: 10-50x for this step
   
3. **Replace weekday organization logic with pandas operations**
   - Expected speedup: 5-10x for this step

### üü¢ **MEDIUM PRIORITY** (Expected 10-20% additional speedup)
4. **Reduce print statements / use logging levels**
5. **Optimize sorting with pandas**
6. **Add checkpointing/resume capability**

### üîµ **LOW PRIORITY** (Marginal improvements)
7. **Optimize file cleanup**
8. **Add parquet compression settings**

---

## Overall Expected Improvement

**Current**: ~24 hours

**After Critical + High Priority optimizations**:
- Parallel processing: 6x speedup ‚Üí 4 hours
- Pandas-based deduplication/organization: 3x speedup ‚Üí 1.3 hours
- Other optimizations: 1.2x speedup ‚Üí **~1 hour total**

**Expected final runtime: 1-2 hours** (from 24 hours)

---

## Additional Considerations

### Memory Usage
- Pre-loading all data is memory-intensive but necessary for speed
- If memory becomes an issue, consider batch processing (50 stations at a time)
- Monitor memory usage with `memory_profiler` package

### Data Validation
- Currently no validation that parquet files were written successfully
- Consider adding file existence/size checks after writing

### Error Handling
- Current error handling catches exceptions and continues
- With parallel processing, need to ensure errors in one worker don't crash entire job
- Consider using `try/except` within worker functions

### Testing
- Before implementing optimizations, create tests to verify output remains identical
- Test with a small subset of stations first (e.g., 5 stations)
- Compare output files byte-by-byte to ensure correctness

---

## Implementation Roadmap

### Phase 1: Parallel Processing (Week 1)
1. Refactor `save_processed_data_by_weekday_to_dataframe()` to be process-safe
2. Implement multiprocessing in `save_stations_by_category()`
3. Test with 10 stations, then 50, then all
4. Measure speedup

### Phase 2: Pandas Optimizations (Week 2)
1. Replace deduplication logic with pandas
2. Replace weekday organization with pandas explode/groupby
3. Replace custom sorting with pandas
4. Validate outputs match original implementation

### Phase 3: Polish (Week 3)
1. Add logging levels
2. Add checkpointing
3. Add progress bars with tqdm
4. Optimize parquet writing settings
5. Add comprehensive error handling

### Phase 4: Validation & Deployment (Week 4)
1. Run full test suite
2. Compare outputs with original implementation
3. Measure final speedup
4. Document changes
5. Deploy optimized version

---

‚úÖ Test Results Summary
New Save Function Tests (test_save_functions.py)
Result: 15/15 PASSED ‚ú®

All the new tests for the save functions work correctly:

‚úÖ save_processed_data_by_weekday_to_dataframe() - 6 tests

Returns dict of DataFrames

Returns None when no schedule data

Deduplication works correctly

Multi-day schedules create entries for all days

DataFrames have correct columns

Entries are sorted by ACTUAL_CALLS

‚úÖ save_stations_by_category() - 5 tests

Returns summary dict

Returns None when no stations found

Handles processing failures gracefully

Creates parquet files for each day

Cleans up existing station folders

‚úÖ load_stations() - 2 tests

Filters by category correctly
Excludes empty categories when loading all
‚úÖ Baseline comparison - 2 tests (placeholder stubs)

Note: The 2 errors for TestPerformanceBenchmarks are expected - they require the optional pytest-benchmark plugin which isn't installed. These are for performance measurement only.