# HERE Traffic API Data Extraction

This notebook demonstrates how to fetch traffic data from the HERE Traffic API v7 and prepare it for merging with map data.

## Step 1: Install Required Libraries

In [None]:
#!pip install requests pandas geopandas matplotlib folium

Collecting folium
  Downloading folium-0.20.0-py2.py3-none-any.whl.metadata (4.2 kB)
Collecting branca>=0.6.0 (from folium)
  Downloading branca-0.8.2-py3-none-any.whl.metadata (1.7 kB)
Collecting xyzservices (from folium)
  Downloading xyzservices-2025.4.0-py3-none-any.whl.metadata (4.3 kB)
Downloading folium-0.20.0-py2.py3-none-any.whl (113 kB)
Downloading branca-0.8.2-py3-none-any.whl (26 kB)
Downloading xyzservices-2025.4.0-py3-none-any.whl (90 kB)
Installing collected packages: xyzservices, branca, folium

   ---------------------------------------- 0/3 [xyzservices]
   ------------- -------------------------- 1/3 [branca]
   -------------------------- ------------- 2/3 [folium]
   -------------------------- ------------- 2/3 [folium]
   -------------------------- ------------- 2/3 [folium]
   -------------------------- ------------- 2/3 [folium]
   -------------------------- ------------- 2/3 [folium]
   -------------------------- ------------- 2/3 [folium]
   -------------------


[notice] A new release of pip is available: 25.1.1 -> 25.2
[notice] To update, run: C:\Users\maxyj\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.11_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


## Step 2: Import Libraries

In [11]:
import requests
import pandas as pd
import geopandas as gpd
import json
import matplotlib.pyplot as plt
import folium
from datetime import datetime
import os
import hashlib

# Create cache directory if it doesn't exist
if not os.path.exists('cache'):
    os.makedirs('cache')

print("Libraries imported successfully!")

Libraries imported successfully!


## Step 3: Set Up HERE API Credentials

