In [1]:
import mlflow, prefect
import warnings, git, hashlib, os
from prefect import flow, task, get_run_logger, unmapped
from prefect.task_runners import SequentialTaskRunner

In [2]:
#Github config

try:
    repo = git.Repo(search_parent_directories=True)
    branch = repo.active_branch.name
    sha = repo.head.commit.hexsha
except:
    warnings.warn('No github repository! Generating new SHA.')
    sha = hashlib.algorithms_guaranteed

In [3]:
#Mlflow config

name = 'Best Classifier'
experiment = mlflow.get_experiment_by_name(name)
if not experiment:
    experiment_id = mlflow.create_experiment(name)
experiment = mlflow.set_experiment(name)
run_params = {"experiment_id" : experiment.experiment_id,
              "description" : "Testing different classifiers for articial data with prefect and mlflow.",
              "tags" : {'release.version':'0.0.1'}}

In [4]:
print("Experiment ID: {}".format(experiment.experiment_id))
print("Artifact Location: {}".format(experiment.artifact_location))
print("Tags: {}".format(experiment.tags))
print("Lifecycle stage: {}".format(experiment.lifecycle_stage))

Experiment ID: 1
Artifact Location: file:///home/lpfgarcia/Projects/MLOps/practical-prefect/mlruns/1
Tags: {}
Lifecycle stage: active


In [5]:
from sklearn.datasets import make_classification

@task
def generate_data(n_samples, n_features):
    return make_classification(n_samples, n_features, n_informative=2, n_redundant=2, random_state=42)

In [6]:
from sklearn.model_selection import train_test_split

@task
def split_data(X, y):
    return train_test_split(X, y, shuffle=False, test_size=int(len(X)*0.1))

In [7]:
@flow
def generate_split(n_samples, n_features):
    X, y = generate_data(n_samples, n_features)
    return split_data(X, y)

In [8]:
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.naive_bayes import GaussianNB

def classifiers():
    
    clf_list = [
        (KNeighborsClassifier(3), "Nearest Neighbors"),
        (SVC(kernel="linear", C=0.025), "Linear SVM"),
        (DecisionTreeClassifier(max_depth=5), "Decision Tree"),
        (RandomForestClassifier(max_depth=5, n_estimators=10, max_features=1), "Random Forest"),
        (AdaBoostClassifier(), "AdaBoost"),
        (GaussianNB(), "Naive Bayes"),
    ]
        
    return clf_list

In [9]:
import uuid

@task
def teste2():
    task = prefect.context.get_run_context()
    print(task)
    print(task.task_run.name)
    print(task.task_run.id)
    aux = {'prefect_task_name':task.task_run.name, 'prefect_task_id':task.task_run.id}
    print(aux)
    aux = ' '.join({'prefect_task_name':task.task_run.name, 'prefect_task_id':str(task.task_run.id)}) 
    print(aux)
    
    
@flow
def teste1():
    flow = (prefect.context.get_run_context()).copy()
    print(flow.flow_run.name)
    print(flow.flow_run.id)
    print()
    teste2()
teste1()

17:31:43.110 | INFO    | prefect.engine - Created flow run 'poised-wren' for flow 'teste1'
17:31:43.244 | INFO    | Flow run 'poised-wren' - Created task run 'teste2-828a7851-0' for task 'teste2'
17:31:43.245 | INFO    | Flow run 'poised-wren' - Executing 'teste2-828a7851-0' immediately...
17:31:43.362 | INFO    | Task run 'teste2-828a7851-0' - Finished in state Completed()
17:31:43.387 | INFO    | Flow run 'poised-wren' - Finished in state Completed('All states completed.')


poised-wren
266482ab-071c-49dc-9847-90ea1961bd98

