In [1]:
import dask
import dask.dataframe as dd
import glob
import pandas as pd
from pathlib import Path 

import numpy as np
import scipy.stats as sps
from sklearn import linear_model

from distributed import Client
client = Client()

In [2]:
PATH_MAVEN = Path("/home/marek")
PATH_NGI = Path(PATH_MAVEN) / "maven" / "data" / "sci" / "ngi"
PATH_NGI_L2 = Path(PATH_NGI) / "l2"

In [6]:
test_dir = Path(PATH_NGI_L2, "2019/04/*0401*.csv")

In [7]:
meta_cols = {
    "orbit": int,
    "alt": float,
    "species": str,
    "abundance": float,
    "t_unix": float
}

In [8]:
ddf = dd.read_csv(
    test_dir, 
    assume_missing=True, 
    usecols=["orbit", "alt", "species", "abundance", "t_unix"],
    dtype=meta_cols,
    na_values = [" ", "-999"]
)

In [13]:
ddf.npartitions

8



In [12]:
ddf.compute()

Unnamed: 0,t_unix,orbit,alt,species,abundance
0,1.554084e+09,8821,438.7639,Ar,0.0
1,1.554084e+09,8821,437.1751,Ar,0.0
2,1.554084e+09,8821,435.5895,Ar,0.0
3,1.554084e+09,8821,434.0071,Ar,0.0
4,1.554084e+09,8821,432.4278,Ar,0.0
...,...,...,...,...,...
4303,1.555393e+09,8920,474.6245,O,
4304,1.555393e+09,8920,476.2508,O,
4305,1.555393e+09,8920,477.8802,O,
4306,1.555393e+09,8920,479.5128,O,


In [203]:
def cull_empty_partitions(df):
    ll = list(df.map_partitions(len).compute())
    df_delayed = df.to_delayed()
    df_delayed_new = list()
    pempty = None
    for ix, n in enumerate(ll):
        if 0 == n:
            pempty = df.get_partition(ix)
        else:
            df_delayed_new.append(df_delayed[ix])
    if pempty is not None:
        df = dd.from_delayed(df_delayed_new, meta=pempty)
    return df

In [204]:
ddf = ddf[(ddf["abundance"] > 0.) & (ddf["alt"] < 250) & (ddf["species"].isin(["Ar", "N2"]))]
ddf = ddf.set_index('orbit', sorted=True, drop=False)
#ddf = cull_empty_partitions(ddf)

In [205]:
ddf_persisted = ddf.persist()

In [206]:
ddf_culled = cull_empty_partitions(ddf_persisted)

In [207]:
ddf_culled_persisted = ddf_culled.persist()

In [208]:
@dask.delayed()
def IO_orb(orbdata,io='I'):
    minalt = orbdata['alt'].min()
    peri_t = orbdata[orbdata['alt']==minalt]['t_unix'].unique()
    #if len(peri_t)>1:
    #    sys.exit('Non-unique time found at periapse '+str(orbdata['orbit'].unique()))
    #else:
    if io == 'I':
        return orbdata[orbdata['t_unix']<=peri_t[0]]
    elif io =='O':
        return orbdata[orbdata['t_unix']>peri_t[0]]
    else:
        return orbdata

@dask.delayed()
def df_from_orb_range(daskdf, orbit, orb_step=5):
    look_for_orbs = list(range(orbit - orb_step, orbit + orb_step + 1))
    temp_ddf = daskdf[daskdf.index.isin(look_for_orbs)]
    #temp_df = temp_ddf.compute()
    return temp_ddf

@dask.delayed
def pivot_df(df):
    temp_df = df.reset_index(drop=True)
    temp_df = temp_df.pivot_table(values=["abundance"], index=["orbit","alt", "species"]).unstack()
    return temp_df

@dask.delayed()
def make_ratio_col(df):
    df["N2/Ar"] = df["abundance"]["N2"] / df["abundance"]["Ar"]
    df = df["N2/Ar"].reset_index().dropna(subset=["alt", "N2/Ar"])
    return df

@dask.delayed()
def fit_ratio_alt(df):
    x = df[["alt"]]
    y = np.log(df["N2/Ar"])
    lr = linear_model.LinearRegression()
    lr.fit(x, y)
    return lr

@dask.delayed()
def hp_from_fit(ratio, slope, intercept):
    return (np.log(ratio)-intercept)/slope

In [213]:
def print_orb(data):
    data = IO_orb(data)
    orb = data.index.unique()[0]
    df = df_from_orb_range(ddf, orb, orb_step=5)
    norbs = df.index.unique().shape[0]
    df = pivot_df(df)
    df = make_ratio_col(df)
    fit = fit_ratio_alt(df)
    hp = hp_from_fit(1.25, fit.coef_[0],fit.intercept_)
    return [orb, hp, fit.coef_[0], fit.intercept_, norbs]

In [214]:
ddf_hp_map = ddf_culled_persisted.map_partitions(print_orb)
#ddf_hp_map = ddf.map_partitions(print_orb)

In [215]:
%%time 
ddf_hp_map = ddf_hp_map.compute()

CPU times: user 28.6 s, sys: 1.21 s, total: 29.8 s
Wall time: 40 s


In [216]:
%%time
newdf = pd.DataFrame(
    [dask.compute(x)[0] for x in ddf_hp_map],
    columns=["orb", "hp_alt", "fit_slope", "fit_intercept", "norbs"]
)
newdf

CPU times: user 1min 23s, sys: 5.92 s, total: 1min 29s
Wall time: 1min 42s


Unnamed: 0,orb,hp_alt,fit_slope,fit_intercept,norbs
0,8821,111.968972,0.027313,-2.835060,6
1,8822,112.210630,0.027523,-2.865185,7
2,8823,111.717642,0.027276,-2.824015,8
3,8824,112.062220,0.027367,-2.843663,9
4,8825,112.195111,0.027253,-2.834563,10
...,...,...,...,...,...
171,9013,97.414886,0.028407,-2.544153,9
172,9014,96.516582,0.028174,-2.496143,8
173,9015,96.516582,0.028174,-2.496143,8
174,9016,96.581030,0.028208,-2.501182,7


In [111]:
months = len([2015, 2016, 2017, 2018, 2019]) * 12
months

60



In [217]:
60*1.5 / 60

1.5