In [37]:
import hashlib
import json
import time

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from math import atan2, fabs, pi, pow, sqrt
from multiprocessing import cpu_count, Pool

import geopandas as gpd
import numpy as np
import pandas as pd
import requests

from pytz import timezone
from requests import Session
from shapely.geometry import Point

with open('../.config/connections.json') as json_file:  
    connections = json.load(json_file)

In [2]:
c = connections['Lime']
version = '0.3'

In [3]:
session = Session()
if 'extra' in c:
    if 'headers' in c['extra']:
        session.headers.update(c['extra']['headers'])
if 'token_url' in c:
    res = session.post(c['token_url'], data=c['auth_payload'])
    session.headers.update({'Authorization': f'Bearer {res.json()[c["token_key"]]}'})
                            
session.headers.update({"Accept": f"application/vnd.mds.provider+json;version={version}"})

In [4]:
end_time = datetime(2019, 9, 1)
start_time = end_time - timedelta(hours=1)
trip_params = {
    'min_end_time': int(start_time.timestamp()) * 1000,
    'max_end_time': int(end_time.timestamp()) * 1000
}
event_params = {
    'start_time': int(start_time.timestamp()) * 1000,
    'end_time': int(end_time.timestamp()) * 1000
}

In [5]:
print(trip_params)
print(event_params)

{'min_end_time': 1567292400000, 'max_end_time': 1567296000000}
{'start_time': 1567292400000, 'end_time': 1567296000000}


In [6]:
def _request(url, payload_key, params=None, results=[]):
        """
        Internal helper for sending requests.

        Returns payload(s).
        """
        retries = 0
        res = None

        while res is None:
            try:
                res = session.get(url, params=params)
                res.raise_for_status()
            except Exception as err:
                res = None
                retries = retries + 1
                if retries > 3:
                    raise Exception(
                        f"Unable to retrieve response from {url} after {3}.  Aborting...")

                print(
                    f"{err}. Retrying in 10 seconds... (retry {retries}/3)")
                time.sleep(10)

        if "Content-Type" in res.headers:
            cts = res.headers["Content-Type"].split(";")
            if "application/vnd.mds.provider+json" not in cts:
                print(
                    f"Incorrect content-type returned: {res.headers['Content-Type']}")
            cts = cts[1:]
            for ct in cts:
                if ct.strip().startswith("charset"):
                    pass
                if not ct.strip().startswith(f"version=0.3"):
                    print(
                        f"Incorrect content-type returned: {res.headers['Content-Type']}")
        else:
            print(f"Missing {self.version} content-type header.")

        page = res.json()

        if page["data"] is not None:
            results.extend(page["data"][payload_key])

        if "links" in page:
            next_page = page["links"].get("next")
            if next_page is not None:
                results = _request(url=next_page, payload_key=payload_key,
                                        results=results)

        return results

In [7]:
trips = _request(c['api_url'].replace(':endpoint', 'trips').strip(), 'trips', params=trip_params)

In [8]:
df = pd.DataFrame.from_records(trips)
len(df)

332

In [9]:
def _request(session, url, data=None):
    """
    Internal helper for sending requests.

    Returns payload(s).
    """
    retries = 0
    res = None

    while res is None:
        try:
            res = session.post(url, data=data)
            res.raise_for_status()
        except Exception as err:
            res = None
            retries = retries + 1
            if retries > 3:
                print(
                    f"Unable to retrieve response from {url} after 3 tries.  Aborting...")

            print(
                f"Error while retrieving: {err}. Retrying in 10 seconds... (retry {retries}/3)")
            time.sleep(10)

    return res

In [10]:
session.headers.update({"Content-Type": "application/json" })
session.headers.update({"Accept": "application/json"})

cores = cpu_count() #Number of CPU cores on your system
executor = ThreadPoolExecutor(max_workers=cores*4)
shst = df.route.map(lambda x: executor.submit(
    _request, session, 'http://sharedstreets:3000/api/v1/match/point/bike', data=json.dumps(x)))

In [22]:
shst.map(lambda x: 1 if x.done() else 0).sum()

332

In [23]:
def safe_result(x):
    try:
        return x.result().json()
    except:
        return None

In [24]:
df['shst'] = shst.map(safe_result)

In [27]:
# Convert the route to a DataFrame now to make mapping easier
df['route'] = df.apply(
    lambda x: gpd.GeoDataFrame.from_features(x.shst['features']) if x.shst is not None else gpd.GeoDataFrame.from_features(x.route['features']), axis=1)

def parse_route(trip):
    route = trip.route
    route['trip_id'] = trip.trip_id
    route['provider_id'] = trip.provider_id
    route['vehicle_id'] = trip.vehicle_id
    route['device_id'] = trip.device_id
    route['vehicle_type'] = trip.vehicle_type
    route['propulsion_type'] = trip.propulsion_type
    return route

