# Nebius Managed Service for MLflow Demo

This demo shows you how key use cases of using the Nebius Managed Service for MLflow for AI development.

## Getting Started

1. Launch your instance of the Managed Service for MLflow with [the MLflow quickstart](https://docs.nebius.com/mlflow/quickstart).
2. Set up your API key to connect to Nebius AI Studio with [the AI Studio quickstart](https://docs.nebius.com/studio/inference/quickstart).

> **Note:** Launching an MLflow cluster may take 30-60 minutes to be fully provisioned and ready to use.

In [None]:
!pip install mlflow==2.20.2 python-dotenv openai

### Secrets and Environment Variables

You will need to set the following environment variables where this notebook is running, so that the code in the following cells can connect to both Nebius Managed Service for MLflow and Nebius AI Studio. 

MLflow:<br>
`MLFLOW_TRACKING_SERVER_CERT_PATH`<br>
`MLFLOW_TRACKING_URI`<br>
`MLFLOW_TRACKING_USERNAME`<br>
`MLFLOW_TRACKING_PASSWORD`<br>

AI Studio:<br>
`NEBIUS_API_KEY`

### Environment Setup

To set the environment variables, run the following cell. You may choose to set them interactively or by loading from a `.env` file.

In [None]:
%reload_ext autoreload
%autoreload 2

import os
import sys
from pathlib import Path
from dotenv import load_dotenv

# Add the parent directory to Python path
sys.path.append(str(Path.cwd().parent))

from env_setup import setup_env_from_file, setup_env_interactive, verify_env_setup

# Option 1: Interactive setup
# setup_env_interactive()

# Option 2: Load from .env file
setup_env_from_file('../.env')

# Verify the setup
verify_env_setup()

### Check connection to MLflow


In [None]:
import mlflow 

# List experiments in MLflow
mlflow.search_experiments()

### Set up Nebius AI Studio client

In [4]:
import openai

API_KEY = os.environ.get("NEBIUS_API_KEY")

# Instantiate the client instance
nebius_client = openai.OpenAI(api_key=API_KEY,
                              base_url="https://api.studio.nebius.ai/v1/")


# Example 1: MLflow Tracking Quickstart

https://mlflow.org/docs/latest/getting-started/intro-quickstart/

In [None]:
import mlflow
from mlflow.models import infer_signature

import pandas as pd
from sklearn import datasets
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score


# Load the Iris dataset
X, y = datasets.load_iris(return_X_y=True)

# Split the data into training and test sets
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

# Define the model hyperparameters
params = {
    "solver": "lbfgs",
    "max_iter": 1000,
    "multi_class": "auto",
    "random_state": 8888,
}

# Train the model
lr = LogisticRegression(**params)
lr.fit(X_train, y_train)

# Predict on the test set
y_pred = lr.predict(X_test)

# Calculate metrics
accuracy = accuracy_score(y_test, y_pred)

### Log the model and its metadata to MLflow

- Initiate an MLflow run context to start a new run that we will log the model and metadata to.
- Log model parameters and performance metrics.
- Tag the run for easy retrieval.
- Register the model in the MLflow Model Registry while logging (saving) the model.


In [None]:
# Create a new MLflow Experiment
mlflow.set_experiment("MLflow Quickstart")

# Start an MLflow run
with mlflow.start_run():
    # Log the hyperparameters
    mlflow.log_params(params)

    # Log the loss metric
    mlflow.log_metric("accuracy", accuracy)

    # Set a tag that we can use to remind ourselves what this run was for
    mlflow.set_tag("Training Info", "Basic LR model for iris data")

    # Infer the model signature
    signature = infer_signature(X_train, lr.predict(X_train))

    # Log the model
    model_info = mlflow.sklearn.log_model(
        sk_model=lr,
        artifact_path="iris_model",
        signature=signature,
        input_example=X_train,
        registered_model_name="tracking-quickstart",
    )

In [None]:
model_info.model_uri

## Load the model as a Python Function (pyfunc) and use it for inference

- Loading the model using MLflow's pyfunc flavor.
- Running Predict on new data using the loaded model

In [None]:
# Load the model back for predictions as a generic Python Function model
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)

loaded_model

In [None]:
# Get predictions 

predictions = loaded_model.predict(X_test)

iris_feature_names = datasets.load_iris().feature_names

result = pd.DataFrame(X_test, columns=iris_feature_names)
result["actual_class"] = y_test
result["predicted_class"] = predictions

result[:4]

# Example 2: MLflow Tracing for LLM Observability

Traces enhances LLM observability in your Generative AI (GenAI) applications by capturing detailed information about the execution details. 

https://mlflow.org/docs/latest/tracing/

## Automatic Tracing of LLM calls

In [None]:
import mlflow
import openai

tracing_experiment =mlflow.set_experiment("MLflow Tracing")

# Enable MLflow automatic tracing for OpenAI with one line of code!
mlflow.openai.autolog()


# Time to call the LLM -- tracing is done automatically
nebius_client.chat.completions.create(
    model="meta-llama/Llama-3.3-70B-Instruct",
    temperature=0.95,
    messages=[
        {"role": "system", "content": "You are a chatbot."},
        {"role": "user", "content": "What is the weather like today?"},
    ],
)


## Manual Tracing

1. Instrument a function with @mlflow.trace decorator.
2. Instrument any block of code using mlflow.start_span context manager.
3. Grouping or annotating traces using a tag.
4. Disabling trace globally.

https://mlflow.org/docs/latest/tracing/#manual-tracing

In [33]:
import mlflow
from mlflow.entities import SpanType
import openai
import time

In [None]:
# Use MLflow tracing to trace the execution of a function

@mlflow.trace(span_type="func", attributes={"key": "value"})
def add_1(x):
    return x + 1


@mlflow.trace(span_type="func", attributes={"key": "value"})
def minus_1(x):
    return x - 1


@mlflow.trace(name="Trace Test")
def trace_test(x):
    step1 = add_1(x)
    return minus_1(step1)


trace_test(4)

In [None]:
# Integrate tracing into your LLM workflow

mlflow.openai.autolog()

@mlflow.trace(span_type=SpanType.CHAIN)
def run(question):
    messages = build_messages(question)
    # MLflow automatically generates a span for OpenAI invocation
    response = nebius_client.chat.completions.create(
        # model="gpt-4o-mini",
        model="meta-llama/Llama-3.3-70B-Instruct",
        max_tokens=100,
        messages=messages,
    )
    return parse_response(response)


@mlflow.trace
def build_messages(question):
    return [
        {"role": "system", "content": "You are a helpful chatbot."},
        {"role": "user", "content": question},
    ]

@mlflow.trace
def parse_response(response):
    return response.choices[0].message.content


run("What is MLflow?")

In [None]:
# Catch exceptions in your traces


# Define methods to be traced
@mlflow.trace(span_type=SpanType.TOOL, attributes={"time": "morning"})
def morning_greeting(name: str):
    time.sleep(1)
    mlflow.update_current_trace(tags={"person": name})
    return f"Good morning {name}."


@mlflow.trace(span_type=SpanType.TOOL, attributes={"time": "evening"})
def evening_greeting(name: str):
    time.sleep(1)
    mlflow.update_current_trace(tags={"person": name})
    return f"Good evening {name}."


@mlflow.trace(span_type=SpanType.TOOL)
def goodbye():
    raise Exception("Cannot say goodbye")


# Execute the methods within different experiments
morning_experiment = mlflow.set_experiment("Morning Experiment")
morning_greeting("Tom")
morning_greeting("Mike")

# Get the timestamp in milliseconds
morning_time = int(time.time() * 1000)

evening_experiment = mlflow.set_experiment("Evening Experiment")
experiment_ids = [morning_experiment.experiment_id, evening_experiment.experiment_id]
evening_greeting("Mary")
goodbye()

In [None]:
# Get the timestamp in milliseconds
one_hour_ago = int(time.time() - 3600) * 1000  # 3600 seconds = 1 hour in milliseconds

one_hour_ago

In [None]:
# Search and analyze traces

mlflow.search_traces(
    tracing_experiment.experiment_id, 
    filter_string=f"timestamp_ms < {one_hour_ago}",
)[:3]

# Example 3: Automatic Tracking of AI Agent Behavior

- Tracing LangGraph with MLflow https://mlflow.org/docs/latest/tracing/integrations/langgraph 
- Example: Code generation with RAG and self-correction with https://langchain-ai.github.io/langgraph/tutorials/code_assistant/langgraph_code_assistant/ 

In [None]:
!pip install langchain_openai langchain langgraph langchain_core

## Tracing LangGraph 

In [None]:
from typing import Literal

import mlflow

from langchain_core.messages import AIMessage, ToolCall
from langchain_core.outputs import ChatGeneration, ChatResult
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import create_react_agent

# Enabling tracing for LangGraph (LangChain)
mlflow.langchain.autolog()

# Optional: Set a tracking URI and an experiment
# mlflow.set_tracking_uri("http://localhost:5000")
mlflow.set_experiment("LangGraph")


@tool
def get_weather(city: Literal["nyc", "sf"]):
    """Use this to get weather information."""
    if city == "nyc":
        return "It might be cloudy in nyc"
    elif city == "sf":
        return "It's always sunny in sf"


llm = ChatOpenAI(model="gpt-4o-mini")
tools = [get_weather]
graph = create_react_agent(llm, tools)

# Invoke the graph
result = graph.invoke(
    {"messages": [{"role": "user", "content": "what is the weather in tokyo?"}]}
)

## Track Code Generation with RAG and self-correction

- Original example: https://langchain-ai.github.io/langgraph/tutorials/code_assistant/langgraph_code_assistant/ 
- MLflow integration: https://mlflow.org/docs/latest/tracing/integrations/langgraph#adding-spans-within-a-node-or-a-tool 


In [None]:
! pip install -U langchain_community langchain-openai langchain-anthropic langchain langgraph bs4

### Load LangChain Expression Language (LCEL) docs as an example.

In [69]:
from bs4 import BeautifulSoup as Soup
from langchain_community.document_loaders.recursive_url_loader import RecursiveUrlLoader

# LCEL docs
url = "https://python.langchain.com/docs/concepts/lcel/"
loader = RecursiveUrlLoader(
    url=url, max_depth=20, extractor=lambda x: Soup(x, "html.parser").text
)
docs = loader.load()

# Sort the list based on the URLs and get the text
d_sorted = sorted(docs, key=lambda x: x.metadata["source"])
d_reversed = list(reversed(d_sorted))
concatenated_content = "\n\n\n --- \n\n\n".join(
    [doc.page_content for doc in d_reversed]
)

### Create a code_gen_chain

### 1. Create a code_gen_chain with OpenAI


In [None]:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from pydantic import BaseModel, Field

### OpenAI

# Grader prompt
code_gen_prompt = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """You are a coding assistant with expertise in LCEL, LangChain expression language. \n 
    Here is a full set of LCEL documentation:  \n ------- \n  {context} \n ------- \n Answer the user 
    question based on the above provided documentation. Ensure any code you provide can be executed \n 
    with all required imports and variables defined. Structure your answer with a description of the code solution. \n
    Then list the imports. And finally list the functioning code block. Here is the user question:""",
        ),
        ("placeholder", "{messages}"),
    ]
)


