In [33]:
from dask.distributed import Client

client = Client('10.65.18.58:8786', asynchronous=True)


+-------------+-----------+-----------+-----------+
| Package     | client    | scheduler | workers   |
+-------------+-----------+-----------+-----------+
| distributed | 2021.01.1 | 2021.02.0 | 2021.02.0 |
| lz4         | 3.1.3     | 3.1.1     | 3.1.1     |
| msgpack     | 1.0.2     | 1.0.0     | 1.0.0     |
| numpy       | 1.20.1    | 1.19.4    | 1.19.4    |
+-------------+-----------+-----------+-----------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6


In [16]:
print(client)
client

<Client: 'tcp://172.18.0.4:8786' processes=3 threads=12, memory=49.97 GB>


0,1
Client  Scheduler: tcp://10.65.18.58:8786  Dashboard: http://10.65.18.58:8787/status,Cluster  Workers: 3  Cores: 12  Memory: 49.97 GB


In [17]:
import dask.dataframe as dd
import dask.bytes as db
import datetime
import pandas as pd
import math
import boto3
import json

In [18]:
import os
import sys
module_path = os.path.abspath(os.path.join('../flows'))
if module_path not in sys.path:
    sys.path.append(module_path)

In [19]:
import tiles_pb2
from common import to_normalized_time, get_storage_options, extract_region_columns, join_region_columns, save_regional_aggregation

In [20]:
client.upload_file('../flows/tiles_pb2.py')
client.upload_file('../flows/common.py')

{'tcp://10.0.35.27:36223': {'status': 'OK'},
 'tcp://10.0.35.28:32956': {'status': 'OK'},
 'tcp://10.0.35.5:40228': {'status': 'OK'}}

In [21]:
# Configs

source = {
    'endpoint_url': 'http://10.65.18.73:9000',
    'region_name':'us-east-1',
    'key': 'foobar',
    'secret': 'foobarbaz',
    'bucket': 'test'
}

dest = {
    'endpoint_url': 'http://10.65.18.73:9000',
    'region_name': 'us-east-1',
    'key': 'foobar',
    'secret': 'foobarbaz',
    'bucket': 'experiments'
}

# This determines the number of bins(subtiles) per tile. Eg. Each tile has 4^6=4096 grid cells (subtiles) when LEVEL_DIFF is 6
# Tile (z, x, y) will have a sutbile where its zoom level is z + LEVEL_DIFF
# eg. Tile (9, 0, 0) will have (15, 0, 0) as a subtile with LEVEL_DIFF = 6
LEVEL_DIFF = 6
MIN_SUBTILE_PRECISION = LEVEL_DIFF # since (0,0,0) main tile wil have (LEVEL_DIFF, x, y) subtiles as its grid cells

# Note: We need to figure out the spatial resolution of a run output in advance. For some model, 15 precision is way too high. 
# For example, lpjml model covers the entire world in very coarse resolution and with 15 precision, it takes 1 hour to process and upload
# the tiles resulting 397395 tile files. (uploading takes most of the time ) 
# where it takes only a minitue with 10 precision. And having high precision tiles doesn't make 
# significant difference visually since underlying data itself is very coarse.
MAX_SUBTILE_PRECISION = 14

# Maximum zoom level for a main tile
MAX_ZOOM = MAX_SUBTILE_PRECISION - LEVEL_DIFF

s_bucket = source['bucket']
# TODO: provide these as input parameters
model_id = '2fe40c11-8862-4ab4-b528-c85dacdc615e'
run_id = '04f97328-2c73-48ce-8020-d74632336670'
#parquet_path = f's3://{s_bucket}/geo-test-data.parquet'
parquet_path = f's3://{s_bucket}/{model_id}/{run_id}/*.parquet'



In [22]:
parquet_path

's3://test/2fe40c11-8862-4ab4-b528-c85dacdc615e/04f97328-2c73-48ce-8020-d74632336670/*.parquet'

In [23]:
# Read parquet files in as set of dataframes
df = dd.read_parquet(parquet_path,
    storage_options={
        'anon': False,
        'use_ssl': False,
        'key': source['key'],
        'secret': source['secret'],
        'client_kwargs':{
            'region_name': source['region_name'],
            'endpoint_url': source['endpoint_url']
        }
    }).repartition(npartitions = 100)
