Data Processing
===
> Handles the processing, including encoding of attributes, creation of sliding windows, adding of start and end events, generation of data loaders.

<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Column-renaming-and-adding-of-start-and-end-events" data-toc-modified-id="Column-renaming-and-adding-of-start-and-end-events-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Column renaming and adding of start and end events</a></span></li><li><span><a href="#Trace-Splitting" data-toc-modified-id="Trace-Splitting-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Trace Splitting</a></span></li><li><span><a href="#Encoding-Techniques" data-toc-modified-id="Encoding-Techniques-3"><span class="toc-item-num">3&nbsp;&nbsp;</span>Encoding Techniques</a></span><ul class="toc-item"><li><span><a href="#PPObj" data-toc-modified-id="PPObj-3.1"><span class="toc-item-num">3.1&nbsp;&nbsp;</span>PPObj</a></span></li><li><span><a href="#Categorization" data-toc-modified-id="Categorization-3.2"><span class="toc-item-num">3.2&nbsp;&nbsp;</span>Categorization</a></span></li><li><span><a href="#Fill-Missing" data-toc-modified-id="Fill-Missing-3.3"><span class="toc-item-num">3.3&nbsp;&nbsp;</span>Fill Missing</a></span></li><li><span><a href="#Z-score" data-toc-modified-id="Z-score-3.4"><span class="toc-item-num">3.4&nbsp;&nbsp;</span>Z-score</a></span></li><li><span><a href="#Date-conversion" data-toc-modified-id="Date-conversion-3.5"><span class="toc-item-num">3.5&nbsp;&nbsp;</span>Date conversion</a></span></li><li><span><a href="#MinMax-Scaling" data-toc-modified-id="MinMax-Scaling-3.6"><span class="toc-item-num">3.6&nbsp;&nbsp;</span>MinMax Scaling</a></span></li><li><span><a href="#One-HoT-Encoding" data-toc-modified-id="One-HoT-Encoding-3.7"><span class="toc-item-num">3.7&nbsp;&nbsp;</span>One HoT Encoding</a></span></li></ul></li><li><span><a href="#Window-Generation" data-toc-modified-id="Window-Generation-4"><span class="toc-item-num">4&nbsp;&nbsp;</span>Window Generation</a></span></li><li><span><a href="#Data-Loader" data-toc-modified-id="Data-Loader-5"><span class="toc-item-num">5&nbsp;&nbsp;</span>Data Loader</a></span><ul class="toc-item"><li><span><a href="#Integration-Samples" data-toc-modified-id="Integration-Samples-5.1"><span class="toc-item-num">5.1&nbsp;&nbsp;</span>Integration Samples</a></span></li></ul></li></ul></div>

In [None]:
#default_exp data_processing

In [None]:
#hide

%load_ext autoreload
%autoreload 2
%matplotlib inline

In [None]:
#export
from dapnn.imports import *


In [None]:
notebook2script(fname='02_data_processing.ipynb')

Converted 02_data_processing.ipynb.


## Column renaming and adding of start and end events



In [None]:
#export
def df_preproc (df,cols=['activity'],start_marker='###start###',end_marker='###end###'):
    # Add event log column
    df['event_id']=df.groupby('trace_id').cumcount()+1    
    # Work with numpy for performance boost
    eid_col = df.columns.to_list().index('event_id')
       
    # Get col idx
    col_idx=[df.columns.to_list().index(c) for c in cols]
        
    data= df.values
    # Add Start Events
    idx= np.where(data[:,eid_col]==1)[0] # start idx
    new = data[idx].copy()
    
    for c in col_idx: new[:,c]=start_marker
    new[:,eid_col]=0
    data = np.insert(data,idx,new, axis=0)
    # Add End Events
    idx= np.where(data[:,eid_col]==0)[0][1:] # start idx without the first
    new = data[idx-1].copy() # get data from current last idx
    for c in col_idx: new[:,c]=end_marker
    
    new[:,eid_col]=new[:,eid_col]+1
    data = np.insert(data,idx,new, axis=0)
    # take care of final last event
    last= data[-1].copy()
    for c in col_idx: last[c]=end_marker

    last[eid_col]+=1
    data = np.insert(data,len(data),last, axis=0)
    df = pd.DataFrame(data,columns=df.columns)
    
    
    return df