# Data model
class code(BaseModel):
    """Schema for code solutions to questions about LCEL."""

    prefix: str = Field(description="Description of the problem and approach")
    imports: str = Field(description="Code block import statements")
    code: str = Field(description="Code block not including import statements")


expt_llm = "gpt-4o-mini"
llm = ChatOpenAI(temperature=0, model=expt_llm)
code_gen_chain_oai = code_gen_prompt | llm.with_structured_output(code)
question = "How do I build a RAG chain in LCEL?"
solution = code_gen_chain_oai.invoke(
    {"context": concatenated_content, "messages": [("user", question)]}
)
solution

### 2. Create a code_gen_chain with Anthropic

In [73]:
from langchain_anthropic import ChatAnthropic
from langchain_core.prompts import ChatPromptTemplate

### Anthropic

# Prompt to enforce tool use
code_gen_prompt_claude = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            """<instructions> You are a coding assistant with expertise in LCEL, LangChain expression language. \n 
    Here is the LCEL documentation:  \n ------- \n  {context} \n ------- \n Answer the user  question based on the \n 
    above provided documentation. Ensure any code you provide can be executed with all required imports and variables \n
    defined. Structure your answer: 1) a prefix describing the code solution, 2) the imports, 3) the functioning code block. \n
    Invoke the code tool to structure the output correctly. </instructions> \n Here is the user question:""",
        ),
        ("placeholder", "{messages}"),
    ]
)


