In [10]:
import os
import time
import logging
import re

import numpy as np
import pandas as pd
import yaml
from watchdog.observers import Observer

from dasly.master import Dasly
from dasly.utils import (
    assign_id_df,
    save_lines_csv,
    HDF5EventHandler,
    add_subtract_dt,
    get_date_time,
    gen_id,
    get_file_paths_deploy,
    create_connection_string,
    read_sql
)

from dasly.utils import (
    assign_id_df,
    save_lines_csv,
    add_subtract_dt,
    get_date_time,
    gen_id,
    create_connection_string,
    read_sql,
    write_sql,
    table_exists,
    extract_element,
    calculate_line_gap,
    drop_table,
)

from dasly import loader
from dasly.simpledas import simpleDASreader
from dasly.loss import timestamp_dist, loss_fn

# Load the parameters
yaml_path = '../config_aastfjordbrua.yml'
with open(yaml_path, 'r') as file:
    params = yaml.safe_load(file)

____

In [11]:
def dasly_core(
    file_path: str,
    lowpass_filter_freq: float = 0.5,
    decimate_t_rate: int = 10,
    gaussian_smooth_s1: float = 0.5,
    gaussian_smooth_s2: float = 1,
    gaussian_smooth_std_s: float = 0.1,
    binary_threshold: float = 1e-7,
    hough_transform_speed_res: int = 0.5,
    hough_transform_length_meters: int = 500,
    dbscan_eps_seconds: int = 1
) -> pd.DataFrame:
    """Run the Dasly core algorithms.

    Args:
        file_path (str): The path to the HDF5 file that triggers the event.

    Returns:
        pd.DataFrame: The DataFrame containing the detected lines.
    """
    lines = np.empty((0, 4))  # create an empty array to store the lines

    # Load the data
    ###########################################################################
    # file_paths = get_file_paths_deploy(
    #     end_file=file_path,
    #     num_files=int(params['dasly']['batch'] / params['hdf5_file_length']),
    # )

    date, time = get_date_time(file_path)
    start = add_subtract_dt(
        f'{date} {time}',
        - (params['dasly']['batch'] + params['hdf5_file_length'])
    )
    file_paths_add, _, _ = simpleDASreader.find_DAS_files(
        experiment_path=params['input_dir'],
        start=start,
        duration=params['dasly']['batch'] + 2 * params['hdf5_file_length'],
        show_header_info=False
    )
    file_paths = extract_element(
        lst=file_paths_add,
        num=int(params['dasly']['batch'] / params['hdf5_file_length']),
        last_value=file_path
    )
    file_paths_short = file_paths[:1] + file_paths[2:]

    das = Dasly()
    das.load_data(
        file_paths=file_paths_short,
        integrate=params['integrate']
    )
    # das.heatmap()

    # forward Gaussian smoothing
    ###########################################################################
    das.lowpass_filter(cutoff=lowpass_filter_freq)
    # das.heatmap()
    das.decimate(t_rate=decimate_t_rate)
    # das.heatmap()
    das.gaussian_smooth(
        s1=gaussian_smooth_s1,
        s2=gaussian_smooth_s2,
        std_s=gaussian_smooth_std_s)
    # das.heatmap()
    das.sobel_filter()
    das.binary_transform(threshold=binary_threshold)
    das.hough_transform(
        target_speed=(gaussian_smooth_s1 + gaussian_smooth_s2) / 2,
        speed_res=hough_transform_speed_res,
        length_meters=hough_transform_length_meters)
    if len(das.lines) > 0:
        # store forward lines
        mask = das.lines_df['speed_kmh'].to_numpy() >= 0
        lines = np.concatenate((lines, das.lines[mask]), axis=0)

    # backward Gaussian smoothing
    ###########################################################################
    das.reset()
    das.lowpass_filter(cutoff=lowpass_filter_freq)
    das.decimate(t_rate=decimate_t_rate)
    das.gaussian_smooth(
        s1=-gaussian_smooth_s2,
        s2=-gaussian_smooth_s1,
        std_s=gaussian_smooth_std_s)
    # das.heatmap()
    das.sobel_filter()
    das.binary_transform(threshold=binary_threshold)
    das.hough_transform(
        target_speed=(gaussian_smooth_s1 + gaussian_smooth_s2) / 2,
        speed_res=hough_transform_speed_res,
        length_meters=hough_transform_length_meters)
    if len(das.lines) > 0:
        # store forward lines
        mask = das.lines_df['speed_kmh'].to_numpy() < 0
        lines = np.concatenate((lines, das.lines[mask]), axis=0)

    # dbscan forward and backward lines
    ###########################################################################
    if len(lines) == 0:  # if there are no lines, exit the function from here
        return
    das.lines = lines  # assign the new lines to the das object
    das.dbscan(eps_seconds=dbscan_eps_seconds)  # group close lines
    return das.lines_df

