In [1]:
import pandas as pd
import re
from geopy.geocoders import Nominatim
from geopy.distance import geodesic
import time
from IPython.display import display

In [26]:
INPUT_CSV = "../../data/processed/ezpass_transactions.csv"
OUTPT_CSV = "../../data/processed/flagged_transactions.csv"

In [None]:
df = pd.read_csv(INPUT_CSV)

In [3]:
for col in df.columns:
    print(col)


posting_date
transaction_date
post_lag_days
tag_plate_number
vehicle_type_code
vehicle_class
agency
agency_fullname
description
exit_time
exit_hour
exit_plaza
exit_plaza_fullname
exit_plaza_agency
exit_lane
amount
balance
prepaid
plan_rate
fare_type


## Flag

In [4]:
df_flag = df.copy()

# Ensure dtypes
for c in ["posting_date", "transaction_date", "exit_time"]:
    if c in df_flag.columns:
        df_flag[c] = pd.to_datetime(df_flag[c], errors="coerce")

In [5]:
# --- tune thresholds here ---
LAG_MAX = 3                 # days
TOPUP_MIN = 50.0            # $ threshold to consider a top-up
NEAR_ZERO_THRESHOLD = 1.00  # $ threshold for near-zero amounts
AMT_DEV_PCT = 0.50          # 50% above/below group median
MIN_EXIT_GAP_MIN = 1        # min gap between exits for same tag
QUIET_HOURS = {1,2,3,4}     # "unusual hour" window
# Flag impossible travel (unrealistic speeds between plazas)
SPEED_THRESHOLD = 120  # mph - max realistic highway speed

#### A. Posting/transaction timing


`flag_post_lag_high`
<br> **What**: The ledger entry was posted much later than the actual trip.
<br> **Why suspicious**: Holds, disputes, cross-agency reconciliation, or manual adjustment.
<br> **Tune**: Start with LAG_MAX = 3 days; loosen/tighten by agency SLA and weekends/holidays.

`flag_post_lag_negative`
* `post_lag_days` < 0 ‚Üí back-dated or bad timestamps
<br> **What**: Posting happened ‚Äúbefore‚Äù the transaction date.
<br> **Why suspicious**: Backdating, timezone issues, or column mapping errors.
<br> **Tune**: Keep strict (should be 0+). Investigate timezone/ETL parsing.


In [6]:
# --- A. Posting/transaction timing ---
df_flag["flag_post_lag_high"] = (df_flag["post_lag_days"] > LAG_MAX)
df_flag["flag_post_lag_negative"] = (df_flag["post_lag_days"] < 0)


#### B. Amount/Billing

`flag_sus_label_topup`
<br> **What**: Large positive amount that doesn‚Äôt look like a payment/top-up in description.
<br> **Why suspicious**: The transaction looks like a large top-up but isn‚Äôt labeled as one ‚Äî it may be a mistaken credit, a manual adjustment without supporting payment (because it means the balance increased, but no real money entered the system), or a misclassified transaction that inflates the customer‚Äôs balance.
<br> **Rule**: amount ‚â• TOPUP_MIN AND description not matching (replenish|top up|payment|credit|deposit|reload|fund).
<br> **Tune**: Set TOPUP_MIN to known reload sizes (e.g., 25/50/100). Extend regex with your program‚Äôs wording.
<br> **Check**: prepaid, plan_rate, and the customer‚Äôs reload history.

In [7]:
# --- B1. Detect likely top-ups (robust) ---

# 1) Normalize description
def _norm(s: str) -> str:
    if s is None:
        return ""
    s = s.lower()
    s = re.sub(r"[^\w\s-]", " ", s)   # remove punctuation
    s = re.sub(r"\s+", " ", s).strip()
    return s

desc_norm = df_flag["description"].fillna("").astype(str).map(_norm)

# 2) Strong allow-list (exact matches after normalization)
strong_allow = {
    "prepaid payment",
    "payment received",
    "auto replenish",
    "autopay payment",
    "ach payment",
    "credit card payment",
    "prepaid top up"
}
is_strong = desc_norm.isin(strong_allow)

# 3) Lenient allow via regex (covers common variants)
lenient_pattern = r"\b(replenish(?:ment)?|auto[-\s]*replenish|top\s*up|auto\s*pay|payment|deposit|reload|recharge|add(?:\s*value)?)\b"
is_lenient = desc_norm.str.contains(lenient_pattern, na=False, regex=True)

# 4) Guard against common false positives
false_pos_pattern = r"\brefund|chargeback|reversal|creditor|fundraising\b"
is_false_pos = desc_norm.str.contains(false_pos_pattern, na=False, regex=True)

# 5) Final allow decision
is_payment_allowed = is_strong | (is_lenient & ~is_false_pos)

# 6) Flag: large top-up-like amount with no payment-like text
df_flag["flag_sus_label_topup"] = (df_flag["amount"] >= TOPUP_MIN) & (~is_payment_allowed)

print("\nRows with suspicious top-up labels:")
df_flag[df_flag["flag_sus_label_topup"]]


Rows with suspicious top-up labels:


  is_lenient = desc_norm.str.contains(lenient_pattern, na=False, regex=True)


Unnamed: 0,posting_date,transaction_date,post_lag_days,tag_plate_number,vehicle_type_code,vehicle_class,agency,agency_fullname,description,exit_time,...,exit_plaza_agency,exit_lane,amount,balance,prepaid,plan_rate,fare_type,flag_post_lag_high,flag_post_lag_negative,flag_sus_label_topup