In [None]:
fn='data/csv/binet_logs/medium-0.3-4.csv.gz'

In [None]:
df=pd.read_csv(fn)
df.rename({'name':'activity','case:concept:name':'trace_id'},axis=1,inplace=True)
if not 'activity' in df.columns:
    df.rename({'concept:name':'activity'},axis=1,inplace=True)
df.head(2)

Unnamed: 0,activity,timestamp,timestamp_end,anomaly,trace_id,company,country,day,user
0,Activity A,,,Rework,1,Codehow,Comoros,Wednesday,Wilton
1,Activity Z,,,Rework,1,Plussunin,Chad,Tuesday,Iluminada


In [None]:
%%time
df1 =df_preproc(df,cols=['activity','company','country','day','user'])

CPU times: user 24.2 ms, sys: 14 µs, total: 24.3 ms
Wall time: 23.5 ms


Add Start and End event, and rename columns

In [None]:
#export
def import_log(log_path,cols=['activity']):
    df=pd.read_csv(log_path)
    df.rename({'name':'activity','case:concept:name':'trace_id'},axis=1,inplace=True)
    if not 'activity' in df.columns:
        df.rename({'concept:name':'activity'},axis=1,inplace=True)
    df.rename({'case:pdc:isPos':'normal'},axis=1,inplace=True)
    df = df_preproc(df,cols)
    df.index=df.trace_id
    return df

In [None]:
log = import_log('data/csv/PDC2020_ground_truth/pdc_2020_0000000.csv.gz')
log[:35]

Unnamed: 0_level_0,activity,trace_id,case:pdc:costs,normal,event_id
trace_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
trace 1,###start###,trace 1,2.0,False,0
trace 1,t11,trace 1,2.0,False,1
trace 1,t21,trace 1,2.0,False,2
trace 1,t26,trace 1,2.0,False,3
trace 1,t35,trace 1,2.0,False,4
trace 1,t41,trace 1,2.0,False,5
trace 1,t51,trace 1,2.0,False,6
trace 1,t61,trace 1,2.0,False,7
trace 1,t44,trace 1,2.0,False,8
trace 1,t71,trace 1,2.0,False,9


In [None]:
import_log(fn,['activity','company','country','day','user'])

Unnamed: 0_level_0,activity,timestamp,timestamp_end,anomaly,trace_id,company,country,day,user,event_id
trace_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
1,###start###,,,Rework,1,###start###,###start###,###start###,###start###,0
1,Activity A,,,Rework,1,Codehow,Comoros,Wednesday,Wilton,1
1,Activity Z,,,Rework,1,Plussunin,Chad,Tuesday,Iluminada,2
1,Activity AA,,,Rework,1,year-job,Libyan Arab Jamahiriya,Wednesday,Sandra,3
1,Activity AF,,,Rework,1,Donquadtech,Cocos (Keeling) Islands,Friday,Ling,4
...,...,...,...,...,...,...,...,...,...,...
5000,Activity W,,,normal,5000,year-job,Indonesia,Monday,Jimmy,4
5000,Activity V,,,normal,5000,Openlane,Australia,Tuesday,Deloras,5
5000,Activity P,,,normal,5000,Lexiqvolax,Uzbekistan,Wednesday,Rossana,6
5000,Activity B,,,normal,5000,Y-corporation,Heard Island & Mcdonald Islands,Tuesday,Della,7


## Trace Splitting
i.e. splitting in training, validation and test set

The `split_traces` function is used to split an event_log into training, validation and test set. Furthermore, it removes traces that are longer than a specific threshhold.

In [None]:
#export
def drop_long_traces(df,max_trace_len=64,event_id='event_id'):
    df=df.drop(np.unique(df[df[event_id]>max_trace_len].index))
    return df

In [None]:
#export
def RandomTraceSplitter(split_pct=0.2, seed=None):
    "Create function that splits `items` between train/val with `valid_pct` randomly."
    def _inner(trace_ids):
        o=np.unique(trace_ids)
        np.random.seed(seed)
        rand_idx = np.random.permutation(o)
        cut = int(split_pct * len(o))
        return L(rand_idx[cut:].tolist()),L(rand_idx[:cut].tolist())
    return _inner

