In [22]:
import sqlite3
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from pathlib import Path
import pytz
import glob

# --- Configuration ---
# Paths as specified in your prompt
DB_PATH = Path('../intersections/franklin & smeed/data/datz/spm_data.db')
PICKLE_DIR = Path('../intersections/franklin & smeed/data/DataFrames')

# Timezone settings
LOCAL_TZ = pytz.timezone('US/Mountain')

def get_db_data(start_dt, end_dt):
    """
    Reads from SQLite, converts Epoch -> Datetime, and localizes to MST.
    """
    if not DB_PATH.exists():
        print(f"❌ Database not found: {DB_PATH}")
        return pd.DataFrame()

    # 1. Convert Input (Local Naive) -> UTC Epoch for querying
    # Assume input string is "Wall Clock Time" in Garden City (MST)
    start_loc = LOCAL_TZ.localize(start_dt)
    end_loc = LOCAL_TZ.localize(end_dt)
    
    start_epoch = start_loc.timestamp()
    end_epoch = end_loc.timestamp()

    print(f"Fetching DB data from {start_loc} to {end_loc}...")

    # 2. Query DB
    with sqlite3.connect(DB_PATH) as conn:
        query = """
        SELECT timestamp, event_type, parameter 
        FROM logs 
        WHERE timestamp >= ? AND timestamp < ?
        ORDER BY timestamp
        """
        df = pd.read_sql_query(query, conn, params=(start_epoch, end_epoch))

    if df.empty:
        return df

    # 3. Process Columns
    # Convert Epoch Float -> Datetime (UTC)
    df['TS_start'] = pd.to_datetime(df['timestamp'], unit='s', utc=True)
    
    # Convert UTC -> US/Mountain
    df['TS_start'] = df['TS_start'].dt.tz_convert(LOCAL_TZ)
    
    # Remove timezone info to match naive Pickles (if pickles are naive)
    df['TS_start'] = df['TS_start'].dt.tz_localize(None)
    
    # Rename to match Old Format
    df = df.rename(columns={'event_type': 'Code', 'parameter': 'ID'})
    
    # Drop the raw epoch column
    return df[['TS_start', 'Code', 'ID']]

def get_pickle_data(start_dt, end_dt):
    """
    Finds relevant pickles by date and filters to exact time range.
    """
    if not PICKLE_DIR.exists():
        print(f"❌ Pickle directory not found: {PICKLE_DIR}")
        return pd.DataFrame()

    # 1. Find relevant files based on Date
    # Pickles are named: df_raw_YYYY_MM_DD_HHMM-YYYY_MM_DD_HHMM.pkl
    # We'll just grab files matching the dates in our range
    
    target_dates = pd.date_range(start_dt.date(), end_dt.date())
    files_to_read = []
    
    for date in target_dates:
        date_str = date.strftime('%Y_%m_%d')
        # Glob for any file starting with this date
        pattern = f"df_raw_{date_str}_*.pkl"
        found = list(PICKLE_DIR.glob(pattern))
        files_to_read.extend(found)
    
    if not files_to_read:
        print("⚠️ No pickle files found for these dates.")
        return pd.DataFrame()

    print(f"Reading {len(files_to_read)} pickle file(s)...")
    
    # 2. Load and Concatenate
    dfs = []
    for f in sorted(files_to_read):
        try:
            # Using bz2 as per your read_data.py
            dfs.append(pd.read_pickle(f, compression='bz2'))
        except Exception as e:
            print(f"  Error reading {f.name}: {e}")

    if not dfs:
        return pd.DataFrame()
        
    df_all = pd.concat(dfs)

    # 3. Filter specific columns and time range
    # Ensure columns exist (Pickles might have Cycle_start, Coord_plan, etc.)
    cols = ['TS_start', 'Code', 'ID']
    df_all = df_all[cols].copy()
    
    # Filter by time
    mask = (df_all['TS_start'] >= start_dt) & (df_all['TS_start'] < end_dt)
    return df_all[mask].sort_values(['TS_start', 'Code', 'ID']).reset_index(drop=True)