`flag_zero_toll`
<br> What: Trip records where the toll amount is zero or near zero, even though a fare should normally be charged.
<br> Why suspicious: A zero toll on a non-exempt trip may indicate a missed billing, unauthorized exemption, or data issue. This can happen if the toll reader failed to capture the plate, an operator voided the charge without proper justification, or the transaction was misclassified during processing.
<br> Rule: abs(amount) < 0.01 AND fare_type in (‚ÄúN‚Äù, ‚ÄúM‚Äù) ‚Äî i.e., normal or manual fare, not exempt.
<br> Tune: Adjust threshold for ‚Äúnear zero‚Äù (e.g., 0.01 or 0.05) depending on rounding and agency billing precision. Exclude known exempt or promotional fare types.
<br> Check: trip location, vehicle class, exemption status, and recent toll reader or ETL error logs.

In [8]:
# Get lowest non-zero toll amount to validate zero threshold
min_toll = df_flag.loc[df_flag["amount"] > 0, "amount"].min()
print(f"\nLowest non-zero toll amount: ${min_toll:.2f}")

# Validate whehther NEAR_ZERO_THRESHOLD is appropriate
if min_toll < NEAR_ZERO_THRESHOLD:
    print("WARNING: NEAR_ZERO_THRESHOLD may be too high - found lower valid tolls")



Lowest non-zero toll amount: $1.68


In [9]:
# --- B2. Zero/near-zero when fare should bill ---
fare_str = df_flag["fare_type"].astype(str).str.lower()
should_bill = fare_str.str.contains(r"toll|cash|ezpass|charge|debit")
df_flag["flag_zero_toll"] = should_bill & (df_flag["amount"].abs() < NEAR_ZERO_THRESHOLD)

print("\nRows with near-zero amount when fare should have billed:")
df_flag[df_flag["flag_zero_toll"]]


Rows with near-zero amount when fare should have billed:


Unnamed: 0,posting_date,transaction_date,post_lag_days,tag_plate_number,vehicle_type_code,vehicle_class,agency,agency_fullname,description,exit_time,...,exit_lane,amount,balance,prepaid,plan_rate,fare_type,flag_post_lag_high,flag_post_lag_negative,flag_sus_label_topup,flag_zero_toll


`flag_amt_outlier`
<br> What: Fare amount that deviates materially from the typical amount for similar trips (same agency, exit_plaza, vehicle_type_code, and fare_type).
<br> Why suspicious: A large departure from the group‚Äôs norm can signal wrong vehicle class/fare type, plaza mispricing, manual overrides, sensor/reader errors, or ETL/mapping issues.
<br> Rule: Compute group_median = median(amount) per (agency, exit_plaza, vehicle_type_code, fare_type). For each row, amt_dev = |amount ‚àí group_median|. Flag when group_count ‚â• MIN_GROUP and group_median is not null and amt_dev > max(AMT_DEV_PCT * |group_median|, MIN_ABS_DEV).
<br> Tune: Set AMT_DEVIATION_PCT (e.g., 0.3‚Äì1.0) for sensitivity; choose MIN_GROUP (e.g., 10‚Äì20) to avoid unstable medians; use MIN_ABS_DEV (e.g., $0.10‚Äì$0.50) to ignore penny-level noise; optionally add/remove grouping keys (e.g., time_band, entrance_plaza) to reflect pricing rules; exclude exempt/zero-fare groups.
<br> Check: Vehicle class & fare type correctness, plaza rate table (incl. peak/off-peak), reader/ANPR status, linked images/payment records, neighboring trips on the same day, and recent configuration or ETL changes.

In [10]:
# Require a minimum group size, handle zero medians, and add a floor
grp_cols = ["agency", "exit_plaza", "vehicle_type_code", "fare_type"]
stats = (
    df_flag[df_flag["description"] != "Prepaid Payment"]
    .groupby(grp_cols, dropna=False)["amount"]
    .agg(_group_median="median", _group_count="size")
    .reset_index()
)

df_flag = df_flag.merge(stats, on=grp_cols, how="left")

# Absolute deviation from median
df_flag["_amt_deviation"] = (df_flag["amount"] - df_flag["_group_median"]).abs()

# Tuning params
AMT_DEVIATION_PCT = 0.5   # e.g., 50%
MIN_GROUP = 20            # Don't trust tiny groups. If a group (agency, exit_plaza, vehicle_type_code, fare_type) only has 1‚Äì2 records, its median is not statistically reliable.
MIN_ABS_DEV = 0.25        # Guarantee the deviation must be at least this number ($) before flagging. Because if group_median ‚âà 0, the thresh product is ‚âà 0.


# Avoid over-flagging when median is near zero
near_zero = df_flag["_group_median"].abs() < 0.01
thresh = AMT_DEV_PCT * df_flag["_group_median"].abs()
thresh = thresh.where(~near_zero, MIN_ABS_DEV)

df_flag["flag_amt_outlier"] = (
    df_flag["_group_count"].ge(MIN_GROUP) &
    df_flag["_group_median"].notna() &
    (df_flag["_amt_deviation"] > thresh)
)

In [11]:
df_flag[df_flag["flag_amt_outlier"]].loc[:, ["posting_date", "tag_plate_number", "agency", "exit_plaza", "vehicle_type_code", "fare_type", "amount", "_group_median", "_group_count", "_amt_deviation", 'flag_amt_outlier']]

