In [None]:
import pandas as pd
import numpy as np
import json
import math
import urllib.request
import dateutil.parser
import dateutil.rrule
import dateutil.tz
import datetime
import re
import gc
import time
import sqlite3

In [None]:
tzUTC = dateutil.tz.gettz('UTC')
tzLocal = dateutil.tz.gettz('Europe/London')

earliestData = datetime.datetime.strptime('2020-01-01T00:00:00Z', '%Y-%m-%dT%H:%M:%SZ').replace(tzinfo=tzUTC)
dateToday = datetime.datetime.combine(datetime.date.today(), datetime.datetime.min.time()).replace(tzinfo=tzUTC)

In [None]:
# Fetch any previous data
# TODO: Switch this back to using a separate array for merging
previousDataEnd = None
pointTsByIntervalOld = {}
pointTsByInterval = {}
camerasByInterval = {}
pdSourcesOld = None

try:
    pdSources = None
    #pd.read_pickle('../cache/_IN_PROGRESS_recent-feature-counts-point-metadata.pkl')

    for interval in [60, 120, 300, 600]:  # pdSources['interval'].unique():
        print('Loading data with %u second interval...' % interval)
        pointTsByIntervalOld[interval] = pd.read_pickle('../cache/recent-feature-counts-pd-%usec.pkl' % interval)
        intervalDataEnd = np.max(pointTsByIntervalOld[interval].index)
        
        for cameraId in list(pointTsByIntervalOld[interval].columns.to_series().apply(lambda column: re.sub(':(.*)$', '', column)).unique()):
            camerasByInterval[cameraId] = interval

        if previousDataEnd is None or intervalDataEnd > previousDataEnd:
            previousDataEnd = intervalDataEnd
            
    print('Loaded previous data.')
    pdSourcesOld = pdSources

except:
    pdSources = None
    print('No existing data could be loaded.')
    
if previousDataEnd is None:
    previousDataEnd = earliestData
else:
    previousDataEnd = datetime.datetime.combine(previousDataEnd.date(), datetime.datetime.min.time()).replace(tzinfo=tzUTC)

if pdSources is not None:
    for interval in pdSources['interval'].unique():
        pointTsByIntervalOld[interval] = pointTsByIntervalOld[interval][pointTsByIntervalOld[interval].index < previousDataEnd]
    
# Remove old data (for now...) to free up memory
pointTsByIntervalOld = {}
gc.collect()
    
print('  Start reading from %s' % previousDataEnd)

In [None]:
visionApiBase = 'https://uo-vision.dev.urbanobservatory.ac.uk/stills/dict'
visionResponse = json.loads(
    urllib.request.urlopen(visionApiBase).read().decode('utf-8')
)

In [None]:
pdSources = pd.DataFrame.from_records(visionResponse).transpose()
pdSources['min_date'] = pdSources['min_date'].apply(lambda d: datetime.datetime.strptime(d, '%Y-%m-%d').replace(tzinfo=tzUTC))
pdSources['max_date'] = pdSources['max_date'].apply(lambda d: datetime.datetime.strptime(d, '%Y-%m-%d').replace(tzinfo=tzUTC))

if len(camerasByInterval) > 0:
    pdSources = pdSources.join(pd.DataFrame.from_dict(camerasByInterval, orient='index', columns=['interval']))

camerasByInterval = {}
#pointTsByInterval = {}
camerasSinceSave = 0

pdSources

