In [1]:
import os, sys, re, io, pathlib
import pandas as pd
import numpy as np
from scipy import interpolate
import hiplot as hip
import klib
import seaborn as sns
from dask import dataframe as dd

from dask.diagnostics import ProgressBar

buffer = io.StringIO()
mix = pd.IndexSlice

# define the current path (notebooks in lab_utils)
labutilspath = str(pathlib.Path(os.getcwd()).parents[1])
sys.path.append(labutilspath)

# import the autoscan routines
from autoscan import autoscan

pp = autoscan.basics()

def probeix(df, vmin, vmax):
    a = (df >= vmin)
    b = (df <= vmax)
    if not isinstance(df, pd.Series):
        a = a.all(axis = 1)
        b = b.all(axis = 1)
    
    return np.logical_and(a, b)

def test(x, th = 0.5, vmin = 0, vmax = 1e6):
    s = probeix(x, vmin = vmin, vmax = vmax)
    test = s.sum() / len(s) >= th
#     ntrue  = s.sum()
#     nfalse = (s == False).sum()
    return test

def get_wrong_measurements(df, probe = None, desc = None, th = 0.5, vmin = 0, vmax = 1e6):
#     desc2 = desc.sort_index()
    if desc is None:
        levels = df.index.names
    else:
        levels = desc.index.names
    
    ix = df.groupby(level = levels).apply(test, vmin = vmin, vmax = vmax)
    
    if not np.logical_or(probe is None, desc is None):
        out = desc.loc[ix[ix == False].index].query("probe == '%s'" %(probe))
        out = (out, ix)
    else:
        out = ix
    
    return out