In [12]:
params_dict = {
    'file_path': '/media/kptruong/yellow02/Aastfjordbrua/Aastfjordbrua/20231005/dphi/095834.hdf5',
    'lowpass_filter_freq': 0.2,
    'decimate_t_rate': 4,
    'gaussian_smooth_s1': 76.1185289472238,
    'gaussian_smooth_s2': 81.39493860652041,
    'gaussian_smooth_std_s': 0.1814650918408482,
    'binary_threshold': 1.0743141474108062e-07,
    'hough_transform_speed_res': 0.5104629857953323,
    'hough_transform_length_meters': 635.5527884179041,
    'dbscan_eps_seconds': 1.798695128633439
}

In [13]:
lines_df = dasly_core(**params_dict)

5 files, from 095744 to 095834
Signal updated with low-pass filter.
Signal updated with new temporal sampling rate 4.
Signal updated with Gaussian smoothing.
Signal updated with Sobel filter.
Signal updated with binary transform with threshold 1.07e-07.
67 lines are detected.
Signal updated with low-pass filter.
Signal updated with new temporal sampling rate 4.
Signal updated with Gaussian smoothing.
Signal updated with Sobel filter.
Signal updated with binary transform with threshold 1.07e-07.
67 lines are detected.


In [14]:
lines_df

Unnamed: 0,speed_kmh,speed_ms,s,t,s1,t1,s2,t2,s1_edge,t1_edge,...,x1,y1,x2,y2,x1_edge,y1_edge,x2_edge,y2_edge,y1_edge_ext,y2_edge_ext
0,208.407273,57.890909,796.0,13.75,0.0,2023-10-05 09:58:05.250,796.0,2023-10-05 09:58:19.000,0.0,2023-10-05 09:58:05.250000000,...,0.0,85.0,796.0,140.0,0.0,85.0,799.0,140.207286,85.0,140.207286
1,598.628571,166.285714,582.0,3.5,95.0,2023-10-05 09:58:14.750,677.0,2023-10-05 09:58:18.250,0.0,2023-10-05 09:58:14.178694158,...,95.0,123.0,677.0,137.0,0.0,120.714777,799.0,139.934708,120.714777,139.934708
2,837.415385,232.615385,756.0,3.25,2.0,2023-10-05 09:58:26.500,758.0,2023-10-05 09:58:29.750,0.0,2023-10-05 09:58:26.491402116,...,2.0,170.0,758.0,183.0,0.0,169.965608,799.0,183.705026,169.965608,183.705026
3,85.283721,23.689922,764.0,32.25,2.0,2023-10-05 09:57:52.750,766.0,2023-10-05 09:58:25.000,0.0,2023-10-05 09:57:52.665575916,...,2.0,35.0,766.0,164.0,0.0,34.662304,799.0,169.57199,34.662304,169.57199
4,0.0,0.0,0.0,0.25,378.0,2023-10-05 09:58:32.750,378.0,2023-10-05 09:58:33.000,,NaT,...,378.0,195.0,378.0,196.0,,,,,,
5,249.709091,69.363636,763.0,11.0,1.0,2023-10-05 09:57:56.750,764.0,2023-10-05 09:58:07.750,0.0,2023-10-05 09:57:56.735583224,...,1.0,51.0,764.0,95.0,0.0,50.942333,799.0,97.018349,50.942333,97.018349
6,10828.8,3008.0,752.0,0.25,2.0,2023-10-05 09:57:55.500,754.0,2023-10-05 09:57:55.750,0.0,2023-10-05 09:57:55.499335106,...,2.0,46.0,754.0,47.0,0.0,45.99734,799.0,47.05984,45.99734,47.05984
7,-2732.4,-759.0,-759.0,1.0,760.0,2023-10-05 09:58:03.250,1.0,2023-10-05 09:58:04.250,799.0,2023-10-05 09:58:03.198616601,...,760.0,77.0,1.0,81.0,799.0,76.794466,0.0,81.00527,81.00527,76.794466