In [None]:
# NC = done, GH = done, SL, NT, ST, CM, NB, METCCTV, VAISALA
#for cameraId in list(filter(lambda id: 'NC_' in id, list(pdSources.index))): # list(pdSources.index)[0:10]: # pdSources.index:
for cameraId in pdSources.index:
    alreadyExists = False
    for testInterval in pointTsByInterval:
        if pointTsByInterval[testInterval] is not None and (('%s: Source image' % cameraId) in pointTsByInterval[testInterval].columns):
            alreadyExists = True
            
    if alreadyExists:
        continue
    
    source = pdSources[pdSources.index == cameraId].to_dict(orient='records')[0]
    
    if cameraId == 'test_cam':
        continue
    
    print(cameraId)
    print('  [', end='')
    
    sourceTs = None

    for date in dateutil.rrule.rrule(
            dateutil.rrule.DAILY,
            interval=1,
            dtstart=source['min_date'] if source['min_date'] > previousDataEnd else previousDataEnd,
            until=source['max_date']
        ):
        
        windowResponse = None
        windowAttempts = 0
        while windowResponse is None:
            try:
                windowAttempts = windowAttempts + 1
                windowResponse = json.loads(
                    urllib.request.urlopen(
                      'https://uo-vision.dev.urbanobservatory.ac.uk/stills/counts?location=%s&date=%s' % (cameraId, date.isoformat()[0:10])
                    ).read().decode('utf-8')
                )
            except:
                windowAttempts = windowAttempts + 1
                print('x', end='')
                time.sleep(min(windowAttempts, 10))

        for i, r in enumerate(windowResponse):
            for count in windowResponse[i]['counts']:
                if not isinstance(windowResponse[i]['counts'][count], int):
                    windowResponse[i]['counts'][count] = windowResponse[i]['counts'][count]['count']
                
        sourceOnDay = pd.DataFrame.from_records(pd.json_normalize(windowResponse), index=['ts'])
        if not sourceOnDay.empty:
            sourceOnDay.index = sourceOnDay.index.to_series().apply(lambda t: datetime.datetime.strptime(t, '%Y-%m-%d %H:%M:%S').replace(tzinfo=tzUTC))
        
        if len(sourceOnDay.columns) == 0:
            print('-', end='')
            continue
        
        if sourceTs is None:
            sourceTs = sourceOnDay
        else:
            sourceTs = sourceTs.append(sourceOnDay)
        
        print('#', end='')
    
    if sourceTs is None:
        print('] No data')
        continue
    
    sourceTs.drop(columns=['camera'], inplace=True)
    sourceTs.rename(inplace=True, errors='ignore', columns={
        'url': 'Source image',
        'counts.bus': 'Bus',
        'counts.car': 'Car',
        'counts.cyclist': 'Cyclist',
        'counts.motorcyclist': 'Motorcyclist',
        'counts.person': 'Person',
        'counts.truck': 'Truck',
        'counts.van': 'Van'
    })
    
    sourceTs = sourceTs.loc[~sourceTs.index.duplicated(keep='first')]
    tsInterval = min(600.0, sourceTs.index.to_series().diff().dropna().apply(lambda x: max(60.0, round(x.seconds / 30) * 30)).quantile(0.01))
    
    # No strange four or nine minute intervals please...
    if tsInterval > 120 and tsInterval < 300:
        tsInterval = 120
    elif tsInterval > 300 and tsInterval < 600:
        tsInterval = 300
    
    if 'interval' in source and np.isnan(source['interval']) == False:
        sourceInterval = source['interval']
    elif np.isnan(tsInterval) == True:
        continue
    else:
        sourceInterval = min(600, round(tsInterval / 60) * 60)
    camerasByInterval[cameraId] = sourceInterval

    sourceTsNumeric = sourceTs.resample('%us' % sourceInterval).nearest(limit=1).drop(columns=['Source image'], errors='ignore')
    sourceTs = sourceTsNumeric.join(sourceTs['Source image'].resample('%us' % sourceInterval).nearest(limit=1))
    
    sourceTs = sourceTs.add_prefix('%s: ' % cameraId)
    
    print('] Interval %u seconds' % sourceInterval)
    
    if sourceInterval not in pointTsByInterval:
        pointTsByInterval[sourceInterval] = None
    
    if sourceInterval not in pointTsByInterval or pointTsByInterval[sourceInterval] is None:
        pointTsByInterval[sourceInterval] = sourceTs
    else:
        if ('%s: Source image' % cameraId) in pointTsByInterval[sourceInterval].columns:
            pointTsByInterval[sourceInterval] = pointTsByInterval[sourceInterval].concat(sourceTs, sort=True, axis='index')
        else:        
            pointTsByInterval[sourceInterval] = pointTsByInterval[sourceInterval].join(sourceTs)
            
    # Periodic saves for sanity
    camerasSinceSave = camerasSinceSave + 1
    
    if camerasSinceSave > 5 or len(sourceTs) > 20000:
        pdSourcesWithoutInterval = pdSources.drop(columns=['interval'], errors='ignore')
        pdSourcesWithInterval = pdSourcesWithoutInterval.join(pd.DataFrame.from_dict(camerasByInterval, orient='index', columns=['interval']))
        pdSourcesWithInterval.to_pickle('../cache/_IN_PROGRESS_recent-feature-counts-point-metadata.pkl')

        pointTsByInterval[sourceInterval].to_pickle('../cache/_IN_PROGRESS_recent-feature-counts-pd-%usec.pkl' % sourceInterval)
        
        camerasSinceSave = 0
        
    gc.collect()

In [None]:
# To resume a failed process only...
#for interval in [60, 120, 300, 600]:  # pdSources['interval'].unique():
#    print('Loading data with %u second interval...' % interval)
#    pointTsByInterval[interval] = pd.read_pickle('../cache/_IN_PROGRESS_recent-feature-counts-pd-%usec.pkl' % interval)

In [None]:
pointTsFinalByInterval = {}

for interval in pointTsByInterval:
    oldData = pd.read_pickle('../cache/recent-feature-counts-pd-%usec.pkl' % interval)
    pointTsFinalByInterval[interval] = oldData.append(pointTsByInterval[interval])
    pointTsFinalByInterval[interval] = pointTsFinalByInterval[interval].loc[~pointTsFinalByInterval[interval].index.duplicated(keep='first')]
    
    oldData = None
    pointTsByInterval[interval] = None
    
    gc.collect()
    print('Writing output for interval %u...' % interval)
    pointTsFinalByInterval[interval].to_pickle('../cache/recent-feature-counts-pd-%usec.pkl' % interval)
    pointTsFinalByInterval[interval] = None
    gc.collect()
    
print('Done.')

In [None]:
pointTsByInterval = None
pointTsByIntervalOld = None
gc.collect()

In [None]:
#pointTsFinalByInterval[60].sort_index()

In [None]:
pdSources.drop(columns=['interval'], inplace=True, errors='ignore')
pdSourcesWithInterval = pdSources.join(pd.DataFrame.from_dict(camerasByInterval, orient='index', columns=['interval']))
pdSourcesWithInterval.to_pickle('../cache/recent-feature-counts-point-metadata.pkl')
pdSourcesWithInterval.to_csv('../output/recent-feature-counts-point-metadata.csv')
pdSourcesWithInterval