In [None]:
#export
def split_traces(df,df_name='tmp',test_seed=42,validation_seed=None):
    
    df=drop_long_traces(df)
    ts=RandomTraceSplitter(seed=test_seed)
    train,test=ts(df.index)
    ts=RandomTraceSplitter(seed=validation_seed,split_pct=0.1)
    train,valid=ts(train)
    return train,valid,test

In [None]:
#hide
a1,b1,c1=split_traces(log)
a2,b2,c2=split_traces(log)
test_ne(a1,a2),test_ne(b1,b2),test_eq(c1,c2);

## Encoding Techniques
Categorization, Normalization, One-Hot, etc.


### PPObj
an object, that manages the pre-processing and knows date columns, cat columns and cont columns
with a few convenient functions

In [None]:
#export
class _TraceIloc:
    "Get/set rows by iloc and cols by name"
    def __init__(self,o): self.o = o
    def __getitem__(self, idxs):
        df = self.o.items
        if isinstance(idxs,tuple):
            rows,cols = idxs
            rows=df.index[rows]
            return self.o.new(df.loc[rows,cols])
        else:
            rows,cols = idxs,slice(None)
            rows=np.unique(df.index)[rows]
            return self.o.new(df.loc[rows])

In [None]:
#export
class PPObj(CollBase, GetAttr, FilteredBase):
    "Main Class for Process Prediction"
    _default,with_cont='procs',True
    def __init__(self,df,procs=None,cat_names=None,cont_names=None,date_names=None,y_names=None,splits=None,
                 ycat_names=None,ycont_names=None,inplace=False,do_setup=True):
        if not inplace: df=df.copy()
        if splits is not None: df = df.loc[sum(splits, [])] # Can drop traces
        self.event_ids=df['event_id'].values if hasattr(df,'event_id') else None

        super().__init__(df)

        self.cat_names,self.cont_names,self.date_names=(L(cat_names),L(cont_names),L(date_names))
        self.set_y_names(y_names,ycat_names,ycont_names)

        self.procs = Pipeline(procs)
        self.splits=splits
        if do_setup: self.setup()


    @property
    def y_names(self): return self.ycat_names+self.ycont_names

    def set_y_names(self,y_names,ycat_names=None,ycont_names=None):
        if ycat_names or ycont_names: store_attr('ycat_names,ycont_names')
        else:
            self.ycat_names,self.ycont_names=(L([i for i in L(y_names) if i in self.cat_names]),
                                                L([i for i in L(y_names) if i not in self.cat_names]))
    def setup(self): self.procs.setup(self)
    def subset(self, i): return self.new(self.loc[self.splits[i]]) if self.splits else self
    def __len__(self): return len(np.unique(self.items.index))
    def show(self, max_n=3, **kwargs):
        print('#traces:',len(self),'#events:',len(self.items))
        display_df(self.new(self.all_cols[:max_n]).items)
    def new(self, df):
        return type(self)(df, do_setup=False,
                          **attrdict(self, 'procs','cat_names','cont_names','ycat_names','ycont_names',
                                     'date_names'))
    def process(self): self.procs(self)
    def loc(self): return self.items.loc
    def iloc(self): return _TraceIloc(self)
    def x_names (self): return self.cat_names + self.cont_names
    def all_col_names(self): return ((self.x_names+self.y_names)).unique()
    def transform(self, cols, f, all_col=True):
        if not all_col: cols = [c for c in cols if c in self.items.columns]
        if len(cols) > 0: self[cols] = self[cols].transform(f)
    def new_empty(self): return self.new(pd.DataFrame({}, columns=self.items.columns))
    def subsets(self): return [self.subset(i) for i in range(len(self.splits))] if self.splits else L(self)
properties(PPObj,'loc','iloc','x_names','all_col_names')

def _add_prop(cls, nm):
    @property
    def f(o): return o[list(getattr(o,nm+'_names'))]
    @f.setter
    def fset(o, v): o[getattr(o,nm+'_names')] = v
    setattr(cls, nm+'s', f)
    setattr(cls, nm+'s', fset)

_add_prop(PPObj, 'cat')
_add_prop(PPObj, 'cont')
_add_prop(PPObj, 'ycat')
_add_prop(PPObj, 'ycont')
_add_prop(PPObj, 'y')
_add_prop(PPObj, 'x')
_add_prop(PPObj, 'all_col')

