# **NYC Parking Violations Data Cleaning Pipeline**

**Author: Zahra Haider**

---

## **📌 Introduction**

Parking violations in New York City are a goldmine of insights—if the data is cleaned properly. Raw datasets from 2014 to 2017 contained inconsistencies, missing values, and structural changes year-over-year. This project systematically cleans and standardizes the data, ensuring it’s ready for analysis.

🔹 Start: The raw data was messy—missing values, inconsistent formats, and duplicate entries.

🔹 Conflict: Without cleaning, analysis would be unreliable or impossible.

🔹 Solution: A structured, step-by-step pipeline that handles each issue systematically.

---

### **1. Setup & Config**

Before diving into cleaning, we set up the environment:

***Folder Structure:*** Created ``raw_data`` (input) and ``cleaned_data`` (output) directories.

***Year Configuration:*** Defined years ``(2014-2017)`` and chunk size ``(100,000 rows)`` for memory efficiency.

***Valid States:*** Restricted registration states to ``NY, NJ, CT, PA, FL, CA`` (2014 only).

***Why?*** Raw data was too large to load at once, and state restrictions were year-specific.

---

In [2]:
import pandas as pd
import os
from pathlib import Path

# Folder setup
os.makedirs("raw_data", exist_ok=True)
os.makedirs("cleaned_data", exist_ok=True)

# Year configuration
YEARS = [2014, 2015, 2016, 2017]
CHUNK_SIZE = 100_000  # Process 100K rows at a time
VALID_STATES = ['NY','NJ','CT','PA','FL','CA']  # From 2014

## **2. Step-by-Step Cleaning Functions**

### **📌 STEP 1: Column Names**

***Problem:*** Column names had inconsistent formatting (e.g., ``Violation Code`` vs. ``violation_code``).

***Solution:*** Standardized to lowercase with underscores ``(clean_column_names()``).

---

In [3]:
# ===== STEP 1: COLUMN NAMES =====
def clean_column_names(chunk):
    """Standardize column names (lowercase with underscores)"""
    chunk.columns = [col.strip().lower().replace(" ", "_") for col in chunk.columns]
    return chunk

### **📌 STEP 2: Column Removal**

***Problem:*** Some columns had >80% nulls or were irrelevant (e.g., ``feet_from_curb``).

***Solution:*** Dropped useless columns (``drop_useless_columns()``).

---

In [None]:
# ===== STEP 2: COLUMN REMOVAL =====
def drop_useless_columns(chunk):
    """Drop low-value columns (2014 criteria)"""
    cols_to_drop = [
        'intersecting_street',  # 2014: Inconsistent data
        'feet_from_curb',       # 2014: >80% nulls
        'violation_legal_code', # 2014: Not used in analysis
        'time_first_observed'   # 2014: 95% nulls
    ]
    return chunk.drop(columns=[c for c in cols_to_drop if c in chunk.columns], errors='ignore')

### **📌 STEP 3: Null Handling**

***Problem:*** Critical fields like ``violation_location`` had ~27% nulls.

***Solution:*** Filled nulls with placeholders (e.g., ``UNKNOWN_LOC``) (``handle_nulls()``).

---

In [5]:
# ===== STEP 3: NULL HANDLING =====
def handle_nulls(chunk):
    """Impute nulls with placeholders (2014 rules)"""
    return chunk.fillna({
        'violation_location': 'UNKNOWN_LOC',  # 2014: 26.9% nulls
        'issuer_command': 'UNKNOWN_CMD',      # 2014: 26% nulls  
        'house_number': 'N/A',                # 2014: 33% nulls
        'violation_county': 'UNKNOWN_COUNTY'  # 2014: 28.8% nulls
    })

### **📌 STEP 4: String Cleaning**

***Problem:*** Text fields had inconsistent casing and whitespace (e.g., ``" toyota "`` vs. ``"TOYOTA"``).

***Solution:*** Trimmed and uppercased strings ``(clean_strings()``).

---

In [6]:
# ===== STEP 4: STRING CLEANING =====
def clean_strings(chunk):
    """Standardize text fields (2014 rules)"""
    str_cols = ['plate_id', 'registration_state', 'vehicle_make', 'vehicle_body_type']
    for col in str_cols:
        if col in chunk.columns:
            chunk[col] = chunk[col].astype(str).str.strip().str.upper()
    return chunk

### **📌 STEP 5: State Filter (2014 only)**

***Problem:*** 2014 data included invalid registration states.

***Solution:*** Filtered to only valid states ``(filter_states()``).

---

In [7]:
# ===== STEP 5: STATE FILTER (2014 ONLY) ===== 
def filter_states(chunk):
    """Filter to valid US states (2014-specific)"""
    if 'registration_state' in chunk.columns:
        valid_states = ['NY','NJ','CT','PA','FL','CA']  # Your 2014 list
        chunk = chunk[chunk['registration_state'].isin(valid_states)]
    return chunk

