# MTA Direction Auto Labeler for NER and RE Training

In this notebook i auto label directions in MTA bus and subway alerts using rule based pattern matching.

## Label Types:
- **COMPASS**: NORTHBOUND, SOUTHBOUND, EASTBOUND, WESTBOUND
- **BOROUGH**: MANHATTAN_BOUND, QUEENS_BOUND, BRONX_BOUND, BROOKLYN_BOUND, STATENISLAND_BOUND
- **LOCAL**: UPTOWN, DOWNTOWN
- **PLACE_BOUND**: Any other location based direction
- **BOTH_DIRECTIONS**: Both/either direction phrases
- **UNSPECIFIED**: No direction detected


In [1]:
import re
import csv
import json
import pandas as pd
from collections import defaultdict


## 1. Direction Classifier

Classifies detected direction targets into compass, borough, or place-bound labels.


In [2]:
class DirectionClassifier:
    # Classifies direction targets into COMPASS, BOROUGH, LOCAL, or PLACE_BOUND
    
    COMPASS_TERMS = {'north', 'south', 'east', 'west'}
    BOROUGH_TERMS = {'manhattan', 'queens', 'bronx', 'brooklyn', 'staten island'}
    LOCAL_TERMS = {'uptown', 'downtown'}
    
    COMPASS_LABELS = {
        'north': 'NORTHBOUND',
        'south': 'SOUTHBOUND',
        'east': 'EASTBOUND',
        'west': 'WESTBOUND'
    }
    
    BOROUGH_LABELS = {
        'manhattan': 'MANHATTAN_BOUND',
        'queens': 'QUEENS_BOUND',
        'bronx': 'BRONX_BOUND',
        'brooklyn': 'BROOKLYN_BOUND',
        'staten island': 'STATENISLAND_BOUND'
    }
    
    LOCAL_LABELS = {
        'uptown': 'UPTOWN',
        'downtown': 'DOWNTOWN'
    }
    
    @classmethod
    def classify(cls, target_text):
        # Returns normalized direction label based on precedence: COMPASS > BOROUGH > LOCAL > PLACE_BOUND
        target_normalized = target_text.strip().lower()
        
        if target_normalized in cls.COMPASS_TERMS:
            return cls.COMPASS_LABELS[target_normalized]
        
        if target_normalized in cls.BOROUGH_TERMS:
            return cls.BOROUGH_LABELS[target_normalized]
        
        if target_normalized in cls.LOCAL_TERMS:
            return cls.LOCAL_LABELS[target_normalized]
        
        return 'PLACE_BOUND'


## 2. Direction Detector

Implements pattern matching rules for detecting direction mentions.