Unnamed: 0,posting_date,tag_plate_number,agency,exit_plaza,vehicle_type_code,fare_type,amount,_group_median,_group_count,_amt_deviation,flag_amt_outlier
30,2025-09-02,96614774555,NJTP,7A,1.0,N,-9.36,-3.80,95,5.56,True
65,2025-09-02,96615569799,NJTP,14,1.0,N,-7.02,-2.46,34,4.56,True
112,2025-09-03,96699119551,NJTP,10,1.0,N,-2.85,-1.02,26,1.83,True
175,2025-09-03,96615569799,NJTP,7A,1.0,N,-7.05,-3.80,95,3.25,True
180,2025-09-03,96615555666,NJTP,7A,1.0,N,-1.36,-3.80,95,2.44,True
...,...,...,...,...,...,...,...,...,...,...,...
2093,2025-09-20,96643569799,NJTP,7A,1.0,N,-9.36,-3.80,95,5.56,True
2094,2025-09-21,96643569799,NJTP,14,1.0,N,-9.36,-2.46,34,6.90,True
2107,2025-09-21,96699399493,NJTP,10,1.0,N,-3.80,-1.02,26,2.78,True
2111,2025-09-21,96643569644,NJTP,10,1.0,N,-6.15,-1.02,26,5.13,True



#### C. Usage Patterns

`flag_fast_repeat_exit`
<br> **What**: Same tag exits again too quickly.
<br> **Why suspicious**: Duplicate tag use, cloning, clock issues, mis-sequenced events.
<br> **Rule**: Time since previous exit for same tag < MIN_EXIT_GAP_MIN (e.g., 5 min).
<br> **Tune**: Base on minimal plausible loop time between nearby plazas.
<br> **Check**: Distances between exit_plaza, lane IDs, clock drift.

In [12]:
# --- C1. Back-to-back exits for same tag within too-short time ---
df_flag = df_flag.sort_values(["tag_plate_number", "exit_time"])
dt = df_flag.groupby("tag_plate_number")["exit_time"].diff().dt.total_seconds() / 60.0
df_flag["_mins_since_prev_exit"] = dt
df_flag["flag_fast_repeat_exit"] = dt.notna() & (dt >= 0) & (dt < MIN_EXIT_GAP_MIN)

In [13]:
print(f"Flag fast repeat exit under {MIN_EXIT_GAP_MIN} minute")
df_flag[df_flag["flag_fast_repeat_exit"]].loc[:, ["posting_date", "tag_plate_number", "agency", "exit_plaza", "vehicle_type_code", "fare_type", "amount", "_mins_since_prev_exit", "flag_fast_repeat_exit"]]

Flag fast repeat exit under 1 minute


Unnamed: 0,posting_date,tag_plate_number,agency,exit_plaza,vehicle_type_code,fare_type,amount,_mins_since_prev_exit,flag_fast_repeat_exit
279,2025-09-03,96611695669,GSP,PRS,1.0,N,-0.76,0.166667,True
178,2025-09-03,96615555666,NJTP,8,1.0,N,-1.36,0.566667,True
357,2025-09-05,96615569579,SJ,APL,2.0,N,-1.45,0.533333,True
6,2025-09-02,96615569665,GSP,TRV,1.0,N,-1.09,0.883333,True
498,2025-09-05,96615569686,GSP,UNI,1.0,N,-2.17,0.583333,True
160,2025-09-03,96615569695,GSP,KEY,1.0,N,-0.76,0.4,True
25,2025-09-02,96615569699,GSP,ESS,1.0,N,-2.17,0.533333,True
2105,2025-09-21,96643569644,GSP,RAS,1.0,N,-2.17,0.716667,True
1861,2025-09-18,96643569649,GSP,BER,1.0,N,-2.17,0.783333,True
1727,2025-09-17,96643569659,NJTP,7A,1.0,N,-14.65,0.983333,True


`flag_agency_hop_fast`
<br> **What**: Tag jumps between different agencies too quickly.
<br> **Why suspicious**: Tag sharing/cloning, misrouting of events.
<br> **Rule**: Previous agency ‚â† current agency AND gap < AGENCY_HOP_MIN (e.g., 30 min).
<br> **Tune**: Use real travel time between closest border plazas.
<br> **Check**: Border plazas list; cross-agency settlement records.

##### 1. Create Plaza Coordinate DataFrame

