# Finetune an embeddings model using ZenML Data

In this notebook, we generate a synthetic dataset of (query, relevant documents) pairs from a corpus of documents *without labelers* by leveraging LLM.

In [1]:
from llama_index.readers import SimpleWebPageReader
from llama_index.node_parser import SimpleNodeParser
from llama_index.schema import MetadataMode
from zenml import step
from typing import Any, Annotated, List, Tuple, Dict

In [11]:
!rm -rf .zen

In [3]:
%pip install -e ~/apps/zenml[server]

Obtaining file:///home/wjayesh/apps/zenml
  Installing build dependencies ... [?25ldone
[?25h  Checking if build backend supports build_editable ... [?25ldone
[?25h  Getting requirements to build editable ... [?25ldone
[?25h  Preparing editable metadata (pyproject.toml) ... [?25ldone
Checking if build backend supports build_editable ... [?25ldone
[?25hBuilding wheels for collected packages: zenml
  Building editable for zenml (pyproject.toml) ... [?25ldone
[?25h  Created wheel for zenml: filename=zenml-0.47.0-py3-none-any.whl size=11177 sha256=90e9992db942e0e1c596173204b5e8f568de1d8dba3dc1baa718e65c72167698
  Stored in directory: /tmp/pip-ephem-wheel-cache-4pjuf3a7/wheels/d1/f1/54/c0a0c45507a3f8878c132f0834a0fe67bf43a9f40063dae9d4
Successfully built zenml
Installing collected packages: zenml
  Attempting uninstall: zenml
    Found existing installation: zenml 0.47.0
    Uninstalling zenml-0.47.0:
      Successfully uninstalled zenml-0.47.0
Successfully installed zenml-0.47.0

In [12]:
!zenml init

[1;35mNote: NumExpr detected 16 cores but "NUMEXPR_MAX_THREADS" not set, so enforcing safe limit of 8.[0m
[1;35mNumExpr defaulting to 8 threads.[0m
[?25l[32m⠋[0m Initializing ZenML repository at /home/wjayesh/apps/zenml-rag-llm-finetune.
[2K[1A[2K[32m⠙[0m Initializing ZenML repository at /home/wjayesh/apps/zenml-rag-llm-finetune.
[2K[1A[2K[32m⠹[0m Initializing ZenML repository at /home/wjayesh/apps/zenml-rag-llm-finetune.
[1;35mSetting the repo active workspace to 'default'.[0m
[33mSetting the repo active stack to default.[0m
[2K[1A[2K[2;36mZenML repository initialized at [0m[2;35m/home/wjayesh/apps/[0m[2;95mzenml-rag-llm-finetune.[0m
[2;32m⠹[0m[2;36m Initializing ZenML repository at /home/wjayesh/apps/zenml-rag-llm-finetune.[0m
[2K[1A[2K[32m⠹[0m Initializing ZenML repository at /home/wjayesh/apps/zenml-rag-llm-finetune.

[1A[2K[1A[2K[2;36mThe local active stack was initialized to [0m[2;32m'default'[0m[2;36m. This local configuration [0m

#### Scrape all URLs

In [18]:
from steps.finetune_pipeline.url_scraper.url_scraping_utils import get_all_pages, get_nested_readme_urls


@step(enable_cache=True)
def url_scraper(
    docs_url: str = "",
    repo_url: str = "",
    release_notes_url: str = "",
    website_url: str = "",
) -> Tuple[Annotated[List, "train_urls"], Annotated[List, "val_urls"]]:
    """Generates a list of relevant URLs to scrape.

    Args:
        docs_url: URL to the documentation.
        repo_url: URL to the repository.
        release_notes_url: URL to the release notes.
        website_url: URL to the website.

    Returns:
        List of URLs to scrape.
    """
    # examples_readme_urls = get_nested_readme_urls(repo_url)
    # docs_urls = get_all_pages(docs_url, finetuning=True)
    # website_urls = get_all_pages(website_url, finetuning=True)
    # all_urls = docs_urls + website_urls + [release_notes_url]

    # # split into train and val sets
    # train_urls = all_urls[: int(0.8 * len(all_urls))]
    # val_urls = all_urls[int(0.8 * len(all_urls)) :]

    return [website_url], [website_url]

    return train_urls, val_urls

#### Load the contents of the URLs

In [21]:
@step()
def load_corpus(urls: List[str], verbose=False) -> Dict[str, str]:
    if verbose:
        print(f"Loading URLs {urls}")

    reader = SimpleWebPageReader(html_to_text=True)
    docs = reader.load_data(urls)
    if verbose:
        print(f"Loaded {len(docs)} docs")

    parser = SimpleNodeParser.from_defaults()
    nodes = parser.get_nodes_from_documents(docs, show_progress=verbose)

    if verbose:
        print(f"Parsed {len(nodes)} nodes")

    corpus = {
        node.node_id: node.get_content(metadata_mode=MetadataMode.NONE)
        for node in nodes
    }
    return corpus

#### Generate Queries

In [2]:
import re
import uuid

from llama_index.llms import OpenAI
from tqdm.notebook import tqdm


@step()
def generate_queries(
    corpus: Dict[str, str],
    num_questions_per_chunk: int = 2,
    prompt_template: str = "",
    verbose=False,
) -> Tuple[Dict[str, str], Dict[str, List[str]]]:
    """
    Automatically generate hypothetical questions that could be answered with
    doc in the corpus.
    """
    llm = OpenAI(model="gpt-3.5-turbo", api_key="API_KEY")

    prompt_template = (
        prompt_template
        or """\
    Context information is below.
    
    ---------------------
    {context_str}
    ---------------------
    
    Given the context information and not prior knowledge.
    generate only questions based on the below query.
    
    You are a Teacher/ Professor. Your task is to setup \
    {num_questions_per_chunk} questions for an upcoming \
    quiz/examination. The questions should be diverse in nature \
    across the document. Restrict the questions to the \
    context information provided."
    """
    )

    queries = {}
    relevant_docs = {}
    for node_id, text in tqdm(corpus.items()):
        query = prompt_template.format(
            context_str=text, num_questions_per_chunk=num_questions_per_chunk
        )
        response = llm.complete(query)

        result = str(response).strip().split("\n")
        questions = [
            re.sub(r"^\d+[\).\s]", "", question).strip() for question in result
        ]
        questions = [question for question in questions if len(question) > 0]

        for question in questions:
            question_id = str(uuid.uuid4())
            queries[question_id] = question
            relevant_docs[question_id] = [node_id]
    return queries, relevant_docs

#### Merge Data

In [22]:
@step()
def merge_data(
    train_corpus: Dict[str, str],
    train_queries: Dict[str, str],
    train_relevant_docs: Dict[str, List[str]],
    val_corpus: Dict[str, str],
    val_queries: Dict[str, str],
    val_relevant_docs: Dict[str, List[str]],
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    train_dataset = {
        "queries": train_queries,
        "corpus": train_corpus,
        "relevant_docs": train_relevant_docs,
    }

    val_dataset = {
        "queries": val_queries,
        "corpus": val_corpus,
        "relevant_docs": val_relevant_docs,
    }

    return train_dataset, val_dataset


#### Generate training examples

In [23]:
from torch.utils.data import DataLoader
from sentence_transformers import InputExample


@step()
def generate_training_examples(
    dataset: Dict[str, Any], batch_size: int = 10
) -> DataLoader:
    """Generate training examples from the dataset.
    
    Args:
        dataset: Dataset containing the corpus, queries and relevant docs.
        batch_size: Batch size for the dataloader.
        
    Returns:
        DataLoader containing the training examples.
    """
    corpus = dataset['corpus']
    queries = dataset['queries']
    relevant_docs = dataset['relevant_docs']

    examples = []
    for query_id, query in queries.items():
        node_id = relevant_docs[query_id][0]
        text = corpus[node_id]
        example = InputExample(texts=[query, text])
        examples.append(example)

    return DataLoader(examples, batch_size=batch_size)

#### Create an evaluator

In [24]:
from sentence_transformers.evaluation import InformationRetrievalEvaluator


@step()
def create_evaluator(dataset: Dict[str, Any]) -> InformationRetrievalEvaluator:
    """Generate training examples from the dataset.

    Args:
        dataset: Dataset containing the corpus, queries and relevant docs.

    Returns:
        InformationRetrievalEvaluator for the dataset.
    """
    corpus = dataset["corpus"]
    queries = dataset["queries"]
    relevant_docs = dataset["relevant_docs"]

    return InformationRetrievalEvaluator(queries, corpus, relevant_docs)

#### Fine tune an embeddings model

In [25]:
from typing import Optional
from sentence_transformers import SentenceTransformer, losses
from zenml.artifacts.artifact_config import ArtifactConfig


@step()
def finetune_sentencetransformer_model(
    loader: DataLoader,
    evaluator: InformationRetrievalEvaluator,
    EPOCHS: int = 2,
    model_id: Optional[str] = "BAAI/bge-small-en",
) -> Annotated[SentenceTransformer, ArtifactConfig(name="finetuned-sentence-transformer", is_model_artifact=True)]:
    model = SentenceTransformer(model_id)
    loss = losses.MultipleNegativesRankingLoss(model=model)

    warmup_steps = int(len(loader) * EPOCHS * 0.1)

    model.fit(
        train_objectives=[(loader, loss)],
        epochs=EPOCHS,
        warmup_steps=warmup_steps,
        show_progress_bar=True,
        evaluator=evaluator, 
        evaluation_steps=50,
    )

    return model

#### Define a pipeline

In [26]:
from zenml import pipeline
from zenml.model.model_version import ModelVersion

# from steps.model_log_register import register_model


@pipeline(
    name="finetuning_pipeline",
    enable_cache=True,
    model_version=ModelVersion(
        name="finetuned-sentence-transformer",
        license="Apache",
        description="Custom Embeddings model",
        create_new_model_version=True,
        delete_new_version_on_failure=True,
    ),
)
def finetuning_pipeline(
    docs_url: str = "",
    repo_url: str = "",
    release_notes_url: str = "",
    website_url: str = "",
):
    train_urls, val_urls = url_scraper(
        docs_url, repo_url, release_notes_url, website_url
    )
    train_corpus = load_corpus(train_urls, id="train_loader")
    val_corpus = load_corpus(val_urls, id="val_loader")
    train_queries, train_relevant_docs = generate_queries(
        train_corpus, id="train_queries_generator"
    )
    val_queries, val_relevant_docs = generate_queries(
        val_corpus, id="val_queries_generator"
    )
    train_dataset, val_dataset = merge_data(
        train_corpus,
        train_queries,
        train_relevant_docs,
        val_corpus,
        val_queries,
        val_relevant_docs,
    )
    training_examples = generate_training_examples(train_dataset)
    evaluator = create_evaluator(val_dataset)
    model = finetune_sentencetransformer_model(loader=training_examples, evaluator=evaluator, model_id="paraphrase-albert-small-v2")
    # register_model(model, "finetuned_model")

#### Call the pipeline

In [27]:
import os
os.environ["OPENAI_API_KEY"]="API_KEY"

In [15]:
version = "0.47.0"
docs_url = f"https://docs.zenml.io/v/{version}/"
website_url = "https://zenml.io"
repo_url = f"https://github.com/zenml-io/zenml/tree/{version}/examples"
release_notes_url = (
    f"https://github.com/zenml-io/zenml/blob/{version}/RELEASE_NOTES.md"
)

finetuning_pipeline(
    website_url=website_url,
    docs_url=docs_url,
    repo_url=repo_url,
    release_notes_url=release_notes_url,
)

[1;35mInitiating a new run for the pipeline: [0m[1;36mfinetuning_pipeline[1;35m.[0m


[1;35mRegistered new version: [0m[1;36m(version 6)[1;35m.[0m
[1;35mNew model version [0m[1;36m3[1;35m was created.[0m
[1;35mExecuting a new run.[0m
[1;35mUsing user: [0m[1;36mdefault[1;35m[0m
[1;35mUsing stack: [0m[1;36mdefault[1;35m[0m
[1;35m  artifact_store: [0m[1;36mdefault[1;35m[0m
[1;35m  orchestrator: [0m[1;36mdefault[1;35m[0m
[1;35mCaching [0m[1;36menabled[1;35m explicitly for [0m[1;36murl_scraper[1;35m.[0m
[1;35mUsing cached version of [0m[1;36murl_scraper[1;35m.[0m
[1;35mStep [0m[1;36murl_scraper[1;35m has started.[0m
[1;35mUsing cached version of [0m[1;36mtrain_loader[1;35m.[0m
[1;35mLinking artifact [0m[1;36moutput[1;35m to model [0m[1;36mNone[1;35m version [0m[1;36mNone[1;35m implicitly.[0m
[1;35mStep [0m[1;36mtrain_loader[1;35m has started.[0m
[1;35mUsing cached version of [0m[1;36mval_loader[1;35m.[0m
[1;35mLinking artifact [0m[1;36moutput[1;35m to model [0m[1;36mNone[1;35m version [0m[1;

.gitattributes:   0%|          | 0.00/690 [00:00<?, ?B/s]

1_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

README.md:   0%|          | 0.00/4.03k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/827 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/122 [00:00<?, ?B/s]

pytorch_model.bin:   0%|          | 0.00/46.7M [00:00<?, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/245 [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/760k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.31M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/465 [00:00<?, ?B/s]

modules.json:   0%|          | 0.00/229 [00:00<?, ?B/s]

[1;35mUse pytorch device: cpu[0m


Epoch:   0%|          | 0/1 [00:00<?, ?it/s]

Iteration:   0%|          | 0/2 [00:00<?, ?it/s]

[1;35mInformation Retrieval Evaluation on  dataset in epoch 0 after 1 steps:[0m
[1;35mQueries: 14[0m
[1;35mCorpus: 7
[0m
[1;35mScore-Function: cos_sim[0m
[1;35mAccuracy@1: 21.43%[0m
[1;35mAccuracy@3: 64.29%[0m
[1;35mAccuracy@5: 85.71%[0m
[1;35mAccuracy@10: 100.00%[0m
[1;35mPrecision@1: 21.43%[0m
[1;35mPrecision@3: 21.43%[0m
[1;35mPrecision@5: 17.14%[0m
[1;35mPrecision@10: 10.00%[0m
[1;35mRecall@1: 21.43%[0m
[1;35mRecall@3: 64.29%[0m
[1;35mRecall@5: 85.71%[0m
[1;35mRecall@10: 100.00%[0m
[1;35mMRR@10: 0.4804[0m
[1;35mNDCG@10: 0.6075[0m
[1;35mMAP@100: 0.4804[0m
[1;35mScore-Function: dot_score[0m
[1;35mAccuracy@1: 35.71%[0m
[1;35mAccuracy@3: 71.43%[0m
[1;35mAccuracy@5: 85.71%[0m
[1;35mAccuracy@10: 100.00%[0m
[1;35mPrecision@1: 35.71%[0m
[1;35mPrecision@3: 23.81%[0m
[1;35mPrecision@5: 17.14%[0m
[1;35mPrecision@10: 10.00%[0m
[1;35mRecall@1: 35.71%[0m
[1;35mRecall@3: 71.43%[0m
[1;35mRecall@5: 85.71%[0m
[1;35mRecall@10: 100.00%[0m

In [16]:
from zenml.client import Client

pipeline_model = Client().get_pipeline(
    name_id_or_prefix="finetuning_pipeline"
)

# you can additionally pass in the version if you want
# to move between different pipeline implementations.
# pipeline_model = Client().get_pipeline(
#     name_id_or_prefix=PIPELINE_NAME, version="9"
# )

if pipeline_model.runs is not None:
    # get the last run
    last_run = pipeline_model.runs[0]
    # get the agent_creator step
    queries_steps = last_run.steps["train_queries_generator"]

    try:
        queries = queries_steps.outputs["output_0"].load()
    except ValueError:
        pass

    print(queries)

{'d8cc78b8-5c48-478e-919b-36ed845f82ff': 'What are the key features of ZenML that make it stand out from other ML orchestrators?', '1701e994-2c99-4c50-a4f3-76cd94d2db23': 'Which top companies have trusted ZenML for their MLOps workflows?', '2c09e019-573e-4d41-82ef-e980330cdfad': 'Which companies are featured in the context information? Provide the names of at least three companies.', '74bc5858-16e5-4f35-9c3f-84bc18a79010': 'What are the different types of images included in the context information? List at least two types.', 'eab371f3-9fde-4647-b24a-94d0aec28800': 'What is the main problem that companies face when it comes to creating their own ChatGPT?', '252d71a9-ee2b-462a-a70e-ee00530bd659': 'How does ZenML simplify the ML workflow for everyone on the team?', '0c4c704e-1a50-4a3d-88f0-3e43c2874d92': 'How does ZenML allow data scientists to focus on modeling and experimentation while ensuring their code is production-ready from the beginning?', 'c2e4df14-f945-4550-bfe2-a93c9c552535': 

In [17]:
import torch

torch.cuda.is_available()


False