In [1]:
!pip install sklearn boto3 botocore mlflow pandas prefect==0.14.16 --user

# Restart kernel once this is done for the first time

Collecting sklearn
  Downloading sklearn-0.0.tar.gz (1.1 kB)
Collecting boto3
  Downloading boto3-1.19.9-py3-none-any.whl (131 kB)
[K     |████████████████████████████████| 131 kB 14.1 MB/s eta 0:00:01
[?25hCollecting botocore
  Downloading botocore-1.22.9-py3-none-any.whl (8.1 MB)
[K     |████████████████████████████████| 8.1 MB 18.2 MB/s eta 0:00:01
Collecting prefect==0.14.16
  Downloading prefect-0.14.16-py3-none-any.whl (508 kB)
[K     |████████████████████████████████| 508 kB 52.5 MB/s eta 0:00:01
Collecting s3transfer<0.6.0,>=0.5.0
  Downloading s3transfer-0.5.0-py3-none-any.whl (79 kB)
[K     |████████████████████████████████| 79 kB 7.3 MB/s  eta 0:00:01
[?25hCollecting jmespath<1.0.0,>=0.7.1
  Downloading jmespath-0.10.0-py2.py3-none-any.whl (24 kB)
Collecting scikit-learn
  Downloading scikit_learn-1.0.1-cp38-cp38-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (25.9 MB)
[K     |████████████████████████████████| 25.9 MB 47.5 MB/s eta 0:00:01
Collecting joblib>=0.11
  Do

In [1]:
from prefect import task, Flow, Parameter, Client
from prefect.run_configs import KubernetesRun
from prefect.schedules import IntervalSchedule
from prefect.storage import S3, Local

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet

from datetime import timedelta

import numpy as np
import pandas as pd

import mlflow
import requests

In [2]:
def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

@task
def fetch_data():
    csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    data = pd.read_csv(csv_url, sep=";")
    return data
 
@task
def train_model(data, mlflow_experiment_id, alpha=0.5, l1_ratio=0.5):
    mlflow.set_tracking_uri("http://mlflow.mlflow:5000")
 
    train, test = train_test_split(data)
 
    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]
    
    with mlflow.start_run(experiment_id=mlflow_experiment_id):
        lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
        lr.fit(train_x, train_y)
        predicted_qualities = lr.predict(test_x)
        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
 
        print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)
 
        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)
 
        mlflow.sklearn.log_model(lr, "model")

In [3]:
import os
os.environ['MLFLOW_S3_ENDPOINT_URL'] = 'http://minio.minio:9000'
os.environ['MLFLOW_TRACKING_URI'] = 'http://mlflow.mlflow:5000' # need for MLFlow model API
os.environ['ARTIFACT_ROOT'] = 's3://mlflow'
os.environ['AWS_ACCESS_KEY_ID'] = 'admin'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'admintest'

In [4]:
import sys
sys.version

'3.8.6 | packaged by conda-forge | (default, Dec 26 2020, 05:05:16) \n[GCC 9.3.0]'

In [8]:
domain = "mlops.ritza-route53.com"
username = "gareth@ritza.co" 
password = "DkguP5GsB9yiPk8"
s3_bucket = "another-mlops-bucket"

auth_url = f"https://{domain}/.ory/kratos/public/self-service/login/api"
prefect_url = f"http://10.111.209.84:4200/graphql" # TODO: change it

prefect_project_name = "wine-quality-project"
docker_image = "prefecthq/prefect:0.14.6-python3.8" # do not change


def create_prefect_flow():
    run_config = KubernetesRun(
        labels=["dev"],
        service_account_name="prefect-server-serviceaccount",
        image=docker_image,
        env={"EXTRA_PIP_PACKAGES": "sklearn boto3 botocore mlflow pandas",
             "MLFLOW_S3_ENDPOINT_URL": os.getenv("MLFLOW_S3_ENDPOINT_URL"),
             "AWS_ACCESS_KEY_ID": os.getenv("AWS_ACCESS_KEY_ID"),
             "AWS_SECRET_ACCESS_KEY": os.getenv("AWS_SECRET_ACCESS_KEY")
            }
    )
    storage = Local()
    storage = S3(bucket="mlflow", 
                 client_options=dict(endpoint_url=os.getenv("MLFLOW_S3_ENDPOINT_URL"), 
                                                     aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
                                                     aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")))

    #session_token = get_prefect_token()
    prefect_client = Client(api_server=prefect_url)
    print(prefect_client.graphql(
    {
        'query': {
            'flow': {
                'id'
            }
        }
    }
    ))
    schedule = IntervalSchedule(interval=timedelta(minutes=1))

    with Flow("train-wine-quality-model-v22", schedule=schedule, storage=storage, run_config=run_config) as flow:
        data = fetch_data()
        train_model(data=data, mlflow_experiment_id=1, alpha=0.3, l1_ratio=0.3)

    #flow.run()
    #prefect_client.create_project(project_name=prefect_project_name)
    training_flow_id = prefect_client.register(flow, project_name=prefect_project_name)
    
    flow_run_id = prefect_client.create_flow_run(flow_id=training_flow_id, run_name=f"run {prefect_project_name}")
    
create_prefect_flow()


{'data': {'flow': [{'id': 'a6b5c184-420b-487a-b4aa-2d349f1e0038'}, {'id': 'add41086-258a-4925-b655-a62451421848'}, {'id': '76794496-1570-411b-b00e-8f6195563c6c'}, {'id': '15fd6cc2-170a-442e-8279-be93d8e2faea'}, {'id': '4d1e91a0-d96d-478d-9d4d-17e3c2f6dcf5'}, {'id': '3764d95d-708e-42ff-8699-99d1e9741ab5'}, {'id': '06bcf44c-a804-415f-8819-92d225a66667'}, {'id': '95790356-3d5e-416f-9b32-ee96940e898e'}, {'id': '87201a3f-2742-448f-a2e9-c2071216c4c7'}, {'id': 'a59dde97-9a55-4d56-b625-df492d449591'}, {'id': '04cb3ccc-e2b0-4847-bf5a-929def7eee0d'}]}}
[2021-11-03 13:04:45+0000] INFO - prefect.S3 | Uploading train-wine-quality-model-v22/2021-11-03t13-04-45-020640-00-00 to mlflow


  training_flow_id = prefect_client.register(flow, project_name=prefect_project_name)


ClientError: 400 Client Error: Bad Request for url: http://10.111.209.84:4200/graphql

The following error messages were provided by the GraphQL server:

    GRAPHQL_VALIDATION_FAILED: Cannot query field "user" on type "Query".

The GraphQL query was:

    query {
            user {
                default_membership {
                    tenant {
                        slug
                }
            }
        }
    }

The passed variables were:

    null


In [7]:
import prefect 

prefect.__version__

'0.14.12'