def pprint(msg, msg_title = '', msg_decorator = '#', len_decorator = 40):
    nhead = len_decorator - len(msg_title) - 2
    if nhead <= 0:
        nhead = 1
        nfoot = len(msg_title) + 4
    else:
        nfoot = len_decorator
    
    top_decorator = msg_decorator * (nhead // 2) 
    print(top_decorator + ' ' + msg_title  +  ' ' + top_decorator, 
          msg, nfoot * '#' + '\n',
          sep = '\n')
    return

def dfinfo(df, header = 'info'):
    with io.StringIO() as buffer:
        df.info(buf = buffer)
        pprint(buffer.getvalue(), msg_title = header)

def interp_on_nans(d, debug = False, extrap = np.mean, coords = ['x', 'y']):
    mask = d.iloc[:, -1].isna().values == False
    if not mask.all():
    #     x, y, v = d.values.T
        c = (d.iloc[:, :2] == d.iloc[0, :2]).all() == False
        c = d.columns[np.append(c, True)]
        mean = extrap(d.iloc[mask, -1])
        if len(c) == 2:
            v = np.interp(d.loc[mask == False, c[0]], d.loc[mask, c[0]], d.iloc[mask, -1], left = mean, right = mean)
        elif len(c) == 3:
            try:
                v = interpolate.griddata(d.loc[mask, coords], d.iloc[mask, -1], d.loc[mask == False, coords], fill_value = mean)
            except:
                if debug: print('interp failed for: ', g)
                v = mean
        
        d.iloc[mask == False, -1] = v
    return d


In [2]:
# from dask.distributed import Client, LocalCluster
# client = Client()
# cluster = LocalCluster()
# client = Client(cluster)
# cluster

In [7]:
pbar = ProgressBar()
pbar.register()

In [8]:
# datapath = '/home/urlab/sandbox/data/characterization/autoscan/autoscan.h5'
datapath = '/home/urlab/sandbox/data/characterization/autoscan/autoscan_parallel.h5'
savepath = '/home/urlab/Documents/'

# load the data
da = dd.read_hdf(datapath, '/data')
desc = dd.read_hdf(datapath, '/description').compute()

[########################################] | 100% Completed |  0.1s


In [9]:
def ftir_row_stats(df: dd.DataFrame) -> dd.DataFrame:
    return (
        df
        .assign(
            l_mean = lambda df: df.iloc[:, 2:1754].mean(axis = 1),          
            l_std = lambda df: df.iloc[:, 2:1754].std(axis = 1),
            # l_median = lambda df: np.median(df.iloc[:, 2:1754], axis = 1)
        )
    )

def clean_dataframe(df: dd.DataFrame) -> dd.DataFrame:
    return (
        df
        .where(df >= 0, np.nan)
        .astype(np.float32)
    )

def enforce_limits(df: dd.DataFrame) -> dd.DataFrame:
    for k, p in pp.probe_settings.items():
        v = p['col'][2:]
        vmin, vmax = p['limits']
        df[v] = df[v].where(((df[v] >= vmin) & (df[v] <= vmax)), np.nan)
    return df

def compute_final_dataframe(df: dd.DataFrame, workers = 20) -> pd.DataFrame:
    """Execute dask task graph and compute final results"""
    return (
        df
        .compute(num_workers = 20)
    )

def hip_visualize(df, pcols = None):
    dp = df.reset_index().loc[:, np.append(['family', 'code'], pcols)]
    s = hip.Experiment.from_dataframe(dp).display();
    return s

In [10]:
db = clean_dataframe(da)
db = enforce_limits(db)
db = ftir_row_stats(db)

In [11]:
%%time 
df = db.compute()

[########################################] | 100% Completed |  7.2s
[########################################] | 100% Completed |  7.3s
CPU times: user 27.5 s, sys: 2.32 s, total: 29.8 s
Wall time: 29.8 s


In [None]:
hip_visualize(df, df.columns[-9:-2])

# pre-processing

1. pre-clean the dataset
 - remove duplicated rows
 - enforce correct dtypes 
 - reduce memory overhead
 - do not remove missing values
1. create a set of variables to summarize the ftir data
1. visualize
 - relations in the dataset suing `hip.Experiment.from_dataframe(df).display()`
 - distribution of missing values  `klib.missingval_plot`

## pre-cleaning
first cleaning of the data before inputation.
overwrite the df since there is no need to refer to it anymore.

In [None]:
# for the record print the information of the original dataframe
dfinfo(df, 'raw data')

In [None]:
# pre-clean, do not remove missing values
df = klib.data_cleaning(df, drop_threshold_rows = 1.0)

In [None]:
# print the information of the cleaned dataframe
dfinfo(df, 'raw data cleaned')

## ftir stats (basic)

## visualize
### feature flow with hip
in this step the features of the ftir can be summarized within the feature `l_mean`

visualize the flow of values in the original dataset

In [None]:
hip_visualize(df, df.columns[-9:-2])

### missing values wtih klib
use `klib` to visualize the missing values in the dataset

In [None]:
klib.missingval_plot(df.loc[:,pcols]);

# missing values & outliers

In [None]:
desc_ix = df.index.droplevel(6).drop_duplicates()
ds = desc.loc[desc_ix].copy()

In [None]:
probe = 'vel'

# get the columns for velocity
vcols = pp.probe_settings[probe]['col'][:4]

# get min and max expected for measurement
vmin, vmax = pp.probe_settings[probe]['limits']

# pcols = df.columns[df.columns.str.startswith(probe[0])]
ncols = len(vcols[2:])

# checkout features
dv = df.loc[:, vcols[:-1]]

In [None]:
# dfinfo(dv, 'info of raw')
## firtst check how they are distributed
pprint(dv.describe().apply(np.round, decimals = 2), 'raw data')

## identify offending values and those to keep
vix = probeix(dv.iloc[:, -1], vmin = vmin, vmax = vmax)

## set nan for all incorrect values
dv.iloc[vix.values == False, -1] = np.nan
pprint(dv.describe().apply(np.round, decimals = 2), 'correct measurements')

# get the labels that have problem
out, ix = get_wrong_measurements(dv.iloc[:, -1], probe = probe, desc = ds, vmin = vmin, vmax = vmax)
ixd = ixdp = dv.loc[ix == True].index
ixdn = dv.loc[ix == False].index

pprint(dv.loc[ix, :].describe().apply(np.round, decimals = 2), 'only approved samples')

# the ix returned from `get_wrong_measurements` keeps only samples where  more than a threshold percent (`th`) of values are correct.
# samples that don't meet this criteria are lost. This is different than probeix, which only asserts if the values are within a range independently of their sample. 

# ix can be used to do basic data inputation on the sample, for example by filling it with the mean
dp = dv.copy()
dp.loc[ix, :] = dv.loc[ix, :].groupby(level = ds.index.names).apply(interp_on_nans)
pprint(dp.describe().apply(np.round, decimals = 2), 'correct & interp data (all samples)')

dp = dp.groupby(level = ds.index.names).apply(lambda x: x.fillna(x.mean()))
pprint(dp.describe().apply(np.round, decimals = 2), 'correct & interp data (all samples, fillna)')
# dv.iloc[[ix == True], -1] = dv.iloc[ix == True, -1].groupby(level = desc.index.names).apply(lambda x: x.fillna(x.mean()))
# # ixs = dv.dropna().index

# # set all the samples that did not meet the criteria to nan
# # dv2 = dv.copy()
# # dv2.loc[mix[ix == False, :]] = np.nan
# # dfinfo(dv2, 'info of mix')

pprint('index\t len\t +\t -\t \nvix\t %d \nix\t %d \t %d \t %d \nixd\t %d \t %d \t %d' 
       % tuple([len(x) for x in [vix, ix, ix[ix == True], ix[ix == False], ixd, ixdp, ixdn]]),
      'index lengths')

In [None]:
# dp.loc[:, vcols[-1]].dropna().groupby('code').hist();#agg({vcols[-1] : ['mean', 'std', 'median','count']})
import matplotlib.pyplot as plt
plt.style.use('ggplot')
fig, ax = plt.subplots(figsize=(10,5))
dp.loc[ix, vcols[-1]].groupby(level = 'code').plot(kind = 'kde', y = vcols[-1], grid = True, ax = ax, label = 'code');
#hist(by = 'code', column = vcols[-1], sharex = True, sharey = True);
# ds = pd.DataFrame(columns = ['step', vcols[-1]])
# [d.loc[k, vcols[-1]].reset_index(drop = True) for k in [[True]*dv.shape[0], vix, ix] for d in [dv, dp]]

In [None]:
v = np.append(velcols, [permcol, hammcol])
v = np.append(['family','code'], v)
df2 = df.loc[ix, :].reset_index(drop = False)
# df2.loc[ixd.values, v.tolist()]

In [None]:
# https://seaborn.pydata.org/generated/seaborn.violinplot.html
# https://levelup.gitconnected.com/scikit-learn-python-6-useful-tricks-for-data-scientists-1a0a502a6aa3
# https://towardsdatascience.com/speed-up-your-data-cleaning-and-preprocessing-with-klib-97191d320f80
# https://pythondata.com/dask-large-csv-python/
# https://github.com/wiseio/paratext

In [None]:


# perm = df.loc[:, [permcol]]
# pprint(perm.describe())
# permidx = probeix(perm, vmin = 0, vmax = np.inf)
# ## check the data makes sense
# pprint(perm.loc[permidx, :].describe())

# ## print the labels that have problem
# out, _  = get_wrong_measurements(perm, 'perm', desc)
# out.sample(10)

In [None]:
# klib.dist_plot(df2.loc[:, v[[1,2,3]]])

In [None]:
# dp.iloc[:, -1].groupby('code').apply(klib.dist_plot)

In [None]:
# remove offending values and keep just good measurements
idx = np.logical_and(velidx.values, permidx.values)
dc = df.loc[idx, :]

In [None]:
# alternative, fill the values with means or nans
vels.loc[velidx == False, :] = np.nan
desc.loc[vels.index[velidx == False].droplevel(6).drop_duplicates()].query("probe == 'vel'")

In [None]:
# x = velidx[velidx == False].index.droplevel(6).drop_duplicates()
# for t in x:
#     vels.loc[mix[[*t], :], :]

In [None]:
# means = vels.groupby(level = velidx.index.names[:-1], sort = False).apply(np.mean)

# for dd in means.index:
#     pass
# dd

In [None]:
# mix = pd.IndexSlice
# mix[[*ix], :]
# vels.loc(axis = 0)[mix[[dd], :], :]

In [None]:
# def myquery(x, vels, velmin = 0.5e3, velmax = v):
#     v = vels.copy()
#     s = v.loc[x, :].shape[0]
#     idx = np.logical_and((v >= velmin).all(axis = 1), (v <= velmax).all(axis = 1))
#     skeep = np.sum(idx)
#     sdrop = np.sum(idx == False)
#     return s, skeep, sdrop

# for a, b in idx.loc[revise_idx, :].groupby(level = idx.index.names[2:-1]):
#     pass#.describe()

# x = revise_idx.droplevel(6).drop_duplicates()
# idx.loc[slice(x, :), :]

# idx.loc[revise_idx, :]
# idx.groupby(level = idx.index.names[2:-1]).describe()
# .loc[:, 'r'] = 'review'

# mix = pd.IndexSlice
# pd.concat((idx.loc[mix[[*t], :]] for t in x)) 

# mix = pd.IndexSlice
# pd.concat((idx.loc[mix[[*t], :]] for t in x)) 