In [15]:
    # if the table does not exist yet (first time running dasly)
connection_string = create_connection_string(
    endpoint=params['database']['endpoint'],
    database=params['database']['database'],
    db_username=os.getenv('POSTGRESQL_USERNAME'),
    db_password=os.getenv('POSTGRESQL_PASSWORD'),
    type=params['database']['type'],
    dbapi=params['database']['dbapi'],
    port=params['database']['port']
)

In [17]:
date_str, time_str = get_date_time(params_dict['file_path'])
if table_exists(
    table_name=params['database']['table'],
    connection_string=connection_string
):
    previous_dt = add_subtract_dt(
        f'{date_str} {time_str}', - params['dasly']['batch_gap'])
    query = (
        f'SELECT * FROM {params["database"]["table"]}' +
        f" WHERE batch_id = '{previous_dt}';"
    )
    previous_lines_df = read_sql(query, connection_string)
else:
    previous_lines_df = pd.DataFrame()

In [18]:
lines_df

Unnamed: 0,speed_kmh,speed_ms,s,t,s1,t1,s2,t2,s1_edge,t1_edge,...,x1,y1,x2,y2,x1_edge,y1_edge,x2_edge,y2_edge,y1_edge_ext,y2_edge_ext
0,208.407273,57.890909,796.0,13.75,0.0,2023-10-05 09:58:05.250,796.0,2023-10-05 09:58:19.000,0.0,2023-10-05 09:58:05.250000000,...,0.0,85.0,796.0,140.0,0.0,85.0,799.0,140.207286,85.0,140.207286
1,598.628571,166.285714,582.0,3.5,95.0,2023-10-05 09:58:14.750,677.0,2023-10-05 09:58:18.250,0.0,2023-10-05 09:58:14.178694158,...,95.0,123.0,677.0,137.0,0.0,120.714777,799.0,139.934708,120.714777,139.934708
2,837.415385,232.615385,756.0,3.25,2.0,2023-10-05 09:58:26.500,758.0,2023-10-05 09:58:29.750,0.0,2023-10-05 09:58:26.491402116,...,2.0,170.0,758.0,183.0,0.0,169.965608,799.0,183.705026,169.965608,183.705026
3,85.283721,23.689922,764.0,32.25,2.0,2023-10-05 09:57:52.750,766.0,2023-10-05 09:58:25.000,0.0,2023-10-05 09:57:52.665575916,...,2.0,35.0,766.0,164.0,0.0,34.662304,799.0,169.57199,34.662304,169.57199
4,0.0,0.0,0.0,0.25,378.0,2023-10-05 09:58:32.750,378.0,2023-10-05 09:58:33.000,,NaT,...,378.0,195.0,378.0,196.0,,,,,,
5,249.709091,69.363636,763.0,11.0,1.0,2023-10-05 09:57:56.750,764.0,2023-10-05 09:58:07.750,0.0,2023-10-05 09:57:56.735583224,...,1.0,51.0,764.0,95.0,0.0,50.942333,799.0,97.018349,50.942333,97.018349
6,10828.8,3008.0,752.0,0.25,2.0,2023-10-05 09:57:55.500,754.0,2023-10-05 09:57:55.750,0.0,2023-10-05 09:57:55.499335106,...,2.0,46.0,754.0,47.0,0.0,45.99734,799.0,47.05984,45.99734,47.05984
7,-2732.4,-759.0,-759.0,1.0,760.0,2023-10-05 09:58:03.250,1.0,2023-10-05 09:58:04.250,799.0,2023-10-05 09:58:03.198616601,...,760.0,77.0,1.0,81.0,799.0,76.794466,0.0,81.00527,81.00527,76.794466


In [19]:
# Assign IDs to the lines
if len(previous_lines_df) > 0:
    print('Assigning IDs to the lines...')
    # Match the IDs between the previous and current lines
    lines_gaps = assign_id_df(lines_df, previous_lines_df)
else:
    print('Generating IDs for the lines...')
    # Assign new IDs to the lines
    lines_df.insert(0, 'id', gen_id(len(lines_df)))
    lines_df.insert(0, 'line_id', lines_df['id'])

Assigning IDs to the lines...
Calculating line gap...
Done!


In [20]:
lines_gaps

