In [0]:
-- Check raw values
SELECT DISTINCT DATA_AS_OF FROM ambulance_project.ambulance_data.t_dot LIMIT 10;

SELECT DATA_AS_OF,
       TRY_TO_TIMESTAMP(
         DATA_AS_OF,
         'MM/dd/yyyy hh:mm:ss a'
       ) AS parsed
FROM ambulance_project.ambulance_data.t_dot
LIMIT 10;


# 1) TYPE CAST + TIMESTAMP PARSE 

 Reason: enforce stable types; handle multiple time formats; keep raw fields for audit/debug.

In [0]:
CREATE OR REPLACE TEMP VIEW dot_stage_type AS
SELECT
  -- Numeric identifiers
  CAST(ID          AS BIGINT)  AS id,
  CAST(LINK_ID     AS BIGINT)  AS link_id,
  CAST(TRANSCOM_ID AS BIGINT)  AS transcom_id,

  -- Metrics (keep values as-is in silver; no filtering here)
  CAST(SPEED       AS DOUBLE)  AS speed,
  CAST(TRAVEL_TIME AS BIGINT)  AS travel_time,
  CAST(STATUS      AS INT)     AS status,

  -- Timestamp parsing: prefer slash format; fallback to wordy if present
  COALESCE(
    to_timestamp(DATA_AS_OF, 'MM/dd/yyyy hh:mm:ss a'),
    to_timestamp(DATA_AS_OF, 'yyyy MMM dd hh:mm:ss a')
  ) AS data_as_of,

  -- Text normalization
  INITCAP(TRIM(BOROUGH))  AS borough,
  TRIM(LINK_NAME)         AS link_name,
  UPPER(TRIM(OWNER))      AS owner,

  -- Geometry text preserved for later parsing in the clean layer
  TRIM(LINK_POINTS)            AS link_points,
  TRIM(ENCODED_POLY_LINE)      AS encoded_polyline,
  TRIM(ENCODED_POLY_LINE_LVLS) AS encoded_polyline_lvls,

  -- Keep raw time for audit/debug
  DATA_AS_OF AS data_as_of_raw
FROM ambulance_project.ambulance_data.t_dot;


In [0]:
-- Count how many timestamps parsed successfully
SELECT 
  COUNT(*) AS total_rows,
  SUM(CASE WHEN data_as_of IS NULL THEN 1 ELSE 0 END) AS null_parsed_timestamps
FROM dot_stage_type;

-- Inspect a sample of parsed timestamps
SELECT data_as_of_raw, data_as_of
FROM dot_stage_type
LIMIT 10;


# 2) TEXT NORMALIZATION + GEOMETRY STRING HOUSEKEEPING

Reason: collapse whitespace; trim trailing commas/spaces; keep encoded polylines intact for future use if needed.

In [0]:

-- Normalize text fields (trim, casing) + clean geometry strings

CREATE OR REPLACE TEMP VIEW dot_stage_text AS
SELECT
  id,
  link_id,
  transcom_id,
  speed,
  travel_time,
  status,
  data_as_of_raw,
  data_as_of,

  -- Normalize text
  INITCAP(TRIM(borough)) AS borough,
  TRIM(link_name)        AS link_name,
  UPPER(TRIM(owner))     AS owner,

  -- Normalize geometry strings (leave parsing for gold/clean layer)
  -- collapse excessive whitespace, trim trailing commas/spaces
  regexp_replace(
    regexp_replace(TRIM(link_points), '\\s+', ' '),
    '[,\\s]+$', ''
  ) AS link_points,
  TRIM(encoded_polyline)      AS encoded_polyline,
  TRIM(encoded_polyline_lvls) AS encoded_polyline_lvls
FROM dot_stage_type;


In [0]:
-- Check distinct borough values after normalization
SELECT borough, COUNT(*) AS n
FROM dot_stage_text
GROUP BY borough
ORDER BY n DESC;

-- Verify whitespace cleanup took effect
SELECT s.link_points, t.link_points
FROM dot_stage_type t
JOIN dot_stage_text s ON t.id = s.id AND t.link_id = s.link_id
LIMIT 10;

# 3) QA FLAGS (SOFT SANITY)

Reason: keep all rows (no drops). Flags help dashboards/alerts.

In [0]:
-- Soft sanity on numeric fields

