In [12]:
#
# source = inspect.getsource(train)
# parsed = ast.parse(source)

# for node in ast.walk(parsed):
#     if isinstance(node,ast.Call):
#         if isinstance(node.func, ast.Attribute):
#             if (node.func.value.id == 'layer'):
#                     if(node.func.attr == 'get_dataset'):
#                         print(ast.dump(node))
#                         print(node.args[0].value)


# ast.dump(parsed)

import pandas as pd
import inspect
import os
import sys
import inspect
import ast


class Layer:
    entities = []
    entity_context = None

    def __init__(self, project_name, environment):
        self.project_name = project_name
        self.environment = environment

    def setup(self):
        if os.path.exists(self.environment):
            file1 = open(self.environment, 'r')
            for lib in file1.readlines():
                print(f"Layer Infra: Installing {lib.strip()}...")
        else:
            print(f"Environment file not found: {self.environment}")

    def log_parameter(self, metric, value):
        print(f"\t{Layer.entity_context} > Parameter > {metric}:{value}")

    def log_metric(self, metric, value):
        print(f"\t{Layer.entity_context} > Metric >{metric}:{value}")

    def log(self, message):
        print(f"\t{Layer.entity_context} > {message}")

    def run(self, entities):
        self.entities = []
        for entity in entities:
            if entity._type == "dataset":
                self.entities.append(Dataset(entity))
            elif entity._type == "model":
                self.entities.append(Model(entity))

        print(f"--- Layer Infra: Running Project: {self.project_name} ---")

        self.setup()

        for entity in self.entities:
            entity.run()
        print(f"\n--- Layer Infra: Run Complete! ---")

    def get_dataset(self, name):
        for entity in self.entities:
            if entity.name == name:
                return entity
        raise Exception(f"Entity '{name}' not found!")


class Model:
    result = None

    def __init__(self, func):
        if func:
            self.name = func._name
            self.func = func

    def run(self):
        self.result = self.func()


class Dataset:
    result = None

    def __init__(self, func):
        if func:
            self.name = func._name
            self.func = func

    def run(self):
        self.result = self.func()

    def to_pandas(self):
        return self.result


def dataset(name):
    def inner(func):
        func._type = "dataset"
        func._name = name

        def wrapped(*args):
            Layer.entity_context = func._name
            print(f'\nBuilding {Layer.entity_context}...')
            res = func()
            # TODO save returning entity to catalog
            return res
        wrapped._type = "dataset"
        wrapped._name = name

        return wrapped

    return inner


def model(name):
    def inner(func):
        def wrapped(*args):
            Layer.entity_context = name
            print(f'\nTraining {Layer.entity_context}...')
            res = func()
            # TODO save returning entity to catalog
            return res
        wrapped._type = "model"
        wrapped._name = name

        return wrapped

    return inner

In [14]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import pandas as pd


@dataset("dummy_dataset")
def build_dummy():
    data = [[1, 'tom', '01011999'], [2, 'nick', '01011983'],
            [3, 'juli', '01012002']]
    columns = ['id', 'name', 'birth']

    df = pd.DataFrame(data, columns=columns)
    return df


@dataset('raw_passengers')
def read_and_clean_dataset():
    df = pd.read_csv("titanic.csv")
    layer.log(f"Total passengers: {len(df)}")
    return df


def clean_sex(sex):
    result = 0
    if sex == "female":
        result = 0
    elif sex == "male":
        result = 1
    return result


def clean_age(data):
    age = data[0]
    pclass = data[1]
    if pd.isnull(age):
        if pclass == 1:
            return 37
        elif pclass == 2:
            return 29
        else:
            return 24
    else:
        return age


@dataset('features')
def extract_features():
    df = layer.get_dataset("raw_passengers").to_pandas()

    df['Sex'] = df['Sex'].apply(clean_sex)
    df['Age'] = df[['Age', 'Pclass']].apply(clean_age, axis=1)

    df = df.drop(["PassengerId", "Name", "Cabin", "Ticket", "Embarked"], axis=1)

    layer.log(f'Features: {list(df.columns)}')
    layer.log(f'Total Count: {len(df)}')
    return df


@model(name='survival_model')
def train():
    df = layer.get_dataset("features").to_pandas()
    layer.log(f"Training data count: {len(df)}")

    X = df.drop(["Survived"], axis=1)
    y = df["Survived"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,
                                                        random_state=42)

    random_forest = RandomForestClassifier(n_estimators=100)
    random_forest.fit(X_train, y_train)
    y_pred = random_forest.predict(X_test)
    acc = accuracy_score(y_test, y_pred)
    layer.log_metric("accuracy", f'{acc:.4f}')
    return random_forest


# ++ init Layer
layer = Layer(project_name="ltv_project", environment='requirements.txt')

# ++ To run the whole project on Layer Infra
layer.run([read_and_clean_dataset, extract_features, train])

# ++ To train model on Layer infra
# layer.run([train])

# ++ To debug the code locally, just call the function:
# train()
# extract_features()

# read_and_clean_dataset()
# train()

--- Layer Infra: Running Project: ltv_project ---
Layer Infra: Installing scikit-learn>=0.18...

Building raw_passengers...
	raw_passengers > Total passengers: 891

Building features...
	features > Features: ['Survived', 'Pclass', 'Sex', 'Age', 'SibSp', 'Parch', 'Fare']
	features > Total Count: 891

Training survival_model...
	survival_model > Training data count: 891
	survival_model > Metric >accuracy:0.8045

--- Layer Infra: Run Complete! ---
