In [None]:
import numpy as np
import pandas as pd
import geopandas as gpd
from shapely import Point, LineString
import matplotlib.pyplot as plt
import sqlalchemy
from zoneinfo import ZoneInfo

## settings

In [None]:
local_crs = 3006

month = 12 # 1-12
city_abbr = 'sthlm' # gbg or sthlm

write_to_table = True
if_exists = 'append' # append or replace

## read data

In [None]:
# read from database
url = sqlalchemy.URL.create(
    "postgresql+psycopg", port=5432,
    host="host", database="database", username="username")
engine = sqlalchemy.create_engine(url)

In [None]:
pointtypes_table_name = '{}_point_types_2024'.format(city_abbr)

In [None]:
until_month = 1 if month == 12 else month + 1
until_year = 2025 if month == 12 else 2024

sql_query = """
    SELECT device_uid, timestamp_se, x, y, type
    FROM extracts.{0}
    WHERE
        type NOT IN ('stay', 'isolated', 'singular', 'error')
        AND (timestamp_se::date >= '{1}-{2:01d}-01' AND timestamp_se::date < '{3}-{4:02d}-01');
""".format(pointtypes_table_name, 2024, month, until_year, until_month)

with engine.connect() as conn_semobiledata:
    with conn_semobiledata.execute(sqlalchemy.text(sql_query)) as cursor:
        traj_points = pd.read_sql(sql_query, con=conn_semobiledata, parse_dates=['timestamp_se'])

In [None]:
traj_points['timestamp_se'] = traj_points['timestamp_se'].dt.tz_convert(ZoneInfo("Europe/Stockholm"))
traj_points.sort_values(['device_uid', 'timestamp_se'], ascending=True, inplace=True)

In [None]:
len(traj_points)

## combine points into trajectories

In [None]:
# set thresholds in time, distance, and speed
# based on which points are categorized and trajectories are cleaned

# also used when defining the point types
too_long = 120 # seconds
too_close = 100 # meters
too_fast = 250 # kilometers per hour
too_slow = 3 # kilometers per hour, unused for now

# additional thresholds
too_far = 1000 # meters

In [None]:
# mark starting points of trajectories
traj_points['trajectory_start'] = False
traj_points.loc[
    traj_points.type.isin(['start', 'slow start']),
    'trajectory_start'] = True

In [None]:
# assign common id to a trajectory, i.e., a set of points starting with a (slow) start
# Create a new column for traj_id where there is a trajectory start, and forward fill for the same device_id
traj_points['traj_id'] = (traj_points['trajectory_start']).cumsum()
traj_points['traj_id'] = traj_points.groupby('device_uid')['traj_id'].ffill()
traj_points['traj_id'] = traj_points['traj_id'].fillna(0).astype(int)

#### iteratively recalculate values after filtering

In [None]:
def recalculate_in_out_stats(traj_points, to_group_by):
    # recalculate stats to previous and next point
    # for example after removing points (errors/jumps/duplicates)

    traj_points['dx_in'] = abs(traj_points.groupby(to_group_by)['x'].diff()) # meters
    traj_points['dy_in'] = abs(traj_points.groupby(to_group_by)['y'].diff()) # meters
    traj_points['dist_in'] = abs(np.sqrt(traj_points['dy_in']**2 + traj_points['dx_in']**2)) # meters
    traj_points['dt_in'] = abs(traj_points.groupby(to_group_by)['timestamp_se'].diff().dt.total_seconds()) # seconds
    traj_points['speed_in'] = (traj_points['dist_in']/1000) / (traj_points['dt_in']/3600) # kilometers per hour

    traj_points['dx_out'] = abs(traj_points.groupby(to_group_by)['x'].diff(periods=-1))
    traj_points['dy_out'] = abs(traj_points.groupby(to_group_by)['y'].diff(periods=-1))
    traj_points['dist_out'] = abs(np.sqrt(traj_points['dy_out']**2 + traj_points['dx_out']**2))
    traj_points['dt_out'] = abs(traj_points.groupby(to_group_by)['timestamp_se'].diff(periods=-1).dt.total_seconds())
    traj_points['speed_out'] = (traj_points['dist_out']/1000) / (traj_points['dt_out']/3600)

    return traj_points