In [None]:
ppObj=PPObj(log,cat_names=['activity'],y_names=['activity'])

In [None]:
ppObj.ycat_names

(#1) ['activity']

In [None]:
ppObj.iloc[0].show(max_n=20) # shows first trace

#traces: 1 #events: 17


Unnamed: 0_level_0,activity
trace_id,Unnamed: 1_level_1
trace 1,start
trace 1,t11
trace 1,t21
trace 1,t26
trace 1,t35
trace 1,t41
trace 1,t51
trace 1,t61
trace 1,t44
trace 1,t71


We can define various pre-processing functions that are executed, when `PPOBj` is instantiated. `PPProc` is the base class for a pre-processing function. It ensures, that setup of a pre-processing function is performed using the training set, and than it is applied to the validation and test set, with the same parameters.

In [None]:
#export
class PPProc(InplaceTransform):
    "Base class to write a non-lazy tabular processor for dataframes"
    def setup(self, items=None, train_setup=False): #TODO: properly deal with train_setup
        super().setup(getattr(items,'train',items), train_setup=False)
        #super().setup(items, train_setup=False)

        # Procs are called as soon as data is available
        return self(items.items if isinstance(items,Datasets) else items)

    @property
    def name(self): return f"{super().name} -- {getattr(self,'__stored_args__',{})}"

### Categorization
i.e ordinal encoding

Implementation of ordinal or integer encoding. Adds NA values for unknown data. Implementation is pretty much taken from fastai.

In [None]:
#export
def _apply_cats (voc, add, c):
    if not is_categorical_dtype(c):
        return pd.Categorical(c, categories=voc[c.name][add:]).codes+add
    return c.cat.codes+add #if is_categorical_dtype(c) else c.map(voc[c.name].o2i)

In [None]:
#export
class Categorify(PPProc):
    "Transform the categorical variables to something similar to `pd.Categorical`"
    order = 2
    def setups(self, to):
        store_attr(classes={n:CategoryMap(to.items.loc[:,n], add_na=True) for n in to.cat_names}, but='to')
    def encodes(self, to):
        to.transform(to.cat_names, partial(_apply_cats, self.classes, 1))
    def __getitem__(self,k): return self.classes[k]

In [None]:
log=import_log('data/csv/PDC2021_ground_truth/pdc2021_000000.csv.gz')
traces=split_traces(log)[0][:100]
splits=traces[:60],traces[60:80],traces[80:100]
o=PPObj(log,None,cat_names='activity',splits=splits)

In [None]:
m=CategoryMap(o.items.loc[:,'activity'])
len(m)

47

In [None]:
cat=Categorify()
cat.setup(o)
len(cat['activity'])

48

In [None]:
df = pd.DataFrame({'a':[0,1,2,0,2]})
to = PPObj(df, Categorify, 'a')
to.show()

#traces: 5 #events: 5


Unnamed: 0,a
0,1
1,2
2,3


In [None]:
log=import_log('data/csv/binet_logs/bpic12-0.3-1.csv.gz')
o=PPObj(log,Categorify,'activity')
o.show()

#traces: 13087 #events: 289892


Unnamed: 0_level_0,activity
trace_id,Unnamed: 1_level_1
173688,73
173688,10
173688,7


### Fill Missing
for continuous values

A pre-processing function that deals with missing data in continuous attributes. Missing data can be replaced with the median, mean or a constant value. Additionaly, we can create another boolean column that indicates, which rows were missing.  Implementation is pretty much taken from fastai.

In [None]:
#export
class FillStrategy:
    "Namespace containing the various filling strategies."
    def median  (c,fill): return c.median()
    def constant(c,fill): return fill
    def mode    (c,fill): return c.dropna().value_counts().idxmax()

In [None]:
#export
class FillMissing(PPProc):
    order=1
    "Fill the missing values in continuous columns."
    def __init__(self, fill_strategy=FillStrategy.median, add_col=True, fill_vals=None):
        if fill_vals is None: fill_vals = defaultdict(int)
        store_attr()

    def setups(self, dsets):
        missing = pd.isnull(dsets.conts).any()
        store_attr(but='to', na_dict={n:self.fill_strategy(dsets[n], self.fill_vals[n])
                            for n in missing[missing].keys()})
        self.fill_strategy = self.fill_strategy.__name__

    def encodes(self, to):
        missing = pd.isnull(to.conts)
        for n in missing.any()[missing.any()].keys():
            assert n in self.na_dict, f"nan values in `{n}` but not in setup training set"
        for n in self.na_dict.keys():
            to[n].fillna(self.na_dict[n], inplace=True)
            if self.add_col:
                to.loc[:,n+'_na'] = missing[n]
                if n+'_na' not in to.cat_names: to.cat_names.append(n+'_na')

In [None]:
fill = FillMissing() 
df = pd.DataFrame({'a':[0,1,np.nan,1,2,3,4], 'b': [0,1,2,3,4,5,6]})
to = PPObj(df, fill, cont_names=['a', 'b'])
to.show()

#traces: 7 #events: 7


Unnamed: 0,a_na,a,b
0,False,0.0,0
1,False,1.0,1
2,True,1.5,2


### Z-score

Calculates standartization, also known as z-score formula. Copied from fastai.

In [None]:
#export
class Normalize(PPProc):
    "Normalize with z-score"
    order = 3
    def setups(self, to):
        store_attr(but='to', means=dict(getattr(to, 'train', to).conts.mean()),
                   stds=dict(getattr(to, 'train', to).conts.std(ddof=0)+1e-7))
        return self(to)

    def encodes(self, to): to.conts = (to.conts-self.means) / self.stds
    def decodes(self, to): to.conts = (to.conts*self.stds ) + self.means

In [None]:
df = pd.DataFrame({'a':[0,1,9,3,4]})
to = PPObj(df, Normalize(), cont_names='a')
to.show()

#traces: 5 #events: 5


Unnamed: 0,a
0,-1.429409
1,-1.327783
2,-0.514775


### Date conversion

Encodes a date column. Supports multiple information by using pandas date functions. This implementation is also based on the fastai but also supports relative duration from the first event of a case.

In [None]:
#export
def _make_date(df, date_field):
    "Make sure `df[date_field]` is of the right date type."
    field_dtype = df[date_field].dtype
    if isinstance(field_dtype, pd.core.dtypes.dtypes.DatetimeTZDtype):
        field_dtype = np.datetime64
    if not np.issubdtype(field_dtype, np.datetime64):
        df[date_field] = pd.to_datetime(df[date_field], infer_datetime_format=True,utc=True)

In [None]:
df = pd.DataFrame({'fu': ['2019-12-04', '2019-11-29', '2019-11-15', '2019-10-24']})
_make_date(df, 'fu')
df.dtypes

fu    datetime64[ns, UTC]
dtype: object

In [None]:
#export
def _secSinceSunNoon(datTimStr):
    dt = pd.to_datetime(datTimStr).dt
    return (dt.dayofweek-1)*24*3600+ dt.hour * 3600 + dt.minute * 60 + dt.second

In [None]:
#export
def _secSinceNoon(datTimStr):
    dt = pd.to_datetime(datTimStr).dt
    return dt.hour * 3600 + dt.minute * 60 + dt.second

In [None]:
#export
Base_Date_Encodings=['Year', 'Month', 'Day', 'Dayofweek', 'Dayofyear','Elapsed']

In [None]:
#export
def encode_date(df, field_name,unit=1e9,date_encodings=Base_Date_Encodings):
    "Helper function that adds columns relevant to a date in the column `field_name` of `df`."
    _make_date(df, field_name)
    field = df[field_name]
    prefix =  re.sub('[Dd]ate$', '', field_name+"_")
    attr = ['Year', 'Month', 'Day', 'Dayofweek', 'Dayofyear', 'Is_month_end', 'Is_month_start',
            'Is_quarter_end', 'Is_quarter_start', 'Is_year_end', 'Is_year_start']
    if time: attr = attr + ['Hour', 'Minute', 'Second']
    for n in attr:
        if n in date_encodings: df[prefix + n] = getattr(field.dt, n.lower())
    # Pandas removed `dt.week` in v1.1.10

    if 'secSinceSunNoon' in date_encodings:
        df[prefix+'secSinceSunNoon']=_secSinceSunNoon(field)
    if 'secSinceNoon' in date_encodings:
        df[prefix+'secSinceNoon']=_secSinceNoon(field)
    if 'Week' in date_encodings:
        week = field.dt.isocalendar().week if hasattr(field.dt, 'isocalendar') else field.dt.week
        df.insert(3, prefix+'Week', week)
    mask = ~field.isna()
    elapsed = pd.Series(np.where(mask,field.values.astype(np.int64) // unit,None).astype(float),index=field.index)

    if 'Relative_elapsed' in date_encodings:
        df[prefix+'Relative_elapsed']=elapsed-elapsed.groupby(elapsed.index).transform('min')

    # required to decode!
    if 'Elapsed' in date_encodings: df[prefix+'Elapsed']=elapsed

    df.drop(field_name, axis=1, inplace=True)
    return [],[prefix+i for i in date_encodings]

In [None]:
df = pd.DataFrame({'fu': ['2019-12-04', '2019-11-29', '2019-11-15', '2019-10-24']})
encode_date(df,'fu')
df

Unnamed: 0,fu_Year,fu_Month,fu_Day,fu_Dayofweek,fu_Dayofyear,fu_Elapsed
0,2019,12,4,2,338,1575418000.0
1,2019,11,29,4,333,1574986000.0
2,2019,11,15,4,319,1573776000.0
3,2019,10,24,3,297,1571875000.0


In [None]:
#export
def decode_date(df, field_name,unit=1e9,date_encodings=Base_Date_Encodings):
    df[field_name]=(df[field_name+'_'+'Elapsed'] * unit).astype('datetime64[ns, UTC]')
    for c in date_encodings: del df[field_name+'_'+c]

In [None]:
decode_date(df,'fu')
df

Unnamed: 0,fu
0,2019-12-04 00:00:00+00:00
1,2019-11-29 00:00:00+00:00
2,2019-11-15 00:00:00+00:00
3,2019-10-24 00:00:00+00:00


In [None]:
#export
class Datetify(PPProc):
    "Encode dates, "
    order = 0

    def __init__(self, date_encodings=['Relative_elapsed']): self.date_encodings=listify(date_encodings)

    def encodes(self, o):
        for i in o.date_names:
            cat,cont=encode_date(o.items,i,date_encodings=self.date_encodings)
            o.cont_names+=cont
            o.cat_names+=cat
# Todo: Add decoding

In [None]:
df = pd.DataFrame({'fu': ['2019-10-04', '2019-10-09', '2019-10-15', '2019-10-24']},index=[1,1,1,1])
o = PPObj(df,Datetify(date_encodings=['secSinceSunNoon','secSinceNoon','Relative_elapsed']),date_names='fu')
o.xs

Unnamed: 0,fu_secSinceSunNoon,fu_secSinceNoon,fu_Relative_elapsed
1,259200,0,0.0
1,86400,0,432000.0
1,0,0,950400.0
1,172800,0,1728000.0


### MinMax Scaling

Calculates the MinMax scaling from a column.

In [None]:
#export
class MinMax(PPProc):
    order=3

    def setups(self, o):
        store_attr(mins=o.xs.min(),
                   maxs=o.xs.max())

    def encodes(self, o):
        cols=[i+'_minmax' for i in o.x_names]
        o[cols] = o.xs.astype(float)
        o[cols] = ((o.xs-self.mins) /(self.maxs-self.mins))
        o.cont_names=L(cols)
        o.cat_names=L()

### One HoT Encoding

Calculates the one-hot encoding of a column. It is required to first apply categorization on the same column, to deal with missing values.

In [None]:
#export
from sklearn.preprocessing import OneHotEncoder

In [None]:
o=PPObj(log,[Categorify],cat_names=['activity'])

In [None]:
len(o.xs),len(o.procs.categorify['activity'])

(289892, 74)

In [None]:
o.xs.values

array([[73],
       [10],
       [ 7],
       ...,
       [ 5],
       [53],
       [72]], dtype=int8)

In [None]:
x=o.xs.to_numpy()
categories=[range(len(o.procs.categorify['activity'])),range(len(o.procs.categorify['activity']))]

In [None]:
x=np.array(['a1','a2'])
categories=[['a1','a2','a3']]

In [None]:
ohe = OneHotEncoder(categories=categories)
a=ohe.fit_transform(x.reshape(-1, 1)).toarray()
a.shape

(2, 3)

In [None]:
categories=['a1','a2','a3']

In [None]:
#export
class OneHot(PPProc):
    "Transform the categorical variables to one-hot. Requires Categorify to deal with unseen data."
    order = 3

    def encodes(self, o):
        new_cats=[]
        for c in o.cat_names:
            categories=[range(len(o.procs.categorify[c]))]
            x=o[c].to_numpy()
            ohe = OneHotEncoder(categories=categories)
            enc=ohe.fit_transform(x.reshape(-1, 1)).toarray()
            for i in range(enc.shape[1]):
                new_cat=f'{c}_{i}'
                o.items.loc[:,new_cat]=enc[:,i]
                new_cats.append(new_cat)
        o.cat_names=L(new_cats)

In [None]:
event_df=import_log('data/csv/PDC2021_training/pdc2021_0000000.csv.gz')

In [None]:
%%time
o=PPObj(event_df,[Categorify(),OneHot()],cat_names=['activity'])

CPU times: user 25.8 ms, sys: 6.56 ms, total: 32.3 ms
Wall time: 31.7 ms


## Window Generation

Here, we cover the sliding window generation

In [None]:
#export
def _shift_columns (a,ws=3): return np.dstack(list(reversed([np.roll(a,i) for i in range(0,ws)])))[0]


In [None]:
#export
def windows_fast(df,event_ids,ws=5,pad=None):
    max_trace_len=int(event_ids.max())+1
    trace_start = np.where(event_ids == 0)[0]
    trace_len=[trace_start[i]-trace_start[i-1] for i in range(1,len(trace_start))]+[len(df)-trace_start[-1]]
    idx=[range(trace_start[i]+(i+1)
               *(ws-1),trace_start[i]+trace_len[i]+(i+1)*(ws-1)-1) for i in range(len(trace_start))]
    idx=np.array([y for x in idx for y in x])
    trace_start = np.repeat(trace_start, ws-1)
    tmp=np.stack([_shift_columns(np.insert(np.array(df[i]), trace_start, 0, axis=0),ws=ws) for i in list(df)]) 
    tmp=np.rollaxis(tmp,1) 
    res=tmp[idx]
    if pad: res=np.pad(res,((0,0),(0,0),(pad-ws,0))) 
    
    return res,np.where(event_ids != 0)[0]


In [None]:
event_df=import_log('data/csv/PDC2020_ground_truth/pdc_2020_0000000.csv.gz')

In [None]:
o=PPObj(event_df,Categorify(),cat_names=['activity'],y_names='activity')
#o=o.iloc[0]
len(o)

1000

In [None]:
len(o.items)-len(o)

17029

In [None]:
ws,idx=windows_fast(o.xs,o.event_ids,ws=5)
ws,ws.shape

(array([[[ 0,  0,  0,  0,  2]],
 
        [[ 0,  0,  0,  2,  3]],
 
        [[ 0,  0,  2,  3,  4]],
 
        ...,
 
        [[12, 15, 18, 19, 21]],
 
        [[15, 18, 19, 21, 20]],
 
        [[18, 19, 21, 20, 22]]], dtype=int8),
 (17029, 1, 5))

## Data Loader

The prefixes are converted to a `pytorch.Dataset` and than to a `DataLoader`
A batch is than represented as a tuple of the form `(x cat. attr,x cont. attr, y cat. attr., y cont attr.)`. Also, categorical attributes are converted to a long tensor and continous attributes to a float tensor.

If a dimensions of the batch is empty - e.g. the model does not use categorical input attributes - it is removed from the tuple. 

In [None]:
o=PPObj(event_df,Categorify(),cat_names=['activity'],y_names='activity')
ws,idx=windows_fast(o.xs,o.event_ids,ws=10)
ws,ws.shape

(array([[[ 0,  0,  0, ...,  0,  0,  2]],
 
        [[ 0,  0,  0, ...,  0,  2,  3]],
 
        [[ 0,  0,  0, ...,  2,  3,  4]],
 
        ...,
 
        [[11, 13,  6, ..., 18, 19, 21]],
 
        [[13,  6, 10, ..., 19, 21, 20]],
 
        [[ 6, 10, 16, ..., 21, 20, 22]]], dtype=int8),
 (17029, 1, 10))

In [None]:
o.ys.iloc[idx].values[16765]

array([11], dtype=int8)

In [None]:
o.ys.groupby(o.items.index).transform('last').iloc[idx].values

array([[1],
       [1],
       [1],
       ...,
       [1],
       [1],
       [1]], dtype=int8)

In [None]:
outcome=False

In [None]:
if not outcome: y=o.ys.iloc[idx]
else: y=o.ys.groupby(o.items.index).transform('last').iloc[idx]
ycats=tensor(y[o.ycat_names].values).long()
yconts=tensor(y[o.ycont_names].values).float()
xcats=tensor(ws[:,len(o.cat_names):]).float()
xconts=tensor(ws[:,:len(o.cat_names)]).long()
xs=tuple([i for i in [xcats,xconts] if i.shape[1]>0])
ys=tuple([ycats[:,i] for i in range(ycats.shape[1])])+tuple([yconts[:,i] for i in range(yconts.shape[1])])
res=(*xs,ys)

In [None]:
res[-1]

(tensor([ 3,  4,  5,  ..., 20, 22,  1]),)

In [None]:
#export
class PPDset(torch.utils.data.Dataset):
    def __init__(self, inp):
        store_attr('inp')

    def __len__(self): return len(self.inp[0])

    def __getitem__(self, idx):
        xs=tuple([i[idx]for i in self.inp[:-1]])
        ys=tuple([i[idx]for i in self.inp[-1]])
        #if len(ys)==1: ys=ys[0]
        return (*xs,ys)

In [None]:
dls=DataLoaders.from_dsets(PPDset(res))

In [None]:
xcat,y=dls.one_batch()
xcat.shape,y.shape

(torch.Size([64, 1, 10]), torch.Size([64]))

In [None]:
o=PPObj(event_df,Categorify(),cat_names=['activity'],y_names='activity',splits=split_traces(event_df))

In [None]:
o.cat_names

(#1) ['activity']

In [None]:
#export
@delegates(TfmdDL)
def get_dls(ppo:PPObj,windows=windows_fast,outcome=False,event_id='event_id',bs=64,**kwargs):
    ds=[]
    for s in ppo.subsets():
        wds,idx=windows(s.xs,s.event_ids)

        if not outcome: y=s.ys.iloc[idx]
        else: y=s.ys.groupby(s.items.index).transform('last').iloc[idx]
        ycats=tensor(y[s.ycat_names].values).long()
        yconts=tensor(y[s.ycont_names].values).float()
        xconts=tensor(wds[:,len(s.cat_names):]).float()
        xcats=tensor(wds[:,:len(s.cat_names)]).long()
        xs=tuple([i for i in [xcats,xconts] if i.shape[1]>0])
        ys=tuple([ycats[:,i] for i in range(ycats.shape[1])])+tuple([yconts[:,i] for i in range(yconts.shape[1])])
        ds.append(PPDset((*xs,ys)))
    return DataLoaders.from_dsets(*ds,bs=bs,device=torch.device('cuda'),**kwargs)
PPObj.get_dls= get_dls

In [None]:
dls=o.get_dls()
xb,yb=dls.one_batch()
xb.shape,yb.shape

(torch.Size([64, 5]), torch.Size([64]))

### Integration Samples

This section shows, how the PPObj can be used to create a DataLoader for pedictive process analytics:

Next event prediction:  
X: 'activity'   
Y: 'activity'

In [None]:
log=import_log('data/csv/PDC2020_ground_truth/pdc_2020_0000001.csv.gz')
o=PPObj(log,Categorify(),cat_names=['activity'],y_names='activity',splits=split_traces(event_df))
dls=o.get_dls(windows=partial(windows_fast,ws=2))
o.show(max_n=2)
xb,y=dls.one_batch()
xb.shape,y.shape


#traces: 1000 #events: 18029


Unnamed: 0_level_0,activity
trace_id,Unnamed: 1_level_1
trace 835,2
trace 835,3


(torch.Size([64, 2]), torch.Size([64]))

## All of this code was to create a pipeline called PPObj to take an event log, split it to train,test,val and preprocess the data in an orginized manner.