In [2]:
import pandas as pd
from tqdm import tqdm, tqdm_notebook

from tools import get_all_prefixes, get_object_list, FETCHES_BUCKET, filter_prefixes, get_jsons_from_object, get_object_list
from datetime import datetime

In [3]:
def generate_obj():
    _, prefixes = get_object_list(bucket_name=FETCHES_BUCKET, prefix='realtime-gzipped/')
    filtered = filter_prefixes(prefixes=prefixes, start_date=datetime(2018, 1, 1), end_date=datetime(2019,12,31))
    ret = []
    for one_prefix in tqdm_notebook(list(filtered)):
        objects, _ = get_object_list(bucket_name=FETCHES_BUCKET, prefix=one_prefix)
        ret=ret + objects
        
    return ret

In [4]:
def get_relevant_measurements(key, is_relevant=lambda x: x['country']=='DE' and x['parameter']=='o3'):
    measurements = get_jsons_from_object(bucket=FETCHES_BUCKET, object_name=key)
    germans = [mes for mes in measurements if is_relevant(mes)]
    return germans

In [16]:
# if you need a new list of objects:
obs = generate_obj()
keys = [ob['Key'] for ob in obs]

HBox(children=(IntProgress(value=0, max=668), HTML(value='')))




In [5]:
def store_keys(fname, keys):    
    with open(fname, 'w') as f:
        for key in keys:
            f.write(f'{key}\n')
            
#store_keys('./zippedkeys.dat', keys)

In [6]:
def read_keys(fname):
    keys = []
    with open(fname, 'r') as f:
        keys = f.readlines()
    return keys

In [8]:
keys = read_keys('./zippedkeys.dat')
print(f'Read {len(keys)} keys')

Read 83763 keys


In [11]:
import csv
def write_down(prefix, data):
    if len(data) == 0:
        print('Empty...')
        return
    with open(f"/tmp/ex/{prefix.replace('/','-')}.csv", 'w', encoding='utf-8') as f:
        writer = csv.writer(f, delimiter=',')
        for row in list(data):
            if not 'coordinates' in row:
                continue
            writer.writerow([row['date']['utc'], row['parameter'], row['value'], row['unit'], row['coordinates']['latitude'],row['coordinates']['longitude']])


In [9]:
# way of filtering relevant measurments from a given S3 object:
get_relevant_measurements(keys[0].strip(), lambda x: x['parameter']=='o3')

[{'date': {'utc': '2017-12-31T23:00:00.000Z',
   'local': '2018-01-01T00:00:00+01:00'},
  'parameter': 'o3',
  'location': 'AD0942A',
  'value': 57,
  'unit': 'µg/m³',
  'city': 'Escaldes-Engordany',
  'attribution': [{'name': 'EEA',
    'url': 'http://www.eea.europa.eu/themes/air/air-quality'}],
  'averagingPeriod': {'unit': 'hours', 'value': 1},
  'coordinates': {'latitude': 42.5096939994651, 'longitude': 1.539138},
  'country': 'AD',
  'sourceName': 'EEA Andorra',
  'sourceType': 'government',
  'mobile': False},
 {'date': {'utc': '2017-12-31T23:00:00.000Z',
   'local': '2018-01-01T00:00:00+01:00'},
  'parameter': 'o3',
  'location': 'AD0944A',
  'value': 93,
  'unit': 'µg/m³',
  'city': 'Escaldes-Engordany',
  'attribution': [{'name': 'EEA',
    'url': 'http://www.eea.europa.eu/themes/air/air-quality'}],
  'averagingPeriod': {'unit': 'hours', 'value': 1},
  'coordinates': {'latitude': 42.516943999465, 'longitude': 1.56525},
  'country': 'AD',
  'sourceName': 'EEA Andorra',
  'sourc

In [12]:
### mind the slice here, only first 20 records for testing
for key in tqdm_notebook(keys[:20]):
    key = key.strip()
    print(f'Processing: {key}')
    mes = get_relevant_measurements(key, lambda x: x['country']=='DE')
    if len(mes) ==0:
        continue
    print(f"Nr of relevant mes: {len(mes)}")
    write_down(key, mes)
    

HBox(children=(IntProgress(value=0, max=20), HTML(value='')))

Processing: realtime-gzipped/2018-01-01/1514765764.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514766350.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514767062.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514767560.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514768252.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514768946.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514771223.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514771782.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514772927.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514773597.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514774820.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514776113.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514776534.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514777898.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514778404.ndjson.gz
Processing: realtime-gzipped/2018-01-01/1514781516.ndjson.gz
Processing: realtime-gzi

In [13]:
# In case you want to speed up things, but check first
import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=None) as executor:
        future_measurements = {executor.submit(get_relevant_measurements, prefix.strip()): prefix for prefix in keys}
        
        for future in concurrent.futures.as_completed(future_measurements):
            url = future_measurements[future]
            try:
                data = future.result()
            except Exception as exc:
                logging.error('%r generated an exception: %s' % (url, exc))
            else:
                write_down(url, data)

Empty...
Empty...
Empty...
Empty...
Empty...


In [15]:
! ls -altr /tmp/ex

total 660
-rw-rw-r--  1 jj   jj   138327 Oct 30 11:05 tes.csv
drwxrwxr-x  2 jj   jj    20480 Oct 30 11:57 .
drwxrwxrwt 22 root root  40960 Oct 30 12:42 ..
-rw-rw-r--  1 jj   jj   459243 Oct 30 12:47 realtime-gzipped-2018-01-01-1514790882.ndjson.gz.csv
-rw-rw-r--  1 jj   jj     1702 Oct 30 12:47 realtime-gzipped-2018-01-01-1514791644.ndjson.gz.csv
