# Setup

In [1]:
!pwd

/home/umni2/a/umnilab/users/verma99/mk/home_detection/code


In [2]:
from setup import *

In [3]:
import contextily as ctx
import haversine as hs
from haversine import haversine_vector as haversine
import scipy.cluster.hierarchy as sch
from shapely.geometry import box, Polygon
from sklearn.cluster import MeanShift, AgglomerativeClustering

In [4]:
SP.start()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/01 10:50:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# Prepare data

## Load regions

In [5]:
indy = Region.load('Indianapolis', 'Indiana')
indy.dates = U.dates('2021-03-01', '2021-03-07')

In [6]:
aus = Region.load('Austin', 'Texas')
aus.dates = U.dates('2021-07-01', '2021-07-07')

In [7]:
hous = Region.load('Houston', 'Texas')
hous.dates = U.dates('2021-07-01', '2021-07-07')

## Parameters

In [8]:
grid_interval = 50 # meters

agglo_clust_params = dict(
    min_dist = 100, # meters
    max_dist = 200, # meters
    step = 1, # meters
    min_ratio = 1.5,
    affinity = 'euclidean',
    linkage = 'average',
)
dwell_thresh = 10 * 60 # seconds

## Helper functions

### Convert distance from meter to degree

In [9]:
def dist_m2deg(dist, lat, a=6_371_001, b=6_356_752):
    lat = np.deg2rad(lat)
    curve_radius = (a * b) ** 2 / (
            (a * np.cos(lat)) ** 2 + (b * np.sin(lat)) ** 2) ** 1.5
    return float(np.rad2deg(dist / curve_radius))

## Create grid from a region's bounding box

In [10]:
def make_grid(rgn, step, delta=1e-16):
    df = Gdf({'geometry': [box(*rgn.bbox)]}, crs=CRS_DEG)
    minx, miny, maxx, maxy = df.to_crs(CRS_M).bounds.iloc[0]
    x = np.append(np.arange(minx, maxx - delta, step), maxx)
    y = np.append(np.arange(miny, maxy - delta, step), maxy)
    pts = [(x, y[0]) for x in x] + [(x[0], y) for y in y]
    df = Pdf(pts, columns=[LON, LAT])
    df = mk.geo.pdf2gdf(df, crs=CRS_M)
    df = mk.geo.gdf2pdf(df.to_crs(CRS_DEG))
    x = df[LON].head(len(x)).values
    y = df[LAT].tail(len(y)).values
    # x = pd.IntervalIndex.from_arrays(x[:-1], x[1:], closed='both')
    # y = pd.IntervalIndex.from_arrays(y[:-1], y[1:], closed='both')
    return x, y
    
x, y = make_grid(indy, grid_interval)
print(f'x: {len(x)}, y: {len(y)}'); x

x: 2496, y: 3022


array([-86.69586   , -86.69541084, -86.69496168, ..., -85.57611   ,
       -85.57566084, -85.575541  ])

## Load pings

In [11]:
def load_pings(rgn, dates=None, concat=True):
    dates = sorted(dates or rgn.dates)
    return reduce(Sdf.union, [
        SP.read_parquet(rgn.data / f'pings/{date}').drop(ERR)
        .withColumn('day_num', F.lit((date - dates[0]).days).cast(T.int8))
        for date in dates])

# %time x = load_pings(aus); x

In [12]:
sdf = SP.read_parquet(indy.data / 'pings/2021-03-01')

                                                                                

In [13]:
pdf = sdf.limit(10).toPandas().disp()

[Stage 1:=====>                                                   (4 + 36) / 40]

10 rows x 5 cols; Memory: 0.0 MiB


                                                                                

Unnamed: 0,uid,lon,lat,ts,error
,<int64>,<object>,<object>,<object>,<object>
0.0,-9221220934257213376,"[-86.121124, -86.11503, -86.10469, -86.10503, ...","[39.585987, 39.58597, 39.58778, 39.58695, 39.5...","[32623.0, 32635.0, 36262.0, 47056.0, 47087.0, ...","[10.0, 5.0, 14.0, 45.0, 4.0, 29.0, 8.0, 18.0]"