# LLM
expt_llm = "claude-3-opus-20240229"
llm = ChatAnthropic(
    model=expt_llm,
    default_headers={"anthropic-beta": "tools-2024-04-04"},
)

structured_llm_claude = llm.with_structured_output(code, include_raw=True)


# Optional: Check for errors in case tool use is flaky
def check_claude_output(tool_output):
    """Check for parse error or failure to call the tool"""

    # Error with parsing
    if tool_output["parsing_error"]:
        # Report back output and parsing errors
        print("Parsing error!")
        raw_output = str(tool_output["raw"].content)
        error = tool_output["parsing_error"]
        raise ValueError(
            f"Error parsing your output! Be sure to invoke the tool. Output: {raw_output}. \n Parse error: {error}"
        )

    # Tool was not invoked
    elif not tool_output["parsed"]:
        print("Failed to invoke tool!")
        raise ValueError(
            "You did not use the provided tool! Be sure to invoke the tool to structure the output."
        )
    return tool_output


# Chain with output check
code_chain_claude_raw = (
    code_gen_prompt_claude | structured_llm_claude | check_claude_output
)


def insert_errors(inputs):
    """Insert errors for tool parsing in the messages"""

    # Get errors
    error = inputs["error"]
    messages = inputs["messages"]
    messages += [
        (
            "assistant",
            f"Retry. You are required to fix the parsing errors: {error} \n\n You must invoke the provided tool.",
        )
    ]
    return {
        "messages": messages,
        "context": inputs["context"],
    }