In [14]:
# Get distance between plazas
# PLAZA MASTER MAP: code -> (description, agency)
PLAZA_MAP = {
    # GSP (Garden State Parkway)
    "PVK": ("Pascack Valley", "GSP"),
    "PRS": ("Paramus South", "GSP"),
    "PRN": ("Paramus North", "GSP"),
    "BER": ("Bergen", "GSP"),
    "SAB": ("Saddle Brook", "GSP"),
    "CLS": ("Clifton South", "GSP"),
    "CLN": ("Clifton North", "GSP"),
    "PSS": ("Passaic South", "GSP"),
    "PSN": ("Passaic North", "GSP"),
    "WAS": ("Watchung South", "GSP"),
    "WAN": ("Watchung North", "GSP"),
    "ESS": ("Essex", "GSP"),
    "BLS": ("Bloomfield South", "GSP"),
    "BLN": ("Bloomfield North", "GSP"),
    "EOR": ("East Orange", "GSP"),
    "IRS": ("Irvington South", "GSP"),
    "IRN": ("Irvington North", "GSP"),
    "UNR": ("Union Ramp", "GSP"),
    "UNI": ("Union", "GSP"),
    "RAS": ("Raritan South", "GSP"),
    "MAT": ("Matawan", "GSP"),
    "KEY": ("Keyport", "GSP"),
    "HOS": ("Holmdel South", "GSP"),
    "HON": ("Holmdel North", "GSP"),
    "RBS": ("Red Bank South", "GSP"),
    "RBN": ("Red Bank North", "GSP"),
    "EAT": ("Eatontown", "GSP"),
    "ASP": ("Asbury Park", "GSP"),
    "BES": ("Belmar South", "GSP"),
    "BEN": ("Belmar North", "GSP"),
    "BRS": ("Brick South", "GSP"),
    "BRN": ("Brick North", "GSP"),
    "LWS": ("Lakewood South", "GSP"),
    "LWN": ("Lakewood North", "GSP"),
    "TRV": ("Toms River", "GSP"),
    "LRS": ("Lacey Rd South", "GSP"),
    "LRN": ("Lacey Rd North", "GSP"),
    "BAR": ("Barnegat", "GSP"),
    "BKS": ("Berkeley Ramp South", "GSP"),
    "BKN": ("Berkeley Ramp North", "GSP"),
    "NGR": ("New Gretna", "GSP"),
    "WRS": ("Waretown South", "GSP"),
    "WRN": ("Waretown North", "GSP"),
    "SPT": ("Somers Point", "GSP"),
    "GEG": ("Great Egg", "GSP"),
    "CMY": ("Cape May", "GSP"),
    "WWS": ("Wildwood South", "GSP"),
    "WWN": ("Wildwood North", "GSP"),
    "SAY": ("Sayreville", "GSP"),

    # NJTP (New Jersey Turnpike) ‚Äî keep numeric codes as strings
    "1":   ("Delaware Memorial Bridge", "NJTP"),
    "2":   ("Swedesboro/Chester", "NJTP"),
    "3":   ("Woodbury/S. Camden/NJ Aquarium", "NJTP"),
    "4":   ("Camden/Philadelphia/NJ Aquarium", "NJTP"),
    "5":   ("Burlington/Mt. Holly", "NJTP"),
    "6":   ("PA Turnpike/Florence", "NJTP"),
    "6A":  ("PA Turnpike/Florence", "NJTP"),
    "6B":  ("Rte. 130 Credit Ramp", "NJTP"),
    "7":   ("Bordentown/Trenton", "NJTP"),
    "7A":  ("I-195/Trenton/Shore Points", "NJTP"),
    "8":   ("Hightstown/Freehold", "NJTP"),
    "8A":  ("Jamesburg/Cranbury", "NJTP"),
    "9":   ("New Brunswick/Admin Bldg", "NJTP"),
    "10":  ("I-287/Metuchen/Edison Twsp", "NJTP"),
    "11":  ("GSP/Woodbridge/The Amboys", "NJTP"),
    "12":  ("Carteret/Rahway", "NJTP"),
    "13":  ("I-278/Eliz/Goethals/Verrazano", "NJTP"),
    "13A": ("Newark Aprt/Elizabeth Seaport", "NJTP"),
    "14":  ("I-78/Newark Airport", "NJTP"),
    "14A": ("Bayonne", "NJTP"),
    "14B": ("Jersey City/Liberty State Park", "NJTP"),
    "14C": ("Holland Tunnel", "NJTP"),
    "15E": ("Newark/Jersey City", "NJTP"),
    "15W": ("I-280/Newark/The Oranges", "NJTP"),
    "15X": ("Secaucus Transfer Station", "NJTP"),
    "16E": ("Lincoln Tunnel/NJ 3/Secaucus", "NJTP"),
    "16W": ("Sprtsplx/NJ 3/Secaucus/Ruthrfrd", "NJTP"),
    "17":  ("Secaucus/US 46", "NJTP"),
    "18E": ("Lincoln Tunnel/NJ 3/Secaucus", "NJTP"),
    "18W": ("Geo Washington Br/US 46/I-80", "NJTP"),
    "19W": ("Carlstadt", "NJTP"),

    # SJ (South Jersey Transportation Authority / AC Expressway + local bridges)
    "APL": ("Pleasantville Mainline Barrier", "SJ"),
    "AR9": ("Route 9", "SJ"),
    "APO": ("Pomona", "SJ"),
    "ACY": ("AC Airport", "SJ"),
    "AML": ("Mays Landing", "SJ"),
    "A50": ("Route 50", "SJ"),
    "AEH": ("Egg Harbor Mainline Barrier", "SJ"),
    "AH":  ("Hammonton Ramp", "SJ"),
    "AWN": ("Winslow Ramp", "SJ"),
    "AWL": ("Williamstown Ramp", "SJ"),
    "ACK": ("Cross Keys", "SJ"),
    "OCL": ("Ocean City-Longport Bridge", "SJ"),
    "CIB": ("Corsons Inlet Bridge", "SJ"),
    "TIB": ("Townsend's Inlet Bridge", "SJ"),
    "GSB": ("Grassy Sound Bridge", "SJ"),
    "MTB": ("Middle Thorofare Bridge", "SJ"),

    # DRPA (Delaware River Port Authority)
    "BRB": ("Betsy Ross Br", "DRPA"),
    "BFB": ("Ben Franklin Br", "DRPA"),
    "WWB": ("Walt Whitman Br", "DRPA"),
    "CBB": ("Commodore Barry Br", "DRPA"),

    # DRBA (Delaware River & Bay Authority)
    "DMB": ("Delaware Memorial Br", "DRBA"),

    # BCBC (Burlington County Bridge Commission)
    "TPB": ("Tacony Palmyra Br", "BCBC"),
    "BBB": ("Burlington Bristol Br", "BCBC"),

    # DRJTBC (Delaware River Joint Toll Bridge Commission)
    "T-M": ("Trenton-Morrisville Br", "DRJTBC"),
    "NHL": ("New Hope-Lambertville Br", "DRJTBC"),
    "I78": ("I-78 Br", "DRJTBC"),
    "E-P": ("Easton-Phillipsburg Br", "DRJTBC"),
    "P-C": ("Portland-Columbia Br", "DRJTBC"),
    "DWG": ("Delaware Water Gap Br", "DRJTBC"),
    "M-M": ("Milford-Montague Br", "DRJTBC"),
    "O78": ("Interstate 78-ORT", "DRJTBC"),
    "ODW": ("Delaware Water Gap-ORT", "DRJTBC"),
    "OSF": ("Scudder Falls Br", "DRJTBC"),
}


In [15]:
geolocator = Nominatim(user_agent="nj_toll_plaza_locator")