## Compute the no. of hierarchical clusters

In [14]:
def get_num_clusters(x, y, min_dist, max_dist, step, min_ratio, **kwargs):
    """ Naming changed from Shagun's implementation in `Shagun_M3.ipynb`. """
    assert len(x) == len(y)
    if len(x) == 0: return 0
    if len(x) == 1: return 1
    # convert the distance parameters from meters to degrees
    lat = (max(y) + max(y)) / 2
    dt = dist_m2deg(min_dist, lat)
    dt_max = dist_m2deg(max_dist, lat)
    d_eps = dist_m2deg(step, lat)
    # create the dendrogram
    linkage = sch.linkage(np.vstack([x, y]).T, method='average')
    dendrogram = sch.dendrogram(linkage, no_plot=True)
    # y-values of the nodes of the links on the dendrogram
    # (i.e., inter-cluster distance)
    Y = Pdf(dendrogram['dcoord'])
    # initialize
    n_clusters = 1
    done = False
    while not done:
        n_clusters_pre = n_clusters
        d_df = Y[Y[1] < dt] # links below the min distance threshold
        d = d_df[1].max() if len(d_df) > 0 else d_eps
        D_df = Y[Y[1] > dt] # links above the min distance threshold
        if len(D_df) == 0: # if there are no clusters farther than `dt`
            done = True # finish since all points are in the same cluster
            n_clusters = 1
        else: # if there is at least one cluster pair far away from each other
            D = D_df[1].min() # min inter-cluster distance greater than `dt`
            n_clusters = (len(Y[(Y[0] < dt) & (Y[1] > dt)]) + 
                          len(Y[(Y[2] > dt) & (Y[3] < dt)]))
            if d > dt_max:
                n_clusters = n_clusters_pre
                done = True
            else:
                if d == 0:
                    ratio_min = D / dt
                else:
                    ratio_min = min(D/d, D/dt)
                if ratio_min > min_ratio:
                    done = True
                else:
                    dt = D + d_eps # increase the distance threshold
    return n_clusters

r = pdf.iloc[1]; print('Total pings:', r.lon.size)
print('No. of clusters:', get_num_clusters(r.lon, r.lat, **agglo_clust_params))

Total pings: 66
No. of clusters: 54


## Hierarchical clustering for one user

In [15]:
def hierarchical_clustering(x, y, **params):
    if len(x) == 1:
        return [0]
    try:
        n_clust = get_num_clusters(x, y, **params)
        model = AgglomerativeClustering(
            n_clusters=n_clust,
            affinity=params.get('affinity', 'euclidean'),
            linkage=params.get('linkage', 'average'))
        model.fit(np.vstack([x, y]).T)
        return model.labels_.tolist()
    except Exception as e:
        print(e)
        return [-1] * len(x)

## Get clusters

In [16]:
def get_clusters(x, y, t, cells):
    df = Pdf({LON: x, LAT: y, TS: t, 'cell': cells})
    return df
    
r = pdf.iloc[1]
%time x = get_clusters(r.lon, r.lat, r.ts, np.ones_like(r.lon)); x

CPU times: user 969 µs, sys: 0 ns, total: 969 µs
Wall time: 722 µs


Unnamed: 0,lon,lat,ts,cell
0,-86.687897,39.555084,20122.0,1.0
1,-86.639671,39.565880,20276.0,1.0
2,-86.631584,39.570583,20302.0,1.0
3,-86.626495,39.572033,20318.0,1.0
4,-86.612244,39.575401,20367.0,1.0
...,...,...,...,...
61,-86.392906,39.656883,73432.0,1.0
62,-86.400734,39.651516,73462.0,1.0
63,-86.295021,39.699810,73477.0,1.0
64,-86.412918,39.646381,73503.0,1.0


## Get stay regions