start_time=DateTime(2022, 9, 30, 20, 31, 43, 277008, tzinfo=Timezone('UTC')) client=<prefect.client.orion.OrionClient object at 0x7f69c4a4deb0> task=<prefect.tasks.Task object at 0x7f69c4b56dc0> task_run=TaskRun(id=UUID('e7a0b14f-9637-4131-a1ed-3c8689574bfe'), name='teste2-828a7851-0', flow_run_id=UUID('266482ab-071c-49dc-9847-90ea1961bd98'), task_key='__main__.teste2', dynamic_key='0', cache_key=None, cache_expiration=None, task_version=None, empirical_policy=TaskRunPolicy(max_retries=0, retry_delay_seconds=0.0, retries=0, retry_delay=0), tags=[], state_id=UUID('2c70c729-093b-4107-884e-2102395683e5'), task_inputs={}, state_type=StateType.RUNNING, state_name='Running', run_count=1, expected_start_time=DateTime(2022, 9, 30, 20, 31, 43, 215360, tzinfo=Timezone('+00:00')), next_scheduled_start_time=None, start_time=DateTime(2022, 9, 30, 20, 31, 43, 247731, tzinfo=Timezone('+00:00')), end_time=None, total_run_time=datetime.timedelta(0), est

[Completed(message=None, type=COMPLETED, result=None)]

In [14]:
from sklearn.metrics import accuracy_score

@task
def evaluate_classifier(clf, X_train, X_test, y_train, y_test):
 
    task = prefect.context.get_run_context()
    
    description = f'prefect_task_name {task.task_run.name}\n prefect_task_id {str(task.task_run.id)}'
    description = ' '.join(['prefect_task_name',task.task_run.name,'prefect_task_id',task.task_run.id.urn])

    with mlflow.start_run(description=description) as run:

        mlflow.log_param('name', clf[1])
        mlflow.log_params(clf[0].get_params())  
        
        model = clf[0].fit(X_train, y_train)
        
        mlflow.sklearn.log_model(
            sk_model = model,
            artifact_path = clf[1]
        )

        y_pred = model.predict(X_test)
        acc = accuracy_score(y_test, y_pred)

        mlflow.log_metric('accuracy', acc)
        
    return model, y_pred, acc

In [16]:
@flow(description='Evaluating different classifier for each dataset.', 
      version=branch + ' ' + sha)
def evaluate(clf, samples, features):
    logger = get_run_logger()
    data = generate_split(samples, features)
    _, _, acc = evaluate_classifier(clf, *data)
    logger.info(f'Accuracy of {acc}')

In [17]:
clf_list = classifiers()
samples = range(10000, 50000, 10000)
features = range(20, 30, 2)

combination = [(x,y) for x in samples for y in features]    
for samples, features in combination:
    evaluate(clf_list[0], samples, features)

17:32:56.535 | INFO    | prefect.engine - Created flow run 'electric-terrier' for flow 'evaluate'
17:32:56.710 | INFO    | Flow run 'electric-terrier' - Created subflow run 'sepia-donkey' for flow 'generate-split'
17:32:56.765 | INFO    | Flow run 'sepia-donkey' - Created task run 'generate_data-eeb4694a-0' for task 'generate_data'
17:32:56.766 | INFO    | Flow run 'sepia-donkey' - Executing 'generate_data-eeb4694a-0' immediately...
17:32:56.836 | INFO    | Task run 'generate_data-eeb4694a-0' - Finished in state Completed()
17:32:56.863 | INFO    | Flow run 'sepia-donkey' - Created task run 'split_data-b2f518fa-0' for task 'split_data'
17:32:56.864 | INFO    | Flow run 'sepia-donkey' - Executing 'split_data-b2f518fa-0' immediately...
17:32:56.928 | INFO    | Task run 'split_data-b2f518fa-0' - Finished in state Completed()
17:32:56.968 | INFO    | Flow run 'sepia-donkey' - Finished in state Completed()
17:32:56.994 | INFO    | Flow run 'electric-terrier' - Created task run 'evaluate_cla

17:33:05.284 | INFO    | Flow run 'bulky-hog' - Created task run 'evaluate_classifier-b2b7f5e1-0' for task 'evaluate_classifier'
17:33:05.285 | INFO    | Flow run 'bulky-hog' - Executing 'evaluate_classifier-b2b7f5e1-0' immediately...
17:33:06.266 | INFO    | Task run 'evaluate_classifier-b2b7f5e1-0' - Finished in state Completed()
17:33:06.267 | INFO    | Flow run 'bulky-hog' - Accuracy of 0.8385
17:33:06.362 | INFO    | Flow run 'bulky-hog' - Finished in state Completed('All states completed.')
17:33:06.439 | INFO    | prefect.engine - Created flow run 'esoteric-cassowary' for flow 'evaluate'
17:33:06.620 | INFO    | Flow run 'esoteric-cassowary' - Created subflow run 'brawny-bandicoot' for flow 'generate-split'
17:33:06.676 | INFO    | Flow run 'brawny-bandicoot' - Created task run 'generate_data-eeb4694a-0' for task 'generate_data'
17:33:06.676 | INFO    | Flow run 'brawny-bandicoot' - Executing 'generate_data-eeb4694a-0' immediately...
17:33:06.769 | INFO    | Task run 'generate_d

17:33:15.484 | INFO    | Task run 'generate_data-eeb4694a-0' - Finished in state Completed()
17:33:15.509 | INFO    | Flow run 'adventurous-squid' - Created task run 'split_data-b2f518fa-0' for task 'split_data'
17:33:15.509 | INFO    | Flow run 'adventurous-squid' - Executing 'split_data-b2f518fa-0' immediately...
17:33:15.605 | INFO    | Task run 'split_data-b2f518fa-0' - Finished in state Completed()
17:33:15.676 | INFO    | Flow run 'adventurous-squid' - Finished in state Completed()
17:33:15.696 | INFO    | Flow run 'imported-pronghorn' - Created task run 'evaluate_classifier-b2b7f5e1-0' for task 'evaluate_classifier'
17:33:15.697 | INFO    | Flow run 'imported-pronghorn' - Executing 'evaluate_classifier-b2b7f5e1-0' immediately...
17:33:16.660 | INFO    | Task run 'evaluate_classifier-b2b7f5e1-0' - Finished in state Completed()
17:33:16.661 | INFO    | Flow run 'imported-pronghorn' - Accuracy of 0.817
17:33:16.772 | INFO    | Flow run 'imported-pronghorn' - Finished in state Compl

17:33:26.288 | INFO    | prefect.engine - Created flow run 'vengeful-wren' for flow 'evaluate'
17:33:26.428 | INFO    | Flow run 'vengeful-wren' - Created subflow run 'heavenly-locust' for flow 'generate-split'
17:33:26.483 | INFO    | Flow run 'heavenly-locust' - Created task run 'generate_data-eeb4694a-0' for task 'generate_data'
17:33:26.484 | INFO    | Flow run 'heavenly-locust' - Executing 'generate_data-eeb4694a-0' immediately...
17:33:26.637 | INFO    | Task run 'generate_data-eeb4694a-0' - Finished in state Completed()
17:33:26.665 | INFO    | Flow run 'heavenly-locust' - Created task run 'split_data-b2f518fa-0' for task 'split_data'
17:33:26.665 | INFO    | Flow run 'heavenly-locust' - Executing 'split_data-b2f518fa-0' immediately...
17:33:26.786 | INFO    | Task run 'split_data-b2f518fa-0' - Finished in state Completed()
17:33:26.875 | INFO    | Flow run 'heavenly-locust' - Finished in state Completed()
17:33:26.899 | INFO    | Flow run 'vengeful-wren' - Created task run 'eva

In [None]:
# from prefect import flow, task

# @task
# def add_together(x, y, z, w):
#     return x + y

# @flow
# def sum_it(numbers, *static_value):
#     futures = add_together.map(numbers, *static_value)
#     return futures

# sum_it([1, 2, 3], 5, 6, 7)