In [9]:
import pandas as pd
import json
from datetime import datetime, timedelta, time
import os
import time

In [10]:
# --- Configuration ---
RAW_JSON_FILE = '/Users/adrianiraeguialvear/Desktop/OnSpotML_v2/data/raw/parking_predictions_corrected.json'
OUTPUT_DIR = '../data/processed/'
OUTPUT_FILENAME = 'parking_predictions_processed.parquet'
OUTPUT_FILE = os.path.join(OUTPUT_DIR, OUTPUT_FILENAME)

# Prediction constants based on metadata
PREDICTION_START_HOUR = 8
PREDICTION_END_HOUR = 20
MINUTES_PER_SLOT = 5
SLOTS_PER_HOUR = 60 // MINUTES_PER_SLOT
SLOTS_PER_DAY = 288 # Data provides 288 slots (24 hours)
# Ensure output directory exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

In [11]:
# --- 1. Load Raw JSON Data (Memory Efficient & Handles FeatureCollection) ---
print(f'Loading raw JSON from: {RAW_JSON_FILE}')
start_load_time = time.time()

# Using standard json.load()
raw_data_list = None # Initialize
df_raw = None        # Initialize final DataFrame
processed_data_frames = [] # List to hold small DataFrames

try:
    with open(RAW_JSON_FILE, 'r') as f:
        raw_data_full = json.load(f) # Load the entire JSON structure

    # Check if data is nested under a key like 'OPENDATA_PSIU_APPARKB'
    if isinstance(raw_data_full, dict) and len(raw_data_full) == 1:
        key = list(raw_data_full.keys())[0]
        print(f'Data appears nested under key: {key}')
        raw_data_list = raw_data_full[key] # Get the list of date records
    elif isinstance(raw_data_full, list):
        print('Data appears as a list of date records.')
        raw_data_list = raw_data_full
    else:
        raise ValueError("Loaded JSON is not a list or a single-key dictionary containing a list.")

    if not raw_data_list:
         raise ValueError("The primary list of data records is empty.")

    # --- Normalize the nested structure (Processing in chunks/individually) ---
    print("Normalizing nested 'TRAMOS' data...")
    records_to_process = raw_data_list # Process all records
    num_processed = 0

    for date_record in records_to_process:
        fh_inicio = date_record.get('FH_INICIO')
        tramos_outer_list = date_record.get('TRAMOS') # This list contains the FeatureCollection
        current_date_features = [] # List to hold the actual features for this date

        # Check if FH_INICIO is missing
        if not fh_inicio:
            print(f"WARN: Skipping date record due to missing 'FH_INICIO': {date_record}")
            continue

        # Ensure tramos_outer_list is a list, default to empty list if not
        if not isinstance(tramos_outer_list, list):
            print(f"WARN: 'TRAMOS' for FH_INICIO {fh_inicio} is not a list, treating as empty: {tramos_outer_list}")
            tramos_outer_list = []

        # Iterate through items in the outer TRAMOS list (expecting one FeatureCollection)
        for item in tramos_outer_list:
            if isinstance(item, dict) and item.get('type') == 'FeatureCollection':
                # Found the FeatureCollection, get the actual features list
                features_list = item.get('features')
                if isinstance(features_list, list):
                    # Process the actual features
                    for feature_record in features_list:
                        if isinstance(feature_record, dict):
                            # Add the date to each feature's record (can add to properties or top-level)
                            # Adding to top-level for simplicity here:
                            feature_record['FH_INICIO'] = fh_inicio
                            # Add the extracted feature to our list for this date
                            current_date_features.append(feature_record)
                        else:
                            print(f"WARN: Item inside 'features' list is not a dictionary: {feature_record}")
                else:
                     print(f"WARN: 'features' key inside FeatureCollection is not a list: {features_list}")
            else:
                print(f"WARN: Item in 'TRAMOS' list is not a FeatureCollection dictionary: {item}")


        # If features were found for this date, create a small DataFrame
        if current_date_features:
            # Extract properties and geometry if needed, or flatten later
            # For now, create DataFrame from the list of feature dicts
            df_temp = pd.DataFrame(current_date_features)
            processed_data_frames.append(df_temp)

        num_processed += 1

    # --- End of loop ---
    print(f"Finished processing {num_processed} date records.")

    # Concatenate all the small DataFrames
    if processed_data_frames:
        print(f"Concatenating {len(processed_data_frames)} temporary DataFrames...")
        # Need to handle potential differences in columns if structure varies
        # Flatten the properties dictionary for easier access later
        all_processed_rows = []
        for df_temp in processed_data_frames:
             for index, feature_row in df_temp.iterrows():
                  row_data = {}
                  # Copy top-level keys (like FH_INICIO, type, potentially geometry)
                  for col in ['FH_INICIO', 'type', 'geometry']: # Add other top-level keys if they exist
                      if col in feature_row:
                           row_data[col] = feature_row[col]
                  # Flatten the properties dictionary
                  properties = feature_row.get('properties')
                  if isinstance(properties, dict):
                       for prop_key, prop_value in properties.items():
                            row_data[prop_key] = prop_value # Add properties keys
                  all_processed_rows.append(row_data)

        if all_processed_rows:
             df_raw = pd.DataFrame(all_processed_rows)
             print(f'Successfully loaded and normalized raw data. Shape: {df_raw.shape}')
             print(f'Columns: {df_raw.columns.tolist()}')
        else:
             print("WARNING: No rows extracted after attempting to flatten features.")
             df_raw = pd.DataFrame() # Empty DataFrame

        del processed_data_frames, all_processed_rows # Free memory
    else:
        print("WARNING: No valid feature records found after normalization (processed_data_frames is empty).")
        df_raw = pd.DataFrame() # Empty DataFrame

