# SE446 - Week 3B: MapReduce Practice with Crime Data

## 📚 Learning Objectives

By the end of this notebook, you will be able to:
1. Apply MapReduce to analyze real-world crime data
2. Implement various aggregation patterns (count, sum, average)
3. Chain multiple MapReduce jobs for complex analysis
4. Debug and optimize MapReduce code

---

## 1. Setup: Load the MapReduce Framework (Emulator)

### ⚠️ Reminder: Local Emulation
The code below defines a **local Python emulator** for MapReduce. 

**Purpose:**
- It allows us to run MapReduce logic on small datasets directly in this notebook.
- It mimics the core behavior of a distributed system (Map → Shuffle → Reduce) using standard Python loops and dictionaries.
- In a real project, this same logic would run on a Hadoop cluster where the "loops" are replaced by parallel processing on many machines.

In [None]:
from collections import defaultdict
import pandas as pd

def map_reduce(data, mapper, reducer):
    """
    Simple MapReduce implementation.
    """
    # MAP PHASE
    mapped = []
    for record in data:
        result = mapper(record)
        if result is not None:
            if isinstance(result, list):
                mapped.extend(result)
            else:
                mapped.append(result)
    
    # SHUFFLE PHASE
    shuffled = defaultdict(list)
    for key, value in mapped:
        shuffled[key].append(value)
    
    # REDUCE PHASE
    results = []
    for key, values in shuffled.items():
        result = reducer(key, values)
        if result is not None:
            results.append(result)
    
    return results

print("✅ MapReduce framework loaded!")

## 2. Load Chicago Crime Dataset

In [None]:
# Generating Synthetic Crime Data 🎲
# ------------------------------------------------------------------
# Instead of downloading a potentially small or missing external file,
# we generate a robust dataset of 1,000 records right here.
# This ensures everyone has the exact same data for practice.

print("⚠️ Generating 1,000 entries of synthetic crime data...")

crimes_df = pd.DataFrame({
    'ID': range(1, 1001),
    'Primary Type': ['THEFT']*300 + ['BATTERY']*250 + ['ASSAULT']*150 + 
                   ['CRIMINAL DAMAGE']*100 + ['BURGLARY']*100 + ['OTHER']*100,
    'District': [1]*100 + [2]*150 + [3]*200 + [4]*150 + [5]*100 + 
               [6]*100 + [7]*100 + [8]*100,
    'Arrest': [True]*350 + [False]*650,
    'Location Description': ['STREET']*400 + ['RESIDENCE']*300 + 
                           ['APARTMENT']*150 + ['STORE']*150
})

# Shuffle the dataset so patterns aren't just sequential
crimes_df = crimes_df.sample(frac=1, random_state=42).reset_index(drop=True)

print(f"✅ Created {len(crimes_df):,} sample records")

# Convert to list of dictionaries for MapReduce
crime_records = crimes_df.to_dict('records')

# Preview columns
print(f"\n📋 Columns: {list(crimes_df.columns)}")
crimes_df.head(5)

## 3. Exercise 1: Count Crimes by Type 📊

**Goal**: Count how many crimes of each type occurred.

**Key**: Crime type (e.g., "THEFT", "ASSAULT")  
**Value**: 1 (for counting)

In [None]:
# Mapper: emit (crime_type, 1) for each crime record
def crime_type_mapper(record):
    """
    Input: Crime record (dict)
    Output: (crime_type, 1)
    """
    crime_type = record['Primary Type']
    return (crime_type, 1)

# Reducer: sum all counts for each crime type
def count_reducer(key, values):
    """
    Input: crime_type, list of 1s
    Output: (crime_type, total_count)
    """
    return (key, sum(values))

# Run MapReduce
crime_counts = map_reduce(crime_records, crime_type_mapper, count_reducer)

# Display results (sorted by count)
print("\n📊 Crime Counts by Type:")
print("-" * 40)
for crime_type, count in sorted(crime_counts, key=lambda x: x[1], reverse=True):
    print(f"{crime_type:25} {count:>6,}")

## 4. Exercise 2: Crimes per District 🏢

**Goal**: Count crimes in each police district.

In [None]:
# TODO: Complete the mapper
def district_mapper(record):
    """
    Input: Crime record (dict)
    Output: (district, 1)
    """
    # YOUR CODE HERE
    district = record['District']
    return (district, 1)

# Run MapReduce (reuse count_reducer)
district_counts = map_reduce(crime_records, district_mapper, count_reducer)

# Display results
print("\n🏢 Crimes by District:")
print("-" * 30)
for district, count in sorted(district_counts, key=lambda x: x[1], reverse=True)[:10]:
    print(f"District {district:>3}: {count:>6,} crimes")

## 5. Exercise 3: Filter - Only Crimes with Arrests 🚔

**Goal**: Count only crimes where an arrest was made.

**Pattern**: Return `None` from mapper to filter records.

In [None]:
def arrest_filter_mapper(record):
    """
    Only emit crimes where Arrest == True
    """
    # Filter: only process if arrest was made
    if record.get('Arrest') == True:
        return (record['Primary Type'], 1)
    else:
        return None  # Filter out - skip this record

# Run MapReduce
arrest_counts = map_reduce(crime_records, arrest_filter_mapper, count_reducer)

# Display results
print("\n🚔 Crimes with Arrests:")
print("-" * 40)
for crime_type, count in sorted(arrest_counts, key=lambda x: x[1], reverse=True)[:10]:
    print(f"{crime_type:25} {count:>6,}")