You need to:
1. Sign up at [HERE Developer Portal](https://developer.here.com/)
2. Create a project and generate an API key
3. Replace `YOUR_API_KEY` below with your actual API key

In [None]:
# HERE API Configuration
HERE_API_KEY = HERE_API_KEY  # Replace with your actual API key

# HERE Traffic API v7 Endpoints
TRAFFIC_FLOW_URL = "https://data.traffic.hereapi.com/v7/flow"
TRAFFIC_INCIDENTS_URL = "https://data.traffic.hereapi.com/v7/incidents"

print("API configuration set!")

API configuration set!


## Step 4: Helper Functions for API Requests

In [13]:
def get_cache_filename(url, params):
    """Generate a unique cache filename based on request parameters"""
    cache_key = f"{url}_{json.dumps(params, sort_keys=True)}"
    hash_key = hashlib.sha1(cache_key.encode()).hexdigest()
    return f"cache/{hash_key}.json"

def fetch_traffic_flow(bbox, api_key, use_cache=True):
    """
    Fetch traffic flow data for a bounding box
    
    Parameters:
    - bbox: tuple (west, south, east, north) in WGS84 coordinates
    - api_key: HERE API key
    - use_cache: whether to use cached data
    
    Returns:
    - JSON response with traffic flow data
    """
    params = {
        'apiKey': api_key,
        'in': f'bbox:{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}',
        'locationReferencing': 'shape'  # Include geometry
    }
    
    cache_file = get_cache_filename(TRAFFIC_FLOW_URL, params)
    
    # Check cache first
    if use_cache and os.path.exists(cache_file):
        print(f"Loading from cache: {cache_file}")
        with open(cache_file, 'r') as f:
            return json.load(f)
    
    # Make API request
    print(f"Fetching traffic flow data from API...")
    response = requests.get(TRAFFIC_FLOW_URL, params=params)
    
    if response.status_code == 200:
        data = response.json()
        
        # Cache the response
        with open(cache_file, 'w') as f:
            json.dump(data, f)
        print(f"Data cached to: {cache_file}")
        
        return data
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
        return None

def fetch_traffic_incidents(bbox, api_key, use_cache=True):
    """
    Fetch traffic incidents for a bounding box
    
    Parameters:
    - bbox: tuple (west, south, east, north) in WGS84 coordinates
    - api_key: HERE API key
    - use_cache: whether to use cached data
    
    Returns:
    - JSON response with traffic incident data
    """
    params = {
        'apiKey': api_key,
        'in': f'bbox:{bbox[0]},{bbox[1]},{bbox[2]},{bbox[3]}',
        'locationReferencing': 'shape'
    }
    
    cache_file = get_cache_filename(TRAFFIC_INCIDENTS_URL, params)
    
    # Check cache first
    if use_cache and os.path.exists(cache_file):
        print(f"Loading from cache: {cache_file}")
        with open(cache_file, 'r') as f:
            return json.load(f)
    
    # Make API request
    print(f"Fetching traffic incidents from API...")
    response = requests.get(TRAFFIC_INCIDENTS_URL, params=params)
    
    if response.status_code == 200:
        data = response.json()
        
        # Cache the response
        with open(cache_file, 'w') as f:
            json.dump(data, f)
        print(f"Data cached to: {cache_file}")
        
        return data
    else:
        print(f"Error: {response.status_code}")
        print(response.text)
        return None

print("Helper functions defined!")

Helper functions defined!


## Step 5: Define Area of Interest

Define the bounding box for the area you want to fetch traffic data. Example coordinates for central Kuala Lumpur.

In [14]:
# Define bounding box: (west, south, east, north)
# Example: Central Kuala Lumpur
bbox_kl = (101.67, 3.13, 101.70, 3.16)

# Example: Singapore
bbox_singapore = (103.80, 1.28, 103.86, 1.32)

# Choose which area to query
selected_bbox = bbox_kl
location_name = "Kuala Lumpur"

print(f"Selected location: {location_name}")
print(f"Bounding box: West={selected_bbox[0]}, South={selected_bbox[1]}, East={selected_bbox[2]}, North={selected_bbox[3]}")

Selected location: Kuala Lumpur
Bounding box: West=101.67, South=3.13, East=101.7, North=3.16


## Step 6: Fetch Traffic Flow Data

In [18]:
# Fetch traffic flow data
traffic_flow_data = fetch_traffic_flow(selected_bbox, HERE_API_KEY, use_cache=True)

if traffic_flow_data:
    print(f"\nTraffic Flow Data Retrieved!")
    print(f"Number of results: {len(traffic_flow_data.get('results', []))}")
    
    # Display first result as example
    if traffic_flow_data.get('results'):
        print("\nExample result structure:")
        print(json.dumps(traffic_flow_data['results'][0], indent=2)[:500])
else:
    print("Failed to fetch traffic flow data. Check your API key and internet connection.")

Loading from cache: cache/c490e0e07c8c89cd20defd442bde397c878fbe6c.json

Traffic Flow Data Retrieved!
Number of results: 337

Example result structure:
{
  "location": {
    "description": "Jalan Tugu",
    "length": 408.0,
    "shape": {
      "links": [
        {
          "points": [
            {
              "lat": 3.14643,
              "lng": 101.68846
            },
            {
              "lat": 3.14628,
              "lng": 101.68863
            }
          ],
          "length": 25.0,
          "functionalClass": 4
        },
        {
          "points": [
            {
              "lat": 3.14628,
              "lng": 101.688


## Step 7: Parse Traffic Flow Data into DataFrame

In [19]:
def parse_traffic_flow_to_dataframe(traffic_data):
    """Convert traffic flow JSON to pandas DataFrame"""
    
    if not traffic_data or 'results' not in traffic_data:
        return None
    
    records = []
    for result in traffic_data['results']:
        location = result.get('location', {})
        current_flow = result.get('currentFlow', {})
        
        record = {
            'location_description': location.get('description', ''),
            'speed': current_flow.get('speed', None),
            'speed_limit': current_flow.get('speedLimit', None),
            'jam_factor': current_flow.get('jamFactor', None),
            'confidence': current_flow.get('confidence', None),
            'free_flow_speed': current_flow.get('freeFlowSpeed', None),
            'traversability': current_flow.get('traversability', ''),
        }
        
        # Extract geometry if available
        if 'shape' in location:
            shape = location['shape']
            if 'links' in shape:
                # Extract coordinates from links
                coords = []
                for link in shape['links']:
                    if 'points' in link:
                        for point in link['points']:
                            coords.append((point.get('lng'), point.get('lat')))
                record['geometry'] = coords
        
        records.append(record)
    
    df = pd.DataFrame(records)
    return df

# Parse the data
if traffic_flow_data:
    flow_df = parse_traffic_flow_to_dataframe(traffic_flow_data)
    
    if flow_df is not None:
        print(f"Traffic Flow DataFrame created with {len(flow_df)} records")
        print("\nDataFrame Info:")
        print(flow_df.info())
        print("\nFirst few records:")
        print(flow_df.head())
        
        # Display statistics
        print("\nTraffic Statistics:")
        print(f"Average Speed: {flow_df['speed'].mean():.2f} km/h")
        print(f"Average Jam Factor: {flow_df['jam_factor'].mean():.2f}")
        print(f"Average Free Flow Speed: {flow_df['free_flow_speed'].mean():.2f} km/h")

Traffic Flow DataFrame created with 337 records

DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 337 entries, 0 to 336
Data columns (total 8 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   location_description  337 non-null    object 
 1   speed                 336 non-null    float64
 2   speed_limit           0 non-null      object 
 3   jam_factor            337 non-null    float64
 4   confidence            336 non-null    float64
 5   free_flow_speed       0 non-null      object 
 6   traversability        337 non-null    object 
 7   geometry              337 non-null    object 
dtypes: float64(3), object(5)
memory usage: 21.2+ KB
None

First few records:
      location_description      speed speed_limit  jam_factor  confidence  \
0               Jalan Tugu   8.333334        None         0.0        0.70   
1           Jalan Kinabalu   8.333334        None         3.3        0.90   
2        Jal

## Step 8: Fetch Traffic Incidents

In [20]:
# Fetch traffic incidents
traffic_incidents_data = fetch_traffic_incidents(selected_bbox, HERE_API_KEY, use_cache=True)

if traffic_incidents_data:
    print(f"\nTraffic Incidents Data Retrieved!")
    print(f"Number of incidents: {len(traffic_incidents_data.get('results', []))}")
    
    # Display first incident as example
    if traffic_incidents_data.get('results'):
        print("\nExample incident structure:")
        print(json.dumps(traffic_incidents_data['results'][0], indent=2)[:500])
else:
    print("No incidents found or failed to fetch data.")

Loading from cache: cache/ba32628a8125a8b66479bd721678955754eae4b0.json

Traffic Incidents Data Retrieved!
Number of incidents: 1

Example incident structure:
{
  "location": {
    "length": 105.0,
    "shape": {
      "links": [
        {
          "points": [
            {
              "lat": 3.15014,
              "lng": 101.69454
            },
            {
              "lat": 3.15064,
              "lng": 101.69492
            }
          ],
          "length": 70.0,
          "functionalClass": 3
        },
        {
          "points": [
            {
              "lat": 3.15064,
              "lng": 101.69492
            },
            {
 


## Step 9: Parse Traffic Incidents into DataFrame

In [21]:
def parse_incidents_to_dataframe(incidents_data):
    """Convert traffic incidents JSON to pandas DataFrame"""
    
    if not incidents_data or 'results' not in incidents_data:
        return None
    
    records = []
    for incident in incidents_data['results']:
        location = incident.get('location', {})
        incident_details = incident.get('incidentDetails', {})
        
        record = {
            'incident_id': incident.get('incidentId', ''),
            'original_id': incident.get('originalId', ''),
            'type': incident_details.get('type', ''),
            'description': incident_details.get('description', {}).get('value', ''),
            'criticality': incident_details.get('criticality', ''),
            'start_time': incident_details.get('startTime', ''),
            'end_time': incident_details.get('endTime', ''),
            'entry_time': incident_details.get('entryTime', ''),
        }
        
        # Extract location coordinates
        if 'shape' in location:
            shape = location['shape']
            if 'links' in shape:
                coords = []
                for link in shape['links']:
                    if 'points' in link:
                        for point in link['points']:
                            coords.append((point.get('lng'), point.get('lat')))
                record['geometry'] = coords
        
        records.append(record)
    
    df = pd.DataFrame(records)
    return df

# Parse incidents data
if traffic_incidents_data:
    incidents_df = parse_incidents_to_dataframe(traffic_incidents_data)
    
    if incidents_df is not None and len(incidents_df) > 0:
        print(f"Traffic Incidents DataFrame created with {len(incidents_df)} records")
        print("\nDataFrame Info:")
        print(incidents_df.info())
        print("\nFirst few incidents:")
        print(incidents_df.head())
        
        # Display incident type counts
        print("\nIncident Types:")
        print(incidents_df['type'].value_counts())
    else:
        print("No incidents to display.")

Traffic Incidents DataFrame created with 1 records

DataFrame Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 9 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   incident_id  1 non-null      object
 1   original_id  1 non-null      object
 2   type         1 non-null      object
 3   description  1 non-null      object
 4   criticality  1 non-null      object
 5   start_time   1 non-null      object
 6   end_time     1 non-null      object
 7   entry_time   1 non-null      object
 8   geometry     1 non-null      object
dtypes: object(9)
memory usage: 204.0+ bytes
None

First few incidents:
  incident_id original_id         type                          description  \
0                          roadClosure  Closed at Jalan Tun Perak - Closed.   

  criticality            start_time              end_time  \
0    critical  2025-11-12T15:32:35Z  2025-11-14T03:32:35Z   

             entry_time    

## Step 10: Visualize Traffic Data on a Map

In [11]:
def visualize_traffic_on_map(flow_df, incidents_df, bbox, location_name):
    """Create an interactive map with traffic flow and incidents"""
    
    # Calculate center of bounding box
    center_lat = (bbox[1] + bbox[3]) / 2
    center_lon = (bbox[0] + bbox[2]) / 2
    
    # Create map
    m = folium.Map(location=[center_lat, center_lon], zoom_start=13)
    
    # Add traffic flow lines
    if flow_df is not None and 'geometry' in flow_df.columns:
        for idx, row in flow_df.iterrows():
            if row['geometry'] and len(row['geometry']) > 0:
                # Determine color based on jam factor
                jam_factor = row['jam_factor'] if pd.notna(row['jam_factor']) else 0
                if jam_factor < 2:
                    color = 'green'  # Free flow
                elif jam_factor < 4:
                    color = 'yellow'  # Moderate
                elif jam_factor < 8:
                    color = 'orange'  # Slow
                else:
                    color = 'red'  # Congested
                
                # Convert geometry to lat/lon format for folium
                coords = [(lat, lon) for lon, lat in row['geometry']]
                
                # Create popup with traffic info (handle None values)
                speed_text = f"{row['speed']:.1f}" if pd.notna(row['speed']) else "N/A"
                speed_limit_text = f"{row['speed_limit']}" if pd.notna(row['speed_limit']) else "N/A"
                jam_factor_text = f"{row['jam_factor']:.1f}" if pd.notna(row['jam_factor']) else "N/A"
                free_flow_text = f"{row['free_flow_speed']:.1f}" if pd.notna(row['free_flow_speed']) else "N/A"
                
                popup_text = f"""
                <b>Location:</b> {row['location_description']}<br>
                <b>Speed:</b> {speed_text} km/h<br>
                <b>Speed Limit:</b> {speed_limit_text} km/h<br>
                <b>Jam Factor:</b> {jam_factor_text}<br>
                <b>Free Flow Speed:</b> {free_flow_text} km/h
                """
                
                folium.PolyLine(
                    coords,
                    color=color,
                    weight=3,
                    opacity=0.7,
                    popup=folium.Popup(popup_text, max_width=300)
                ).add_to(m)
    
    # Add traffic incidents
    if incidents_df is not None and len(incidents_df) > 0 and 'geometry' in incidents_df.columns:
        for idx, row in incidents_df.iterrows():
            if row['geometry'] and len(row['geometry']) > 0:
                # Use first coordinate as marker location
                lat, lon = row['geometry'][0][1], row['geometry'][0][0]
                
                # Create popup with incident info
                popup_text = f"""
                <b>Type:</b> {row['type']}<br>
                <b>Description:</b> {row['description']}<br>
                <b>Criticality:</b> {row['criticality']}<br>
                <b>Start Time:</b> {row['start_time']}<br>
                """
                
                folium.Marker(
                    [lat, lon],
                    popup=folium.Popup(popup_text, max_width=300),
                    icon=folium.Icon(color='red', icon='exclamation-triangle', prefix='fa')
                ).add_to(m)
    
    # Add legend
    legend_html = '''
    <div style="position: fixed; 
                bottom: 50px; right: 50px; width: 180px; height: 150px; 
                background-color: white; border:2px solid grey; z-index:9999; 
                font-size:14px; padding: 10px">
    <p><strong>Traffic Flow</strong></p>
    <p><span style="color:green;">&#9632;</span> Free Flow (JF < 2)</p>
    <p><span style="color:yellow;">&#9632;</span> Moderate (JF 2-4)</p>
    <p><span style="color:orange;">&#9632;</span> Slow (JF 4-8)</p>
    <p><span style="color:red;">&#9632;</span> Congested (JF > 8)</p>
    </div>
    '''
    m.get_root().html.add_child(folium.Element(legend_html))
    
    return m

# Create the map
if 'flow_df' in locals():
    traffic_map = visualize_traffic_on_map(
        flow_df if 'flow_df' in locals() else None,
        incidents_df if 'incidents_df' in locals() else None,
        selected_bbox,
        location_name
    )
    
    # Save map
    map_filename = f"traffic_map_{location_name.replace(' ', '_')}.html"
    traffic_map.save(map_filename)
    print(f"Map saved to: {map_filename}")
    
    # Display map
    traffic_map
else:
    print("No traffic data available to visualize.")

Map saved to: traffic_map_Kuala_Lumpur.html


## Step 11: Save Data for Merging with Map Data

In [None]:
# Save traffic flow data
if 'flow_df' in locals() and flow_df is not None:
    flow_filename = f"traffic_flow_{location_name.replace(' ', '_')}.csv"
    flow_df.to_csv(flow_filename, index=False)
    print(f"Traffic flow data saved to: {flow_filename}")

# Save traffic incidents data
if 'incidents_df' in locals() and incidents_df is not None and len(incidents_df) > 0:
    incidents_filename = f"traffic_incidents_{location_name.replace(' ', '_')}.csv"
    incidents_df.to_csv(incidents_filename, index=False)
    print(f"Traffic incidents data saved to: {incidents_filename}")

# Also save as GeoJSON for easier merging with map data
if 'flow_df' in locals() and flow_df is not None and 'geometry' in flow_df.columns:
    # Convert to GeoDataFrame
    from shapely.geometry import LineString
    
    geometries = []
    valid_indices = []
    
    for idx, row in flow_df.iterrows():
        if row['geometry'] and len(row['geometry']) > 1:
            try:
                line = LineString(row['geometry'])
                geometries.append(line)
                valid_indices.append(idx)
            except:
                pass
    
    if geometries:
        gdf = gpd.GeoDataFrame(
            flow_df.loc[valid_indices].drop(columns=['geometry']),
            geometry=geometries,
            crs='EPSG:4326'
        )
        
        geojson_filename = f"traffic_flow_{location_name.replace(' ', '_')}.geojson"
        gdf.to_file(geojson_filename, driver='GeoJSON')
        print(f"Traffic flow GeoJSON saved to: {geojson_filename}")

print("\nData saved and ready to merge with map data!")

## Step 12: Summary Statistics and Visualization

In [None]:
# Create visualizations
if 'flow_df' in locals() and flow_df is not None:
    fig, axes = plt.subplots(2, 2, figsize=(15, 10))
    
    # Speed distribution
    axes[0, 0].hist(flow_df['speed'].dropna(), bins=30, color='blue', alpha=0.7, edgecolor='black')
    axes[0, 0].set_xlabel('Speed (km/h)')
    axes[0, 0].set_ylabel('Frequency')
    axes[0, 0].set_title('Distribution of Current Speeds')
    axes[0, 0].axvline(flow_df['speed'].mean(), color='red', linestyle='--', label=f'Mean: {flow_df["speed"].mean():.1f}')
    axes[0, 0].legend()
    
    # Jam factor distribution
    axes[0, 1].hist(flow_df['jam_factor'].dropna(), bins=20, color='orange', alpha=0.7, edgecolor='black')
    axes[0, 1].set_xlabel('Jam Factor')
    axes[0, 1].set_ylabel('Frequency')
    axes[0, 1].set_title('Distribution of Jam Factors')
    axes[0, 1].axvline(flow_df['jam_factor'].mean(), color='red', linestyle='--', label=f'Mean: {flow_df["jam_factor"].mean():.2f}')
    axes[0, 1].legend()
    
    # Speed vs Speed Limit
    axes[1, 0].scatter(flow_df['speed_limit'], flow_df['speed'], alpha=0.5)
    axes[1, 0].plot([0, flow_df['speed_limit'].max()], [0, flow_df['speed_limit'].max()], 'r--', label='Equal line')
    axes[1, 0].set_xlabel('Speed Limit (km/h)')
    axes[1, 0].set_ylabel('Current Speed (km/h)')
    axes[1, 0].set_title('Current Speed vs Speed Limit')
    axes[1, 0].legend()
    
    # Traffic congestion categories
    flow_df['congestion'] = pd.cut(flow_df['jam_factor'], 
                                    bins=[0, 2, 4, 8, 10], 
                                    labels=['Free Flow', 'Moderate', 'Slow', 'Congested'])
    congestion_counts = flow_df['congestion'].value_counts()
    axes[1, 1].bar(congestion_counts.index.astype(str), congestion_counts.values, 
                   color=['green', 'yellow', 'orange', 'red'])
    axes[1, 1].set_xlabel('Traffic Condition')
    axes[1, 1].set_ylabel('Number of Road Segments')
    axes[1, 1].set_title('Traffic Congestion Categories')
    axes[1, 1].tick_params(axis='x', rotation=45)
    
    plt.tight_layout()
    plt.savefig(f'traffic_analysis_{location_name.replace(" ", "_")}.png', dpi=300, bbox_inches='tight')
    plt.show()
    
    print(f"\nVisualization saved to: traffic_analysis_{location_name.replace(' ', '_')}.png")

## Next Steps: Merging with Map Data

To merge this traffic data with your OSM map data:

1. **Spatial Join**: Use GeoPandas to perform a spatial join between the traffic flow GeoJSON and your road network from OSM
2. **Match by Coordinates**: The traffic flow geometry (LineStrings) can be matched with OSM road segments
3. **Attribute Transfer**: Transfer traffic attributes (speed, jam_factor) to the corresponding road segments

Example code for merging:
```python
import osmnx as ox
import geopandas as gpd

# Load your OSM road network
G = ox.graph_from_place("Kuala Lumpur, Malaysia", network_type='drive')
edges = ox.graph_to_gdfs(G, nodes=False)

# Load traffic data
traffic_gdf = gpd.read_file("traffic_flow_Kuala_Lumpur.geojson")

# Perform spatial join (nearest neighbor)
merged = gpd.sjoin_nearest(edges, traffic_gdf, how='left', max_distance=0.001)

# Now you have road segments with traffic data!
```

## Step 13: Hourly Batch Data Collection for Historical Analysis

This section sets up automated hourly data collection to build a historical traffic database.

In [7]:
import time
import sqlite3
from datetime import datetime, timedelta
import schedule

def collect_traffic_data_with_timestamp(bbox, api_key, location_name):
    """
    Collect traffic data and add timestamp for historical tracking
    
    Returns:
    - Tuple of (flow_df, incidents_df) with timestamp column added
    """
    timestamp = datetime.now()
    
    # Fetch traffic flow data (use_cache=False to get fresh data)
    print(f"\n{'='*60}")
    print(f"Collecting data at: {timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'='*60}")
    
    flow_data = fetch_traffic_flow(bbox, api_key, use_cache=False)
    incidents_data = fetch_traffic_incidents(bbox, api_key, use_cache=False)
    
    # Parse to dataframes
    flow_df = None
    incidents_df = None
    
    if flow_data:
        flow_df = parse_traffic_flow_to_dataframe(flow_data)
        if flow_df is not None:
            flow_df['timestamp'] = timestamp
            flow_df['hour'] = timestamp.hour
            flow_df['day_of_week'] = timestamp.strftime('%A')
            flow_df['date'] = timestamp.date()
            print(f"✓ Collected {len(flow_df)} traffic flow records")
    
    if incidents_data:
        incidents_df = parse_incidents_to_dataframe(incidents_data)
        if incidents_df is not None and len(incidents_df) > 0:
            incidents_df['timestamp'] = timestamp
            incidents_df['hour'] = timestamp.hour
            incidents_df['day_of_week'] = timestamp.strftime('%A')
            incidents_df['date'] = timestamp.date()
            print(f"✓ Collected {len(incidents_df)} traffic incident records")
        else:
            print("✓ No incidents at this time")
    
    return flow_df, incidents_df

print("Batch collection function defined!")

Batch collection function defined!


### Option 1: Save to SQLite Database (Recommended for Historical Data)

In [8]:
def save_to_sqlite(flow_df, incidents_df, db_name='traffic_historical.db'):
    """
    Save traffic data to SQLite database for historical analysis
    
    Parameters:
    - flow_df: DataFrame with traffic flow data
    - incidents_df: DataFrame with traffic incidents data
    - db_name: Name of SQLite database file
    """
    conn = sqlite3.connect(db_name)
    
    try:
        # Save traffic flow data
        if flow_df is not None and len(flow_df) > 0:
            # Convert geometry to string for storage
            flow_df_copy = flow_df.copy()
            if 'geometry' in flow_df_copy.columns:
                flow_df_copy['geometry'] = flow_df_copy['geometry'].astype(str)
            
            # Check if table exists and has correct schema
            cursor = conn.cursor()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='traffic_flow'")
            table_exists = cursor.fetchone() is not None
            
            if table_exists:
                # Check if timestamp column exists
                cursor.execute("PRAGMA table_info(traffic_flow)")
                columns = [row[1] for row in cursor.fetchall()]
                if 'timestamp' not in columns:
                    print("⚠ Warning: Existing table schema is outdated. Dropping and recreating table...")
                    cursor.execute("DROP TABLE traffic_flow")
                    conn.commit()
                    table_exists = False
            
            # Use 'replace' for first insert to create table, then 'append'
            if_exists = 'replace' if not table_exists else 'append'
            flow_df_copy.to_sql('traffic_flow', conn, if_exists=if_exists, index=False)
            print(f"✓ Saved {len(flow_df)} flow records to database")
        
        # Save traffic incidents data
        if incidents_df is not None and len(incidents_df) > 0:
            incidents_df_copy = incidents_df.copy()
            if 'geometry' in incidents_df_copy.columns:
                incidents_df_copy['geometry'] = incidents_df_copy['geometry'].astype(str)
            
            # Check if table exists and has correct schema
            cursor = conn.cursor()
            cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='traffic_incidents'")
            table_exists = cursor.fetchone() is not None
            
            if table_exists:
                # Check if timestamp column exists
                cursor.execute("PRAGMA table_info(traffic_incidents)")
                columns = [row[1] for row in cursor.fetchall()]
                if 'timestamp' not in columns:
                    print("⚠ Warning: Existing table schema is outdated. Dropping and recreating table...")
                    cursor.execute("DROP TABLE traffic_incidents")
                    conn.commit()
                    table_exists = False
            
            if_exists = 'replace' if not table_exists else 'append'
            incidents_df_copy.to_sql('traffic_incidents', conn, if_exists=if_exists, index=False)
            print(f"✓ Saved {len(incidents_df)} incident records to database")
        
        print(f"✓ Data saved to {db_name}")
        
    finally:
        conn.close()

# Test the save function with current data
if 'flow_df' in locals():
    # Add timestamp columns to existing flow_df for testing
    if 'timestamp' not in flow_df.columns:
        timestamp = datetime.now()
        flow_df['timestamp'] = timestamp
        flow_df['hour'] = timestamp.hour
        flow_df['day_of_week'] = timestamp.strftime('%A')
        flow_df['date'] = timestamp.date()
    
    # Add timestamp columns to incidents_df if it exists
    if 'incidents_df' in locals() and incidents_df is not None and 'timestamp' not in incidents_df.columns:
        timestamp = datetime.now()
        incidents_df['timestamp'] = timestamp
        incidents_df['hour'] = timestamp.hour
        incidents_df['day_of_week'] = timestamp.strftime('%A')
        incidents_df['date'] = timestamp.date()
    
    save_to_sqlite(flow_df, incidents_df if 'incidents_df' in locals() else None)
    print("\nDatabase created successfully!")

#### Troubleshooting: Reset Database if Schema Changes

In [3]:
# If you encounter schema errors, you can manually delete the old database file
# Uncomment and run this cell to start fresh:

import os
if os.path.exists('traffic_historical.db'):
    os.remove('traffic_historical.db')
    print("✓ Old database deleted. You can now save data with the new schema.")
else:
    print("No database file found.")

print("Database reset function ready (uncomment to use)")

✓ Old database deleted. You can now save data with the new schema.
Database reset function ready (uncomment to use)


### Option 2: Save to Parquet Files (Efficient for Large Datasets)

In [None]:
# Install pyarrow for parquet support
# !pip install pyarrow

def save_to_parquet(flow_df, incidents_df, data_dir='historical_data'):
    """
    Save traffic data to Parquet files partitioned by date and hour
    
    Parameters:
    - flow_df: DataFrame with traffic flow data
    - incidents_df: DataFrame with traffic incidents data
    - data_dir: Directory to store parquet files
    """
    if not os.path.exists(data_dir):
        os.makedirs(data_dir)
    
    timestamp = datetime.now()
    date_str = timestamp.strftime('%Y-%m-%d')
    hour_str = timestamp.strftime('%H')
    
    # Save traffic flow data
    if flow_df is not None and len(flow_df) > 0:
        flow_dir = f"{data_dir}/flow"
        if not os.path.exists(flow_dir):
            os.makedirs(flow_dir)
        
        flow_df_copy = flow_df.copy()
        if 'geometry' in flow_df_copy.columns:
            flow_df_copy['geometry'] = flow_df_copy['geometry'].astype(str)
        
        filename = f"{flow_dir}/flow_{date_str}_{hour_str}.parquet"
        flow_df_copy.to_parquet(filename, index=False)
        print(f"✓ Saved flow data to {filename}")
    
    # Save traffic incidents data
    if incidents_df is not None and len(incidents_df) > 0:
        incidents_dir = f"{data_dir}/incidents"
        if not os.path.exists(incidents_dir):
            os.makedirs(incidents_dir)
        
        incidents_df_copy = incidents_df.copy()
        if 'geometry' in incidents_df_copy.columns:
            incidents_df_copy['geometry'] = incidents_df_copy['geometry'].astype(str)
        
        filename = f"{incidents_dir}/incidents_{date_str}_{hour_str}.parquet"
        incidents_df_copy.to_parquet(filename, index=False)
        print(f"✓ Saved incidents data to {filename}")

print("Parquet save function defined!")

### Automated Hourly Collection Loop

In [9]:
def hourly_data_collection(bbox, api_key, location_name, storage_method='sqlite', duration_hours=None):
    """
    Collect traffic data every hour
    
    Parameters:
    - bbox: Bounding box coordinates
    - api_key: HERE API key
    - location_name: Name of location
    - storage_method: 'sqlite' or 'parquet'
    - duration_hours: How many hours to collect (None for indefinite)
    """
    
    collection_count = 0
    start_time = datetime.now()
    
    print(f"\n{'='*60}")
    print(f"Starting hourly data collection at {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Location: {location_name}")
    print(f"Storage method: {storage_method}")
    if duration_hours:
        print(f"Duration: {duration_hours} hours")
    else:
        print("Duration: Indefinite (press interrupt to stop)")
    print(f"{'='*60}\n")
    
    try:
        while True:
            # Collect data
            flow_df, incidents_df = collect_traffic_data_with_timestamp(bbox, api_key, location_name)
            
            # Save data
            if storage_method == 'sqlite':
                save_to_sqlite(flow_df, incidents_df)
            elif storage_method == 'parquet':
                save_to_parquet(flow_df, incidents_df)
            else:
                print(f"Warning: Unknown storage method '{storage_method}'")
            
            collection_count += 1
            
            # Check if duration limit reached
            if duration_hours and collection_count >= duration_hours:
                print(f"\n✓ Completed {collection_count} collections over {duration_hours} hours")
                break
            
            # Wait until next hour
            now = datetime.now()
            next_hour = (now + timedelta(hours=1)).replace(minute=0, second=0, microsecond=0)
            wait_seconds = (next_hour - now).total_seconds()
            
            print(f"\nNext collection at: {next_hour.strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"Waiting {wait_seconds/60:.1f} minutes...")
            
            time.sleep(wait_seconds)
            
    except KeyboardInterrupt:
        print(f"\n\n{'='*60}")
        print(f"Collection stopped by user")
        print(f"Total collections: {collection_count}")
        print(f"{'='*60}")

print("Hourly collection function ready!")
print("\nTo start collection, run:")
print("hourly_data_collection(selected_bbox, HERE_API_KEY, location_name, storage_method='sqlite')")

Hourly collection function ready!

To start collection, run:
hourly_data_collection(selected_bbox, HERE_API_KEY, location_name, storage_method='sqlite')


### Example: Collect Data for 24 Hours

In [None]:
# Example: Collect data every hour for 24 hours
# Uncomment to run:

hourly_data_collection(
     bbox=selected_bbox,
     api_key=HERE_API_KEY,
     location_name=location_name,
     storage_method='sqlite',  # or 'parquet'
     duration_hours=24
)

print("Ready to start hourly collection!")
print("\nUncomment the code above and run to start collecting data every hour.")


Starting hourly data collection at 2025-11-13 15:56:33
Location: Kuala Lumpur
Storage method: sqlite
Duration: 24 hours


Collecting data at: 2025-11-13 15:56:33
Fetching traffic flow data from API...
Data cached to: cache/c490e0e07c8c89cd20defd442bde397c878fbe6c.json
Fetching traffic incidents from API...
Data cached to: cache/ba32628a8125a8b66479bd721678955754eae4b0.json
✓ Collected 337 traffic flow records
✓ Collected 1 traffic incident records
✓ Saved 337 flow records to database
✓ Saved 1 incident records to database
✓ Data saved to traffic_historical.db

Next collection at: 2025-11-13 16:00:00
Waiting 3.4 minutes...


### Querying Historical Data from SQLite

In [1]:
def query_historical_data(db_name='traffic_historical.db', table='traffic_flow', 
                         start_date=None, end_date=None, hours=None):
    """
    Query historical traffic data from SQLite database
    
    Parameters:
    - db_name: Name of the database file
    - table: 'traffic_flow' or 'traffic_incidents'
    - start_date: Start date (YYYY-MM-DD) or None for all
    - end_date: End date (YYYY-MM-DD) or None for all
    - hours: List of hours to filter (e.g., [7, 8, 9] for morning rush)
    
    Returns:
    - DataFrame with historical data
    """
    conn = sqlite3.connect(db_name)
    
    query = f"SELECT * FROM {table}"
    conditions = []
    
    if start_date:
        conditions.append(f"date >= '{start_date}'")
    if end_date:
        conditions.append(f"date <= '{end_date}'")
    if hours:
        hour_list = ','.join(map(str, hours))
        conditions.append(f"hour IN ({hour_list})")
    
    if conditions:
        query += " WHERE " + " AND ".join(conditions)
    
    df = pd.read_sql_query(query, conn)
    conn.close()
    
    # Convert timestamp back to datetime
    if 'timestamp' in df.columns:
        df['timestamp'] = pd.to_datetime(df['timestamp'])
    
    return df

# Example usage (after collecting some data):
# Get all morning rush hour data (7-9 AM)
# morning_data = query_historical_data(hours=[7, 8, 9])
# print(f"Retrieved {len(morning_data)} records from morning rush hours")

# Get data for specific date range
# week_data = query_historical_data(start_date='2024-01-01', end_date='2024-01-07')

print("Query function ready!")
print("\nExample queries:")
print("1. morning_data = query_historical_data(hours=[7, 8, 9])")
print("2. rush_hour = query_historical_data(hours=[17, 18, 19])")
print("3. week_data = query_historical_data(start_date='2024-01-01', end_date='2024-01-07')")

Query function ready!

Example queries:
1. morning_data = query_historical_data(hours=[7, 8, 9])
2. rush_hour = query_historical_data(hours=[17, 18, 19])
3. week_data = query_historical_data(start_date='2024-01-01', end_date='2024-01-07')


## Step 14: Data Storage Recommendations

Below are recommendations for storing your historical traffic data for analysis.

### Storage Options Comparison

| Storage Method | Pros | Cons | Best For |
|---------------|------|------|----------|
| **SQLite Database** | ✓ Easy querying with SQL<br>✓ Good for time-based queries<br>✓ Single file<br>✓ Built-in Python support | ✗ Slower for very large datasets<br>✗ Limited concurrent writes | Small to medium datasets (<10GB)<br>Quick prototyping<br>Ad-hoc queries |
| **Parquet Files** | ✓ Highly compressed<br>✓ Very fast reads<br>✓ Column-oriented (efficient for analytics)<br>✓ Works with big data tools | ✗ Immutable (can't update)<br>✗ Needs explicit partitioning | Large datasets (>10GB)<br>Long-term storage<br>Analytics workloads |
| **PostgreSQL + PostGIS** | ✓ Advanced spatial queries<br>✓ Scalable<br>✓ Concurrent access<br>✓ ACID compliance | ✗ Requires separate server<br>✗ More setup complexity | Production systems<br>Multi-user access<br>Complex spatial analysis |
| **InfluxDB / TimescaleDB** | ✓ Optimized for time-series<br>✓ Fast aggregations<br>✓ Built-in downsampling | ✗ Requires installation<br>✗ Learning curve | Time-series analysis<br>Real-time monitoring<br>High-frequency data |

### Recommended Approach

For your traffic analysis project, I recommend a **hybrid approach**:

1. **SQLite for initial collection** (1-7 days)
   - Easy to set up and query
   - Good for exploring patterns
   - Quick prototyping of analysis

2. **Parquet files for long-term storage** (weekly/monthly archives)
   - Partition by date: `historical_data/flow/2024-01-15/`
   - Compress older data
   - Easy to process with Pandas/Dask

3. **Upgrade to PostgreSQL+PostGIS** if:
   - Dataset grows beyond 50GB
   - Need real-time dashboards
   - Multiple people accessing data
   - Complex spatial joins with OSM data

### Storage Structure Example

```
traffic_data/
├── traffic_historical.db          # SQLite for recent data
├── historical_data/                # Parquet archive
│   ├── flow/
│   │   ├── 2024-01-15/
│   │   │   ├── flow_00.parquet
│   │   │   ├── flow_01.parquet
│   │   │   └── ...
│   │   └── 2024-01-16/
│   └── incidents/
│       ├── 2024-01-15/
│       └── 2024-01-16/
└── analysis/                       # Processed/aggregated data
    ├── hourly_averages.parquet
    ├── daily_patterns.parquet
    └── peak_hours.csv
```

### Data Volume Estimation

Calculate expected data size for planning storage:

In [None]:
def estimate_data_size(num_flow_records, num_incident_records, duration_days):
    """
    Estimate storage requirements for traffic data collection
    
    Parameters:
    - num_flow_records: Average number of flow records per hour
    - num_incident_records: Average number of incident records per hour
    - duration_days: Number of days to collect data
    """
    
    # Approximate sizes per record (in KB)
    flow_record_size_kb = 0.5  # ~500 bytes per flow record
    incident_record_size_kb = 0.8  # ~800 bytes per incident record
    
    hours_per_day = 24
    total_hours = duration_days * hours_per_day
    
    # Calculate total records
    total_flow_records = num_flow_records * total_hours
    total_incident_records = num_incident_records * total_hours
    
    # Calculate sizes
    flow_size_mb = (total_flow_records * flow_record_size_kb) / 1024
    incident_size_mb = (total_incident_records * incident_record_size_kb) / 1024
    total_size_mb = flow_size_mb + incident_size_mb
    
    # With compression (Parquet typically achieves 3-5x compression)
    compressed_size_mb = total_size_mb / 4
    
    print(f"\n{'='*60}")
    print(f"Storage Estimation for {duration_days} days of data collection")
    print(f"{'='*60}")
    print(f"\nCollection Parameters:")
    print(f"  - Flow records per hour: {num_flow_records:,}")
    print(f"  - Incident records per hour: {num_incident_records:,}")
    print(f"  - Total hours: {total_hours:,}")
    print(f"\nStorage Requirements:")
    print(f"  - Traffic Flow: {flow_size_mb:.1f} MB ({total_flow_records:,} records)")
    print(f"  - Traffic Incidents: {incident_size_mb:.1f} MB ({total_incident_records:,} records)")
    print(f"  - Total (SQLite): ~{total_size_mb:.1f} MB")
    print(f"  - Total (Parquet compressed): ~{compressed_size_mb:.1f} MB")
    print(f"\nRecommendations:")
    if total_size_mb < 1000:
        print("  ✓ SQLite is perfect for this data volume")
    elif total_size_mb < 10000:
        print("  ✓ SQLite or Parquet both work well")
        print("  ✓ Consider Parquet for better compression")
    else:
        print("  ✓ Use Parquet for optimal storage")
        print("  ✓ Consider PostgreSQL for frequent queries")
    print(f"{'='*60}\n")

# Example: Estimate for your current data
if 'flow_df' in locals():
    current_flow_count = len(flow_df) if flow_df is not None else 0
    current_incident_count = len(incidents_df) if 'incidents_df' in locals() and incidents_df is not None else 0
    
    print(f"Current collection has:")
    print(f"  - {current_flow_count} flow records")
    print(f"  - {current_incident_count} incident records")
    
    # Estimate for different durations
    for days in [7, 30, 90, 365]:
        estimate_data_size(current_flow_count, current_incident_count, days)

### Advanced: Using a Task Scheduler (Alternative to Notebook Loop)

For production-level data collection, consider using a task scheduler instead of running the notebook continuously.

#### Option A: Windows Task Scheduler (Windows)

1. Create a Python script (`collect_traffic.py`) with the collection logic
2. Open Task Scheduler → Create Basic Task
3. Set trigger: Daily, repeat every 1 hour
4. Set action: Start a program → Python executable → Script path

#### Option B: Cron Job (Linux/Mac)

```bash
# Edit crontab
crontab -e

# Add this line to run every hour
0 * * * * /path/to/python /path/to/collect_traffic.py >> /path/to/logs/traffic_collection.log 2>&1
```

#### Option C: Python Script with Schedule Library

Create a standalone Python script that can run in the background:

In [None]:
# Sample standalone script content for collect_traffic.py
# Save this as a .py file and run separately

standalone_script = '''
import requests
import pandas as pd
import json
import sqlite3
from datetime import datetime
import os

# Configuration
HERE_API_KEY = "YOUR_API_KEY_HERE"
BBOX = (101.67, 3.13, 101.70, 3.16)  # Kuala Lumpur
LOCATION_NAME = "Kuala_Lumpur"
DB_NAME = "traffic_historical.db"

# ... (copy the fetch_traffic_flow, fetch_traffic_incidents, 
#      parse functions, and save_to_sqlite from above)

def main():
    print(f"Starting collection at {datetime.now()}")
    
    # Fetch data
    flow_data = fetch_traffic_flow(BBOX, HERE_API_KEY, use_cache=False)
    incidents_data = fetch_traffic_incidents(BBOX, HERE_API_KEY, use_cache=False)
    
    # Parse data
    flow_df = parse_traffic_flow_to_dataframe(flow_data)
    incidents_df = parse_incidents_to_dataframe(incidents_data)
    
    # Add timestamps
    timestamp = datetime.now()
    if flow_df is not None:
        flow_df["timestamp"] = timestamp
        flow_df["hour"] = timestamp.hour
        flow_df["day_of_week"] = timestamp.strftime("%A")
        flow_df["date"] = timestamp.date()
    
    if incidents_df is not None:
        incidents_df["timestamp"] = timestamp
        incidents_df["hour"] = timestamp.hour
        incidents_df["day_of_week"] = timestamp.strftime("%A")
        incidents_df["date"] = timestamp.date()
    
    # Save to database
    save_to_sqlite(flow_df, incidents_df, DB_NAME)
    print(f"Collection completed at {datetime.now()}")

if __name__ == "__main__":
    main()
'''

print("Standalone script template ready!")
print("\nTo use:")
print("1. Copy the code above to a file named 'collect_traffic.py'")
print("2. Include all necessary functions from this notebook")
print("3. Set up a task scheduler to run it hourly")
print("4. Or run: python collect_traffic.py (manually for testing)")

## Summary: Quick Start Guide for Historical Data Collection

### Step-by-Step Instructions:

1. **Run cells 1-9** to set up your environment and test a single data collection

2. **Choose your storage method:**
   - **SQLite** (recommended for starting): Good for 1-30 days of data
   - **Parquet**: Better for long-term archives (>30 days)

3. **Start hourly collection:**
   ```python
   # In a new cell or uncomment in Step 13:
   hourly_data_collection(
       bbox=selected_bbox,
       api_key=HERE_API_KEY,
       location_name=location_name,
       storage_method='sqlite',  # or 'parquet'
       duration_hours=24  # or None for continuous
   )
   ```

4. **Query your historical data:**
   ```python
   # Get morning rush hour data
   morning_data = query_historical_data(hours=[7, 8, 9])
   
   # Get evening rush hour data
   evening_data = query_historical_data(hours=[17, 18, 19])
   
   # Analyze patterns
   avg_speed_by_hour = morning_data.groupby('hour')['speed'].mean()
   ```

5. **For long-term collection**, use a task scheduler instead of keeping the notebook running

### Analysis Ideas:

- **Peak hours identification**: Find when traffic is worst
- **Day-of-week patterns**: Compare weekdays vs weekends
- **Speed predictions**: Build ML models to predict congestion
- **Incident correlation**: See how incidents affect traffic flow
- **Route optimization**: Find fastest routes at different times

### Next Steps:

- Collect data for at least 1 week to see daily patterns
- Collect for 1+ month to see weekly patterns
- Use the OSM map merging code to visualize on actual roads