def get_coordinates(plaza_desc, agency):
    """
    Attempt to geocode plaza location
    Returns (latitude, longitude) tuple or (None, None)
    """
    try:
        # Try different search strategies based on agency
        search_terms = []
        
        if agency == "GSP":
            search_terms.append(f"{plaza_desc} Garden State Parkway, New Jersey")
            search_terms.append(f"{plaza_desc}, New Jersey")
        elif agency == "NJTP":
            search_terms.append(f"{plaza_desc} New Jersey Turnpike")
            search_terms.append(f"{plaza_desc}, New Jersey")
        else:
            # For bridges and other facilities
            search_terms.append(f"{plaza_desc}, New Jersey")
            search_terms.append(plaza_desc)
        
        # Try each search term
        for search_term in search_terms:
            location = geolocator.geocode(search_term, timeout=10)
            if location:
                return (location.latitude, location.longitude)
            time.sleep(1.1)  # Respect rate limits (1 request per second)
        
        return (None, None)
    
    except Exception as e:
        print(f"Error geocoding {plaza_desc}: {e}")
        return (None, None)
        
# Create DataFrame with coordinates
def create_plaza_dataframe(plaza_map):
    """
    Create a DataFrame with plaza information including coordinates
    """
    data = []
    
    for code, (desc, agency) in plaza_map.items():
        print(f"Geocoding {code}: {desc}...")
        lat, lon = get_coordinates(desc, agency)
        
        data.append({
            'plaza_code': code,
            'plaza_name': desc,
            'agency': agency,
            'latitude': lat,
            'longitude': lon
        })
        
        # Small delay to respect API rate limits
        time.sleep(1.1)
    
    df = pd.DataFrame(data)
    return df

In [16]:
# # Create the plaza long lat dataframe
# plaza_df = create_plaza_dataframe(PLAZA_MAP)

# print("\nPlaza DataFrame with Coordinates:")
# print(plaza_df.head(10))
# print(f"\nTotal plazas: {len(plaza_df)}")
# print(f"Successfully geocoded: {plaza_df['latitude'].notna().sum()}")
# print(f"Failed to geocode: {plaza_df['latitude'].isna().sum()}")
# plaza_df.to_csv('../../data/processed/plaza_coordinates.csv', index=False)

In [17]:
plaza_df = pd.read_csv('../../data/processed/plaza_coordinates.csv')
print(f"\nTotal plazas: {len(plaza_df)}")
print(f"Successfully geocoded: {plaza_df['latitude'].notna().sum()}")
print(f"Failed to geocode: {plaza_df['latitude'].isna().sum()}")


Total plazas: 113
Successfully geocoded: 113
Failed to geocode: 0


##### 2. Calculate distance between two plazas and Flag impossible travel

`flag_impossible_travel`
<br> **What**: Transactions from same tag at different plazas with impossible travel time
<br> **Why suspicious**: Indicates tag cloning/sharing when travel speed exceeds physical limits
<br> **Rule**: Calculate time & distance between consecutive transactions, flag if speed > 100mph
<br> **Tune**: Adjust speed threshold based on local speed limits and buffer
<br> **Check**: Verify plaza coordinates are accurate, account for clock sync issues

In [18]:
def calculate_distance(plaza_code1, plaza_code2, df):
    """
    Calculate distance in miles between two plazas
    """
    # Check if plaza codes exist in the dataframe
    p1_match = df[df['plaza_code'] == plaza_code1]
    p2_match = df[df['plaza_code'] == plaza_code2]
    
    # Return None if either plaza code is not found
    if len(p1_match) == 0 or len(p2_match) == 0:
        return None
    
    p1 = p1_match.iloc[0]
    p2 = p2_match.iloc[0]
    
    if pd.isna(p1['latitude']) or pd.isna(p2['latitude']):
        return None
    
    coord1 = (p1['latitude'], p1['longitude'])
    coord2 = (p2['latitude'], p2['longitude'])
    
    distance_miles = geodesic(coord1, coord2).miles
    return distance_miles

# Test
plaza_df = pd.read_csv('../../data/processed/plaza_coordinates.csv')
if len(plaza_df) > 1:
    dist = calculate_distance("PVK", "PRS", plaza_df)
    if dist:
        print(f"\nExample calculated distance between PVK and PRS: {dist:.2f} miles")


Example calculated distance between PVK and PRS: 4.02 miles


In [19]:
# Convert exit_time to datetime
df_flag['exit_time'] = pd.to_datetime(df_flag['exit_time'], errors='coerce')

# Sort by tag and time
df_flag = df_flag.sort_values(['tag_plate_number', 'exit_time'])

# Calculate next plaza and time differences
# Get next plaza for each tag
df_flag['_next_plaza'] = df_flag.groupby('tag_plate_number')['exit_plaza'].shift(-1).fillna('')

# Check if plazas is in-state (exist in plaza_df)
instate_plazas = set(plaza_df['plaza_code'])
df_flag['flag_out_state_travel'] = (~df_flag['exit_plaza'].isin(instate_plazas)) | (~df_flag['_next_plaza'].isin(instate_plazas))

# Calculate time difference for in-state consecutive plazas
df_flag['_exit_time_p1'] = df_flag['exit_time']
df_flag['_exit_time_p2'] = df_flag.groupby('tag_plate_number')['exit_time'].shift(-1)
df_flag['_time_diff'] = (df_flag['_exit_time_p1'] - df_flag['_exit_time_p2']).abs()


# Calculate distances using the fixed function
#  Calculate ONLY for rows where:
#   ‚Ä¢ flag_out_state_travel = FALSE, AND
#   ‚Ä¢ _next_plaza is not empty
df_flag['_distance'] = df_flag.apply(lambda x: calculate_distance(x['exit_plaza'],
                                                        x['_next_plaza'],
                                                        plaza_df) if pd.notna(x['_next_plaza']) 
                                                                   and x['_next_plaza'] != '' 
                                                                   and not x['flag_out_state_travel'] else None,
                        axis=1)