In [17]:
def get_stay_regions(df, dwell_thresh):
    df = df.sort_values(TS)
    # uid = df[UID].iloc[0]
    df['virt_rgn'] = (df['cluster'].diff() != 0).astype(np.int32).cumsum()
    df2 = df.groupby('virt_rgn').agg({x: list for x in [
        LON, LAT, TS, 'cluster']}).reset_index()
    # df2.insert(0, UID, uid)
    df2['dwell_time'] = df2[TS].str[-1] - df2[TS].str[0]
    df2['stay_rgn'] = df2['dwell_time'] >= dwell_thresh
    return df2

## Main

In [18]:
def get_ping_clusters(rgn, dates=None, grid_interval=grid_interval,
                      clust_params=agglo_clust_params,
                      dwell_thresh=dwell_thresh, seed=None, save=False):
    df = load_pings(rgn, dates)
    # create the grid based on the region's bounding box
    gridX, gridY = get_grid(rgn, grid_interval)
    # sample one ping using pandas
    def sample_cell_pings(x, y, t):
        df = Pdf({LON: x, LAT: y, TS: t})
        df['cx'] = pd.cut(df[LON], gridX).cat.codes
        df['cy'] = pd.cut(df[LAT], gridY).cat.codes
        df['cell'] = df.pop('cx') + df.pop('cy') * len(gridX)
        sample = df.groupby('cell').sample().reset_index(drop=True)
        x, y = sample[LON].values, sample[LAT].values
        labels = hierarchical_clustering(x, y, **clust_params)
        sample['cluster'] = Arr(labels).astype(float)
        df = df.merge(sample[['cell','cluster']], 
                      on='cell').drop(columns='cell')
        # compute virtual regions
        # df = get_stay_regions(df, dwell_thresh)
        return [df[x].tolist() for x in [LON, LAT, TS, 'cluster']]
    df = df.select(UID, F.udf(
        sample_cell_pings, T.array(T.array(T.float))
    )(LON, LAT, TS).alias('_'))
    df = df.select(UID, *[F.col('_')[i].alias(x) for i, x in 
                          enumerate([LON, LAT, TS, 'cluster'])])
    df = df.withColumn('cluster', F.col('cluster').cast(T.array(T.int32)))
    if save:
        mk.spark.write(df, rgn.data / 'location_clusters')
    return df
    
# %time x = get_ping_clusters(indy, [dt.date(2021,3,1), dt.date(2021,3,2)]); x
# %time x.disp(5)
# %time d = x.limit(20).toPandas().disp(None)

In [19]:
def main_clustering(rgn, dates=None, grid_interval=grid_interval,
                    clust_params=agglo_clust_params,
                    dwell_thresh=dwell_thresh, seed=None, save=False):
    # create the grid based on the region's bounding box
    gridX, gridY = make_grid(rgn, grid_interval)
    def add_day(t, n_days): return [t + n_days * 86400 for t in t]
    user_day_df = (load_pings(rgn, dates)
                   .withColumn(TS, F.udf(add_day, T.array(T.float))(TS)))
    user_df = (
        user_day_df.groupby(UID)
        .agg(*[F.flatten(F.collect_list(x)).alias(x) for x in [LON, LAT, TS]])
        .withColumn('cx', F.udf(
            lambda x: pd.cut(x, gridX).codes.tolist(), T.array(T.int))(LON))
        .withColumn('cy', F.udf(
            lambda x: pd.cut(y, gridY).codes.tolist(), T.array(T.int))(LAT))
        .withColumn('cell', F.udf(
            lambda cx, cy: [cx + cy * len(gridX) for cx, cy in zip(cx, cy)],
            T.array(T.int))('cx', 'cy'))
    ); return user_df

%time x = main_clustering(indy); x

CPU times: user 129 ms, sys: 25.1 ms, total: 154 ms
Wall time: 1.36 s


DataFrame[uid: bigint, lon: array<float>, lat: array<float>, ts: array<float>, cx: array<int>, cy: array<int>, cell: array<int>]