### **📌 STEP 6: Data Processing**

***Problem:*** Dates were stored as strings (e.g., ``"01/12/2014"``).

***Solution:*** Converted to datetime and extracted features (``issue_year``, ``issue_month``, etc.) ``(fix_dates()``).

---

In [8]:
# ===== STEP 6: DATE PROCESSING =====
def fix_dates(chunk):
    """Convert and extract date features (2014 rules)"""
    if 'issue_date' in chunk.columns:
        chunk['issue_date'] = pd.to_datetime(chunk['issue_date'], errors='coerce')
        chunk = chunk[chunk['issue_date'].notna()]
        chunk['issue_year'] = chunk['issue_date'].dt.year
        chunk['issue_month'] = chunk['issue_date'].dt.month
        chunk['issue_day'] = chunk['issue_date'].dt.day
        chunk['issue_dayofweek'] = chunk['issue_date'].dt.dayofweek  # Monday=0
    return chunk

### **📌 STEP 7: Numeric Cleaning**

***Problem:*** Numeric columns (e.g., ``violation_code``) were sometimes strings.

***Solution:*** Coerced to numeric type ``(fix_numerics()``).

---

In [9]:
# ===== STEP 7: NUMERIC CLEANING =====
def fix_numerics(chunk):
    """Ensure numeric columns are properly typed (2014 rules)"""
    num_cols = ['violation_code', 'street_code1', 'street_code2', 'street_code3']
    for col in num_cols:
        if col in chunk.columns:
            chunk[col] = pd.to_numeric(chunk[col], errors='coerce')
    return chunk

### **📌 STEP 8: Duplicate Removal**

***Problem:*** Duplicate ``summons_number`` entries skewed counts.

***Solution:*** Tracked and removed duplicates across chunks ``(remove_duplicates()``).

---

In [10]:
# ===== STEP 8: DUPLICATE REMOVAL =====
def remove_duplicates(chunk, seen_summons):
    """Remove duplicate summons numbers (2014 rule)"""
    if 'summons_number' in chunk.columns:
        chunk = chunk[~chunk['summons_number'].isin(seen_summons)]
        seen_summons.update(chunk['summons_number'].tolist())
    return chunk

## 📆 **3. Year-Specific Adjustments**

***Problem:*** Schema changes year-over-year (e.g., column renames in 2016).

***Solution:*** Applied year-specific fixes ``(apply_year_specific_rules()``):

**2015:** Fixed time format (``"A"`` → ``"AM"``).

**2016:** Renamed ``vehicle_color_desc`` to ``vehicle_color``.

---

In [11]:
def apply_year_specific_rules(chunk, year):
    """Handle schema changes per year"""
    if year == 2015:
        # Example: Fix time format change in 2015
        if 'violation_time' in chunk.columns:
            chunk['violation_time'] = chunk['violation_time'].str.replace('A','AM').replace('P','PM')
    
    elif year == 2016:
        # Example: Column renamed in 2016
        if 'vehicle_color_desc' in chunk.columns:
            chunk = chunk.rename(columns={'vehicle_color_desc': 'vehicle_color'})
    
    return chunk

## ⚙️ **4. Chunked Processing Pipeline**

***Problem:*** Datasets were too large to load at once (8M–11M rows/year).

***Solution:*** Processed in **100K-row chunks:**

1. **Read:** Loaded CSV in chunks with error handling.

2. **Clean:** Applied all cleaning steps sequentially.

3. **Append:** Saved cleaned chunks incrementally.

---

**Results:**

**2014:** 8.4M rows

**2015:** 11.8M rows

**2016:** 10.6M rows

**2017:** 10.8M rows

---

In [12]:
def process_year(year):
    input_path = f"raw_data/Parking_{year}.csv"
    output_path = f"cleaned_data/cleaned_{year}.csv"
    
    print(f"\n=== PROCESSING {year} ===")
    print(f"Input: {input_path}")
    print(f"Output: {output_path}")
    
    # Initialize with header only
    pd.DataFrame(columns=get_expected_columns(year)).to_csv(output_path, index=False)
    
    # Track duplicates
    seen_summons = set()
    processed_rows = 0
    chunk_counter = 0
    
    try:
        # Read with more robust settings
        reader = pd.read_csv(
            input_path,
            chunksize=CHUNK_SIZE,
            dtype='unicode',  # Treat all as strings initially
            encoding_errors='replace',
            on_bad_lines='warn'
        )
        
        for chunk in reader:
            chunk_counter += 1
            print(f"Processing chunk {chunk_counter}...", end='\r')
            
            try:
                # Apply cleaning pipeline
                cleaned = (chunk
                    .pipe(clean_column_names)
                    .pipe(drop_useless_columns)
                    .pipe(handle_nulls)
                    .pipe(clean_strings))
                
                if year == 2014:
                    cleaned = cleaned.pipe(filter_states)
                
                cleaned = (cleaned
                    .pipe(fix_dates)
                    .pipe(fix_numerics)
                    .pipe(lambda x: remove_duplicates(x, seen_summons)))
                
                # Append to output
                cleaned.to_csv(
                    output_path,
                    mode='a',
                    header=False,
                    index=False
                )
                processed_rows += len(cleaned)
                
            except Exception as e:
                print(f"\nError in chunk {chunk_counter}: {str(e)}")
                continue
                
    except Exception as e:
        print(f"\nFatal error processing {year}: {str(e)}")
        return None
    
    print(f"\n✅ {year} complete: {processed_rows:,} rows")
    return processed_rows

