In [1]:
import geopandas as gpd
import pandas as pd
import os
import json
from pathlib import Path
import datetime
from shapely import wkt
from tqdm import tqdm

from joblib import Parallel, delayed
import multiprocessing

import trackintel as ti
from trackintel.preprocessing.triplegs import generate_trips
from trackintel.analysis.tracking_quality import temporal_tracking_quality, _split_overlaps

In [2]:
# read file storage
Dataset_file = os.path.join(".", "paths.json")
with open(Dataset_file) as json_file:
    CONFIG = json.load(json_file)

In [3]:
sp = pd.read_csv(os.path.join(CONFIG[f"raw_mobis"], "sps.csv"))

In [4]:
sp["started_at"] = pd.to_datetime(sp["started_at"], format='mixed', yearfirst=True, utc=True)
sp["finished_at"] = pd.to_datetime(sp["finished_at"], format='mixed', yearfirst=True, utc=True)

In [5]:
tpls = pd.read_csv(os.path.join(CONFIG["raw_mobis"], "legs.csv"), usecols=[0, 1, 6])

In [6]:
tpls["started_at"] = pd.to_datetime(tpls["started_at"], format='mixed', yearfirst=True, utc=True)
tpls["finished_at"] = pd.to_datetime(tpls["finished_at"], format='mixed', yearfirst=True, utc=True)

In [7]:
tpls["user_id"].unique().shape, sp["user_id"].unique().shape

((5152,), (5130,))

In [8]:
# negative duration records have already been dropped

sp["duration"] = (sp["finished_at"] - sp["started_at"]).dt.total_seconds()
tpls["duration"] = (tpls["finished_at"] - tpls["started_at"]).dt.total_seconds()

In [9]:
sp = sp.sort_values(by="started_at").reset_index(drop=True)
tpls = tpls.sort_values(by="started_at").reset_index(drop=True)

sp.index.name = "id"
tpls.index.name = "id"

# Filter duplicates

In [10]:
def _alter_diff(df):
    df.sort_values(by="started_at", inplace=True)
    df["diff"] = pd.NA
    df["st_next"] = pd.NA

    diff = df["started_at"].iloc[1:].reset_index(drop=True) - df["finished_at"].iloc[:-1].reset_index(drop=True)
    df["diff"].iloc[:-1] = diff.dt.total_seconds()
    df["st_next"].iloc[:-1] = df["started_at"].iloc[1:].reset_index(drop=True)

    df.loc[df["diff"] < 0, "finished_at"] = df.loc[df["diff"] < 0, "st_next"]

    df["started_at"], df["finished_at"] = pd.to_datetime(df["started_at"]), pd.to_datetime(df["finished_at"])
    df["duration"] = (df["finished_at"] - df["started_at"]).dt.total_seconds()

    # print(df.loc[df["diff"] < 0])
    df.drop(columns=["diff", "st_next"], inplace=True)
    df.drop(index=df[df["duration"] <= 0].index, inplace=True)

    return df

def filter_duplicates(sp, tpls):

    # merge trips and staypoints
    sp["type"] = "sp"
    tpls["type"] = "tpl"
    df_all = pd.merge(sp, tpls, how="outer")

    df_all = df_all.groupby("user_id", as_index=False).apply(_alter_diff)
    sp = df_all.loc[df_all["type"] == "sp"].drop(columns=["type"])
    tpls = df_all.loc[df_all["type"] == "tpl"].drop(columns=["type"])

    sp = sp[["id", "user_id", "started_at", "finished_at", "geometry", "duration", "purpose", "detected_purpose", "overseas"]]
    tpls = tpls[["id", "user_id", "started_at", "finished_at","duration"]]

    return sp.set_index("id"), tpls.set_index("id")

sp, tpls = filter_duplicates(sp.reset_index(), tpls.reset_index())

In [11]:
quality_path = os.path.join("data", "quality")
quality_file = os.path.join(quality_path, "mobis_filtered.csv")
if Path(quality_file).is_file():
    valid_users = pd.read_csv(quality_file)["user_id"].values
else:
    if not os.path.exists(quality_path):
        os.makedirs(quality_path)

In [12]:
sp["is_activity"] = True
sp.loc[sp["purpose"] == "wait", "is_activity"] = False