# Calculate ONLY for rows where:
#       ‚Ä¢ _distance exists (not null), AND
#       ‚Ä¢ _time_diff_hours exists (not null)

# Convert time_diff to hours for speed calculation
df_flag['_time_diff_hours'] = df_flag['_time_diff'].dt.total_seconds() / 3600

MIN_TIME_HOURS = 1/3600

valid_mask = (
    df_flag['_distance'].notna() & 
    df_flag['_time_diff_hours'].notna() &
    (df_flag['_time_diff_hours'] >= MIN_TIME_HOURS)
)
df_flag['_speed'] = None
df_flag.loc[valid_mask, '_speed'] = (
    df_flag.loc[valid_mask, '_distance'] / 
    df_flag.loc[valid_mask, '_time_diff_hours']
)

sample_data = df_flag[df_flag['_speed'].notna()][['tag_plate_number', 'exit_plaza', '_next_plaza', '_distance', '_exit_time_p1', '_exit_time_p2', '_time_diff_hours', '_speed']].head()
display(sample_data)


Unnamed: 0,tag_plate_number,exit_plaza,_next_plaza,_distance,_exit_time_p1,_exit_time_p2,_time_diff_hours,_speed
192,96611566699,7A,8A,8.400708,2025-10-09 08:39:38,2025-10-09 18:20:57,9.688611,0.86707
4,96611594761,WAN,EOR,14.158602,2025-10-09 08:35:54,2025-10-09 11:47:23,3.191389,4.436502
16,96611594761,EOR,15E,6.922647,2025-10-09 11:47:23,2025-10-09 16:14:56,4.459167,1.552453
72,96611594761,15E,EOR,6.922647,2025-10-09 16:14:56,2025-10-09 20:07:39,3.878611,1.784826
590,96611594765,ACK,AWL,10300.044748,2025-10-09 15:26:49,2025-10-09 20:46:08,5.321944,1935.391257


In [20]:
# Flag for unrealistic speeds
df_flag['flag_impossible_travel'] = (df_flag['_speed'] > SPEED_THRESHOLD)
df_flag['flag_impossible_travel'] = df_flag['flag_impossible_travel'].fillna(False)

# Results
impossible_travel = df_flag[df_flag['flag_impossible_travel']]
print(f"Flagged {len(impossible_travel)} transactions with speeds > {SPEED_THRESHOLD} mph")
if len(impossible_travel) > 0:
    display(impossible_travel[['tag_plate_number','exit_time', 'exit_plaza',
                             '_next_plaza', '_distance', '_time_diff_hours', '_speed', 'flag_impossible_travel']].head())

Flagged 265 transactions with speeds > 120 mph


Unnamed: 0,tag_plate_number,exit_time,exit_plaza,_next_plaza,_distance,_time_diff_hours,_speed,flag_impossible_travel
590,96611594765,2025-10-09 15:26:49,ACK,AWL,10300.044748,5.321944,1935.391257,True
15,96611594769,2025-10-09 11:49:21,TRV,BRS,33.480768,0.26,128.772184,True
7,96611695669,2025-10-09 08:59:33,PRS,ESS,9.578388,0.079722,120.147024,True
172,96611695669,2025-10-09 09:04:20,ESS,PRS,9.578388,0.028056,341.407881,True
83,96611695669,2025-10-09 19:15:52,ESS,PRS,9.578388,0.002778,3448.219599,True


In [21]:
# Check reason for missing time difference
if '_time_diff_hours' in df_flag.columns:
    too_small_mask = (
        df_flag['_distance'].notna() &
        df_flag['_time_diff_hours'].isna()
        # (df_flag['_time_diff_hours'] < MIN_TIME_HOURS)
    )
too_small_rows = df_flag[too_small_mask]
display(too_small_rows[['tag_plate_number', 'exit_plaza', '_next_plaza',
                        '_exit_time_p1', '_exit_time_p2', '_distance', '_time_diff_hours']])



Unnamed: 0,tag_plate_number,exit_plaza,_next_plaza,_exit_time_p1,_exit_time_p2,_distance,_time_diff_hours


In [22]:
# DATA FILTERING & LOSS ANALYSIS REPORT
total = len(df_flag)

print("\n" + "="*80)
print("DATA FILTERING & LOSS ANALYSIS REPORT")
print("="*80)

# ========================================================================
# STAGE 0: INITIAL DATA
# ========================================================================
print(f"\nüìä STAGE 0: INITIAL DATA")
print(f"{'‚îÄ'*80}")
print(f"  Total transactions:                           {total:>8,}")

# ========================================================================
# STAGE 1: NEXT PLAZA AVAILABILITY
# ========================================================================
print(f"\nüìä STAGE 1: NEXT PLAZA CALCULATION")
print(f"{'‚îÄ'*80}")

has_next_plaza = df_flag['_next_plaza'].ne('').sum()
missing_next_plaza = df_flag['_next_plaza'].eq('').sum()

print(f"  ‚úì Has next plaza:                             {has_next_plaza:>8,}  ({has_next_plaza/total*100:>5.1f}%)")
print(f"  ‚úó Missing next plaza (end of journey):       {missing_next_plaza:>8,}  ({missing_next_plaza/total*100:>5.1f}%)")
print(f"\n  ‚Üí Proceeding with:                            {has_next_plaza:>8,}  pairs")

# ========================================================================
# STAGE 2: PLAZA LOCATION (IN-STATE vs OUT-OF-STATE)
# ========================================================================
print(f"\nüìä STAGE 2: PLAZA LOCATION FILTER")
print(f"{'‚îÄ'*80}")