In [None]:
def recalculate_traj_ids(traj_points, to_group_by):
    # reassign a common id to each trajectory,
    # taking into account, for example, continuation after short stops
    # and fresh starts after jumping

    traj_points['traj_id'] = (traj_points['trajectory_start']).cumsum()
    traj_points['traj_id'] = traj_points.groupby(to_group_by)['traj_id'].ffill()
    traj_points['traj_id'] = traj_points['traj_id'].fillna(0).astype(int)

    return traj_points

#### link trajectories with only brief pause between

In [None]:
# in case a trajectory starts within short distance and time from the previous one
# we merge those trajectories into one
condition_pause = (
    traj_points.trajectory_start
    & (traj_points.device_uid == traj_points.shift(1).device_uid)
    & (traj_points.traj_id != traj_points.traj_id.shift(1))
    & (np.sqrt( abs(traj_points.x - traj_points.x.shift(1))**2 + abs(traj_points.y - traj_points.y.shift(1))**2 ) < too_close)
    & ((traj_points.timestamp_se - traj_points.timestamp_se.shift(1)).dt.total_seconds() < too_long))

traj_points.loc[condition_pause.shift(-1, fill_value=False), 'type'] = 'short stop'
traj_points.loc[condition_pause, ['type', 'trajectory_start']] = ['continuation', False]

In [None]:
traj_points = recalculate_traj_ids(traj_points, to_group_by='device_uid')

#### cleaning trajectories

In [None]:
# sometimes there are big jumps in a trajectory
# they jump a far distance, very fast, or take a long time
# we cut the trajectory into parts where such jumps occur
# and re-identify relating stop and start points

In [None]:
# 1. the trajectory jumps after starting
#       -> remove the jumpstart and make the next point a starting point

def condition_jumpstarts(traj_points):
    return (
        (
            # startpoint
            (traj_points.traj_id != traj_points.traj_id.shift(1)) & (traj_points.traj_id == traj_points.traj_id.shift(-1))
        )
        & (
            # jump far
            (np.sqrt( abs(traj_points.x - traj_points.x.shift(-1))**2 + abs(traj_points.y - traj_points.y.shift(-1))**2 ) > too_far)
            # or fast
            | (np.sqrt( abs((traj_points.x - traj_points.x.shift(-1))**2 + abs(traj_points.y - traj_points.y.shift(-1))**2 )) / (abs((traj_points.timestamp_se - traj_points.timestamp_se.shift(-1))).dt.total_seconds()) > too_fast/3.6)
            # or long
            | (abs((traj_points.timestamp_se - traj_points.timestamp_se.shift(-1))).dt.total_seconds() > too_long )
        )
    )

def remove_jumpstarts(traj_points):
    traj_points.loc[condition_jumpstarts(traj_points).shift(1, fill_value=False), ['type', 'trajectory_start']] = ['start after jump', True]
    return traj_points[~condition_jumpstarts(traj_points)].copy()

In [None]:
# 2. the trajectory jumps back to a point smaller than too_far from the previous
#       -> remove jumping point, but keep trajectory as one
#       -> (note that if it jumps back to the EXACT same point, we will solve that later with removing subsequent duplicates)

def condition_jumpbacks(traj_points):
    return (
        (
            # midpoint
            (traj_points.traj_id == traj_points.traj_id.shift(1)) & (traj_points.traj_id == traj_points.traj_id.shift(-1))
        )
        & (
            (
                # jump far and then back (to a point nearby within reasonable time)
                (((np.sqrt(abs(traj_points.x - traj_points.x.shift(1))**2 + abs(traj_points.y - traj_points.y.shift(1))**2)) > too_far)
                & (np.sqrt( abs(traj_points.x.shift(-1) - traj_points.x.shift(1))**2 + abs(traj_points.y.shift(-1) - traj_points.y.shift(1))**2 ) <= too_close)
                & ((traj_points.timestamp_se.shift(-1) - traj_points.timestamp_se.shift(1)).dt.total_seconds() < too_long))
            )
            | (
                # or jump fast and then back (to a point nearby within reasonable time)
                ( np.sqrt(abs((traj_points.x - traj_points.x.shift(1))**2 + abs(traj_points.y - traj_points.y.shift(1))**2 )) / (abs((traj_points.timestamp_se - traj_points.timestamp_se.shift(1))).dt.total_seconds()) > too_fast/3.6)
                & (np.sqrt( abs(traj_points.x.shift(-1) - traj_points.x.shift(1))**2 + abs(traj_points.y.shift(-1) - traj_points.y.shift(1))**2 ) <= too_close)
                & ((traj_points.timestamp_se.shift(-1) - traj_points.timestamp_se.shift(1)).dt.total_seconds() < too_long)
            )
        )
    )

