# Query prototyping

This notebook computes interesting metrics from the raw wheel rotation data stored in S3. It's meant as a prototype for queries that'll run to produce summary data for use by a web frontend later on.

An earlier version of the notebook worked with the raw rotation CSV files locally using pandas. Ultimately, I want to keep the compute close to the data in S3 and avoid paying for extra compute elsewhere (e.g., Lambda, ECS). Therefore, this notebook uses Athena, under the assumption that the Raspberry Pi doing the data collection can also trigger Athena queries to produce the summary data in the future.

In [None]:
import datetime
import getpass
import math
import time

import boto3
# import matplotlib.pyplot as plt
# import pandas as pd
# import pytz
import requests

In [None]:
%config InlineBackend.print_figure_kwargs={'facecolor' : "w"}

Constants to convert [wheel rotations](https://www.amazon.com/gp/product/B019RH7PPE/ref=ppx_yo_dt_b_asin_title_o04_s00?ie=UTF8&psc=1) into distances.

In [None]:
wheel_diameter = 8.5 # inches, not quite the 9" advertised, I measured
wheel_circumference = math.pi * wheel_diameter / 12 / 5280 # miles

In [None]:
athena = boto3.client('athena')
s3 = boto3.client('s3')

## Utils

I'll execute Athena queries using boto3. I'm not using `pyathena` to keep demands on the Raspberry Pi light.

In [None]:
def q(query, max_checks=30):
    """Executes an Athena query, waits for success or failure, and returns the first page
    of the query results.
    
    Waits up to max_checks * 10 seconds for the query to complete before raising.
    """
    resp = athena.start_query_execution(
        QueryString=query,
        QueryExecutionContext={
            'Database': 'honey_data'
        },
        WorkGroup='honey-data'
    )
    qid = resp['QueryExecutionId']
    for i in range(max_checks):
        resp = athena.get_query_execution(
            QueryExecutionId=qid
        )
        state = resp['QueryExecution']['Status']['State']
        if state == 'SUCCEEDED':
            return qid, athena.get_query_results(QueryExecutionId=qid)
        elif state == 'FAILED':
            return qid, resp
        time.sleep(10)
    else:
        raise RuntimeError('Reached max_checks')

In [None]:
def publish(qid, s3_key):
    """Copies Athena results to the public bucket for client use."""
    return s3.copy_object(
        CopySource=f'honey-data/athena-results/{qid}.csv',
        Bucket='honey-data-public',
        Key=s3_key
    )

In [None]:
def update_partitions():
    """Update daily partitions."""
    return q('msck repair table incoming_rotations')

## Interesting metrics

In [None]:
update_partitions()

### How far has Honey run since we started tracking?

This can serve as input to the geopoint API naming a city she could have reached if she traveled this far in straight line distance.

In [None]:
qid, resp = q('''
select sum(rotations) total
from incoming_rotations
''')

The web client will end up using published CSVs instead of results fetched using the API. Therefore, I'm not investing in data type parsing in this notebook.

In [None]:
total_miles = int(resp['ResultSet']['Rows'][1]['Data'][0]['VarCharValue']) * wheel_circumference
total_miles

### What's the most recent activity available?

Not sure this is useful.

In [None]:
q(f'''
select max(from_iso8601_timestamp(datetime)) as most_recent_activity
from incoming_rotations 
where year = year(current_date) and month >= month(current_date)-1
''')

### How has she progressed over the last 7 days? (cumulative plot, origin at sum before period)

This should be the total sum prior to the window of interest. The query scans everything instead of trying to skip scanning 7 day partitions--a drop in the bucket.

In [None]:
qid, resp = q('''
select sum(rotations) as prior_rotations
from incoming_rotations
where from_iso8601_timestamp(datetime) < (current_date - interval '7' day)
''')

In [None]:
resp

In [None]:
publish(qid, 'prior-7-day-window.csv');

And this should be the sum of rotations by hour and the cumulative sum by hour within the window of interest. I'm trying to constrain the search space for the necessary data using partitions. I need a bit more data to make sure this is working properly.

In [None]:
qid, resp = q(f'''
select
    sum(rotations) as sum_rotations,
    to_iso8601(date_trunc('hour', from_iso8601_timestamp(datetime))) as datetime_hour,
    sum(sum(rotations)) over (
        order by date_trunc('hour', from_iso8601_timestamp(datetime)) asc 
        rows between unbounded preceding and current row
    ) as cumsum_rotations
from incoming_rotations
where 
    year >= year(current_date)-1 and
    from_iso8601_timestamp(datetime) >= (current_date - interval '7' day)
group by date_trunc('hour', from_iso8601_timestamp(datetime))
order by datetime_hour
''')

In [None]:
publish(qid, '7-day-window.csv');

Let's work with the CSV forms of these metrics to create a plot.

In [None]:
tz = pytz.timezone('America/New_York')

In [None]:
resp = s3.get_object(Bucket='honey-data-public', Key='prior-7-day-window.csv')
prior_df = pd.read_csv(resp['Body'])

In [None]:
try:
    offset = prior_df.iloc[0].iloc[0]
except:
    offset = 0

In [None]:
resp = s3.get_object(Bucket='honey-data-public', Key='7-day-window.csv')
week_df = pd.read_csv(resp['Body'])

In [None]:
week_df['datetime_utc'] = pd.to_datetime(week_df.datetime_hour)
week_df['datetime'] = week_df.datetime_utc.dt.tz_convert(tz)
week_df.set_index('datetime', inplace=True)

Filling missing values is something I might want to do in Athena instead of relying on the frontend web client doing it if plot interpolation doesn't look pretty. Some techniques here: https://www.reddit.com/r/SQL/comments/80t1db/inserting_dates_between_a_start_date_and_enddate/

In [None]:
cumsum_df = week_df[['cumsum_rotations']] + offset
#cumsum_df = cumsum_df.reindex(pd.date_range(week_df.index.min(), week_df.index.max(), freq='1h'), method='ffill')

In [None]:
cumsum_df.index.max() - cumsum_df.index.min()

In [None]:
_, ax = plt.subplots(figsize=(15, 5))
(cumsum_df * wheel_circumference).rename(columns={'cumsum_rotations': 'miles'}).plot(ax=ax)
ax.set_facecolor('white')

In [None]:
week_df.loc['2020-09-05 12:00:00':'2020-09-06 12:00:00'].sum_rotations.sum() * wheel_circumference

### How far has she run each night for the past year?

We should subtract 12 hours to sum rotations for nocturnal sessions.

In [None]:
qid, resp = q(f'''
select
    sum(rotations) as sum_rotations,
    date(date_trunc('day', from_iso8601_timestamp(datetime) - interval '12' hour)) as date
from incoming_rotations
where 
    year >= year(current_date)-1 and
    date(date_trunc('day', from_iso8601_timestamp(datetime) - interval '12' hour)) >= current_date - interval '1' year
group by date_trunc('day', from_iso8601_timestamp(datetime) - interval '12' hour)
order by date
''')

In [None]:
publish(qid, '1-year-window.csv');

### What city might she have reached by traveling this distance?

https://rapidapi.com/wirefreethought/api/geodb-cities?endpoint=5aadab87e4b00687d35767b4 allows 1000 request per day. If the data upload / aggregation job runs every 10 minutes, I only need about a tenth of that.

In [None]:
rapid_key = getpass.getpass('Rapid API key:')

In [None]:
durham_lat = '35.994034'
durham_lon = '-78.898621'
rapid_url = "https://wft-geo-db.p.rapidapi.com"

In [None]:
def furthest_poi(lat, lon, radius, api_key, base_url=rapid_url):
    path = f'/v1/geo/locations/{durham_lat_lon}/nearbyCities'

    # Results sort nearest to farthest
    resp = requests.get(
        f'{base_url}{path}',
        headers = {
            'x-rapidapi-host': "wft-geo-db.p.rapidapi.com",
            'x-rapidapi-key': api_key
        }, 
        params={"radius": radius}
    )
    resp.raise_for_status()

    # Navigate to the last page
    for link in resp.json()['links']:
        if link['rel'] == 'last':
            path = link['href']
            break
    else:
        raise ValueError('Link to last result page not found')
        
    resp = requests.get(
        f'{base_url}{path}',
        headers = {
            'x-rapidapi-host': "wft-geo-db.p.rapidapi.com",
            'x-rapidapi-key': api_key
        }, 
    )
    resp.raise_for_status()
    # Furthest point of interest within the given radius
    return resp.json()['data'][-1]

In [None]:
furthest_poi(durham_lat, durham_lon, total_miles, rapid_key)

In [None]:
resp = requests.get(
    f'{rapid_url}{path}',
    headers = {
        'x-rapidapi-host': "wft-geo-db.p.rapidapi.com",
        'x-rapidapi-key': rapid_key
    }, 
    params={"radius": total_miles}
)

In [None]:
for link in resp.json()['links']:
    if link['rel'] == 'last':
        path = link['href']
        break
else:
    raise ValueError('Link to last result page not found')

In [None]:
path

In [None]:
resp = requests.get(
    f'{rapid_url}{path}',
    headers = {
        'x-rapidapi-host': "wft-geo-db.p.rapidapi.com",
        'x-rapidapi-key': rapid_key
    }
)

In [None]:
resp.json()['data'][-1]

I can turn the `wikiDataId` into a link to a Wikipedia page by querying the Wikidata API followed by the Wikipedia API.

* https://www.wikidata.org/w/pi.php?action=wbgetentities&format=json&props=sitelinks&ids=Q1373463&sitefilter=enwiki
* https://en.wikipedia.org/w/api.php?action=query&titles=Fries,%20Virginia&format=json
* https://en.wikipedia.org/w/api.php?action=query&prop=info&pageids=137620&inprop=url&format=json
* https://en.wikipedia.org/wiki/Fries,_Virginia