(                                line_id                                    id  \
 0  821bcb03-2f93-44b2-9c8f-0b0db6f9ca6a  821bcb03-2f93-44b2-9c8f-0b0db6f9ca6a   
 1  f3c61b80-a653-4c43-8489-b19b864c7122  f3c61b80-a653-4c43-8489-b19b864c7122   
 2  f7ca3449-8d62-45ed-a457-72b655cbc139  f7ca3449-8d62-45ed-a457-72b655cbc139   
 3  c71f30cb-d52a-4f2f-9560-31fee64805f1  c71f30cb-d52a-4f2f-9560-31fee64805f1   
 4  e391bd06-8409-4f20-a746-5d202c56a004  e391bd06-8409-4f20-a746-5d202c56a004   
 5  58424703-e62b-447f-89e3-b49819a88936  10477c15-534d-46b7-b696-2b4be53e45bb   
 6  8e9fc6c6-62ec-44eb-bb7f-9fddde3088d4  90ff9826-809f-45ae-a78d-98b6070ab32c   
 7  820deeec-4715-46e8-b82b-0e887cb08650  820deeec-4715-46e8-b82b-0e887cb08650   
 
       speed_kmh     speed_ms      s      t     s1                      t1  \
 0    208.407273    57.890909  796.0  13.75    0.0 2023-10-05 09:58:05.250   
 1    598.628571   166.285714  582.0   3.50   95.0 2023-10-05 09:58:14.750   
 2    837.415385   232.615

In [21]:
lines_df[['speed_kmh', 'speed_ms', 's1', 't1', 's2', 't2', 'x1', 'y1', 'x2', 'y2']]

Unnamed: 0,speed_kmh,speed_ms,s1,t1,s2,t2,x1,y1,x2,y2
0,208.407273,57.890909,0.0,2023-10-05 09:58:05.250,796.0,2023-10-05 09:58:19.000,0.0,85.0,796.0,140.0
1,598.628571,166.285714,95.0,2023-10-05 09:58:14.750,677.0,2023-10-05 09:58:18.250,95.0,123.0,677.0,137.0
2,837.415385,232.615385,2.0,2023-10-05 09:58:26.500,758.0,2023-10-05 09:58:29.750,2.0,170.0,758.0,183.0
3,85.283721,23.689922,2.0,2023-10-05 09:57:52.750,766.0,2023-10-05 09:58:25.000,2.0,35.0,766.0,164.0
4,0.0,0.0,378.0,2023-10-05 09:58:32.750,378.0,2023-10-05 09:58:33.000,378.0,195.0,378.0,196.0
5,249.709091,69.363636,1.0,2023-10-05 09:57:56.750,764.0,2023-10-05 09:58:07.750,1.0,51.0,764.0,95.0
6,10828.8,3008.0,2.0,2023-10-05 09:57:55.500,754.0,2023-10-05 09:57:55.750,2.0,46.0,754.0,47.0
7,-2732.4,-759.0,760.0,2023-10-05 09:58:03.250,1.0,2023-10-05 09:58:04.250,760.0,77.0,1.0,81.0


In [22]:
previous_lines_df

Unnamed: 0,created_at,batch_id,line_id,id,speed_kmh,speed_ms,s,t,s1,t1,...,x1,y1,x2,y2,x1_edge,y1_edge,x2_edge,y2_edge,y1_edge_ext,y2_edge_ext
0,2024-09-02 22:23:33.803722+00:00,20231005 095814,e1ad0bd2-42e2-424a-80d3-736a346c28d7,e1ad0bd2-42e2-424a-80d3-736a346c28d7,-462.857143,-128.571429,-225.0,1.75,493.0,2023-10-05 09:58:06.750,...,493.0,131.0,268.0,138.0,799.0,121.48,0.0,146.337778,146.337778,121.48
1,2024-09-02 22:23:33.803722+00:00,20231005 095814,58424703-e62b-447f-89e3-b49819a88936,58424703-e62b-447f-89e3-b49819a88936,204.588679,56.830189,753.0,13.25,0.0,2023-10-05 09:57:57.000,...,0.0,92.0,753.0,145.0,0.0,92.0,799.0,148.237716,92.0,148.237716
2,2024-09-02 22:23:33.803722+00:00,20231005 095814,8e9fc6c6-62ec-44eb-bb7f-9fddde3088d4,8e9fc6c6-62ec-44eb-bb7f-9fddde3088d4,-306.0,-85.0,-85.0,1.0,420.0,2023-10-05 09:57:57.250,...,420.0,93.0,335.0,97.0,799.0,75.164706,0.0,112.764706,112.764706,75.164706


In [23]:
from dasly.loss import timestamp_dist, loss_fn

In [29]:
loss_fn(
    y_pred=lines_df['t2'].to_numpy(),
    y_true=previous_lines_df['t2'].to_numpy(),
    a=0.5,
    b=0.5,
    dist=timestamp_dist
)

AttributeError: 'numpy.timedelta64' object has no attribute 'total_seconds'

In [27]:
lines_df['t2'].to_numpy()

array(['2023-10-05T09:58:19.000000000', '2023-10-05T09:58:18.250000000',
       '2023-10-05T09:58:29.750000000', '2023-10-05T09:58:25.000000000',
       '2023-10-05T09:58:33.000000000', '2023-10-05T09:58:07.750000000',
       '2023-10-05T09:57:55.750000000', '2023-10-05T09:58:04.250000000'],
      dtype='datetime64[ns]')

In [28]:
previous_lines_df['t2'].to_numpy()

array(['2023-10-05T09:58:08.500000000', '2023-10-05T09:58:10.250000000',
       '2023-10-05T09:57:58.250000000'], dtype='datetime64[ns]')

In [38]:
abs(lines_df['t2'].to_numpy()[0] - previous_lines_df['t2'].to_numpy()[0])#.astype('timedelta64[s]')#.astype(float)

numpy.timedelta64(10500000000,'ns')

In [39]:
threshold = 3
dist_mat = np.nan_to_num(lines_gaps, nan=np.inf)

# Find the indices of the minimum distances
min_indices = np.nanargmin(dist_mat, axis=1)

# Find the minimum distances for each point
min_distances = np.nanmin(dist_mat, axis=1)

# Create a mask for distances below the threshold
mask = min_distances <= threshold

# Assign the previous line IDs to the new lines, where the distance is
# below the threshold, otherwise assign a new unique ID
new_lines_id = np.where(
    mask, previous_lines_df['line_id'][min_indices], gen_id(len(lines_df)))

In [40]:
dist_mat

array([[ 4.19770706,  8.1286118 , 14.02088571],
       [ 8.84192431, 12.70783544, 18.69888341],
       [20.50214946, 24.47165668, 30.36425292],
       [ 5.62385452,  9.24321795, 10.85062206],
       [        inf,         inf,         inf],
       [ 5.40383339,  1.46309006,  4.42791665],
       [11.99916887,  8.01776218,  2.12516582],
       [ 3.87500036,  7.11159039,  6.0039525 ]])

In [41]:
min_indices

array([0, 0, 0, 0, 0, 1, 2, 0])

In [42]:
min_distances

array([ 4.19770706,  8.84192431, 20.50214946,  5.62385452,         inf,
        1.46309006,  2.12516582,  3.87500036])

In [43]:
mask

array([False, False, False, False, False,  True,  True, False])

In [49]:
import optuna
from optuna.samplers import TPESampler
from sqlalchemy import create_engine, text
from sqlalchemy.pool import NullPool

from dasly.simpledas import simpleDASreader
from dasly.loss import timestamp_dist, loss_fn

In [57]:
drop_query = f'DROP TABLE test_table1;'

# Create a SQLAlchemy engine
engine = create_engine(connection_string, poolclass=NullPool)

# Define the raw SQL command
sql_command = text(drop_query)
print(sql_command)

# Execute the command
with engine.connect() as connection:
    connection.execute(sql_command)


DROP TABLE test_table1;


In [56]:
from sqlalchemy import create_engine, text
from sqlalchemy.pool import NullPool

# Define your connection string
# connection_string = "your_connection_string_here"

# Create a SQLAlchemy engine
engine = create_engine(connection_string, poolclass=NullPool)

# Define the raw SQL command
drop_query = "DROP TABLE IF EXISTS test_table;"

# Execute the command
with engine.connect() as connection:
    try:
        connection.execute(text(drop_query))
        print("Table dropped successfully.")
    except Exception as e:
        print(f"Failed to drop table: {e}")

Table dropped successfully.


In [None]:
f

In [17]:
lines_df.loc[4, ['x1', 'y1', 'x2', 'y2']].values

array([378.0, 195.0, 378.0, 196.0], dtype=object)