def compare_datasets(start_time_str, end_time_str):
    """
    Main Comparison Function.
    Args:
        start_time_str: 'YYYY-MM-DD HH:MM:SS' (24hr)
        end_time_str:   'YYYY-MM-DD HH:MM:SS' (24hr)
    """
    # Parse inputs
    s_dt = datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S')
    e_dt = datetime.strptime(end_time_str, '%Y-%m-%d %H:%M:%S')
    
    # --- GET DATA ---
    print("--- Loading Data ---")
    df_new = get_db_data(s_dt, e_dt)
    df_old = get_pickle_data(s_dt, e_dt)
    
    # --- ANALYSIS ---
    print("\n--- Comparison Results ---")
    print(f"Time Window: {s_dt} to {e_dt}")
    print(f"New DB Rows:   {len(df_new):,}")
    print(f"Old Pickle Rows: {len(df_old):,}")
    
    if df_new.empty and df_old.empty:
        print("Both datasets are empty for this range.")
        return

    # 1. Check for "Break Records" (Code 0)
    # The new DB has these, the old pickles likely do not.
    breaks_new = df_new[df_new['Code'] == 0]
    real_data_new = df_new[df_new['Code'] != 0]
    
    print(f"\nNew DB Analysis:")
    print(f"  - Real Data Rows: {len(real_data_new):,}")
    print(f"  - Generated Breaks (Code 0): {len(breaks_new)}")
    
    # 2. Direct Discrepancy (Count Mismatch)
    diff = len(real_data_new) - len(df_old)
    if diff == 0:
        print("\n✅ SUCCESS: 'Real' record counts match perfectly!")
    else:
        print(f"\n⚠️ MISMATCH: Count difference of {diff} rows.")

    # 3. Visual Sampling
    print("\n--- Visual Sample (First 5 Rows) ---")
    print("NEW (DB):")
    display(df_new.head()) if 'display' in globals() else print(df_new.head())
    print("OLD (Pickle):")
    display(df_old.head()) if 'display' in globals() else print(df_old.head())

    print("\n--- Visual Sample (Last 5 Rows) ---")
    print("NEW (DB):")
    display(df_new.tail()) if 'display' in globals() else print(df_new.tail())
    print("OLD (Pickle):")
    display(df_old.tail()) if 'display' in globals() else print(df_old.tail())

    return df_new, df_old

# --- EXAMPLE USAGE ---
# Run this cell, then uncomment the line below to test
df1, df2 = compare_datasets('2025-11-25 17:00:00', '2025-11-28 22:00:00')

--- Loading Data ---
Fetching DB data from 2025-11-25 17:00:00-07:00 to 2025-11-28 22:00:00-07:00...
Reading 4 pickle file(s)...

--- Comparison Results ---
Time Window: 2025-11-25 17:00:00 to 2025-11-28 22:00:00
New DB Rows:   1,097,789
Old Pickle Rows: 1,097,789

New DB Analysis:
  - Real Data Rows: 1,080,902
  - Generated Breaks (Code 0): 16887

⚠️ MISMATCH: Count difference of -16887 rows.

--- Visual Sample (First 5 Rows) ---
NEW (DB):
             TS_start  Code  ID
0 2025-11-25 17:00:00     3   7
1 2025-11-25 17:00:00    44   4
2 2025-11-25 17:00:00    81  37
3 2025-11-25 17:00:00    81  38
4 2025-11-25 17:00:00    81  42
OLD (Pickle):
                 TS_start  Code  ID
0 2025-11-25 17:00:00.200     3   7
1 2025-11-25 17:00:00.200    44   4
2 2025-11-25 17:00:00.200    81  37
3 2025-11-25 17:00:00.200    81  38
4 2025-11-25 17:00:00.200    81  42

--- Visual Sample (Last 5 Rows) ---
NEW (DB):
                             TS_start  Code  ID
1097784 2025-11-28 21:59:57.200000048 

In [12]:
import os

os.getcwd()

'c:\\Users\\rhansen\\Documents\\Python\\SPMs\\notebooks'

In [None]:
os.chdir('./notebooks')