In [3]:
class DirectionDetector:
    # Detects direction mentions in MTA alert headers
    
    # Rejection patterns - only reject clearly invalid patterns
    REJECTION_PATTERNS = [
        r'\bin\s+both\s+directions?\s+of\b',  # "in both direction of X" (invalid)
    ]
    
    # Stop words that signal a boundary (common grammatical words)
    STOP_WORDS = {
        'a', 'an', 'the',
        'and', 'or', 'but', 'nor',
        'to', 'from', 'via', 'at', 'in', 'on', 'for', 'of', 'with', 'by', 'as', 'into',
        'are', 'is', 'was', 'were', 'will', 'may', 'can', 'could', 'would', 'should',
        'be', 'been', 'being', 'have', 'has', 'had',
        'running', 'operating', 'making', 'skipping', 'stopping', 'ending', 'starting',
        'experience', 'expect', 'wait', 'stop', 'last', 'first', 'next',
        'some', 'all', 'no', 'any', 'each', 'every', 'this', 'that', 'these', 'those', 'most',
        'trains', 'train', 'buses', 'bus', 'service', 'services', 'shuttle', 'shuttles',
        'longer', 'shorter', 'local', 'express', 'limited', 'delayed', 'suspended',
        'you', 'your', 'we', 'our', 'they', 'their'
    }
    
    # Known abbreviations that are part of place names (not boundaries)
    KNOWN_ABBREVIATIONS = {
        'st', 'st.', 'ave', 'av', 'av.', 'ave.', 'sq', 'sq.', 'blvd', 'blvd.',
        'pkwy', 'pkwy.', 'rd', 'rd.', 'pl', 'pl.', 'ct', 'ct.', 'dr', 'dr.',
        'hwy', 'hwy.', 'jct', 'jct.', 'ctr', 'ctr.', 'pk', 'pk.',
        'sts', 'avs'
    }
    
    def __init__(self):
        self.classifier = DirectionClassifier()
        
        # Borough patterns (highest priority)
        self.borough_patterns = [
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))Manhattan[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'MANHATTAN_BOUND'),
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))Queens[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'QUEENS_BOUND'),
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))Bronx[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'BRONX_BOUND'),
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))Brooklyn[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'BROOKLYN_BOUND'),
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))Staten\s+Island[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'STATENISLAND_BOUND'),
        ]
        
        # Local direction patterns (Uptown/Downtown)
        self.local_patterns = [
            (re.compile(r'\bUptown\b', re.IGNORECASE), 'UPTOWN'),
            (re.compile(r'\bDowntown\b', re.IGNORECASE), 'DOWNTOWN'),
        ]
        
        # Compass direction patterns
        self.compass_patterns = [
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))North[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'NORTHBOUND'),
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))South[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'SOUTHBOUND'),
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))East[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'EASTBOUND'),
            (re.compile(r'(?:^|(?<=[^a-zA-Z]))West[-\s]?bound(?:\b|(?=[A-Z0-9]))', re.IGNORECASE | re.MULTILINE), 'WESTBOUND'),
        ]
        
        # Place-bound anchor pattern: finds "-bound" or " bound"
        self.place_bound_anchor_pattern = re.compile(r'[-\s]bound\b', re.IGNORECASE)
        
        # Both/either direction(s) pattern
        self.both_either_pattern = re.compile(r'\b(?:both|either)\s+directions?\b', re.IGNORECASE)
    
    def _is_abbreviation(self, word):
        word_clean = word.lower().rstrip('.')
        return word_clean in self.KNOWN_ABBREVIATIONS or word.lower() in self.KNOWN_ABBREVIATIONS
    
    def _is_stop_word(self, word):
        return word.lower() in self.STOP_WORDS
    
    def _tokenize_leftward(self, text, end_pos):
        # Tokenize text going leftward from end_pos
        # Returns list of (start_pos, end_pos, token_text, is_connected) in left-to-right order
        tokens = []
        pos = end_pos
        prev_connected = False
        
        while pos > 0:
            # Skip whitespace
            while pos > 0 and text[pos - 1] == ' ':
                pos -= 1
            
            if pos == 0:
                break
            
            # Check for sentence boundary
            if text[pos - 1] in '\n,':
                break
            
            # Check for dash or slash (connector)
            if text[pos - 1] in '-/':
                prev_connected = True
                pos -= 1
                continue
            
            # Extract the token
            token_end = pos
            token_start = pos
            
            while token_start > 0 and (text[token_start - 1].isalnum() or text[token_start - 1] == '.'):
                if text[token_start - 1] == '.':
                    # Check if period is part of abbreviation
                    temp_start = token_start - 1
                    while temp_start > 0 and text[temp_start - 1].isalpha():
                        temp_start -= 1
                    potential_abbrev = text[temp_start:token_start - 1]
                    if potential_abbrev and self._is_abbreviation(potential_abbrev):
                        token_start = temp_start
                        break
                    else:
                        break
                token_start -= 1
            
            if token_start < token_end:
                token_text = text[token_start:token_end]
                tokens.append((token_start, token_end, token_text, prev_connected))
                pos = token_start
                prev_connected = False
            else:
                break
        
        tokens.reverse()
        return tokens
    
    def _find_place_bound_start(self, text, bound_start):
        # Find the start position of a place name by scanning leftward from "bound"
        tokens = self._tokenize_leftward(text, bound_start)
        
        if not tokens:
            return bound_start
        
        # Find the boundary - place name starts after the last stop word (not dash-connected)
        place_start_idx = 0
        
        for i, (start, end, token_text, is_connected) in enumerate(tokens):
            base_word = token_text.split('-')[0].split('/')[0]
            
            if self._is_stop_word(base_word) and not is_connected:
                place_start_idx = i + 1
            
            # Handle standalone numbers
            if base_word.isdigit() and not is_connected:
                if i + 1 < len(tokens):
                    next_token = tokens[i + 1][2].split('-')[0].split('/')[0]
                    if not self._is_abbreviation(next_token):
                        place_start_idx = i + 1
                else:
                    place_start_idx = i + 1
        
        if place_start_idx < len(tokens):
            return tokens[place_start_idx][0]
        else:
            return bound_start
    
    def _check_rejection_context(self, text, match_start, match_end):
        # Check if match appears in a rejected context (only checks BEFORE the match)
        context_start = max(0, match_start - 10)
        context = text[context_start:match_end + 5]
        
        for pattern in self.REJECTION_PATTERNS:
            if re.search(pattern, context, re.IGNORECASE):
                return True
        return False
    

    def _has_overlap(self, start, end, detected_ranges):
        for ds, de in detected_ranges:
            if not (end <= ds or start >= de):
                return True
        return False
    
    def detect_place_bound_directions(self, text, detected_ranges):
        # Detect place-bound directions with improved boundary detection
        detections = []
        
        for match in self.place_bound_anchor_pattern.finditer(text):
            bound_start = match.start()
            bound_end = match.end()
            
            if self._has_overlap(bound_start, bound_end, detected_ranges):
                continue
            
            place_start = self._find_place_bound_start(text, bound_start)
            
            if place_start >= bound_start:
                continue
            
            if self._has_overlap(place_start, bound_end, detected_ranges):
                continue
            
            if self._check_rejection_context(text, place_start, bound_end):
                continue
            
            full_match = text[place_start:bound_end]
            detected_ranges.append((place_start, bound_end))
            
            detections.append({
                'text': full_match,
                'start': place_start,
                'end': bound_end,
                'label': 'PLACE_BOUND'
            })
        
        return detections
    
    def detect_bound_directions(self, text):
        # Detect X-bound patterns with priority: Borough > Local > Compass > Place-bound
        detections = []
        detected_ranges = []
        
        # Priority 1: Borough directions
        for pattern, label in self.borough_patterns:
            for match in pattern.finditer(text):
                start_pos, end_pos = match.start(), match.end()
                
                if self._has_overlap(start_pos, end_pos, detected_ranges):
                    continue
                if self._check_rejection_context(text, start_pos, end_pos):
                    continue
                
                detected_ranges.append((start_pos, end_pos))
                detections.append({
                    'text': match.group(0),
                    'start': start_pos,
                    'end': end_pos,
                    'label': label
                })
        
        # Priority 2: Local directions
        for pattern, label in self.local_patterns:
            for match in pattern.finditer(text):
                start_pos, end_pos = match.start(), match.end()
                
                if self._has_overlap(start_pos, end_pos, detected_ranges):
                    continue
                if self._check_rejection_context(text, start_pos, end_pos):
                    continue
                
                detected_ranges.append((start_pos, end_pos))
                detections.append({
                    'text': match.group(0),
                    'start': start_pos,
                    'end': end_pos,
                    'label': label
                })
        
        # Priority 3: Compass directions
        for pattern, label in self.compass_patterns:
            for match in pattern.finditer(text):
                start_pos, end_pos = match.start(), match.end()
                
                if self._has_overlap(start_pos, end_pos, detected_ranges):
                    continue
                if self._check_rejection_context(text, start_pos, end_pos):
                    continue
                
                detected_ranges.append((start_pos, end_pos))
                detections.append({
                    'text': match.group(0),
                    'start': start_pos,
                    'end': end_pos,
                    'label': label
                })
        
        # Priority 4: Place-bound directions
        place_bound_detections = self.detect_place_bound_directions(text, detected_ranges)
        detections.extend(place_bound_detections)
        
        return detections
    
    def detect_both_either(self, text):
        # Detect "both/either direction(s)" patterns
        detections = []
        
        for match in self.both_either_pattern.finditer(text):
            detections.append({
                'text': match.group(0),
                'start': match.start(),
                'end': match.end(),
                'label': 'BOTH_DIRECTIONS'
            })
        
        return detections
    
    def detect_all(self, text):
        # Run all detection rules on text
        all_detections = []
        all_detections.extend(self.detect_bound_directions(text))
        all_detections.extend(self.detect_both_either(text))
        return {'detections': all_detections}