CREATE OR REPLACE TEMP VIEW dot_stage_qaflags AS
SELECT
  *,
  CASE WHEN speed       IS NULL OR speed < 0          THEN 1 ELSE 0 END AS flag_speed_suspect,
  CASE WHEN travel_time IS NULL OR travel_time < 0    THEN 1 ELSE 0 END AS flag_travel_time_suspect,
  CASE WHEN status      IS NOT NULL AND status < 0    THEN 1 ELSE 0 END AS flag_status_negative,
  CASE WHEN data_as_of IS NULL                        THEN 1 ELSE 0 END AS flag_ts_unparsed
FROM dot_stage_text;


In [0]:


SELECT
  SUM(flag_speed_suspect)       AS n_speed_suspect,
  SUM(flag_travel_time_suspect) AS n_travel_time_suspect,
  SUM(flag_status_negative)     AS n_status_negative,
  SUM(flag_ts_unparsed)         AS n_ts_unparsed
FROM dot_stage_qaflags;


In [0]:


SELECT
  SUM(flag_speed_suspect)       AS n_speed_suspect,
  SUM(flag_travel_time_suspect) AS n_travel_time_suspect,
  SUM(flag_status_negative)     AS n_status_negative,
  SUM(flag_ts_unparsed)         AS n_ts_unparsed
FROM dot_stage_qaflags;


# 4) MATERIALIZE LOSSLESS SILVER

Reason: audit-friendly, no destructive filtering.

In [0]:

--- Materialize dot_silver (no drops)


CREATE OR REPLACE TABLE ambulance_project.ambulance_data.dot_silver AS
SELECT
  id,
  link_id,
  transcom_id,
  speed,
  travel_time,
  status,
  data_as_of,        -- parsed timestamp
  data_as_of_raw,    -- keep raw string for audit

  borough,
  link_name,
  owner,

  link_points,
  encoded_polyline,
  encoded_polyline_lvls,

  -- QA flags carried forward (handy in dashboards/alerts)
  flag_speed_suspect,
  flag_travel_time_suspect,
  flag_status_negative,
  flag_ts_unparsed
FROM dot_stage_qaflags;

In [0]:
SELECT 
  (SELECT COUNT(*) FROM ambulance_project.ambulance_data.t_dot)        AS raw_rows,
  (SELECT COUNT(*) FROM ambulance_project.ambulance_data.dot_silver) AS silver_rows;

-- Quick glance at the final table
SELECT * 
FROM ambulance_project.ambulance_data.dot_silver
LIMIT 10;

-- Null/flag overview in silver
SELECT
  SUM(CASE WHEN speed       IS NULL THEN 1 ELSE 0 END) AS null_speed,
  SUM(CASE WHEN travel_time IS NULL THEN 1 ELSE 0 END) AS null_travel_time,
  SUM(CASE WHEN data_as_of  IS NULL THEN 1 ELSE 0 END) AS null_parsed_ts,
  SUM(flag_speed_suspect)       AS n_speed_suspect,
  SUM(flag_travel_time_suspect) AS n_travel_time_suspect,
  SUM(flag_status_negative)     AS n_status_negative,
  SUM(flag_ts_unparsed)         AS n_ts_unparsed
FROM ambulance_project.ambulance_data.dot_silver;

# 5) DEDUPLICATE (SAME link_id + data_as_of) → KEEP LATEST id

Reason: DOT feeds often resend same timestamp; dedup prevents double counting and improves training integrity.

In [0]:

-- Deduplicate exact timestamp per link

CREATE OR REPLACE TEMP VIEW dot_silver_dedup AS
WITH ranked AS (
  SELECT *,
         ROW_NUMBER() OVER (PARTITION BY link_id, data_as_of ORDER BY id DESC) AS rn
  FROM ambulance_project.ambulance_data.dot_silver
)
SELECT * FROM ranked WHERE rn = 1;


In [0]:
SELECT COUNT(*) AS dup_pairs
FROM (
  SELECT link_id, data_as_of, COUNT(*) c
  FROM ambulance_project.ambulance_data.dot_silver
  GROUP BY 1,2
  HAVING COUNT(*) > 1
);


# 6) ROBUST OUTLIER TRIM (per link_id using Median ± 5*MAD)

Reason: remove sensor spikes without killing true congestion.

In [0]:
-- Compute per-link median
CREATE OR REPLACE TEMP VIEW dot_link_median AS
SELECT
  link_id,
  percentile_approx(speed, 0.5) AS med_speed