In [28]:
route_df = gpd.GeoDataFrame(
    pd.concat(df.apply(parse_route, axis=1).values, sort=False).sort_values(
        by=['trip_id', 'timestamp'], ascending=True
    )
).reset_index(drop=True)
route_df.crs = {'init': 'epsg:4326'}
route_df['datetime'] = route_df.timestamp.map(
    lambda x: datetime.fromtimestamp(x / 1000).astimezone(timezone("US/Pacific")))
route_df['datetime'] = route_df.datetime.dt.round("L")
route_df['datetime'] = route_df.datetime.map(
    lambda x: datetime.replace(x, tzinfo=None))
route_df['date_key'] = route_df.datetime.map(
    lambda x: int(x.strftime('%Y%m%d')))
# Generate a hash to aid in merge operations
route_df['hash'] = route_df.apply(lambda x: hashlib.md5((
    x.trip_id + x.device_id + x.provider_id +
    x.datetime.strftime('%d%m%Y%H%M%S%f')
).encode('utf-8')).hexdigest(), axis=1)
route_df['datetime'] = route_df.datetime.map(
    lambda x: x.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3])


In [29]:
route_df = route_df.to_crs(epsg=3857)

route_df['x'] = route_df.geometry.map(lambda g: g.x)
route_df['y'] = route_df.geometry.map(lambda g: g.y)

route_by_trip = route_df.groupby(['trip_id'])

route_df['nt'] = route_by_trip.timestamp.shift(-1)
route_df['nx'] = route_by_trip.x.shift(-1)
route_df['ny'] = route_by_trip.y.shift(-1)

# drop destination
route_df = route_df.dropna()

route_df['dx'] = route_df.apply(
    lambda x: x.nx - x.x, axis=1)
route_df['dy'] = route_df.apply(
    lambda x: x.ny - x.y, axis=1)
route_df['dt'] = route_df.apply(
    lambda x: (x.nt - x.timestamp) / 1000, axis=1)

def find_bearing(hit):
    deg = atan2(hit.dx, hit.dy) / pi * 180
    if deg < 0:
        deg = deg + 360
    return deg

def find_speed(hit):
    if hit['dt'] <= 0:
        return 0

    d = sqrt(pow((hit.dx), 2) + pow((hit.dy), 2))

    return d / hit['dt']

route_df['bearing'] = route_df.apply(find_bearing, axis=1)
route_df['speed'] = route_df.apply(find_speed, axis=1)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  app.launch_new_instance()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org

In [31]:
route_df['shstCandidates'] = route_df.shstCandidates.map(lambda x: pd.DataFrame(x))

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  """Entry point for launching an IPython kernel.


In [32]:
def expand_candidates(p):
    df = p.shstCandidates.rename(index=str, columns={'bearing':'shstBearing'})
    df['hash'] = p.hash
    df['trip_id'] = p.trip_id
    df['timestamp'] = p.timestamp
    df['vehicle_type'] = p.vehicle_type
    df['bearing'] = p.bearing
    df['speed'] = p.speed
    
    return df

shst_df = pd.concat(route_df.apply(expand_candidates, axis=1).values, sort=False).sort_values(
    by=['trip_id', 'timestamp'], ascending=True
)

In [50]:
def normalizeAngle(angle):
    if angle < 0:
        angle = angle + 360
    return angle
shst_df['bearing_diff'] = shst_df.apply(lambda x: fabs(normalizeAngle(x.bearing) - normalizeAngle(x.shstBearing)), axis=1)

In [55]:
shst_df.sort_values(
    by=['hash', 'bearing_diff', 'score']
).drop_duplicates(
    subset=['hash']
).drop_duplicates(
    subset=['trip_id', 'geometryId'],
    keep='last'
).rename(
    index=str,
    columns={
        'geometryId': 'shst_geometry_id',
        'referenceId': 'shst_reference_id'
    }
)[[
    'provider_id',
    'date_key',
    'shst_geometry_id',
    'shst_reference_id',
    'hash',
    'datetime',
    'vehicle_type',
    'propulsion_type',
    'bearing',
    'speed',
    'seen',
]]

KeyError: "['batch', 'date_key', 'seen', 'datetime', 'provider_id', 'propulsion_type'] not in index"

In [133]:
gpd.GeoDataFrame(route_df)[['geometry', 'hash']].to_file('../.data/shst_matches.geojson', driver='GeoJSON')

In [45]:
route_df.shstCandidates.map(lambda x: 1 if x.empty else 0).sum()

6541

In [47]:
route_df[route_df.shstCandidates.map(lambda x: x.empty)][['geometry', 'hash']].to_file('../.data/unmatched_shst.geojson', driver='GeoJSON')