In [1]:
import pandas as pd
import geopandas as gpd
import os
import requests
import time
import io

DATA_DIR = '../data/raw/'
INTERIM_DIR = '../data/interim/'
CTA_FILE_PATH = os.path.join(DATA_DIR, 'cta_l_stops.geojson')
os.makedirs(INTERIM_DIR, exist_ok=True)

In [None]:
gdf_cta = gpd.read_file(CTA_FILE_PATH)
gdf_cta

In [None]:
print("Initial CTA Station Columns:", gdf_cta.columns.tolist())
gdf_cta

In [None]:
name_column = 'longname' 
lines_column = 'legend'
gdf_cta['Line_Colors'] = gdf_cta[lines_column].str.replace(' Line', '', regex=False).str.replace(', ', ',', regex=False)
gdf_cta[[name_column, lines_column, 'Line_Colors', 'geometry']].head()

In [None]:
print(f"Total unique station names: {gdf_cta[name_column].nunique()}")
print(f"Total rows in DataFrame: {len(gdf_cta)}")
print("Top 5 unique Line_Colors combinations:")
gdf_cta['Line_Colors'].value_counts().head(10)

In [None]:
df_sales = pd.read_csv(os.path.join(DATA_DIR, 'sales_data.csv'))
df_sales.head()

In [3]:
UNIVERSE_DATA_ID = 'nj4t-kc8j'
UNIVERSE_API_URL = f'https://datacatalog.cookcountyil.gov/resource/{UNIVERSE_DATA_ID}.csv'
TEXT_FILE = os.path.join(INTERIM_DIR, 'universe_pin.txt')

In [None]:
# Clean TEXT_FILE by removing lines that contain "None,None" and trimming empty/trailing spaces
if os.path.exists(TEXT_FILE):
    with open(TEXT_FILE, 'r', encoding='utf-8') as f:
        lines = f.readlines()

    before = len(lines)
    cleaned = []
    for line in lines:
        s = line.strip()
        if not s:
            continue
        if 'None,None' in s:
            continue
        cleaned.append(s)

    with open(TEXT_FILE, 'w', encoding='utf-8') as f:
        for ln in cleaned:
            f.write(f"{ln}\n")

    print(f"Cleaned {TEXT_FILE}: {before} -> {len(cleaned)} lines")
else:
    print(f"File not found: {TEXT_FILE}")

In [None]:
# use existing cleaned lines if available, otherwise read from TEXT_FILE
if 'cleaned' in globals():
    src_lines = cleaned
else:
    with open(TEXT_FILE, 'r', encoding='utf-8') as f:
        src_lines = [ln.strip() for ln in f if ln.strip()]

unique_pin10 = set()
for ln in src_lines:
    # skip header or empty lines
    if not ln or ln.lower().startswith('pin10'):
        continue
    parts = ln.split(',')
    if not parts:
        continue
    pin_raw = parts[0].strip()
    if not pin_raw or pin_raw.lower() == 'none':
        continue
    # normalize trailing .0 (e.g. "2503106015.0" -> "2503106015")
    if pin_raw.endswith('.0'):
        pin = pin_raw[:-2]
    else:
        pin = pin_raw
    unique_pin10.add(pin)

# result available as `unique_pin10`
print("Unique pin10 count:", len(unique_pin10))

In [None]:
df_sales['pin10'] = df_sales['pin'].astype(str).str[:10]
all_pins = df_sales['pin10'].dropna().unique().tolist()

In [None]:
# compute pins_to_check = all_pins that are not present in unique_pin10 (preserve original order)
pins_to_check = [p for p in all_pins if p not in unique_pin10]

print(f"Total all_pins: {len(all_pins)}")
print(f"Pins already found (unique_pin10): {len(unique_pin10)}")
print(f"Missing pins_to_check count: {len(pins_to_check)}")

In [None]:
PINS_TO_CHECK_FILE = os.path.join(INTERIM_DIR, 'pins_to_check.txt')
# # write one pin per line
# with open(PINS_TO_CHECK_FILE, 'w', encoding='utf-8') as fh:
#     for pin in pins_to_check:
#         fh.write(f"{pin}\n")

In [None]:
def fetch_pin(pin10):
    params = {
        '$limit': 1,
        '$select': 'pin10, lon, lat',
        '$where': f"pin10 = '{pin10}'"
    }
    try:
        r = requests.get(UNIVERSE_API_URL, params=params, timeout=10)
        r.raise_for_status()
        df = pd.read_csv(io.StringIO(r.text))
        if not df.empty:
            row = df.iloc[0]
            return {'pin10': str(row.get('pin10')), 'lon': row.get('lon'), 'lat': row.get('lat')}
    except Exception:
        return None
    return None

In [None]:
import random
for each_pin in pins_to_check:
    result = fetch_pin(each_pin)
    if result:
        with open(TEXT_FILE, 'a', encoding='utf-8') as fh:
            fh.write(f"{result['pin10']},{result['lon']},{result['lat']}\n")
    pins_to_check.remove(each_pin)
    if random.randint(0, 30) == 0:
        os.remove(PINS_TO_CHECK_FILE)
        with open(PINS_TO_CHECK_FILE, 'w', encoding='utf-8') as fh:
            for pin in pins_to_check:
                fh.write(f"{pin}\n")

In [4]:
import shutil
CSV_FILE = os.path.join(INTERIM_DIR, 'universe_pin.csv')
shutil.copy(TEXT_FILE, CSV_FILE)

'../data/interim/universe_pin.csv'