# In-state plazas
instate_plazas = set(plaza_df['plaza_code'].dropna())
print(f"  Reference: {len(instate_plazas)} in-state plazas in plaza_df")

# Break down out-of-state by reason
current_out = ~df_flag['exit_plaza'].isin(instate_plazas)
next_out = ~df_flag['_next_plaza'].isin(instate_plazas) & df_flag['_next_plaza'].ne('')
both_in = df_flag['exit_plaza'].isin(instate_plazas) & df_flag['_next_plaza'].isin(instate_plazas)

current_out_count = current_out.sum()
next_out_count = next_out.sum()
both_in_count = both_in.sum()
out_of_state_total = df_flag['flag_out_state_travel'].sum()

print(f"\n  Out-of-state breakdown:")
print(f"    ‚Ä¢ Current plaza out-of-state:               {current_out_count:>8,}  ({current_out_count/total*100:>5.1f}%)")
print(f"    ‚Ä¢ Next plaza out-of-state:                  {next_out_count:>8,}  ({next_out_count/total*100:>5.1f}%)")
print(f"    ‚Ä¢ Missing next plaza:                       {missing_next_plaza:>8,}  ({missing_next_plaza/total*100:>5.1f}%)")
print(f"  ‚úó Total flagged as out-of-state:             {out_of_state_total:>8,}  ({out_of_state_total/total*100:>5.1f}%)")
print(f"\n  ‚úì Both plazas in-state:                      {both_in_count:>8,}  ({both_in_count/total*100:>5.1f}%)")
print(f"\n  ‚Üí Proceeding with:                            {both_in_count:>8,}  in-state pairs")

# ========================================================================
# STAGE 3: COORDINATE AVAILABILITY
# ========================================================================
print(f"\nüìä STAGE 3: COORDINATE AVAILABILITY")
print(f"{'‚îÄ'*80}")

# Check plaza_df completeness
plazas_with_coords = plaza_df['latitude'].notna().sum()
plazas_missing_coords = plaza_df['latitude'].isna().sum()

print(f"  Plaza coordinate coverage:")
print(f"    ‚Ä¢ Plazas with coordinates in plaza_df:      {plazas_with_coords:>8,}  ({plazas_with_coords/len(plaza_df)*100:>5.1f}%)")
print(f"    ‚Ä¢ Plazas missing coordinates:               {plazas_missing_coords:>8,}  ({plazas_missing_coords/len(plaza_df)*100:>5.1f}%)")

# Distance calculation results
has_distance = df_flag['_distance'].notna().sum()
missing_distance = both_in_count - has_distance

print(f"\n  Distance calculation results:")
print(f"    ‚úì Successfully calculated distance:         {has_distance:>8,}  ({has_distance/both_in_count*100:>5.1f}% of in-state)")
print(f"    ‚úó Failed (missing coordinates):             {missing_distance:>8,}  ({missing_distance/both_in_count*100:>5.1f}% of in-state)")

print(f"\n  ‚Üí Proceeding with:                            {has_distance:>8,}  with distances")

# ========================================================================
# STAGE 4: TIME DIFFERENCE VALIDATION
# ========================================================================
print(f"\nüìä STAGE 4: TIME DIFFERENCE VALIDATION")
print(f"{'‚îÄ'*80}")

# Time difference statistics
has_time = df_flag['_time_diff_hours'].notna().sum()
missing_time = df_flag['_time_diff_hours'].isna().sum()

print(f"  Time difference availability:")
print(f"    ‚úì Valid time differences:                   {has_time:>8,}  ({has_time/total*100:>5.1f}%)")
print(f"    ‚úó Missing time data:                        {missing_time:>8,}  ({missing_time/total*100:>5.1f}%)")

# Among those with distance, check time validity
has_dist_and_time = (df_flag['_distance'].notna() & df_flag['_time_diff_hours'].notna()).sum()
has_dist_no_time = (df_flag['_distance'].notna() & df_flag['_time_diff_hours'].isna()).sum()

print(f"\n  For transactions with distance:")
print(f"    ‚úì Also has valid time:                      {has_dist_and_time:>8,}  ({has_dist_and_time/has_distance*100:>5.1f}%)")
print(f"    ‚úó Missing time:                             {has_dist_no_time:>8,}  ({has_dist_no_time/has_distance*100:>5.1f}%)")

# Check time threshold
if '_time_diff_seconds' in df_flag.columns:
    time_too_small = (
        df_flag['_distance'].notna() & 
        df_flag['_time_diff_seconds'].notna() &
        (df_flag['_time_diff_seconds'] < min_time_seconds)
    ).sum()
    
    print(f"    ‚úó Time diff < {min_time_seconds}s (too small):              {time_too_small:>8,}  ({time_too_small/has_distance*100:>5.1f}%)")

# ========================================================================
# STAGE 5: SPEED CALCULATION
# ========================================================================
print(f"\nüìä STAGE 5: SPEED CALCULATION (FINAL)")
print(f"{'‚îÄ'*80}")

has_speed = df_flag['_speed'].notna().sum()
missing_speed = has_distance - has_speed

print(f"  Speed calculation results:")
print(f"    ‚úì Successfully calculated speed:            {has_speed:>8,}  ({has_speed/has_distance*100:>5.1f}% of distances)")
print(f"    ‚úó Failed validation:                        {missing_speed:>8,}  ({missing_speed/has_distance*100:>5.1f}% of distances)")