except FileNotFoundError:
    print(f'ERROR: Raw JSON file not found at {RAW_JSON_FILE}')
    df_raw = None
except MemoryError:
    print(f'ERROR: MemoryError loading/processing JSON. File might be too large.')
    df_raw = None
except json.JSONDecodeError as e:
    print(f'ERROR: Invalid JSON format - {e}')
    df_raw = None
except Exception as e:
    print(f'ERROR loading or normalizing JSON: {e}')
    print("Check JSON structure, key names ('FH_INICIO', 'TRAMOS'), and content.")
    df_raw = None

end_load_time = time.time()
if df_raw is not None and not df_raw.empty:
    print(f'JSON loading and normalization completed in {end_load_time - start_load_time:.2f} seconds.')
elif df_raw is not None and df_raw.empty:
     print(f'JSON loading completed, but resulted in an empty DataFrame. Time: {end_load_time - start_load_time:.2f} seconds.')
else:
    print('Failed to create df_raw from JSON.')


Loading raw JSON from: /Users/adrianiraeguialvear/Desktop/OnSpotML_v2/data/raw/parking_predictions_corrected.json
Data appears nested under key: OPENDATA_PSIU_APPARKB
Normalizing nested 'TRAMOS' data...
Finished processing 1 date records.
Concatenating 1 temporary DataFrames...
Successfully loaded and normalized raw data. Shape: (6381, 9)
Columns: ['FH_INICIO', 'type', 'geometry', 'ID_TRAMO', 'TRAMO', 'TIPO', 'TARIFA', 'HORARIO', 'PREDICCIONES']
JSON loading and normalization completed in 0.12 seconds.


In [16]:
# Step 2: Define the Prediction Parsing Function (Corrected, but NOT used by Optimized Step 3)

import pandas as pd
from datetime import datetime, timedelta
import time # Make sure time is imported
# import numpy as np # Uncomment if using np.nan for 'R' or unexpected values

