# Building your first ML Pipeline

In [5]:
from river import compose, preprocessing, linear_model
from river.datasets import synth
from river.utils import VectorDict
import mlflow
import mlflow.pyfunc
import pickle  # For model serialization
import os  # For handling file operations

To run MLflow, start the server with:
```bash
mlflow server --backend-store-uri sqlite:///mlflow.db --default-artifact-root ./mlruns
```
Ensure it matches the port you pre-configured, on this case, port 5000. You can then access the MLflow interface at [http://127.0.0.1:5000](http://127.0.0.1:5000).

In [6]:
# Initialize MLFlow tracking
mlflow.set_tracking_uri('http://127.0.0.1:5000')  # Adjust as necessary
experiment_name = 'MLBook_Experiment'
mlflow.set_experiment(experiment_name)

# Define a simple data pipeline for online learning
model = compose.Pipeline(
    preprocessing.StandardScaler(),
    linear_model.LogisticRegression()
)

In [7]:
# Function to serialize and log the model to MLFlow
def log_model_to_mlflow(model, run_id):
    # Serialize the model
    serialized_model = pickle.dumps(model)

    # Save the serialized model to a file
    model_filename = "online_model.pkl"
    with open(model_filename, "wb") as f:
        f.write(serialized_model)

    # Log the model file to MLFlow
    mlflow.log_artifact(model_filename, artifact_path='model')

    # Clean up the model file
    os.remove(model_filename)


In [8]:
def _retrieve_mlflow_experiment_id(name, create=False):
        experiment_id = None
        if name:
            existing_experiment = mlflow.tracking.MlflowClient().get_experiment_by_name(name)
            if existing_experiment:
                experiment_id = existing_experiment.experiment_id
            else:
                if create:
                    experiment_id = mlflow.create_experiment(name)
                else:
                    raise Exception(
                        'Experiment "{}" not found in {}'.format(
                            name, mlflow.get_tracking_uri()
                        )
                    )
        return experiment_id 

In [9]:
# Simulate a data stream (in practice, this would be your real-time data source)
data_stream = synth.Agrawal(classification_function=0, seed=42).take(1000)

import mlflow
import pickle

def load_latest_model():
    client = mlflow.tracking.MlflowClient()
    experiment_id = client.get_experiment_by_name(experiment_name).experiment_id
    run_infos = client.search_runs([experiment_id])

    if run_infos:
        # Sort the runs by start time and get the latest one
        latest_run_info = max(run_infos, key=lambda run_info: run_info.info.start_time)
        latest_run = client.get_run(latest_run_info.info.run_id)

        try:
            local_path = client.download_artifacts(latest_run.info.run_id, "model/online_model.pkl")
            with open(local_path, "rb") as f:
                return pickle.load(f)
        except Exception as e:
            print(f"Failed to download model artifact: {e}")
            return None
    else:
        return None

In [10]:
# Initialize an MLFlow run outside the loop
with mlflow.start_run() as run:
    run_id = run.info.run_id

    # Online learning loop
    for x, y in data_stream:
    # Convert x to a regular dict if it's a VectorDict
        if isinstance(x, VectorDict):
            x = dict(x)
            
    for i, (x, y) in enumerate(data_stream):
       
        # Periodically load the latest model from MLFlow, for example, every 100 iterations
        if i % 100 == 0:
            latest_model = load_latest_model()
            if latest_model:
                model = latest_model

        # Inference (test-then-train)
        y_pred = model.predict_one(x)

        # Update the model with the new data point
        model.learn_one(x, y)

        # Log the updated model to MLFlow periodically, for example, every 100 iterations
        if i % 100 == 99:
            log_model_to_mlflow(model, run_id)

    # Log the final model state
    log_model_to_mlflow(model, run_id)

print("Online learning and model updating completed.")

Online learning and model updating completed.