FROM dot_silver_dedup
WHERE speed IS NOT NULL AND speed >= 0 AND speed <= 120
GROUP BY link_id;

-- Compute per-link MAD using the median from step 1
CREATE OR REPLACE TEMP VIEW dot_link_stats AS
SELECT
  d.link_id,
  m.med_speed,
  percentile_approx(ABS(d.speed - m.med_speed), 0.5) AS mad_speed
FROM dot_silver_dedup d
JOIN dot_link_median m ON d.link_id = m.link_id
WHERE d.speed IS NOT NULL AND d.speed >= 0 AND d.speed <= 120
GROUP BY d.link_id, m.med_speed;

-- Trim outliers using the robust stats
CREATE OR REPLACE TEMP VIEW dot_trim AS
SELECT
  d.*,
  s.med_speed,
  s.mad_speed,
  CASE
    WHEN d.speed IS NULL THEN NULL
    WHEN d.speed < 0 OR d.speed > 120 THEN NULL
    WHEN s.mad_speed IS NULL OR s.mad_speed = 0 THEN d.speed
    WHEN d.speed BETWEEN (s.med_speed - 5 * s.mad_speed) AND (s.med_speed + 5 * s.mad_speed)
      THEN d.speed
    ELSE NULL
  END AS speed_trim
FROM dot_silver_dedup d
LEFT JOIN dot_link_stats s ON d.link_id = s.link_id;

In [0]:
SELECT 
  COUNT(*) AS n_all,
  SUM(CAST((speed_trim IS NULL AND speed IS NOT NULL) AS INT)) AS n_trimmed_as_outliers
FROM dot_trim;


# 7) LOCAL IMPUTATION (±30m rolling median per link)

Reason: preserve continuity for time-series models; short gaps often due to brief sensor dropouts.

In [0]:
-- Local imputation (±30 min rolling median per link)

CREATE OR REPLACE TEMP VIEW dot_impute AS
WITH base AS (
  SELECT *,
         percentile_approx(speed_trim, 0.5)
           OVER (PARTITION BY link_id
                 ORDER BY data_as_of
                 RANGE BETWEEN INTERVAL 30 MINUTES PRECEDING AND INTERVAL 30 MINUTES FOLLOWING)
         AS med_60m
  FROM dot_trim
)
SELECT
  id, link_id, transcom_id,
  COALESCE(speed_trim, med_60m, med_speed) AS speed_filled,
  travel_time, status,
  data_as_of, data_as_of_raw,
  borough, link_name, owner,
  link_points, encoded_polyline, encoded_polyline_lvls,
  flag_speed_suspect, flag_travel_time_suspect, flag_status_negative, flag_ts_unparsed
FROM base;


In [0]:
SELECT COUNT(CASE WHEN speed_filled IS NULL THEN 1 END) AS still_null_speed FROM dot_impute;


# 8) TIME FEATURES

 Reason: weekday/peak patterns are high-signal covariates.

In [0]:
-- Time features 

CREATE OR REPLACE TEMP VIEW dot_time AS
SELECT
  *,
  DATE(data_as_of)                 AS dte,
  HOUR(data_as_of)                 AS hr,
  MINUTE(data_as_of)               AS min,
  DAYOFWEEK(data_as_of)            AS dow,        -- 1=Sun..7=Sat (Spark)
  CASE WHEN DAYOFWEEK(data_as_of) BETWEEN 2 AND 6 THEN 1 ELSE 0 END AS is_weekday,
  CASE WHEN (HOUR(data_as_of) BETWEEN 7 AND 10) OR (HOUR(data_as_of) BETWEEN 16 AND 19) THEN 1 ELSE 0 END AS is_peak
FROM dot_impute;


# 9) GEOMETRY CLEAN (robust; final regex filter)

Reason: handle glued tokens, stray quotes, orphan lats, and out-of-bounds coordinates. Output start_lat/lon for joins.