def remove_jumpbacks(traj_points):
    return traj_points.loc[~condition_jumpbacks(traj_points)].copy()

In [None]:
# 3. the trajectory jumps back to a point further than too_far from the previous
#       -> keep both points, but split the traj into two by defining a fresh trajectory-start after the jump

def condition_jumpaways(traj_points):
    return (
        (
            # midpoint
            (traj_points.traj_id == traj_points.traj_id.shift(1)) & (traj_points.traj_id == traj_points.traj_id.shift(-1))
        )
        & (
            (
                # jump far and away
                ((np.sqrt(abs(traj_points.x - traj_points.x.shift(1))**2 + abs(traj_points.y - traj_points.y.shift(1))**2)) > too_far)
                & (
                    (np.sqrt(abs(traj_points.x.shift(-1) - traj_points.x.shift(1))**2 + abs(traj_points.y.shift(-1) - traj_points.y.shift(1))**2) > too_close)
                    | ((traj_points.timestamp_se.shift(-1) - traj_points.timestamp_se.shift(1)).dt.total_seconds() >= too_long))
            )
            | (
                # or jump fast and away
                ( np.sqrt(abs((traj_points.x - traj_points.x.shift(1))**2 + abs(traj_points.y - traj_points.y.shift(1))**2 )) / (abs((traj_points.timestamp_se - traj_points.timestamp_se.shift(1))).dt.total_seconds()) > too_fast/3.6)
                & (
                    (np.sqrt(abs(traj_points.x.shift(-1) - traj_points.x.shift(1))**2 + abs(traj_points.y.shift(-1) - traj_points.y.shift(1))**2) > too_close)
                    | ((traj_points.timestamp_se.shift(-1) - traj_points.timestamp_se.shift(1)).dt.total_seconds() >= too_long))
            )
            # or jump long
            | (abs((traj_points.timestamp_se - traj_points.timestamp_se.shift(1))).dt.total_seconds() > too_long )
        )
    )

def remove_jumpaways(traj_points):
    traj_points.loc[condition_jumpaways(traj_points), ['type', 'trajectory_start']] = ['start after jump', True]
    return recalculate_traj_ids(traj_points, 'device_uid')

In [None]:
# 4. the trajectory jumps to a stop
#       -> remove the jumpstop and make the previous point a stop before jumping

def condition_jumpstops(traj_points):
    return (
        (
            # endpoint
            (traj_points.traj_id != traj_points.traj_id.shift(-1)) & (traj_points.traj_id == traj_points.traj_id.shift(1))
        )
        & (
            # jump far
            (np.sqrt( abs(traj_points.x - traj_points.x.shift(1))**2 + abs(traj_points.y - traj_points.y.shift(1))**2 ) > too_far)
            # or fast
            | ( np.sqrt(abs((traj_points.x - traj_points.x.shift(1))**2 + abs(traj_points.y - traj_points.y.shift(1))**2 )) / (abs((traj_points.timestamp_se - traj_points.timestamp_se.shift(1))).dt.total_seconds()) > too_fast/3.6)
            # or long
            | (abs((traj_points.timestamp_se - traj_points.timestamp_se.shift(1))).dt.total_seconds() > too_long )
        )
    )

def remove_jumpstops(traj_points):
    traj_points.loc[condition_jumpstops(traj_points).shift(-1, fill_value=False), 'type'] = 'stop before jump'
    return traj_points[~condition_jumpstops(traj_points)].copy()

In [None]:
# sometimes two sequential points are identical
# for example when we removed an error-point in between
# we only want to keep the first point of these duplicates

def condition_duplicates(traj_points):
    return (
        (traj_points.x.shift() == traj_points.x)
        & (traj_points.y.shift() == traj_points.y)
        & (traj_points['traj_id'].shift() == traj_points['traj_id']))

def remove_duplicates(traj_points):
    return traj_points.loc[~condition_duplicates(traj_points)].copy()