# This will be run as a fallback chain
fallback_chain = insert_errors | code_chain_claude_raw
N = 3  # Max re-tries
code_gen_chain_re_try = code_chain_claude_raw.with_fallbacks(
    fallbacks=[fallback_chain] * N, exception_key="error"
)


def parse_output(solution):
    """When we add 'include_raw=True' to structured output,
    it will return a dict w 'raw', 'parsed', 'parsing_error'."""

    return solution["parsed"]


# Optional: With re-try to correct for failure to invoke tool
code_gen_chain = code_gen_chain_re_try | parse_output

# No re-try
code_gen_chain = code_gen_prompt_claude | structured_llm_claude | parse_output

In [None]:
# Test
question = "How do I build a RAG chain in LCEL?"
solution = code_gen_chain.invoke(
    {"context": concatenated_content, "messages": [("user", question)]}
)
solution

### Define the Graph

In [None]:
from typing import List
from typing_extensions import TypedDict


class GraphState(TypedDict):
    """
    Represents the state of our graph.

    Attributes:
        error : Binary flag for control flow to indicate whether test error was tripped
        messages : With user question, error messages, reasoning
        generation : Code solution
        iterations : Number of tries
    """

    error: str
    messages: List
    generation: str
    iterations: int

In [76]:
### Parameter

# Max tries
max_iterations = 3
# Reflect
# flag = 'reflect'
flag = "do not reflect"

### Nodes


