In [1]:
import d6tflow, luigi
import pandas as pd
import os

Loading postgres module without psycopg2 installed. Will crash at runtime if postgres functionality is used.
Loading S3 module without the python package boto3. Will crash at runtime if S3 functionality is used.


Welcome to d6tflow!


In [2]:
from processing.api_preprocessors import preprocess_raw_dict
from processing.utils import add_Country_from_ISO
from processing.utils import add_ISO

In [3]:
os.listdir('data/indicator/SL2/raw')

['SL2_GGGI.M.csv']

In [4]:
def process_SL2():
    df = pd.read_csv('data/indicator/SL2/raw/SL2_GGGI.M.csv')
    return df[['ISO', 'Year', 'Value']]


In [5]:
def process_SP2():
    df = pd.read_csv('data/indicator/SP2/raw/SP2_GHD.M.csv')

    df = df[df['indicator_name'] == 'Healthcare Access and Quality'][[
        'location_name', 'year_id', 'val']]

    df = df.rename(columns={'location_name': 'Country', 'year_id': 'Year', 'val': 'Value'})
    a = {'lala': 0}
    print(a['l'])
    
    return df

In [6]:
def formatting_step(df):
    df = df.copy()
    df = df.drop(columns='Country', errors='ignore')
    df = add_Country_from_ISO(df)
    df = df.rename(columns={'Variable': 'Indicator'})
    return df

In [7]:

def add_information_pandas(df, information):
    df = df.copy()
    for key in information:
        df[key] = information[key]
    return df

In [8]:
def preprocess_raw_df_from_MANUAL(df, config):
    df = add_information_pandas(df, {k: v for k, v in config.items() if k != 'function'})

    if 'ISO' not in df.columns:
        df = add_ISO(df)
    if 'Country' in df.columns:
        df = df[~df.Country.isin(exceptions_countries)]
        df = df.drop(columns='Country')
    df['From'] = 'MANUAL'
    return df.dropna(subset=['ISO', 'Value', 'Year'])

In [20]:
SP2 = {
        'Variable': 'SP2',
        'function': process_SP2,
        'Description': 'Health care Access and Quality Index',
        'Source': 'Institute for Health Metrics and Evaluation, based on Global Burden of Disease Study 2015 (GBD 2015).',
        'URL': 'http://ghdx.healthdata.org/record/global-burden-disease-study-2015-gbd-2015-healthcare-access-and-quality-index-based-amenable'
    }

SL2 = {
        'Variable': 'SL2',
        'function': process_SL2,
        'Description': 'Share agriculture organic to total agriculture land area (Percent)',
        'Source': 'FAOSTAT',
        'URL': 'http://www.fao.org/faostat/en/#data/EL'
    }

configs = {'SL2': SL2, 'SP2': SP2}


exceptions_countries = ['Southern Africa', 'Southern Sub-Saharan Africa',
                        'Micronesia', 'Bassas da India', 'French Guyana',
                        'China, mainland', 'Czechoslovakia', 'Gilbert Islands (Kiribati)',
                        'Phoenix Islands (Kiribati)', 'Line Islands (Kiribati)']


In [23]:
SP2['function']()

KeyError: 'l'

In [15]:
class PreProcess(d6tflow.tasks.TaskCSVPandas):
    indicator = luigi.Parameter()
    def run(self):
        indicator_config = configs[self.indicator]
        df = indicator_config['function']()
        df = preprocess_raw_df_from_MANUAL(df, indicator_config)
        self.save(df)
            
@d6tflow.requires(PreProcess)
class Process(d6tflow.tasks.TaskCSVPandas):
    def run(self):
        df = self.input().load()
        df = formatting_step(df)
        self.save(df)

In [31]:
d6tflow.invalidate_upstream(Process('SP2'))

#d6tflow.invalidate_downstream(Process('SL2'))


Compeleted tasks to invalidate:
[Process(indicator=SP2), PreProcess(indicator=SP2)]
Confirm invalidating tasks (y/n)y


In [32]:
d6tflow.run(Process('SP2'))

ERROR: [pid 26066] Worker Worker(salt=553870189, workers=1, host=simon-NBLK-WAX9X, username=simon, pid=26066) failed    PreProcess(indicator=SP2)
Traceback (most recent call last):
  File "/home/simon/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 191, in run
    new_deps = self._run_get_new_deps()
  File "/home/simon/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 133, in _run_get_new_deps
    task_gen = self.task.run()
  File "<ipython-input-15-03078f8c4e3d>", line 5, in run
    df = indicator_config['function']()
  File "<ipython-input-5-c9366a267b46>", line 9, in process_SP2
    print(a['l'])
KeyError: 'l'


RuntimeError: Exception found running flow, check trace. For more details see https://d6tflow.readthedocs.io/en/latest/run.html#debugging-failures

In [30]:
Process('SP2').outputLoad()

Unnamed: 0,ISO,Year,Value,Indicator,Description,Source,URL,From,Country
0,CHN,1990,49.5,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
1,CHN,1995,53.7,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
2,CHN,2000,57.8,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
3,CHN,2005,63.7,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
4,CHN,2010,69.9,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
...,...,...,...,...,...,...,...,...,...
1171,SDN,1995,38.8,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan
1172,SDN,2000,42.0,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan
1173,SDN,2005,44.9,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan
1174,SDN,2010,47.4,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan


In [17]:
d6tflow.run([Process('SL2'),Process('SP2')])


===== Luigi Execution Summary =====

Scheduled 4 tasks of which:
* 2 complete ones were encountered:
    - 2 PreProcess(indicator=SL2,SP2)
* 2 ran successfully:
    - 2 Process(indicator=SL2,SP2)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====



LuigiRunResult(status=<LuigiStatusCode.SUCCESS: (':)', 'there were no failed tasks or missing dependencies')>,worker=<luigi.worker.Worker object at 0x7fab6629d490>,scheduling_succeeded=True)

Unnamed: 0,ISO,Year,Value,Indicator,Description,Source,URL,From,Country
0,CHN,1990,49.5,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
1,CHN,1995,53.7,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
2,CHN,2000,57.8,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
3,CHN,2005,63.7,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
4,CHN,2010,69.9,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,China
...,...,...,...,...,...,...,...,...,...
1171,SDN,1995,38.8,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan
1172,SDN,2000,42.0,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan
1173,SDN,2005,44.9,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan
1174,SDN,2010,47.4,SP2,Health care Access and Quality Index,"Institute for Health Metrics and Evaluation, b...",http://ghdx.healthdata.org/record/global-burde...,MANUAL,Sudan


In [None]:
formatting_step(PreProcess().outputLoad())