In [13]:
tqdm.pandas(desc="Load geometry")
sp["geometry"] = sp["geometry"].progress_apply(wkt.loads)
sp = gpd.GeoDataFrame(sp, crs="EPSG:4326", geometry="geometry")

Load geometry: 100%|██████████████████████████████████████████████████████| 4804194/4804194 [00:55<00:00, 86942.31it/s]


In [14]:
# the trackintel trip generation
sp, tpls, trips = generate_trips(sp, tpls, add_geometry=False)

# Generate user filter

In [50]:

def _filter_user(df, min_thres, mean_thres):
    consider = df.loc[df["quality"] != 0]
    if (consider["quality"].min() > min_thres) and (consider["quality"].mean() > mean_thres):
        return df


def _get_tracking_quality(df, window_size):

    weeks = (df["finished_at"].max() - df["started_at"].min()).days // 7
    start_date = df["started_at"].min().date()

    quality_list = []
    # construct the sliding week gdf
    for i in range(0, weeks - window_size + 1):
        curr_start = datetime.datetime.combine(start_date + datetime.timedelta(weeks=i), datetime.time())
        curr_end = datetime.datetime.combine(curr_start + datetime.timedelta(weeks=window_size), datetime.time())

        # the total df for this time window
        cAll_gdf = df.loc[(df["started_at"] >= curr_start) & (df["finished_at"] < curr_end)]
        if cAll_gdf.shape[0] == 0:
            continue
        total_sec = (curr_end - curr_start).total_seconds()

        quality_list.append([i, cAll_gdf["duration"].sum() / total_sec])
    ret = pd.DataFrame(quality_list, columns=["timestep", "quality"])
    ret["user_id"] = df["user_id"].unique()[0]
    return ret

def calculate_user_quality(sp, trips, file_path, quality_filter):

    trips["started_at"] = pd.to_datetime(trips["started_at"]).dt.tz_localize(None)
    trips["finished_at"] = pd.to_datetime(trips["finished_at"]).dt.tz_localize(None)
    sp["started_at"] = pd.to_datetime(sp["started_at"]).dt.tz_localize(None)
    sp["finished_at"] = pd.to_datetime(sp["finished_at"]).dt.tz_localize(None)

    # merge trips and staypoints
    print("starting merge", sp.shape, trips.shape)
    sp["type"] = "sp"
    trips["type"] = "tpl"
    all_df = pd.concat([sp, trips])
    print("finished merge", all_df.shape)
    print("*" * 50)
    all_df = _split_overlaps(all_df, granularity="day")
    all_df["duration"] = (all_df["finished_at"] - all_df["started_at"]).dt.total_seconds()

    print(len(all_df["user_id"].unique()))

    # get quality
    total_quality = temporal_tracking_quality(all_df, granularity="all")
    # get tracking days
    total_quality["days"] = (
        all_df.groupby("user_id").apply(lambda x: (x["finished_at"].max() - x["started_at"].min()).days).values
    )
    # filter based on days
    user_filter_day = (
        total_quality.loc[(total_quality["days"] > quality_filter["day_filter"])]
        .reset_index(drop=True)["user_id"]
        .unique()
    )
    # filter based on sliding quality
    sliding_quality = (
        all_df.groupby("user_id")
        .apply(_get_tracking_quality, window_size=quality_filter["window_size"])
        .reset_index(drop=True)
    )

    filter_after_day = sliding_quality.loc[sliding_quality["user_id"].isin(user_filter_day)]

    if "min_thres" in quality_filter:
        # filter based on quanlity
        filter_after_day = (
            filter_after_day.groupby("user_id")
            .apply(_filter_user, min_thres=quality_filter["min_thres"], mean_thres=quality_filter["mean_thres"])
            .reset_index(drop=True)
            .dropna()
        )

    filter_after_user_quality = filter_after_day.groupby("user_id", as_index=False)["quality"].mean()

    print("final selected user", filter_after_user_quality.shape[0])
    filter_after_user_quality.to_csv(file_path, index=False)
    return filter_after_user_quality["user_id"].values

quality_filter = {"day_filter": 50, "window_size": 5, "min_thres": 0.5, "mean_thres": 0.6}
valid_users = calculate_user_quality(sp.copy(), trips.copy(), quality_file, quality_filter)