In [None]:
def clean_trajectories(traj_points):
    print('iteratively cleaning trajectories')
    n = 1
    while True:
        print(n)
        n = n + 1

        traj_points = remove_jumpaways(traj_points)
        traj_points = remove_jumpbacks(traj_points)
        traj_points = remove_jumpstarts(traj_points)
        traj_points = remove_jumpstops(traj_points)
        traj_points = remove_duplicates(traj_points)

        if (
                (len(traj_points[condition_jumpstarts(traj_points)]) == 0)
                and (len(traj_points[condition_jumpbacks(traj_points)]) == 0)
                and (len(traj_points[condition_jumpaways(traj_points)]) == 0)
                and (len(traj_points[condition_jumpstops(traj_points)]) == 0)
                and (len(traj_points[condition_duplicates(traj_points)]) == 0)
                ):
            break

    return traj_points

In [None]:
len(traj_points)

In [None]:
traj_points = clean_trajectories(traj_points)
traj_points = recalculate_in_out_stats(traj_points, to_group_by='traj_id')

#### remove one-point trajectories

In [None]:
# due to cleaning out duplicates and jumps
# only one point may remain in some 'trajectories', that are thus no longer trajectories
traj_length = traj_points['traj_id'].value_counts()
traj_points = traj_points[traj_points['traj_id'].isin(traj_length[traj_length>1].index)]

In [None]:
len(traj_points)

## create trajectory linestrings

#### turn points into linestrings

In [None]:
traj_lines = traj_points.groupby(['traj_id'])[['device_uid', 'traj_id', 'x', 'y', 'timestamp_se', 'dist_in', 'dist_out', 'speed_in', 'speed_out']].apply(
    lambda point: pd.Series({
        'geometry': LineString([Point(xy) for xy in zip(point.x, point.y)]),
        'n_points': len(point),
        'total_dt': (point.timestamp_se.max()-point.timestamp_se.min()).total_seconds(),
        'start_timestamp_se': point.timestamp_se.min(),
        'end_timestamp_se': point.timestamp_se.max(),
        'max_speed': point.speed_in.max(),
        'min_speed': point.speed_in.min(),
        'stdev_speed': point.speed_in.std()
    })
)

traj_lines = gpd.GeoDataFrame(traj_lines, geometry='geometry', crs=local_crs)

traj_lines['total_dist'] = traj_lines.geometry.length
traj_lines['avg_speed'] = traj_lines.total_dist / traj_lines.total_dt
traj_lines['avg_dist'] = traj_lines.total_dist / (traj_lines.n_points-1)

In [None]:
traj_lines['traj_id'] = traj_lines.index

In [None]:
len(traj_lines)

#### select and plot fine-grained trajectories

In [None]:
min_n = 2
max_dist = 1000
potential250 = traj_lines[(traj_lines.n_points>=min_n) & (traj_lines.avg_dist<max_dist)]
print('trajectories with potential:\n{} trajectories have {}+ points and the average interval between them is less than {} meters'.format(len(potential250), min_n, max_dist))

In [None]:
fig, axs = plt.subplots(nrows=1, ncols=1, figsize=(12,12))

potential250.plot(ax=axs, color='red', figsize=(20,20), alpha=0.01)

plt.axis('off')

plt.show()

## write trajectories to table

In [None]:
cols_general = ['total_dt', 'total_dist', 'avg_dist', 'min_speed', 'max_speed', 'avg_speed', 'stdev_speed', 'geometry']
cols_lines = ['traj_id', 'start_timestamp_se', 'end_timestamp_se', 'n_points'] + cols_general
cols_points = ['traj_id', 'timestamp_se', 'type', 'n_points'] + cols_general

In [None]:
# prepare points to write to table with some line attributes added
output_points = pd.merge(
    traj_points,
    traj_lines.reset_index(drop=True),
    how='right', on='traj_id')

output_points = gpd.GeoDataFrame(
    output_points, crs=local_crs,
    geometry=gpd.points_from_xy(output_points['x'], output_points['y'])).drop(columns=['x', 'y'])

In [None]:
# write to database
if write_to_table:

    url_flowsense = sqlalchemy.URL.create(
        "postgresql+psycopg", port=5432,
        host="host", database="database", username="username")
    engine_flowsense = sqlalchemy.create_engine(url_flowsense)

    traj_lines.reset_index(drop=True)[cols_lines].to_postgis(
        name='{}_trajectory_lines_2024'.format(city_abbr),
        con=engine_flowsense,
        schema='trajectories',
        if_exists=if_exists,
        index=False)

    output_points[cols_points].to_postgis(
        name='{}_trajectory_points_2024'.format(city_abbr),
        con=engine_flowsense,
        schema='trajectories',
        if_exists=if_exists,
        index=False)