def generate(state: GraphState):
    """
    Generate a code solution

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation
    """

    print("---GENERATING CODE SOLUTION---")

    # State
    messages = state["messages"]
    iterations = state["iterations"]
    error = state["error"]

    # We have been routed back to generation with an error
    if error == "yes":
        messages += [
            (
                "user",
                "Now, try again. Invoke the code tool to structure the output with a prefix, imports, and code block:",
            )
        ]

    # Solution
    code_solution = code_gen_chain.invoke(
        {"context": concatenated_content, "messages": messages}
    )
    messages += [
        (
            "assistant",
            f"{code_solution.prefix} \n Imports: {code_solution.imports} \n Code: {code_solution.code}",
        )
    ]

    # Increment
    iterations = iterations + 1
    return {"generation": code_solution, "messages": messages, "iterations": iterations}



def reflect(state: GraphState):
    """
    Reflect on errors

    Args:
        state (dict): The current graph state

    Returns:
        state (dict): New key added to state, generation
    """

    print("---GENERATING CODE SOLUTION---")

    # State
    messages = state["messages"]
    iterations = state["iterations"]
    code_solution = state["generation"]

    # Prompt reflection

    # Add reflection
    reflections = code_gen_chain.invoke(
        {"context": concatenated_content, "messages": messages}
    )
    messages += [("assistant", f"Here are reflections on the error: {reflections}")]
    return {"generation": code_solution, "messages": messages, "iterations": iterations}


### Edges


def decide_to_finish(state: GraphState):
    """
    Determines whether to finish.

    Args:
        state (dict): The current graph state

    Returns:
        str: Next node to call
    """
    error = state["error"]
    iterations = state["iterations"]

    if error == "no" or iterations == max_iterations:
        print("---DECISION: FINISH---")
        return "end"
    else:
        print("---DECISION: RE-TRY SOLUTION---")
        if flag == "reflect":
            return "reflect"
        else:
            return "generate"

In [79]:
# Code check with integrated tracing with MLflow

def code_check(state: GraphState):
    # State
    messages = state["messages"]
    code_solution = state["generation"]
    iterations = state["iterations"]

    # Get solution components
    imports = code_solution.imports
    code = code_solution.code

    # Check imports
    try:
        # Create a child span manually with mlflow.start_span() API
        with mlflow.start_span(name="import_check", span_type=SpanType.TOOL) as span:
            span.set_inputs(imports)
            exec(imports)
            span.set_outputs("ok")
    except Exception as e:
        error_message = [("user", f"Your solution failed the import test: {e}")]
        messages += error_message
        return {
            "generation": code_solution,
            "messages": messages,
            "iterations": iterations,
            "error": "yes",
        }

    # Check execution
    try:
        code = imports + "\n" + code
        with mlflow.start_span(name="execution_check", span_type=SpanType.TOOL) as span:
            span.set_inputs(code)
            exec(code)
            span.set_outputs("ok")
    except Exception as e:
        error_message = [("user", f"Your solution failed the code execution test: {e}")]
        messages += error_message
        return {
            "generation": code_solution,
            "messages": messages,
            "iterations": iterations,
            "error": "yes",
        }

    # No errors
    return {
        "generation": code_solution,
        "messages": messages,
        "iterations": iterations,
        "error": "no",
    }

### Create workflow 

In [80]:
from langgraph.graph import END, StateGraph, START

workflow = StateGraph(GraphState)

# Define the nodes
workflow.add_node("generate", generate)  # generation solution
workflow.add_node("check_code", code_check)  # check code
workflow.add_node("reflect", reflect)  # reflect

# Build graph
workflow.add_edge(START, "generate")
workflow.add_edge("generate", "check_code")
workflow.add_conditional_edges(
    "check_code",
    decide_to_finish,
    {
        "end": END,
        "reflect": "reflect",
        "generate": "generate",
    },
)
workflow.add_edge("reflect", "generate")
app = workflow.compile()

### Run the workflow

In [None]:
question = "How can I directly pass a string to a runnable and use it to construct the input needed for my prompt?"
solution = app.invoke({"messages": [("user", question)], "iterations": 0, "error": ""})

# Tracing CrewAI Agents