# Function to parse the prediction string for a single row
def parse_predictions(row):
    """
    Parses the 'PREDICCIONES' string for a row and generates multiple records,
    one for each 5-minute interval prediction covering a full 24-hour period.

    Args:
        row (pd.Series): A row from the predictions DataFrame.
                         Expected columns: 'ID_TRAMO', 'FH_INICIO', 'PREDICCIONES'.

    Returns:
        list: A list of dictionaries, where each dictionary represents a
              single prediction interval record. Returns an empty list if
              input validation fails or an error occurs during parsing.
    """
    # --- Input Validation ---
    required_cols = ['ID_TRAMO', 'FH_INICIO', 'PREDICCIONES']
    if not all(col in row.index for col in required_cols):
        # if row.name < 5: # Print only for first 5 rows to avoid clutter
        #     print(f"DBG: Missing required columns for row {row.name}. Available: {row.index.tolist()}")
        return []

    # Check for NaN values in the required columns
    if pd.isna(row['ID_TRAMO']) or pd.isna(row['FH_INICIO']) or pd.isna(row['PREDICCIONES']):
        # if row.name < 5: # Print only for first 5 rows to avoid clutter
        #      print(f"DBG: NaN check failed for row {row.name}. Values: ID={row['ID_TRAMO']}, FH={row['FH_INICIO']}, PRED={row['PREDICCIONES']}")
        return []

    # Validate FH_INICIO format and convert to datetime
    try:
        start_dt = pd.to_datetime(row['FH_INICIO'], dayfirst=True) # Handle DD/MM/YYYY
    except ValueError:
        # if row.name < 5: # Print only for first 5 rows
        #      print(f"DBG: Date parsing failed for row {row.name}. Value: '{row['FH_INICIO']}'")
        return []

    # Validate prediction string type and length
    predictions_str = row['PREDICCIONES']
    if not isinstance(predictions_str, str):
        # if row.name < 5: # Print only for first 5 rows
        #      print(f"DBG: PREDICCIONES not a string for row {row.name}. Type: {type(predictions_str)}")
        return []

    # Use the constant defined in the config cell for expected length
    expected_length = SLOTS_PER_DAY # Should be 288 now
    if len(predictions_str) != expected_length:
        # if row.name < 5: # Print only for first 5 rows
        #      print(f"DBG: PREDICCIONES length mismatch for row {row.name}. Expected: {expected_length}, Got: {len(predictions_str)}")
        return []
    # --- End Input Validation ---

    parsed_data = []
    # --- Main Parsing Logic ---
    for i in range(expected_length): # Loop 288 times
        timestamp = start_dt + timedelta(minutes=i * MINUTES_PER_SLOT)
        pred_val = predictions_str[i]

        # Map prediction value ('0', '1', 'R') - Updated Mapping
        if pred_val == '0':
            mapped_pred_val = 0 # Assuming '0' means Libre/Free
        elif pred_val == '1':
            mapped_pred_val = 1 # Assuming '1' means Ocupado/Occupied
        elif pred_val == 'R':
            mapped_pred_val = -1 # Handling 'R' (Reserved/Restricted) as -1
        else:
            mapped_pred_val = -99 # Assigning a distinct code for unexpected values
            # print(f"WARN: Unexpected prediction character '{pred_val}' in row {row.name} at index {i}. Assigning -99.")

        parsed_data.append({
            'ID_TRAMO': row['ID_TRAMO'],
            'timestamp': timestamp,
            'prediction': mapped_pred_val
        })
    # --- End Main Parsing Logic ---

    return parsed_data

print("parse_predictions function defined (Corrected, but not used by Optimized Step 3).")

parse_predictions function defined (Corrected, but not used by Optimized Step 3).


In [19]:
# --- 3. Apply Parsing Function & Process Results (Optimized Version) ---
import numpy as np # Need numpy