# Diagnose speed failures
if missing_speed > 0:
    has_dist_no_speed = df_flag[df_flag['_distance'].notna() & df_flag['_speed'].isna()]
    
    print(f"\n  Reasons for speed calculation failure:")
    
    if '_time_diff_seconds' in df_flag.columns:
        zero_time = (has_dist_no_speed['_time_diff_seconds'] < min_time_seconds).sum()
        null_time = has_dist_no_speed['_time_diff_seconds'].isna().sum()
        inf_speed = np.isinf(has_dist_no_speed['_speed']).sum() if '_speed' in has_dist_no_speed else 0
        
        print(f"    ‚Ä¢ Time difference < {min_time_seconds}s:                    {zero_time:>8,}  ({zero_time/missing_speed*100:>5.1f}%)")
        print(f"    ‚Ä¢ Null/missing timestamps:                  {null_time:>8,}  ({null_time/missing_speed*100:>5.1f}%)")
        if inf_speed > 0:
            print(f"    ‚Ä¢ Division by zero (infinite speed):        {inf_speed:>8,}  ({inf_speed/missing_speed*100:>5.1f}%)")

# ========================================================================
# SUMMARY CASCADE
# ========================================================================
print(f"\n{'='*80}")
print("üìà CUMULATIVE LOSS CASCADE")
print(f"{'='*80}")

print(f"\n  Stage 0: Initial data                         {total:>8,}  (100.0%)")
print(f"           ‚Üì Lost {missing_next_plaza:>5,} to missing next plaza")
print(f"  Stage 1: Has next plaza                       {has_next_plaza:>8,}  ({has_next_plaza/total*100:>5.1f}%)")
print(f"           ‚Üì Lost {out_of_state_total - missing_next_plaza:>5,} to out-of-state travel")
print(f"  Stage 2: In-state pairs                       {both_in_count:>8,}  ({both_in_count/total*100:>5.1f}%)")
print(f"           ‚Üì Lost {missing_distance:>5,} to missing coordinates")
print(f"  Stage 3: Has distance                         {has_distance:>8,}  ({has_distance/total*100:>5.1f}%)")
print(f"           ‚Üì Lost {missing_speed:>5,} to invalid time")
print(f"  Stage 5: Has speed (FINAL)                    {has_speed:>8,}  ({has_speed/total*100:>5.1f}%)")

print(f"\n  Total data loss:                              {total - has_speed:>8,}  ({(total-has_speed)/total*100:>5.1f}%)")
print(f"  Data retention rate:                                      {has_speed/total*100:>5.1f}%")

print(f"\n{'='*80}\n")


DATA FILTERING & LOSS ANALYSIS REPORT

üìä STAGE 0: INITIAL DATA
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
  Total transactions:                              2,143

üìä STAGE 1: NEXT PLAZA CALCULATION
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
  ‚úì Has next plaza:                                1,794  ( 83.7%)
  ‚úó Missing next plaza (end of journey):            349  ( 16.3%)

  ‚Üí Proceeding with:                               1,794  pairs

üìä STAGE 2: PLAZA LOCATION FILTER
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚

## Save df

In [23]:
for col in df_flag.columns:
    print(col)
    

posting_date
transaction_date
post_lag_days
tag_plate_number
vehicle_type_code
vehicle_class
agency
agency_fullname
description
exit_time
exit_hour
exit_plaza
exit_plaza_fullname
exit_plaza_agency
exit_lane
amount
balance
prepaid
plan_rate
fare_type
flag_post_lag_high
flag_post_lag_negative
flag_sus_label_topup
flag_zero_toll
_group_median
_group_count
_amt_deviation
flag_amt_outlier
_mins_since_prev_exit
flag_fast_repeat_exit
_next_plaza
flag_out_state_travel
_exit_time_p1
_exit_time_p2
_time_diff
_distance
_time_diff_hours
_speed
flag_impossible_travel


In [27]:
# Sort by tag_plate_number
df_flag = df_flag.sort_values('tag_plate_number')

# Save to CSV
df_flag.to_csv(OUTPT_CSV, index=False)

#### High-severity combos:
* `flag_fast_repeat_exit` or `flag_agency_hop_fast` AND `flag_zero_amount` or `flag_amt_outlier`.

In [25]:
# --- D. Balance sanity ---
df_flag["flag_negative_balance"] = df_flag["balance"] < 0

# Optional rolling: prepaid card being used repeatedly with low balance and no top-up
# Mark recent top-up in last N minutes (e.g., 60)
N_MIN = 60
recent_topup = (df_flag["amount"] >= TOPUP_MIN)
recent_topup_rolled = (
    recent_topup.groupby(df_flag["tag_plate_number"]).rolling(window=10, min_periods=1).max().reset_index(level=0, drop=True)
)
df_flag["flag_low_bal_no_topup"] = (df_flag.get("prepaid", False) == True) & (df_flag["balance"] < 5) & (recent_topup_rolled == 0)

# --- Collect reasons & final flag ---
flag_cols = [
    "flag_post_lag_high","flag_post_lag_negative",
    "flag_topup_large_no_text","flag_zero_amount","flag_amt_outlier",
    "flag_fast_repeat_exit","flag_agency_hop_fast",
    "flag_negative_balance","flag_low_bal_no_topup","flag_quiet_hours"
]
df_flag["any_rule_flag"] = df_flag[flag_cols].any(axis=1)

def reasons(row):
    return [c for c in flag_cols if bool(row[c])]

df_flag["flag_reasons"] = df_flag.apply(reasons, axis=1)

# Preview suspicious rows with key context
view_cols = ["posting_date","transaction_date","exit_time","tag_plate_number","agency",
             "exit_plaza","vehicle_type_code","fare_type","amount","balance",
             "post_lag_days","mins_since_prev_exit","any_rule_flag","flag_reasons"]
suspicious = df_flag[df_flag["any_rule_flag"]].sort_values(["tag_plate_number","exit_time"])
suspicious[view_cols].head(20)

KeyError: "['flag_topup_large_no_text', 'flag_zero_amount', 'flag_agency_hop_fast', 'flag_quiet_hours'] not in index"