diff --git a/.typos.toml b/.typos.toml index 6a44caa4..48ffcd36 100644 --- a/.typos.toml +++ b/.typos.toml @@ -3,6 +3,8 @@ extend-exclude = [ "*.json", "*.js", "*.ipynb", + "llm-finetuning/*", + "end-to-end-computer-vision/*", ] [default.extend-identifiers] @@ -31,6 +33,12 @@ arange = "arange" cachable = "cachable" OT = "OT" cll = "cll" +Louvre = "Louvre" +quantised = "quantised" +colours = "colours" +initialised = "initialised" +visualisation = "visualisation" +customise = "customise" [default] locale = "en-us" diff --git a/flux-dreambooth/train_dreambooth_lora_flux.py b/flux-dreambooth/train_dreambooth_lora_flux.py index 83c89df4..5558812b 100644 --- a/flux-dreambooth/train_dreambooth_lora_flux.py +++ b/flux-dreambooth/train_dreambooth_lora_flux.py @@ -1977,7 +1977,10 @@ def get_sigmas(timesteps, n_dim=4, dtype=torch.float32): # Predict the noise residual model_pred = transformer( hidden_states=packed_noisy_model_input, - # YiYi notes: divide it by 1000 for now because we scale it by 1000 in the transforme rmodel (we should not keep it but I want to keep the inputs same for the model for testing) + # YiYi notes: divide it by 1000 for now because we scale it + # by 1000 in the transformer model (we should not keep it + # but I want to keep the inputs same for the model for + # testing) timestep=timesteps / 1000, guidance=guidance, pooled_projections=pooled_prompt_embeds, diff --git a/huggingface-sagemaker/README.md b/huggingface-sagemaker/README.md index 2d1a3acc..672ff985 100644 --- a/huggingface-sagemaker/README.md +++ b/huggingface-sagemaker/README.md @@ -214,7 +214,7 @@ This will train a model from Huggingface and register a new ZenML model on the M Please note the above screens are a cloud-only feature in [ZenML Pro](https://zenml.io/pro), and the CLI `zenml models list` should be used instead for OSS users. -At the end of the pipeline, the model will also be pushed the Huggingface, and a link estabilished between the ZenML Control Plane and the Huggingface model repository. +At the end of the pipeline, the model will also be pushed the Huggingface, and a link established between the ZenML Control Plane and the Huggingface model repository. Huggingface Repo diff --git a/llm-complete-guide/README.md b/llm-complete-guide/README.md index 60bdb910..932905fb 100644 --- a/llm-complete-guide/README.md +++ b/llm-complete-guide/README.md @@ -130,6 +130,11 @@ Once the pipeline has run successfully, you can query the assets in your vector using the `--query` flag as well as passing in the model you'd like to use for the LLM. +Note that you'll need to set the `LANGFUSE_API_KEY` environment variable for the +tracing which is built in to the implementation of the inference. This will +trace all LLM calls and store them in the [Langfuse](https://langfuse.com/) +platform. + When you're ready to make the query, run the following command: ```shell @@ -197,6 +202,21 @@ python run.py evaluation You'll need to have first run the RAG pipeline to have the necessary assets in the database to evaluate. +## RAG evaluation with Langfuse + +You can run the Langfuse evaluation pipeline if you have marked some of your +responses as good or bad in the deployed Hugging Face space. + +To run the evaluation pipeline, you can use the following command: + +```shell +python run.py langfuse_evaluation +``` + +Note that this pipeline will only work if you have set the `LANGFUSE_API_KEY` +environment variable. It will use this key to fetch the traces from Langfuse and +evaluate the responses. + ## Embeddings finetuning For embeddings finetuning we first generate synthetic data and then finetune the @@ -292,7 +312,7 @@ The project loosely follows [the recommended ZenML project structure](https://do ├── most_basic_eval.py # Basic evaluation script ├── most_basic_rag_pipeline.py # Basic RAG pipeline script ├── notebooks -│ └── visualise_embeddings.ipynb # Notebook to visualize embeddings +│ └── visualize_embeddings.ipynb # Notebook to visualize embeddings ├── pipelines │ ├── __init__.py │ ├── generate_chunk_questions.py # Pipeline to generate chunk questions diff --git a/llm-complete-guide/configs/dev/rag.yaml b/llm-complete-guide/configs/dev/rag.yaml index 3379a686..dc0eb255 100644 --- a/llm-complete-guide/configs/dev/rag.yaml +++ b/llm-complete-guide/configs/dev/rag.yaml @@ -28,3 +28,6 @@ steps: parameters: docs_url: https://docs.zenml.io/ use_dev_set: true + index_generator: + parameters: + index_type: postgres diff --git a/llm-complete-guide/pipelines/__init__.py b/llm-complete-guide/pipelines/__init__.py index 3e9f4d62..c8b76e19 100644 --- a/llm-complete-guide/pipelines/__init__.py +++ b/llm-complete-guide/pipelines/__init__.py @@ -20,4 +20,5 @@ from pipelines.llm_basic_rag import llm_basic_rag from pipelines.llm_eval import llm_eval from pipelines.llm_index_and_evaluate import llm_index_and_evaluate +from pipelines.llm_langfuse_evals import llm_langfuse_evaluation from pipelines.rag_deployment import rag_deployment diff --git a/llm-complete-guide/pipelines/llm_langfuse_evals.py b/llm-complete-guide/pipelines/llm_langfuse_evals.py new file mode 100644 index 00000000..07955a90 --- /dev/null +++ b/llm-complete-guide/pipelines/llm_langfuse_evals.py @@ -0,0 +1,14 @@ +from typing import Optional + +from steps.eval_langfuse import fast_eval, visualize_fast_eval_results +from zenml import pipeline + + +@pipeline(enable_cache=False) +def llm_langfuse_evaluation(after: Optional[str] = None) -> None: + results = fast_eval(after=after) + visualize_fast_eval_results(results) + + +if __name__ == "__main__": + llm_langfuse_evaluation() diff --git a/llm-complete-guide/run.py b/llm-complete-guide/run.py index b4f0f203..d9908c75 100644 --- a/llm-complete-guide/run.py +++ b/llm-complete-guide/run.py @@ -48,6 +48,7 @@ llm_basic_rag, llm_eval, llm_index_and_evaluate, + llm_langfuse_evaluation, rag_deployment, ) from structures import Document @@ -76,6 +77,7 @@ "embeddings", "chunks", "basic_rag", + "langfuse_evaluation", ] ), required=True, @@ -268,6 +270,10 @@ def main( pipeline_args["enable_cache"] = False llm_eval.with_options(model=zenml_model, config_path=config_path)() + elif pipeline == "langfuse_evaluation": + pipeline_args["enable_cache"] = False + llm_langfuse_evaluation.with_options(model=zenml_model)() + elif pipeline == "synthetic": generate_synthetic_data.with_options( model=zenml_model, config_path=config_path, **pipeline_args diff --git a/llm-complete-guide/steps/eval_langfuse.py b/llm-complete-guide/steps/eval_langfuse.py new file mode 100644 index 00000000..8fa454df --- /dev/null +++ b/llm-complete-guide/steps/eval_langfuse.py @@ -0,0 +1,405 @@ +import io +import json +import logging +import sys +import traceback +from collections import defaultdict +from dataclasses import dataclass +from typing import Annotated, Any, Dict, List, Optional, Tuple + +import matplotlib.pyplot as plt +import numpy as np +from langfuse import Langfuse +from litellm import completion +from PIL import Image +from pydantic import BaseModel +from rich import print +from utils.llm_utils import process_input_with_retrieval +from zenml import ArtifactConfig, get_step_context, log_metadata, step +from zenml.logger import get_logger + +logger = get_logger(__name__) + +langfuse = Langfuse() + +sys.excepthook = sys.__excepthook__ # Revert to standard tracebacks + +SERVICE_CONNECTORS_EVAL_CRITERIA = """ +The RAG pipeline sometimes struggles to respond to questions about ZenML's service +connectors feature. Generally speaking, it should respond in detail with code +examples when asked about supported service connectors (and should explain how to +use them together with associated stack components.) And if someone is asking +about service connectors then generally speaking it shouldn't then go on to +be all about an orchestrator etc instead of focusing on the service connectors. +""" + +MISSING_CODE_SAMPLE_EVAL_CRITERIA = """ +The RAG pipeline sometimes doesn't include a code sample in the response. +Of course, a code sample doesn't always need to be included, but generally when +a user asks about how to use a feature, it's usually useful to include a code +sample in the response. (Also note that there are detailed documents about +Evidently and so it's not enough to just say that you can implement Evidently as +a custom step.) +""" + +EVALUATION_DATA_PAIRS = [ + { + "langfuse_score_identifier": "service_connectors", + "eval_criteria": SERVICE_CONNECTORS_EVAL_CRITERIA, + }, + { + "langfuse_score_identifier": "missing_code_sample", + "eval_criteria": MISSING_CODE_SAMPLE_EVAL_CRITERIA, + }, +] + + +class EvalResult(BaseModel): + """Pydantic model to capture LLM evaluation outputs.""" + + is_good_response: bool + reasoning: str + + +@dataclass +class EvaluationInput: + """Data class to hold all inputs needed for a single evaluation.""" + + question: str + llm_response: str + eval_criteria: str + sample_good_response: str + sample_bad_response: str + + +def construct_eval_metaprompt( + eval_criteria: str, + example_good_response: str = "None provided for this evaluation.", + example_bad_response: str = "None provided for this evaluation.", +) -> str: + """Construct a metaprompt for evaluating the performance of an LLM. + + Args: + eval_criteria (str): The criteria for evaluating the LLM. + example_good_response (str): An example of a good response. + example_bad_response (str): An example of a bad response. + """ + return f""" +# General Instructions + +You are an expert at evaluating the performance of a chatbot. + +You will be given: +1. A user question +2. An OLD RESPONSE from a previous version of the chatbot +3. A NEW RESPONSE from the current version of the chatbot + +Your task is to compare these responses and determine if the NEW RESPONSE is better +than the OLD RESPONSE based on the evaluation criteria below. + +# Evaluation Criteria + +You should judge whether the NEW RESPONSE is an improvement over the OLD RESPONSE, +considering these specific criteria: + +{eval_criteria} + +## Example Responses + +Here is an example of a good response format: +{example_good_response} + +Here is an example of a response that needs improvement: +{example_bad_response} + +# Your response + +Your response should be a JSON object with the following fields: + +- `is_good_response`: (bool) Whether the NEW RESPONSE is an improvement over the OLD RESPONSE. +- `reasoning`: (str) A brief explanation comparing the two responses and justifying your decision. + Focus on specific improvements or regressions in the NEW RESPONSE. +""" + + +def build_evaluation_messages( + eval_input: EvaluationInput, +) -> List[Dict[str, str]]: + """Construct the messages payload for the LLM evaluation. + + Args: + eval_input: Container for all evaluation-related inputs + + Returns: + List of message dictionaries for the LLM API call + """ + prompt = construct_eval_metaprompt( + eval_input.eval_criteria, + eval_input.sample_good_response, + eval_input.sample_bad_response, + ) + + return [ + {"role": "system", "content": prompt}, + { + "role": "user", + "content": f"Question: {eval_input.question}\n\nLLM Response: {eval_input.llm_response}", + }, + ] + + +def evaluate_single_response( + eval_input: EvaluationInput, logger: logging.Logger +) -> Optional[Dict[str, Any]]: + """Evaluate a single LLM response using the evaluation criteria.""" + try: + messages = build_evaluation_messages(eval_input) + logger.debug( + "Constructed evaluation messages:\n%s", + json.dumps(messages, indent=2), + ) + + logger.debug( + "Sending evaluation prompt to LLM for question: %s", + eval_input.question, + ) + + # Use minimal OpenAI-compatible JSON response format + response = completion( + model="gpt-4o", + messages=messages, + response_format={"type": "json_object"}, + ) + + # Log full response structure for debugging + logger.debug( + "Raw LLM response:\n%s", json.dumps(response.dict(), indent=2) + ) + + # Validate and parse the response content + response_content = response.choices[0].message.content + evaluation = EvalResult.model_validate_json(response_content) + + result = { + "question": eval_input.question, + "evaluation": evaluation.model_dump(), + } + print(result) + + logger.debug("Successfully parsed evaluation result") + logger.debug( + "Full evaluation result:\n%s", json.dumps(result, indent=2) + ) + return result + + except Exception as e: + logger.error( + "Error during LLM evaluation for question '%s': %s\n%s", + eval_input.question, + str(e), + "Full traceback:\n" + traceback.format_exc(), + ) + return None + + +def get_langfuse_scores( + langfuse_score_identifier: str, +) -> Tuple[List[Dict[str, Any]], str, str]: + """Get the Langfuse scores for a given score identifier.""" + all_scores = langfuse.api.score.get(name=langfuse_score_identifier).data + if not all_scores: + logger.error( + f"Score with name '{langfuse_score_identifier}' not found" + ) + return [], "None provided", "None provided" + + logger.info( + f"Found {len(all_scores)} scores with name '{langfuse_score_identifier}'" + ) + + # Extract question-answer pairs (only from downvoted traces) + qa_pairs: List[Dict[str, Any]] = [] + good_response = "None provided" + bad_response = "None provided" + + for score in all_scores: + associated_trace = langfuse.get_trace(id=score.trace_id) + question = associated_trace.input["messages"][1]["content"] + old_response = associated_trace.output["content"] + qa_pairs.append({"question": question, "old_response": old_response}) + if score.value == 0: + bad_response = f"Question: {question}\n\nResponse: {old_response}" + else: + good_response = f"Question: {question}\n\nResponse: {old_response}" + + logger.info( + f"Found {len(qa_pairs)} question-answer pairs from downvoted traces" + ) + logger.debug(f"Good response found: {good_response != 'None provided'}") + logger.debug(f"Bad response found: {bad_response != 'None provided'}") + + if not qa_pairs: + raise ValueError( + "No valid question-answer pairs found in downvoted traces" + ) + + return qa_pairs, good_response, bad_response + + +@step(enable_cache=False) +def fast_eval() -> List[Dict[str, Any]]: + """Evaluate LLM responses by comparing old vs new responses. + + Returns: + List of evaluation results comparing old and new responses + """ + logger = logging.getLogger(__name__) + results: List[Dict[str, Any]] = [] + + for pair in EVALUATION_DATA_PAIRS: + langfuse_score_identifier = pair["langfuse_score_identifier"] + eval_criteria = pair["eval_criteria"] + + langfuse_score_data, sample_good_response, sample_bad_response = ( + get_langfuse_scores(langfuse_score_identifier) + ) + + for data in langfuse_score_data: + question = data["question"] + old_response = data["old_response"] + + # Generate new response using current implementation + try: + new_response = process_input_with_retrieval( + question, + tracing_tags=["evaluation"], + ) + except Exception as e: + logger.error(f"Error generating new response: {e}") + continue + + eval_input = EvaluationInput( + question=question, + llm_response=f"OLD RESPONSE:\n{old_response}\n\nNEW RESPONSE:\n{new_response}", + eval_criteria=eval_criteria, + sample_good_response=sample_good_response, + sample_bad_response=sample_bad_response, + ) + + if result := evaluate_single_response(eval_input, logger): + result.update( + { + "experiment_name": langfuse_score_identifier, + "old_response": old_response, + "new_response": new_response, + } + ) + results.append(result) + + logger.info("All evaluations completed with %d results", len(results)) + return results + + +@step +def visualize_fast_eval_results( + results: List[Dict[str, Any]], +) -> Annotated[Image.Image, ArtifactConfig(name="fast_eval_metrics")]: + """Visualize the results of the fast evaluation. + + Args: + results: List of evaluation results from the fast_eval step, each containing + experiment_name, question, and evaluation data + + Returns: + PIL Image showing the evaluation metrics visualization + """ + step_context = get_step_context() + pipeline_run_name = step_context.pipeline_run.name + + # Process results to get metrics per experiment + experiment_metrics = defaultdict(lambda: {"total": 0, "bad": 0}) + + for result in results: + experiment = result["experiment_name"] + experiment_metrics[experiment]["total"] += 1 + if not result["evaluation"]["is_good_response"]: + experiment_metrics[experiment]["bad"] += 1 + + # Calculate percentages + percentages = {} + for exp, metrics in experiment_metrics.items(): + if metrics["total"] > 0: + percentages[exp] = (metrics["bad"] / metrics["total"]) * 100 + + log_metadata( + metadata={ + f"{exp}.total_evaluations": metrics["total"], + f"{exp}.bad_responses": metrics["bad"], + f"{exp}.bad_response_percentage": percentages[exp], + } + ) + + # Create visualization + labels = list(percentages.keys()) + scores = list(percentages.values()) + + # Create a new figure and axis + fig, ax = plt.subplots( + figsize=(12, max(6, len(labels) * 0.5)) + ) # Adjust height based on number of experiments + fig.subplots_adjust( + left=0.4 + ) # Adjust left margin for potentially longer experiment names + + # Plot horizontal bar chart + y_pos = np.arange(len(labels)) + bars = ax.barh(y_pos, scores, align="center", color="skyblue") + + # Add value labels on the bars + for i, bar in enumerate(bars): + width = bar.get_width() + total = experiment_metrics[labels[i]]["total"] + bad = experiment_metrics[labels[i]]["bad"] + ax.text( + width + 1, + bar.get_y() + bar.get_height() / 2, + f"{width:.1f}% ({bad}/{total})", + ha="left", + va="center", + ) + + # Customize the plot + ax.set_yticks(y_pos) + ax.set_yticklabels([name.replace("_", " ").title() for name in labels]) + ax.invert_yaxis() # Labels read top-to-bottom + ax.set_xlabel("Percentage of Bad Responses") + ax.set_title( + f"Fast Evaluation Results - Bad Response Rate\n{pipeline_run_name}" + ) + ax.set_xlim(0, 100) # Set x-axis limits for percentage + + # Add grid for better readability + ax.grid(True, axis="x", linestyle="--", alpha=0.7) + + # Add a light gray background for better contrast + ax.set_facecolor("#f8f8f8") + + # Add total evaluations count to the title + total_evals = sum( + metrics["total"] for metrics in experiment_metrics.values() + ) + plt.suptitle(f"Total Evaluations: {total_evals}", y=0.95, fontsize=10) + + # Adjust layout + plt.tight_layout() + + # Convert plot to PIL Image + buf = io.BytesIO() + plt.savefig( + buf, format="png", bbox_inches="tight", dpi=300, facecolor="white" + ) + buf.seek(0) + image = Image.open(buf) + + logger.info("Generated visualization for fast evaluation results") + return image diff --git a/llm-complete-guide/steps/finetune_embeddings_legacy.py b/llm-complete-guide/steps/finetune_embeddings_legacy.py index 7136e784..3e2d25a7 100644 --- a/llm-complete-guide/steps/finetune_embeddings_legacy.py +++ b/llm-complete-guide/steps/finetune_embeddings_legacy.py @@ -247,7 +247,7 @@ def evaluate_model( Returns: A tuple containing the average cosine similarity for each model on the - test set as well as an image visualising the comparison. + test set as well as an image visualizing the comparison. """ logger.info("Evaluating the finetuned model on the test set.") logger.info(f"Comparison model: {comparison_model}") @@ -514,7 +514,7 @@ def calculate_batch_similarities( # Returns: # A tuple containing the average cosine similarity for each model on the -# test set as well as an image visualising the comparison. +# test set as well as an image visualizing the comparison. # """ # pretrained_model = SentenceTransformer(comparison_model) diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index 9e97e9e1..32c01b0b 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -297,7 +297,7 @@ def get_pinecone_client(model_version_name_or_id: str = "dev") -> pinecone.Index ] pc = Pinecone(api_key=pinecone_api_key) - # if the model versio is staging, we check if any index name is associated as metadata + # if the model version is staging, we check if any index name is associated as metadata # if not, create a new one with the name from the secret and attach it to the metadata # if the model version is production, we just use the index name from the metadata attached to it # raise error if there is no index name attached to the metadata diff --git a/llm-finetuning/README.md b/llm-finetuning/README.md index 8dc4841b..f1d6d10a 100644 --- a/llm-finetuning/README.md +++ b/llm-finetuning/README.md @@ -87,7 +87,9 @@ The `deployment` pipelines relies on the `training_pipeline` to have run before. ## :cloud: Deployment -We have create a custom zenml model deployer for deploying models on the huggingface inference endpoint. The code for custom deployer is in [huggingface](./huggingface/) folder. +We have create a custom zenml model deployer for deploying models on the +huggingface inference endpoint. The code for custom deployer is in +the deployment pipeline which can be found [here](./pipelines/deployment.py). For running deployment pipeline, we create a custom zenml stack. As we are using a custom model deployer, we will have to register the flavor and model deployer. We update the stack to use this custom model deployer for running deployment pipeline.