## 3. Process Dataset

Load the input data and apply direction detection.


In [4]:
# Configuration
INPUT_FILE = 'Optional Spans/MTA_Data_preprocessed_routespans.csv'
OUTPUT_FILE = 'Preprocessed/MTA_Data_silver_directions.csv'

# Load input data
print(f"Reading {INPUT_FILE}...")
df_input = pd.read_csv(INPUT_FILE)
print(f"Loaded {len(df_input):,} records")

# Initialize detector
detector = DirectionDetector()

Reading Optional Spans/MTA_Data_preprocessed_routespans.csv...
Loaded 226,160 records
Loaded 226,160 records


In [5]:
# Process all headers
print("Processing headers...")

direction_spans_list = []
direction_list = []

for idx, row in df_input.iterrows():
    if (idx + 1) % 10000 == 0:
        print(f"  Processed {idx+1:,} / {len(df_input):,} records...")
    
    header_text = row['header']
    
    # Handle missing or invalid headers
    if pd.isna(header_text):
        direction_spans_list.append(json.dumps([]))
        direction_list.append(json.dumps(['UNSPECIFIED']))
        continue
    
    header_text = str(header_text)
    result = detector.detect_all(header_text)
    detections = result['detections']
    
    if detections:
        direction_spans = []
        direction_labels = []
        
        for detection in detections:
            direction_spans.append({
                'start': detection['start'],
                'end': detection['end'],
                'type': 'DIRECTION',
                'value': detection['label']
            })
            direction_labels.append(detection['label'])
        
        direction_spans_list.append(json.dumps(direction_spans))
        direction_list.append(json.dumps(direction_labels))
    else:
        direction_spans_list.append(json.dumps([]))
        direction_list.append(json.dumps(['UNSPECIFIED']))