# Helper function
def get_expected_columns(year):
    """Returns expected columns after cleaning"""
    base_cols = [
        'summons_number', 'plate_id', 'registration_state', 
        'plate_type', 'issue_date', 'violation_code',
        'vehicle_body_type', 'vehicle_make', 'issuing_agency',
        'street_code1', 'street_code2', 'street_code3',
        'vehicle_expiration_date', 'violation_location',
        'violation_precinct', 'issuer_precinct', 'issuer_code',
        'issuer_command', 'issuer_squad', 'violation_time',
        'violation_county', 'house_number', 'street_name',
        'date_first_observed', 'law_section', 'sub_division',
        'days_parking_in_effect', 'vehicle_color',
        'unregistered_vehicle?', 'vehicle_year', 'meter_number',
        'violation_description', 'issue_year', 'issue_month',
        'issue_day', 'issue_dayofweek'
    ]
    if year == 2016:
        base_cols.remove('vehicle_color')
        base_cols.append('vehicle_color_desc')
    return base_cols

# Process years with memory monitoring
for year in YEARS:
    print(f"\n{'='*30}")
    print(f"Starting {year}")
    process_year(year)
    print(f"Completed {year}")
    print(f"{'='*30}\n")


Starting 2014

=== PROCESSING 2014 ===
Input: raw_data/Parking_2014.csv
Output: cleaned_data/cleaned_2014.csv
Processing chunk 92...
✅ 2014 complete: 8,403,201 rows
Completed 2014


Starting 2015

=== PROCESSING 2015 ===
Input: raw_data/Parking_2015.csv
Output: cleaned_data/cleaned_2015.csv
Processing chunk 119...
✅ 2015 complete: 11,809,126 rows
Completed 2015


Starting 2016

=== PROCESSING 2016 ===
Input: raw_data/Parking_2016.csv
Output: cleaned_data/cleaned_2016.csv
Processing chunk 107...
✅ 2016 complete: 10,626,899 rows
Completed 2016


Starting 2017

=== PROCESSING 2017 ===
Input: raw_data/Parking_2017.csv
Output: cleaned_data/cleaned_2017.csv
Processing chunk 109...
✅ 2017 complete: 10,803,028 rows
Completed 2017



## 🔗 **5. Combine All Years (Optional)**

***Problem:*** Yearly files were split, complicating cross-year analysis.

***Solution:*** Merged into a single dataset (combined_2014-2017.csv):

**Total Rows:** 41.6M

**Size:** ~5.2 GB (uncompressed)

---

In [14]:
import pandas as pd
from pathlib import Path

YEARS = [2014, 2015, 2016, 2017]
OUTPUT_FILE = "cleaned_data/combined_2014-2017.csv"

# Step 1: Initialize with header only
first_file = f"cleaned_data/cleaned_{YEARS[0]}.csv"
pd.read_csv(first_file, nrows=0).to_csv(OUTPUT_FILE, index=False)

# Step 2: Process each year in chunks
for year in YEARS:
    input_file = f"cleaned_data/cleaned_{year}.csv"
    print(f"Processing {year}...")
    
    for chunk in pd.read_csv(input_file, chunksize=100000, low_memory=False):
        chunk.to_csv(
            OUTPUT_FILE,
            mode='a',
            header=False,
            index=False
        )

# Step 3: Verify
row_count = sum(1 for _ in open(OUTPUT_FILE)) - 1  # Subtract header
print(f"✅ Combined dataset created: {row_count:,} rows")

Processing 2014...
Processing 2015...
Processing 2016...
Processing 2017...
✅ Combined dataset created: 41,642,254 rows


## 🔍 **Future Analysis Opportunities**

With clean data, we can now explore:

1. **Trends:** Are violations increasing yearly?

2. **Hotspots:** Which precincts issue the most tickets?

3. **Vehicle Analysis:** Do certain car makes/colors get ticketed more?

4. **Time Patterns:** Are tickets more common on weekdays or weekends?

---

## 🎯 **Conclusion**

This pipeline transformed messy, fragmented data into a structured, analysis-ready dataset. By addressing inconsistencies, nulls, and schema changes, we’ve unlocked the potential for deeper insights into NYC parking violations.

**Key Takeaway:** Clean data is the foundation of impactful analysis. 🚀

---

**Tools Used:** Python, Pandas, Pathlib

**Author:** Zahra Haider

---