- Example https://mlflow.org/docs/latest/tracing/integrations/crewai 

In [None]:
!pip install crewai crewai_tools

In [None]:
import mlflow

# Turn on auto tracing by calling mlflow.crewai.autolog()
mlflow.crewai.autolog()

mlflow.set_experiment("CrewAI")

In [87]:
from crewai import Agent, Crew, Task
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai_tools import SerperDevTool, WebsiteSearchTool

from textwrap import dedent

content = "Users name is John. He is 30 years old and lives in San Francisco."
string_source = StringKnowledgeSource(
    content=content, metadata={"preference": "personal"}
)

search_tool = WebsiteSearchTool()

class TripAgents:
    def city_selection_agent(self):
        return Agent(
            role="City Selection Expert",
            goal="Select the best city based on weather, season, and prices",
            backstory="An expert in analyzing travel data to pick ideal destinations",
            tools=[
                search_tool,
            ],
            verbose=True,
        )

    def local_expert(self):
        return Agent(
            role="Local Expert at this city",
            goal="Provide the BEST insights about the selected city",
            backstory="""A knowledgeable local guide with extensive information
        about the city, it's attractions and customs""",
            tools=[search_tool],
            verbose=True,
        )

class TripTasks:
    def identify_task(self, agent, origin, cities, interests, range):
        return Task(
            description=dedent(
                f"""
                Analyze and select the best city for the trip based
                on specific criteria such as weather patterns, seasonal
                events, and travel costs. This task involves comparing
                multiple cities, considering factors like current weather
                conditions, upcoming cultural or seasonal events, and
                overall travel expenses.
                Your final answer must be a detailed
                report on the chosen city, and everything you found out
                about it, including the actual flight costs, weather
                forecast and attractions.

                Traveling from: {origin}
                City Options: {cities}
                Trip Date: {range}
                Traveler Interests: {interests}
            """
            ),
            agent=agent,
            expected_output="Detailed report on the chosen city including flight costs, weather forecast, and attractions",
        )

    def gather_task(self, agent, origin, interests, range):
        return Task(
            description=dedent(
                f"""
                As a local expert on this city you must compile an
                in-depth guide for someone traveling there and wanting
                to have THE BEST trip ever!
                Gather information about key attractions, local customs,
                special events, and daily activity recommendations.
                Find the best spots to go to, the kind of place only a
                local would know.
                This guide should provide a thorough overview of what
                the city has to offer, including hidden gems, cultural
                hotspots, must-visit landmarks, weather forecasts, and
                high level costs.
                The final answer must be a comprehensive city guide,
                rich in cultural insights and practical tips,
                tailored to enhance the travel experience.

                Trip Date: {range}
                Traveling from: {origin}
                Traveler Interests: {interests}
            """
            ),
            agent=agent,
            expected_output="Comprehensive city guide including hidden gems, cultural hotspots, and practical travel tips",
        )


class TripCrew:
    def __init__(self, origin, cities, date_range, interests):
        self.cities = cities
        self.origin = origin
        self.interests = interests
        self.date_range = date_range

    def run(self):
        agents = TripAgents()
        tasks = TripTasks()

        city_selector_agent = agents.city_selection_agent()
        local_expert_agent = agents.local_expert()

        identify_task = tasks.identify_task(
            city_selector_agent,
            self.origin,
            self.cities,
            self.interests,
            self.date_range,
        )
        gather_task = tasks.gather_task(
            local_expert_agent, self.origin, self.interests, self.date_range
        )

        crew = Crew(
            agents=[city_selector_agent, local_expert_agent],
            tasks=[identify_task, gather_task],
            verbose=True,
            memory=True,
            knowledge={
                "sources": [string_source],
                "metadata": {"preference": "personal"},
                "collection_name":"knowledge",
            },
        )

        result = crew.kickoff()
        return result


trip_crew = TripCrew("California", "Tokyo", "Dec 12 - Dec 20", "sports")


In [None]:
# Run the crew

result = trip_crew.run()
result.dict()