df.dtypes

timestamp    datetime64[ns]
lat                 float64
lng                 float64
feature              object
value               float64
country              object
admin1               object
admin2               object
admin3               object
dtype: object

In [24]:
# Temporal aggregation (compute for both sum and mean)
time_res = 'month'

columns = df.columns.tolist()
columns.remove('value')

t = dd.to_datetime(df['timestamp'], unit='s').apply(lambda x: to_normalized_time(x, time_res), meta=(None, 'int'))
temporal_df = df.assign(timestamp=t) \
                .groupby(columns)['value'].agg(['sum', 'mean'])
# Rename agg column names
temporal_df.columns = temporal_df.columns.str.replace('sum', 't_sum').str.replace('mean', 't_mean')
temporal_df = temporal_df.reset_index()

In [25]:
temporal_df.tail()

Unnamed: 0,timestamp,lat,lng,feature,country,admin1,admin2,admin3,t_sum,t_mean
77949,1583020800,9.375,38.542,production,Ethiopia,Oromia,Mirab Shewa,Adda Berga,999.0,999.0
77950,1583020800,9.375,38.625,production,Ethiopia,Oromia,North Shewa,Mulo,89.0,89.0
77951,1583020800,9.375,38.708,production,Ethiopia,Oromia,North Shewa,Sululta,115.0,115.0
77952,1583020800,9.458,38.542,production,Ethiopia,Oromia,Mirab Shewa,Adda Berga,313.0,313.0
77953,1583020800,9.458,38.625,production,Ethiopia,Oromia,North Shewa,Mulo,32.0,32.0


In [30]:
# save timeseries as a json file
def save_timeseries(df, dest, model_id, run_id, time_res, timeseries_agg_columns):
    for col in timeseries_agg_columns:
        timeseries_to_json(df[['timestamp', col]], dest, model_id, run_id, df['feature'].values[0], time_res, df['region_id'].values[0], col)

# write timeseries to json
def timeseries_to_json(df, dest, model_id, run_id, feature, time_res, region_id, column):
    bucket = dest['bucket']
    col_map = {}
    col_map[column] = 'value'
    df.rename(columns=col_map, inplace=False).to_json(f's3://{bucket}/{model_id}/{run_id}/{time_res}/{feature}/regional/country/timeseries/{region_id}/{column}.json',
        orient='records',
        storage_options=get_storage_options(dest))

In [32]:
%%time
regions_cols = extract_region_columns(df)
level = 3
# do for all level
timeseries_df = temporal_df.copy()
timeseries_df['region_id'] = join_region_columns(timeseries_df, level)
timeseries_aggs = ['min', 'max', 'sum', 'mean']
timeseries_lookup = {
    ('t_sum', 'min'): 's_min_t_sum', ('t_sum', 'max'): 's_max_t_sum', ('t_sum', 'sum'): 's_sum_t_sum', ('t_sum', 'mean'): 's_mean_t_sum',
    ('t_mean', 'min'): 's_min_t_mean', ('t_mean', 'max'): 's_max_t_mean', ('t_mean', 'sum'): 's_sum_t_mean', ('t_mean', 'mean'): 's_mean_t_mean'
}
timeseries_agg_columns = ['s_min_t_sum', 's_max_t_sum', 's_sum_t_sum', 's_mean_t_sum', 's_min_t_mean', 's_max_t_mean', 's_sum_t_mean', 's_mean_t_mean']

timeseries_df = timeseries_df.groupby(['feature', 'region_id', 'timestamp']).agg({ 't_sum' : timeseries_aggs, 't_mean' : timeseries_aggs })
timeseries_df.columns = timeseries_df.columns.to_flat_index()
timeseries_df = timeseries_df.rename(columns=timeseries_lookup).reset_index()
timeseries_df = timeseries_df.repartition(npartitions = 12).groupby(['feature', 'region_id']).apply(
    lambda x: save_timeseries(x, dest, model_id, run_id, time_res, timeseries_agg_columns),
    meta=(None, 'object'))
timeseries_df.compute()

CPU times: user 223 ms, sys: 10.3 ms, total: 233 ms
Wall time: 24.7 s


Series([], dtype: object)