In [None]:
import zlib
import struct
import sys
from datetime import datetime, timedelta

def convert_datz_to_csv(input_file):
    if not os.path.exists(input_file):
        print(f"Error: File {input_file} not found.")
        return

    # 1. Decompress the file
    try:
        with open(input_file, 'rb') as f:
            compressed_data = f.read()
            # Decompress raw zlib data
            full_content = zlib.decompress(compressed_data)
    except Exception as e:
        print(f"Error reading/decompressing {input_file}: {e}")
        return

    # 2. Find the Text/Binary Boundary
    # The header ends with the line "Phases in use:..." followed by a newline.
    # We look for the newline character '\n' (0x0A) after that specific marker.
    marker = b"Phases in use:"
    marker_pos = full_content.find(marker)
    
    if marker_pos == -1:
        print("Error: Could not find header end marker.")
        return
        
    # Find the end of that line
    newline_pos = full_content.find(b'\n', marker_pos)
    if newline_pos == -1:
        print("Error: Header line not terminated.")
        return

    # Split the data
    header_bytes = full_content[:newline_pos+1]
    binary_bytes = full_content[newline_pos+1:]

    # 3. Parse the Header (to get the Base Timestamp)
    header_text = header_bytes.decode('utf-8')
    header_lines = header_text.splitlines()
    
    # We need the start time from the first line: "11-25-2025 21:36:00.0,..."
    # Format: MM-DD-YYYY HH:MM:SS.f
    first_line = header_lines[0]
    date_part = first_line.split(',')[0] # Extract "11-25-2025 21:36:00.0"
    
    # Clean up format for Python parsing (replace - with / if needed, though strptime handles it)
    # The binary header uses dashes, the CSV uses slashes.
    try:
        # Parse "11-25-2025 21:36:00.0"
        # Note: Python < 3.7 might struggle with .0, so we handle microseconds manually if needed
        time_str, micro_str = date_part.split('.')
        base_time = datetime.strptime(time_str, "%m-%d-%Y %H:%M:%S")
        base_time = base_time + timedelta(microseconds=int(micro_str) * 100000)
    except ValueError:
        print(f"Error parsing date: {date_part}")
        return

    # 4. Parse the Binary Data
    # Schema found: 4 Bytes per row
    # Byte 0: Event Type (Integer)
    # Byte 1: Parameter (Integer)
    # Byte 2-3: Time Offset (Unsigned Short, Big Endian) -> tenths of a second
    
    row_size = 4
    num_rows = len(binary_bytes) // row_size
    
    csv_rows = []
    
    # Add Header (Matches your CSV format)
    csv_rows.append("Timestamp,Event Type,Parameter")
    
    # (Optional) Reconstruct the Metadata rows from the header_lines if you need them in the CSV
    # Your example CSV includes metadata rows at the top. 
    # For now, let's process the binary data which is the critical part.

    for i in range(num_rows):
        chunk = binary_bytes[i*row_size : (i+1)*row_size]
        
        # Unpack: > = Big Endian, B = uchar (1 byte), H = ushort (2 bytes)
        event_type, parameter, time_offset = struct.unpack('>BBH', chunk)
        
        # Calculate actual time
        # Offset is in 10ths of a second
        row_time = base_time + timedelta(seconds=time_offset/10.0)
        
        # Format Timestamp: MM/DD/YYYY HH:MM:SS.f (1 digit precision)
        ts_str = row_time.strftime("%m/%d/%Y %H:%M:%S")
        ms_digit = row_time.microsecond // 100000
        final_ts = f"{ts_str}.{ms_digit}"
        
        csv_rows.append(f"{final_ts},{event_type},{parameter}")

    # 5. Save to CSV
    output_filename = input_file.replace(".datZ", ".csv")
    with open(output_filename, 'w', newline='') as f:
        f.write('\n'.join(csv_rows))

    print(f"Success! Converted {input_file} -> {output_filename}")
    print(f"Processed {num_rows} binary records.")

Error reading/decompressing FAILED_ECON_10.70.10.51_2025_01_23_0730.datZ: Error -3 while decompressing data: invalid stored block lengths
