In [1]:
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
import pycountry_convert as pc

from pypely import pipeline, fork, merge, identity
from pypely.memory import memorizable
from pypely.helpers import side_effect

@memorizable
def load_covid_cases():
    date = _yesterday()
    source = f"https://raw.githubusercontent.com/CSSEGISandData/COVID-19/master/csse_covid_19_data/csse_covid_19_daily_reports/{date}.csv"

    return pd.read_csv(source)


def aggregate_to_country_level(df):
    return df.groupby(['Country_Region', 'Last_Update']).agg(dict(
        Confirmed=np.sum,
        Deaths=np.sum,
        Recovered=np.sum,
        Active=np.sum,
    )).reset_index()


def add_continent_name(df):
    return df.assign(continent=df["Country_Region"].apply(_continent_from_country_name))


def filter(*, column, value):
    def _filter(df):
        return df[df[column] == value]
    
    return _filter


@memorizable
def combine_with_raw_data(df, raw_data):
    keys = ["Country_Region", "Last_Update"]

    join = lambda left, right: left.merge(right, left_on=keys, right_on=keys)

    pipe = pipeline(
        fork(
            lambda: _add_postfix_to_column_name(df, keep=keys, postfix="country"),
            lambda: _pre_process_raw(raw_data),
        ),
        merge(join)
    )

    return pipe()


def calculate_state_ratio(df):
    columns = ["Confirmed", "Deaths", "Recovered", "Active"]
    postfixes = ["state", "country"]

    columns_to_drop = []
    [[columns_to_drop.append(f"{col}_{postfix}") for col in columns] for postfix in postfixes]

    drop_columns = lambda df: df.drop(columns_to_drop, axis=1)


    pipe = pipeline(
        _divide("Confirmed_state", "Confirmed_country", "Confirmed_ratio"),
        _divide("Deaths_state", "Deaths_country", "Deaths_ratio"),
        _divide("Recovered_state", "Recovered_country", "Recovered_ratio"),
        _divide("Active_state", "Active_country", "Active_ratio"),
        drop_columns
    )

    return pipe(df)


def select(*, columns):
    def _select(df):
        return df[columns]
    return _select


def store(*, path):
    def _store(df):
        return df.to_csv(path, index=False)
    return _store


'''
*******
HELPERS
*******
'''


def _divide(col1, col2, result_col):
    def __inner(df):
        try:
            df[result_col] = df[col1] / df[col2]
        except ZeroDivisionError as e:
            print(f"ZeroDivisionError for column {col2}: {e}")
            df[result_col] = -1
        return df
    
    return __inner


def _add_postfix_to_column_name(df, keep, postfix):
    rename = lambda name: f"{name}_{postfix}" if name not in keep else name
    return df.rename(columns=rename)



def _pre_process_raw(df):
    keep_column_names = ["Country_Region", "Last_Update", "Province_State"]
    
    def _clean_columns(df):
        return df.drop(["FIPS", "Admin2", "Lat", "Long_", "Combined_Key"], axis=1)
    
    def _rename_columns(df):
        return df.rename(columns=lambda name: f"{name}_state" if not name in keep_column_names else name)

    process = pipeline(
        _clean_columns,
        _rename_columns
    )

    return process(df)


def _continent_from_country_name(country_name):
    convert = pipeline(
        pc.country_name_to_country_alpha2,
        pc.country_alpha2_to_continent_code,
        pc.convert_continent_code_to_continent_name
    )
    try:
        return convert(country_name)
    except Exception as e:
        return "Unknown"


def _yesterday():
    pipe = pipeline(
        datetime.today,
        lambda today: today - timedelta(days=1),
        _johns_hopkins_date_format
    )

    return pipe()


def _johns_hopkins_date_format(date):
    return date.strftime("%m-%d-%Y")

In [2]:
result_columns = ["Country_Region",	"Province_State", "Confirmed_ratio", "Deaths_ratio"]

example = pipeline(
    load_covid_cases >> "raw_data",
    aggregate_to_country_level,
    add_continent_name,
    filter(column="continent", value="Europe"),
    combine_with_raw_data << "raw_data",
    calculate_state_ratio,
    select(columns=result_columns),
    store(path="complex.csv")
)

In [3]:
from pypely.visual import draw
from pathlib import Path

example_path = Path("example.html")
draw(example, browser=True)

In [7]:
example()

PipelineStepError: The step with name 'load_covid_cases' failed
  The last step before the failed step was 'load_covid_cases'
  'load_covid_cases' from File "/var/folders/pf/vq3kw_lj11d5yrk397nvvwc0svfwfb/T/ipykernel_38690/2608020524.py", line 10
  'load_covid_cases' from File "/var/folders/pf/vq3kw_lj11d5yrk397nvvwc0svfwfb/T/ipykernel_38690/2608020524.py", line 10

In [6]:
test = "<lambda>_test"
test.strip('_<>').replace('_', ' ')

'lambda> test'

In [None]:
%%html
<script src="https://cdn.rawgit.com/knsv/mermaid/bc5f73daa2769b666ccf442fa4f6d7fd4995feb0/dist/mermaid.full.js">
    mermaid.init();
</script>
<div class="mermaid">
    flowchart LR
        step [LALLA]
</div>