df_input['direction'] = direction_list
df_input['direction_spans'] = direction_spans_list

print(f"Completed processing {len(df_input):,} records")


Processing headers...
  Processed 10,000 / 226,160 records...
  Processed 10,000 / 226,160 records...
  Processed 20,000 / 226,160 records...
  Processed 20,000 / 226,160 records...
  Processed 30,000 / 226,160 records...
  Processed 30,000 / 226,160 records...
  Processed 40,000 / 226,160 records...
  Processed 40,000 / 226,160 records...
  Processed 50,000 / 226,160 records...
  Processed 50,000 / 226,160 records...
  Processed 60,000 / 226,160 records...
  Processed 60,000 / 226,160 records...
  Processed 70,000 / 226,160 records...
  Processed 70,000 / 226,160 records...
  Processed 80,000 / 226,160 records...
  Processed 80,000 / 226,160 records...
  Processed 90,000 / 226,160 records...
  Processed 90,000 / 226,160 records...
  Processed 100,000 / 226,160 records...
  Processed 100,000 / 226,160 records...
  Processed 110,000 / 226,160 records...
  Processed 110,000 / 226,160 records...
  Processed 120,000 / 226,160 records...
  Processed 120,000 / 226,160 records...
  Processed 

## 4. Save Output

Write the labeled data to CSV.