df_processed = None # Initialize
if df_raw is not None and not df_raw.empty:
    print(f'\\nStarting optimized processing for {len(df_raw)} raw records...')
    start_parse_time = time.time()

    # --- 1. Prepare data for expansion ---
    # Ensure correct data types for columns we need
    df_raw['ID_TRAMO'] = pd.to_numeric(df_raw['ID_TRAMO'], errors='coerce')
    df_raw['FH_INICIO'] = pd.to_datetime(df_raw['FH_INICIO'], dayfirst=True, errors='coerce')
    # Keep only rows with valid ID and Start Date, and correct prediction length
    expected_length = SLOTS_PER_DAY # Should be 288
    df_valid = df_raw.dropna(subset=['ID_TRAMO', 'FH_INICIO'])
    df_valid = df_valid[df_valid['PREDICCIONES'].str.len() == expected_length].copy()
    print(f'Processing {len(df_valid)} valid rows after initial filtering.')

    if not df_valid.empty:
        # --- 2. Create Timestamps ---
        # Create a time delta range for one day (0 min, 5 min, ..., 1435 min for 288 slots)
        time_deltas = pd.to_timedelta(np.arange(expected_length) * MINUTES_PER_SLOT, unit='m')

        # --- 3. Expand DataFrame ---
        # Repeat each row 'expected_length' (288) times
        df_expanded = df_valid.loc[np.repeat(df_valid.index.values, expected_length)].copy()

        # --- 4. Calculate Correct Timestamps ---
        # Use the repeated FH_INICIO and add the corresponding time delta for each slot
        # We need to tile the time_deltas to match the expanded DataFrame length
        df_expanded['timestamp'] = df_expanded['FH_INICIO'] + np.tile(time_deltas, len(df_valid))

        # --- 5. Expand and Map Predictions ---
        # Flatten the prediction strings into a single series of characters
        all_pred_chars = np.concatenate(df_valid['PREDICCIONES'].apply(list).values)
        df_expanded['prediction_char'] = all_pred_chars

        # ----- Updated Mapping -----
        mapping = {'0': 0, '1': 1, '2': 2, '3': 3, 'R': -1}
        # -------------------------

        # Use .map, fill unknown with -99
        df_expanded['prediction'] = df_expanded['prediction_char'].map(mapping).fillna(-99)

        # --- 6. Final DataFrame Assembly ---
        # Select and reorder columns
        df_processed = df_expanded[['ID_TRAMO', 'timestamp', 'prediction']].copy()

        # --- 7. Data Type Conversion and Cleanup ---
        print('\\nConverting data types...')
        df_processed['ID_TRAMO'] = df_processed['ID_TRAMO'].astype('Int64')
        # Int8 is still suitable for range -1, 0, 1, 2, 3 (and -99)
        df_processed['prediction'] = df_processed['prediction'].astype('Int8')

        # Check if any unexpected characters resulted in -99
        unknown_chars_count = (df_processed['prediction'] == -99).sum()
        if unknown_chars_count > 0:
            print(f"WARN: Found {unknown_chars_count} instances of unexpected prediction characters (mapped to -99).")
            # You can uncomment this again if needed after this run
            # print("Unique unexpected characters:", df_expanded.loc[df_processed['prediction'] == -99, 'prediction_char'].unique())
        else:
             print("All prediction characters successfully mapped.")


        # Sort by ID and Timestamp
        print('Sorting data...')
        df_processed.sort_values(by=['ID_TRAMO', 'timestamp'], inplace=True)
        df_processed.reset_index(drop=True, inplace=True)

        print(f'\\nCreated processed DataFrame. Shape: {df_processed.shape}')
        print('\\nProcessed data info:')
        df_processed.info()
        print('\\nProcessed data head:')
        print(df_processed.head())
        print('\\nProcessed data tail:')
        print(df_processed.tail())
        print('\\nPrediction code distribution:')
        # DropNA before value_counts if using Int64/Int8 which are nullable
        print(df_processed['prediction'].value_counts(normalize=True, dropna=False).sort_index())

    else:
        print('WARNING: No valid data found after initial filtering. Check input file structure, column names, date formats, and prediction string lengths.')

    end_parse_time = time.time()
    print(f'Optimized data parsing and processing completed in {end_parse_time - start_parse_time:.2f} seconds.')

else:
    print('\\nSkipping parsing due to JSON loading failure or empty raw dataframe.')

\nStarting optimized processing for 6381 raw records...
Processing 6381 valid rows after initial filtering.
\nConverting data types...
All prediction characters successfully mapped.
Sorting data...
\nCreated processed DataFrame. Shape: (1837728, 3)
\nProcessed data info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1837728 entries, 0 to 1837727
Data columns (total 3 columns):
 #   Column      Dtype         
---  ------      -----         
 0   ID_TRAMO    Int64         
 1   timestamp   datetime64[ns]
 2   prediction  Int8          
dtypes: Int64(1), Int8(1), datetime64[ns](1)
memory usage: 33.3 MB
\nProcessed data head:
   ID_TRAMO           timestamp  prediction
0         7 2024-09-17 08:00:00           0
1         7 2024-09-17 08:05:00           0
2         7 2024-09-17 08:10:00           0
3         7 2024-09-17 08:15:00           0
4         7 2024-09-17 08:20:00           0
\nProcessed data tail:
         ID_TRAMO           timestamp  prediction
1837723     17056 2024-09-18 

In [20]:
# --- 4. Save Processed Data ---

if df_processed is not None and not df_processed.empty:
    print(f'\\nSaving processed data ({len(df_processed)} rows) to: {OUTPUT_FILE}')
    start_save_time = time.time()
    try:
        df_processed.to_parquet(OUTPUT_FILE, index=False, engine='pyarrow', compression='snappy')
        end_save_time = time.time()
        print(f'Processed data saved successfully in {end_save_time - start_save_time:.2f} seconds.')
    except Exception as e:
        print(f'ERROR saving processed data to Parquet: {e}')
else:
    print('\\nNo processed data available to save.')


\nSaving processed data (1837728 rows) to: ../data/processed/parking_predictions_processed.parquet
Processed data saved successfully in 0.12 seconds.


In [21]:
# --- 5. Load and Verify Saved Parquet File ---
import pandas as pd
import os

# Use the OUTPUT_FILE variable defined in the configuration cell
parquet_file_path = OUTPUT_FILE