In [0]:
CREATE OR REPLACE TEMP VIEW dot_geom_clean AS
WITH norm AS (
  SELECT
    link_id, data_as_of, link_points,
    -- strip surrounding quotes; collapse whitespace
    regexp_replace(regexp_replace(link_points, '^"|"$', ''), '\\s+', ' ') AS lp0
  FROM dot_time
),
patched AS (
  SELECT
    link_id, data_as_of,
    -- if a lon is glued to next lat "40.x/41.x", inject a space
    regexp_replace(lp0, '(\\d)(?=4[01]\\.\\d)', '\\1 ') AS lp1
  FROM norm
),
parsed AS (
  SELECT
    link_id, data_as_of,
    -- regex-extract lat,lon per token; non-matching tokens → NULLs
    transform(
      split(lp1, ' '),
      p -> struct(
        TRY_CAST(regexp_extract(p, '(-?\\d+\\.?\\d*),(-?\\d+\\.?\\d*)', 1) AS DOUBLE) AS lat,
        TRY_CAST(regexp_extract(p, '(-?\\d+\\.?\\d*),(-?\\d+\\.?\\d*)', 2) AS DOUBLE) AS lon
      )
    ) AS pts_all
  FROM patched
),
filtered AS (
  SELECT
    link_id, data_as_of,
    -- keep only valid, in-bounds coords (NYC-ish bbox)
    filter(
      pts_all,
      x -> x.lat IS NOT NULL AND x.lon IS NOT NULL
           AND x.lat BETWEEN 40.4 AND 41.1
           AND x.lon BETWEEN -74.5 AND -73.5
    ) AS pts
  FROM parsed
),
rebuilt AS (
  SELECT
    link_id,
    data_as_of,
    size(pts) AS n_valid_points,
    element_at(pts, 1).lat AS start_lat,
    element_at(pts, 1).lon AS start_lon,
    transform(pts, x -> concat(CAST(x.lat AS STRING), ',', CAST(x.lon AS STRING))) AS str_pts
  FROM filtered
),
joined AS (
  SELECT
    link_id,
    data_as_of,
    n_valid_points,
    start_lat, start_lon,
    trim(array_join(str_pts, ' ')) AS link_points_joined
  FROM rebuilt
)
SELECT
  link_id,
  data_as_of,
  -- Final regex: sequence of "lat,lon" tokens separated by single spaces
  CASE
    WHEN link_points_joined RLIKE '^(-?\\d+\\.?\\d*,-?\\d+\\.?\\d*)( (-?\\d+\\.?\\d*,-?\\d+\\.?\\d*))*$'
      THEN link_points_joined
    ELSE ''
  END AS link_points_clean,
  start_lat, start_lon,
  n_valid_points,
  CASE WHEN n_valid_points = 0 OR link_points_joined = '' THEN 1 ELSE 0 END AS flag_no_valid_points,
  0 AS flag_has_quotes
FROM joined;


# 10) PUBLISH FEATURE‑READY SILVER (with geometry)

Reason: one table ready for modeling + spatial joins.

In [0]:
CREATE OR REPLACE TABLE ambulance_project.ambulance_data.dot_silver_features AS
SELECT
  t.link_id,
  t.data_as_of,
  -- canonical speed (imputed/trimmed)
  t.speed_filled AS speed_mph,
  t.travel_time,
  t.status,

  -- dimensions
  t.borough, t.link_name, t.owner,

  -- time features
  t.dte, t.hr, t.min, t.dow, t.is_weekday, t.is_peak,

  -- convenient categorical bin for GBMs/monitoring
  CASE 
    WHEN t.speed_filled IS NULL THEN NULL
    WHEN t.speed_filled < 5  THEN 'vslow'
    WHEN t.speed_filled < 15 THEN 'slow'
    WHEN t.speed_filled < 30 THEN 'moderate'
    WHEN t.speed_filled < 45 THEN 'fast'
    ELSE 'vfast'
  END AS speed_bin,

  -- geometry (clean)
  g.link_points_clean AS link_points,
  g.start_lat, g.start_lon,
  g.n_valid_points,
  g.flag_no_valid_points,
  g.flag_has_quotes,

  -- original QA flags (for diagnostics)
  t.flag_speed_suspect, t.flag_travel_time_suspect, t.flag_status_negative, t.flag_ts_unparsed
FROM dot_time t
LEFT JOIN dot_geom_clean g
  USING (link_id, data_as_of);

-- Storage/scan optimization for common access paths
OPTIMIZE ambulance_project.ambulance_data.dot_silver_features
ZORDER BY (link_id, data_as_of);

# FINAL CHECK: Display cleaned DOT Silver Features
# 

In [0]:
SELECT *
FROM ambulance_project.ambulance_data.dot_silver_features
ORDER BY data_as_of DESC
LIMIT 20;