starting merge (4804194, 12) (4363607, 5)
finished merge (9167801, 15)
**************************************************
<class 'geopandas.geodataframe.GeoDataFrame'>
5168
final selected user 2095


# Filter valid users

In [15]:
sp = sp.loc[sp["user_id"].isin(valid_users)]

In [16]:
sp["is_activity"].value_counts()

is_activity
True     1483849
False      73295
Name: count, dtype: int64

In [17]:
def _filter_within_swiss(stps, swissBound):
    """Spatial filtering of staypoints."""
    # save a copy of the original projection
    init_crs = stps.crs
    # project to projected system
    stps = stps.to_crs(swissBound.crs)

    ## parallel for speeding up
    stps["within"] = _apply_parallel(stps["geometry"], _apply_extract, swissBound)
    sp_swiss = stps[stps["within"] == True].copy()
    sp_swiss.drop(columns=["within"], inplace=True)

    return sp_swiss.to_crs(init_crs)
    
def _apply_extract(df, swissBound):
    """The func for _apply_parallel: judge whether inside a shp."""
    tqdm.pandas(desc="pandas bar")
    shp = swissBound["geometry"].to_numpy()[0]
    return df.progress_apply(lambda x: shp.contains(x))


def _apply_parallel(df, func, other, n=-1):
    """parallel apply for spending up."""
    if n is None:
        n = -1
    dflength = len(df)
    cpunum = multiprocessing.cpu_count()
    if dflength < cpunum:
        spnum = dflength
    if n < 0:
        spnum = cpunum + n + 1
    else:
        spnum = n or 1

    sp = list(range(dflength)[:: int(dflength / spnum + 0.5)])
    sp.append(dflength)
    slice_gen = (slice(*idx) for idx in zip(sp[:-1], sp[1:]))
    results = Parallel(n_jobs=n, verbose=0)(delayed(func)(df.iloc[slc], other) for slc in slice_gen)
    return pd.concat(results)

swissBoundary = gpd.read_file(os.path.join(".", "data", "swiss", "swiss_1903+.shp"))
print("Before spatial filtering: ", sp.shape[0])
sp_swiss = _filter_within_swiss(sp, swissBoundary)
print("After spatial filtering: ", sp_swiss.shape[0])

Before spatial filtering:  1557144
After spatial filtering:  1440822


In [18]:
# filter activity staypoints
sp_swiss = sp_swiss.loc[sp_swiss["is_activity"] == True]

In [19]:
# generate locations
sp_swiss, locs = sp_swiss.as_staypoints.generate_locations(
    epsilon=20, num_samples=2, distance_metric="haversine", agg_level="dataset", n_jobs=-1
)

In [20]:
# filter noise staypoints
sp_swiss = sp_swiss.loc[~sp_swiss["location_id"].isna()].copy()
print("After filter non-location staypoints: ", sp_swiss.shape[0])

After filter non-location staypoints:  1217863


In [21]:
# save locations
locs = locs[~locs.index.duplicated(keep="first")]
filtered_locs = locs.loc[locs.index.isin(sp_swiss["location_id"].unique())]

# locations without duplication, user_id have no meaning
filtered_locs.as_locations.to_csv(os.path.join(".", "data", f"locations.csv"))
print("Location size: ", sp_swiss["location_id"].unique().shape[0], filtered_locs.shape[0])

Location size:  62673 62673


In [22]:
sp_swiss = sp_swiss[["user_id", "started_at", "finished_at", "geometry", "location_id"]]
# merge staypoints
sp_merged = sp_swiss.as_staypoints.merge_staypoints(
    triplegs=pd.DataFrame([]), max_time_gap="1min", agg={"location_id": "first"}
)
print("After staypoints merging: ", sp_merged.shape[0])

After staypoints merging:  1197153


In [23]:
sp_merged["duration"] = (sp_merged["finished_at"] - sp_merged["started_at"]).dt.total_seconds() // 60

In [24]:
print("User size: ", sp_merged["user_id"].unique().shape[0])

User size:  2094


In [25]:
sp_merged.to_csv(os.path.join(".", "data", f"sp_filtered.csv"))