In [1]:
# Created by Frederic on 15.03.2025.
# 
# This script is used to create a HashMap out of the binary data of the traffic simulation.
#
# Steps:
# - Read the binary data from a duckdb file
# - Split the data into the different segments of the roads
# - Extract the density by averaging over the last dt time steps (dt*10 seconds)
# - Use these information to create random vehicles
#        - In this step handle the min distance issue to avoid disappearing vehicles
# - Create a HashMap with the information of the vehicles (parameters and position)

In [2]:
import duckdb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
import Hashmap_Creation as hc

In [3]:
def selectx(table_name, lane_count, time_from, time_to, detector_from, detector_to, conn) -> np.array:
    time_steps = time_to - time_from
    detectors = detector_to - detector_from

    what = ",".join([f"lane{l1}" for l1 in range(lane_count)])
    sql = (f"SELECT {what} FROM {table_name} "
           f"WHERE time >= {time_from} AND time < {time_to} "
           f"AND sensor >= {detector_from} AND sensor < {detector_to} "
           f"ORDER BY time ASC, sensor ASC")

    sql_res = conn.sql(sql).fetchnumpy()

    res = np.zeros([3, time_steps, detectors], dtype=np.uint8)

    for l1 in range(lane_count):
        lane_data = np.array(sql_res[f"lane{l1}"], dtype=np.uint8)
        res[l1] = np.reshape(lane_data, (-1, detectors))

    res = np.transpose(res, (1, 0, 2))
    return res

def selecty(table_name, lane_id, time_from, time_to, time_future, detector_from, detector_to, quantile, conn) -> np.array:
    time_intervals = time_to - time_from
    detectors = detector_to - detector_from
    sql = (
        f"SELECT lane{lane_id} FROM {table_name} WHERE time < {time_to + time_future} AND time >= {time_from} "
        f"AND sensor < {detector_to} AND sensor >= {detector_from} ")
    sql_res = conn.sql(sql).fetchnumpy()
    res = np.zeros([time_intervals, detectors], dtype=np.uint8)
    lane_data = sql_res[f"lane{lane_id}"]

    for i in range(time_intervals):
        i_data = lane_data[i*detectors:(i+time_future)*detectors].reshape(time_future, detectors, order='C').astype(dtype=np.uint8)
        res[i] = np.quantile(i_data, quantile, axis=0)
    return res.squeeze()

In [5]:
# First we create a dataframe and csv with all possible vehicle types by reading a .rou.xml file
import xml.etree.ElementTree as ET

tree = ET.parse('../../../DemoCase/Routes2000_2_Krauss_represent_traffic_BBV_30.rou.xml')

root = tree.getroot()

# We are only interested in the vehicle types parameters
vehicle_types = root.findall('vType')

# Read the content of the vehicle types: lcStrategic, tau, id, accel, decel, sigma, length, width, maxSpeed, vClass, carFollowModel, color, guiShape

vehicle_types_dict = {}
for vehicle_type in vehicle_types:
    vehicle_types_dict[vehicle_type.attrib['id']] = vehicle_type.attrib
    
vehicle_types_df = pd.DataFrame(vehicle_types_dict).T

# Set the id as index
vehicle_types_df = vehicle_types_df.set_index('id')

vehicle_types_df.to_csv('vehicle_types.csv') 

In [7]:
# Open the duckdb file
path = "D:/extern3/Traffic_KITracking/scenario.duckdb"

con = duckdb.connect(path)

In [8]:
lanes = 2

time_steps_before_simulation = 100
sensors_per_segment = 20
length_per_sensor = 5

tstart = 0
tend = 16000
twin = 1
tstep = 1

xstart = 0
xend = 4000

x = selectx('rawdata', lanes, tstart, tstart+time_steps_before_simulation, xstart, xstart + xend, con)
# Only keep the first 2 of the second dimension
x = x[:, :2, :]
print(x.shape)
# Split the data into the different segments of the roads
x_new = np.zeros([x.shape[0], x.shape[1], x.shape[2]//sensors_per_segment, sensors_per_segment])
for time_step in range(x.shape[0]):
    for lane in range(x.shape[1]):
        x_new[time_step, lane] = np.array(np.split(x[time_step, lane], x.shape[2]//sensors_per_segment), dtype=np.uint8)
print(x_new.shape)

(100, 2, 4000)
(100, 2, 200, 20)


In [9]:
density = np.zeros([x_new.shape[0], x_new.shape[1], x_new.shape[2]])
for time_step in range(x_new.shape[0]):
    for lane in range(x_new.shape[1]):
        density[time_step, lane] = np.mean(x_new[time_step, lane], axis=1)
        
print(density.shape)

density = np.mean(density, axis=0)

print(density.shape)

(100, 2, 200)
(2, 200)


In [10]:
density

array([[0.1845, 0.176 , 0.157 , 0.1855, 0.159 , 0.1205, 0.133 , 0.14  ,
        0.15  , 0.107 , 0.1045, 0.216 , 0.1265, 0.107 , 0.1315, 0.0925,
        0.128 , 0.1   , 0.179 , 0.161 , 0.2295, 0.168 , 0.12  , 0.137 ,
        0.207 , 0.138 , 0.1125, 0.207 , 0.154 , 0.1525, 0.0925, 0.1655,
        0.164 , 0.1295, 0.1695, 0.111 , 0.244 , 0.2095, 0.157 , 0.1055,
        0.21  , 0.1255, 0.1195, 0.1   , 0.1505, 0.183 , 0.2575, 0.1505,
        0.131 , 0.1385, 0.2   , 0.1725, 0.1865, 0.22  , 0.156 , 0.183 ,
        0.197 , 0.183 , 0.1655, 0.2595, 0.1   , 0.0855, 0.236 , 0.2365,
        0.151 , 0.2035, 0.228 , 0.1805, 0.178 , 0.1635, 0.162 , 0.1645,
        0.113 , 0.0865, 0.086 , 0.2535, 0.209 , 0.14  , 0.1035, 0.208 ,
        0.1145, 0.135 , 0.1115, 0.197 , 0.162 , 0.2615, 0.1605, 0.1465,
        0.1395, 0.205 , 0.1605, 0.193 , 0.1285, 0.1235, 0.1255, 0.1935,
        0.208 , 0.166 , 0.149 , 0.1375, 0.1385, 0.219 , 0.2315, 0.172 ,
        0.1645, 0.214 , 0.089 , 0.1315, 0.0845, 0.1055, 0.1315, 

In [11]:
density_per_segment = np.mean(density, axis=0)
density_per_segment.shape

(200,)

In [12]:
lane_elements_per_segment = 2
possible_elements = []
for i in range(density_per_segment.shape[0]):
    possible_elements.append([i*2, i*2+1])
    
len(possible_elements)

200

In [13]:
vehicle_dict, route_dict = hc.randomly_place_vehicle(vehicle_types_df, density_per_segment[10], 100, possible_elements[10])

vehicle_dict

No vehicles fit in the available space
No more vehicles can be placed


{'ORV4_0': 'ORV4', 'SUV5_0': 'SUV5'}

In [15]:
hc.create_rou_xml(vehicle_dict, route_dict, 'Template.rou.xml')

Created file routes/test.rou.xml