## 6. Exercise 4: Calculate Arrest Rate 📈

**Goal**: Calculate arrest rate (%) for each crime type.

**Key**: Crime type  
**Value**: (arrested, total) - tuple for computing rate

In [None]:
def arrest_rate_mapper(record):
    """
    Emit (crime_type, (arrested, total))
    - arrested: 1 if arrest made, 0 otherwise
    - total: always 1 (for counting)
    """
    crime_type = record['Primary Type']
    arrested = 1 if record.get('Arrest') == True else 0
    return (crime_type, (arrested, 1))

def arrest_rate_reducer(crime_type, values):
    """
    Calculate arrest rate from list of (arrested, total) tuples
    """
    total_arrests = sum(v[0] for v in values)
    total_crimes = sum(v[1] for v in values)
    
    if total_crimes > 0:
        rate = (total_arrests / total_crimes) * 100
    else:
        rate = 0
    
    return (crime_type, {
        'arrests': total_arrests,
        'total': total_crimes,
        'rate': round(rate, 1)
    })

# Run MapReduce
arrest_rates = map_reduce(crime_records, arrest_rate_mapper, arrest_rate_reducer)

# Display results (sorted by rate)
print("\n📈 Arrest Rates by Crime Type:")
print("-" * 55)
print(f"{'Crime Type':25} {'Arrests':>8} {'Total':>8} {'Rate':>8}")
print("-" * 55)
for crime_type, stats in sorted(arrest_rates, key=lambda x: x[1]['rate'], reverse=True)[:10]:
    print(f"{crime_type:25} {stats['arrests']:>8,} {stats['total']:>8,} {stats['rate']:>7.1f}%")

## 7. Exercise 5: Multi-Stage - Top 5 Crime Types 🏆

**Goal**: Find the 5 most common crime types.

**Approach**: Two-stage MapReduce
1. **Stage 1**: Count crimes by type
2. **Stage 2**: Find top 5

In [None]:
# Stage 1: Count by type (already done above)
crime_counts = map_reduce(crime_records, crime_type_mapper, count_reducer)
print(f"Stage 1: Got {len(crime_counts)} crime types")

# Stage 2: Find top 5
def top_n_mapper(item):
    """
    Send all items to same reducer using dummy key
    """
    crime_type, count = item
    return ("all", (crime_type, count))  # Key="all" sends everything to one reducer

def top_5_reducer(key, values):
    """
    Sort and return top 5
    """
    sorted_values = sorted(values, key=lambda x: x[1], reverse=True)
    return (key, sorted_values[:5])

# Run Stage 2
top_5_result = map_reduce(crime_counts, top_n_mapper, top_5_reducer)

# Display results
print("\n🏆 Top 5 Crime Types:")
print("-" * 40)
for rank, (crime_type, count) in enumerate(top_5_result[0][1], 1):
    print(f"#{rank}: {crime_type:25} {count:>6,}")

## 8. 🎯 Challenge Exercise: Crime Hot Spots

**Goal**: Find the top 3 crime locations for each crime type.

This requires thinking carefully about keys and multi-stage processing!

In [None]:
# TODO: Implement your solution

# Stage 1: Count (crime_type, location) combinations
def type_location_mapper(record):
    crime_type = record['Primary Type']
    location = record.get('Location Description', 'UNKNOWN')
    # Composite key: (crime_type, location)
    return ((crime_type, location), 1)

# Run Stage 1
type_location_counts = map_reduce(crime_records, type_location_mapper, count_reducer)

# Stage 2: Group by crime type and find top 3 locations
def group_by_type_mapper(item):
    (crime_type, location), count = item
    return (crime_type, (location, count))

def top_3_locations_reducer(crime_type, values):
    sorted_locations = sorted(values, key=lambda x: x[1], reverse=True)[:3]
    return (crime_type, sorted_locations)

# Run Stage 2
crime_hotspots = map_reduce(type_location_counts, group_by_type_mapper, top_3_locations_reducer)

# Display results
print("\n🔥 Crime Hot Spots (Top 3 Locations per Type):")
print("=" * 60)
for crime_type, locations in sorted(crime_hotspots, key=lambda x: x[0])[:5]:  # Show first 5 types
    print(f"\n{crime_type}:")
    for location, count in locations:
        print(f"  • {location}: {count:,} incidents")

## 9. Summary: MapReduce Patterns

| Pattern | Mapper Output | Reducer Operation |
|---------|---------------|------------------|
| **Count** | `(key, 1)` | `sum(values)` |
| **Sum** | `(key, value)` | `sum(values)` |
| **Average** | `(key, (value, 1))` | `sum(v)/sum(c)` |
| **Filter** | `(key, value)` or `None` | pass through |
| **Top N** | `("all", (key, value))` | sort and slice |

### Key Design Questions

1. **What is my key?** → What do I want to group by?
2. **What is my value?** → What do I want to aggregate?
3. **Do I need multiple stages?** → Complex aggregations often do

## 10. 📝 Homework: Milestone 2 Preparation

For Milestone 2, you'll implement MapReduce analysis on the crime dataset.

**Tasks to prepare:**
1. Review all patterns from this notebook
2. Clone your team's GitHub repo
3. Copy the starter notebook to your folder
4. Begin implementing the required analyses

**Remember:** Commit frequently with meaningful messages!