In [1]:
import os
from tasrif.processing_pipeline import SequenceOperator, ComposeOperator, MapIterableOperator
from tasrif.data_readers.fitbit_interday_dataset import FitbitInterdayDataset
from tasrif.processing_pipeline.pandas import ConvertToDatetimeOperator, SetIndexOperator, ConcatOperator, MergeOperator, \
AsTypeOperator, DropFeaturesOperator
from tasrif.processing_pipeline.custom import CreateFeatureOperator, AggregateOperator, FlattenOperator

In [2]:
os.environ['FITBIT_INTERDAY_PATH'] = '/mnt/data/fitbit-data/'

interday_folder_path = os.environ['FITBIT_INTERDAY_PATH']

In [3]:
pipeline = SequenceOperator([
    FitbitInterdayDataset(interday_folder_path, table_name="Sleep"),
    ConvertToDatetimeOperator(feature_names=['Start Time', 'End Time'],
                                infer_datetime_format=True),    
    CreateFeatureOperator(
        feature_name="Date",
        feature_creator=lambda df: df['End Time'].date()),
    AggregateOperator(groupby_feature_names="Date",
                        aggregation_definition={
                            'Minutes Asleep': 'sum',
                            'Minutes Awake': 'sum',
                            'Number of Awakenings': 'sum',
                            'Time in Bed': 'sum',
                            'Minutes REM Sleep': 'sum',
                            'Minutes Light Sleep': 'sum',
                            'Minutes Deep Sleep': 'sum'
                        }),
    AsTypeOperator({'Date':'str'}),
    SetIndexOperator('Date')
])

In [4]:
df = pipeline.process()
df