In [6]:
# Save to CSV
print(f"\nWriting output to {OUTPUT_FILE}...")
df_input.to_csv(OUTPUT_FILE, index=False)
print(f"Successfully wrote {len(df_input):,} records to {OUTPUT_FILE}")

# Display first few rows
print("\nFirst 5 rows of output:")
df_input.head()


Writing output to Preprocessed/MTA_Data_silver_directions.csv...
Successfully wrote 226,160 records to Preprocessed/MTA_Data_silver_directions.csv

First 5 rows of output:
Successfully wrote 226,160 records to Preprocessed/MTA_Data_silver_directions.csv

First 5 rows of output:


Unnamed: 0,alert_id,date,agency,status_label,affected,affected_spans,header,direction,direction_spans
0,180128,11/05/2022 05:58:00 PM,NYCT Subway,delays,"[""A"", ""C""]","[{""start"": 0, ""end"": 1, ""type"": ""ROUTE"", ""valu...",A C trains are delayed while we conduct emerge...,"[""UNSPECIFIED""]",[]
1,189489,12/20/2022 07:09:00 PM,NYCT Subway,delays,"[""L""]","[{""start"": 0, ""end"": 1, ""type"": ""ROUTE"", ""valu...",L trains are running with delays in both direc...,"[""BOTH_DIRECTIONS""]","[{""start"": 36, ""end"": 51, ""type"": ""DIRECTION"",..."
2,189321,12/20/2022 12:31:00 AM,NYCT Subway,delays,"[""J""]","[{""start"": 14, ""end"": 15, ""type"": ""ROUTE"", ""va...",Jamaica-bound J trains are delayed while we re...,"[""PLACE_BOUND""]","[{""start"": 0, ""end"": 13, ""type"": ""DIRECTION"", ..."
3,188948,12/18/2022 06:12:00 AM,NYCT Subway,delays,"[""Q""]","[{""start"": 11, ""end"": 12, ""type"": ""ROUTE"", ""va...",Southbound Q trains are running with delays af...,"[""SOUTHBOUND""]","[{""start"": 0, ""end"": 10, ""type"": ""DIRECTION"", ..."
4,187749,12/12/2022 02:26:00 PM,NYCT Subway,delays,"[""B"", ""C""]","[{""start"": 11, ""end"": 12, ""type"": ""ROUTE"", ""va...",Southbound B C trains are running with delays ...,"[""SOUTHBOUND""]","[{""start"": 0, ""end"": 10, ""type"": ""DIRECTION"", ..."


In [7]:
# Save version without span columns
NO_SPANS_OUTPUT_FILE = 'Preprocessed/MTA_Data_silver_no_spans.csv'

# Create a copy and drop span columns
df_no_spans = df_input.copy()
columns_to_drop = ['direction_spans']

# Also drop affected_spans if it exists
if 'affected_spans' in df_no_spans.columns:
    columns_to_drop.append('affected_spans')

df_no_spans = df_no_spans.drop(columns=columns_to_drop)

print(f"\nWriting output without spans to {NO_SPANS_OUTPUT_FILE}...")
df_no_spans.to_csv(NO_SPANS_OUTPUT_FILE, index=False)
print(f"Successfully wrote {len(df_no_spans):,} records to {NO_SPANS_OUTPUT_FILE}")
print(f"Dropped columns: {columns_to_drop}")


Writing output without spans to Preprocessed/MTA_Data_silver_no_spans.csv...
Successfully wrote 226,160 records to Preprocessed/MTA_Data_silver_no_spans.csv
Dropped columns: ['direction_spans', 'affected_spans']
Successfully wrote 226,160 records to Preprocessed/MTA_Data_silver_no_spans.csv
Dropped columns: ['direction_spans', 'affected_spans']