print(f"Attempting to load Parquet file: {parquet_file_path}")

if os.path.exists(parquet_file_path):
    try:
        df_loaded = pd.read_parquet(parquet_file_path)
        print("\nFile loaded successfully.")
        print(f"Shape: {df_loaded.shape}")

        print("\nInfo:")
        df_loaded.info()

        print("\nData Types:")
        print(df_loaded.dtypes)

        print("\nHead:")
        print(df_loaded.head())

        print("\nTail:")
        print(df_loaded.tail())

        print("\nPrediction code distribution:")
        print(df_loaded['prediction'].value_counts(normalize=True, dropna=False).sort_index())

    except Exception as e:
        print(f"\nERROR loading or verifying Parquet file: {e}")
else:
    print(f"\nERROR: File not found at {parquet_file_path}")


Attempting to load Parquet file: ../data/processed/parking_predictions_processed.parquet

File loaded successfully.
Shape: (1837728, 3)

Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1837728 entries, 0 to 1837727
Data columns (total 3 columns):
 #   Column      Dtype         
---  ------      -----         
 0   ID_TRAMO    Int64         
 1   timestamp   datetime64[ns]
 2   prediction  Int8          
dtypes: Int64(1), Int8(1), datetime64[ns](1)
memory usage: 33.3 MB

Data Types:
ID_TRAMO               Int64
timestamp     datetime64[ns]
prediction              Int8
dtype: object

Head:
   ID_TRAMO           timestamp  prediction
0         7 2024-09-17 08:00:00           0
1         7 2024-09-17 08:05:00           0
2         7 2024-09-17 08:10:00           0
3         7 2024-09-17 08:15:00           0
4         7 2024-09-17 08:20:00           0

Tail:
         ID_TRAMO           timestamp  prediction
1837723     17056 2024-09-18 07:35:00           0
1837724     17056 2024-09-

In [22]:
# --- 6. Confirm Data Scope and Completeness ---
import pandas as pd

print("--- Analyzing Data Scope ---")

# 1. Verify Date Range
min_ts = df_loaded['timestamp'].min()
max_ts = df_loaded['timestamp'].max()
print(f"Timestamp range: {min_ts} to {max_ts}")

# Calculate duration and expected slots
time_span = max_ts - min_ts
# Add one slot duration to max_ts to get the end of the last interval for calculation
total_duration_minutes = (max_ts - min_ts + pd.Timedelta(minutes=MINUTES_PER_SLOT)).total_seconds() / 60
expected_slots_total = total_duration_minutes / MINUTES_PER_SLOT
print(f"Total time span covered: {time_span}")
# Note: Expected slots might be slightly complex if start/end times aren't perfectly aligned with day boundaries.
# Let's calculate expected slots per day more directly if possible
# Assuming 288 slots per full day were processed based on previous steps
num_days_rough = (max_ts.normalize() - min_ts.normalize()).days + 1
print(f"Rough number of days covered: {num_days_rough}")
# The exact expectation depends on whether the start/end times cross midnight
# Given the head/tail, it looks like 1 full day (Sept 17 08:00 to Sept 18 07:55)
# This means exactly 288 slots were generated per ID_TRAMO during processing.
expected_slots_per_tramo = SLOTS_PER_DAY # Should be 288 from config

# 2. Count Unique ID_TRAMOs
unique_tramos = df_loaded['ID_TRAMO'].nunique()
print(f"\nTotal unique ID_TRAMOs: {unique_tramos}")

# 3. Check Completeness per ID_TRAMO
print(f"\nChecking record count per ID_TRAMO (expected: {expected_slots_per_tramo})...")
record_counts = df_loaded.groupby('ID_TRAMO').size()

# Find ID_TRAMOs with counts different from the expected value
incomplete_tramos = record_counts[record_counts != expected_slots_per_tramo]

if incomplete_tramos.empty:
    print("All ID_TRAMOs have the expected number of records.")
else:
    print(f"Found {len(incomplete_tramos)} ID_TRAMOs with incomplete/unexpected record counts:")
    print(incomplete_tramos)

print("\n--- Analysis Complete ---")

--- Analyzing Data Scope ---
Timestamp range: 2024-09-17 08:00:00 to 2024-09-18 07:55:00
Total time span covered: 0 days 23:55:00
Rough number of days covered: 2

Total unique ID_TRAMOs: 6381

Checking record count per ID_TRAMO (expected: 288)...
All ID_TRAMOs have the expected number of records.

--- Analysis Complete ---