[            Minutes Asleep_sum  Minutes Awake_sum  Number of Awakenings_sum  \
 Date                                                                          
 2019-06-01                 329                 40                        24   
 2019-06-02                 227                 16                         2   
 2019-06-03                 198                 18                         1   
 2019-06-04                 220                 26                        10   
 2019-06-05                 655                 79                        39   
 2019-06-06                 456                 58                        29   
 2019-06-07                 439                 70                        27   
 2019-06-08                 503                 62                        31   
 2019-06-09                 394                 55                        17   
 2019-06-10                 399                 70                        18   
 2019-06-11                 380         

In [5]:
import json

import yaml

with open("example.yaml", "r") as stream:
    try:
        #print(json.dumps(yaml.safe_load(stream), indent=4, sort_keys=True))
        print(yaml.safe_load(stream))
    except yaml.YAMLError as exc:
        print(exc)

{'modules': [{'tasrif.processing_pipeline': ['sequence', 'compose', 'map_iterable', 'print']}, {'tasrif.data_readers.fitbit_interday_dataset': ['fitbit_interday_dataset']}, {'tasrif.processing_pipeline.pandas': ['convert_to_datetime', 'set_index', 'merge', 'as_type', 'drop_features']}, {'tasrif.processing_pipeline.custom': ['create_feature', 'aggregate', 'flatten', 'normalize']}], 'pipeline': [{'$sequence': [{'$compose': [{'$sequence': [{'$fitbit_interday_dataset': {'folder_path': '/mnt/data/fitbit-data/', 'table_name': 'Activities'}}, {'$convert_to_datetime': {'feature_names': 'Date', 'infer_datetime_format': True}}, {'$as_type': {'dtype': {'Date': 'str'}}}, {'$drop_features': {'feature_names': ['Floors', 'Activity Calories', 'Minutes Lightly Active', 'Minutes Fairly Active', 'Minutes Very Active']}}, {'$set_index': {'keys': 'Date'}}]}, {'$sequence': [{'$fitbit_interday_dataset': {'folder_path': '/mnt/data/fitbit-data/', 'table_name': 'Sleep'}}, {'$convert_to_datetime': {'feature_name

In [6]:
import json

import yaml
import importlib

def from_yaml(stream):    
    yaml_file = yaml.safe_load(stream)
    context = load_modules(yaml_file['modules'])
    return parse(yaml_file['pipeline'], context)

def from_json(stream):
    pipeline = json.loads(stream)
    return parse(pipeline['pipeline'])
    
def parse(obj, context):    
    if isinstance(obj, dict):
        if len(list(obj.items())) == 1:
            key, value = list(obj.items())[0]
            parsed_value = parse(value, context)
            if key.startswith('$'):
                parsed = create_operator(key, parsed_value, context)
            else:
                parsed = {key: parsed_value}
        else:
            parsed = {}
            for key, value in obj.items():
                parsed_value = parse(value, context)                
                if isinstance(key, str) and key.startswith('$'):
                    parsed[key] = create_operator(key, parsed_value, context)
                else:
                    parsed[key] = parsed_value
    elif isinstance(obj, list):
        parsed = []
        for value in obj:
            parsed.append(parse(value, context))
    elif isinstance(obj, str):
        if obj.strip().startswith('lambda'):
            parsed = eval(obj)
        elif obj.strip().startswith('$'):
            parsed = create_operator(obj, None, context)
        else:
            parsed = obj
    else:
        parsed = obj
        
    return parsed
        
def create_operator(key, value, context):
    
    key = key.replace('map', 'map_iterable')
    operator_specs = key[1:].split('.')
    
    operator = None
    
    for i, operator_spec in enumerate(operator_specs):        
        if i == 0:    
            if isinstance(value, list):
                if operator_spec == "sequence" or operator_spec == "compose":
                    operator = context[operator_spec](value)
                else:
                    args = []
                    kwargs = {}
                    for item in value:
                        if isinstance(item, tuple):
                            kwargs[item[0]] = item[1]
                        else:
                            args.append(item)
                    operator = context[operator_spec](*args, **kwargs)
            elif isinstance(value, dict):
                operator = context[operator_spec](**value)
            else:
                if value:
                    operator = context[operator_spec](value)
                else:
                    operator = context[operator_spec]()
                    
        else:
            operator = context[operator_spec](operator)        
            
    #print(key, value)
    
    return operator
    
def get_operator_name(spec):
    components = spec.split('_')
    name = ''.join(x.title() for x in components) 
    if spec.endswith('dataset'):        
        return name 
    return name + 'Operator'
        

        
def load_modules(modules):
    context = {}
    for module in modules:
        for key, value in module.items():
            imported_module = importlib.import_module(key)
            for class_ in value:
                context[class_] = getattr(imported_module, get_operator_name(class_))
                
    return context
            
    

In [9]:
with open("example.yaml", "r") as stream:
    try:
        #print(json.dumps(yaml.safe_load(stream), indent=4, sort_keys=True))
        p = from_yaml(stream)
    except yaml.YAMLError as exc:
        print(exc)

In [13]:
df = p[0].process()
#df[0].plot(x='Minutes Sedentary', y='Minutes Asleep_sum', style = 'o')
df[0].corr()

Unnamed: 0,Calories Burned,Steps,Distance,Minutes Sedentary,Minutes Asleep_sum,Minutes Non Sedentary
Calories Burned,1.0,0.896269,0.896551,0.143744,-0.434131,0.059574
Steps,0.896269,1.0,0.998784,0.217899,-0.399624,-0.061247
Distance,0.896551,0.998784,1.0,0.199556,-0.386699,-0.043944
Minutes Sedentary,0.143744,0.217899,0.199556,1.0,-0.744164,-0.9201
Minutes Asleep_sum,-0.434131,-0.399624,-0.386699,-0.744164,1.0,0.423062
Minutes Non Sedentary,0.059574,-0.061247,-0.043944,-0.9201,0.423062,1.0


In [71]:
df

[            Calories Burned  Steps  Distance  Minutes Sedentary  \
 Date                                                              
 2019-06-01             3406  13007      9.12                288   
 2019-06-02             3541  13854      9.66                803   
 2019-06-03             3866  15032     10.48                830   
 2019-06-04             2896   7136      4.97                449   
 2019-06-05             2838   7669      5.35                  0   
 2019-06-06             3121   9451      6.59                109   
 2019-06-07             3269   9271      6.46                 47   
 2019-06-08             3960  14766     10.29                  0   
 2019-06-09             3407  11197      7.80                359   
 2019-06-10             3192   9395      6.55                161   
 2019-06-11             3444  10538      7.34                204   
 2019-06-12             4176  15100     10.52                472   
 2019-06-13             4048  15731     10.96   

In [None]:
pipeline = SequenceOperator([ComposeOperator([SequenceOperator([
    FitbitInterdayDataset(interday_folder_path, table_name="Activities"),
    ConvertToDatetimeOperator(feature_names=['Date'],
                              infer_datetime_format=True),
    AsTypeOperator({'Date':'str'}),
    SetIndexOperator('Date')
]),
SequenceOperator([
    FitbitInterdayDataset(interday_folder_path, table_name="Sleep"),
    ConvertToDatetimeOperator(feature_names=['Start Time', 'End Time'],
                                infer_datetime_format=True),
    CreateFeatureOperator(
        feature_name="Date",
        feature_creator=lambda df: df['End Time'].date()),
    AggregateOperator(groupby_feature_names="Date",
                        aggregation_definition={
                            'Minutes Asleep': 'sum',
                            'Minutes Awake': 'sum',
                            'Number of Awakenings': 'sum',
                            'Time in Bed': 'sum',
                            'Minutes REM Sleep': 'sum',
                            'Minutes Light Sleep': 'sum',
                            'Minutes Deep Sleep': 'sum'
                        }),
    AsTypeOperator({'Date':'str'}),
    SetIndexOperator('Date')
])]),
FlattenOperator(),
MergeOperator(on='Date', how='inner')]);

In [None]:
df = pipeline.process()

In [None]:

do = DropFeaturesOperator(feature_names=['Floors'])