# RAG

This notebook shows how to use ML cube Platform with a Retrieval Augmented Generation task.

We will use a dataset available in the Hugging Face hub as an example.

Each sample in the dataset is composed by 3 elements:
- user_input: the question asked by the user
- context: the context retrieved by the retrieval system
- answer: the output of the model that tries to answer the question

The presence of these 3 elements allows us to simulate a full RAG system without actually setting up the system.


**In this notebook you will learn:**
- how to create a Rag task
- how to define a data schema
- how to create a model
- how to upload historical data
- how to set reference for the model
- how to upload production data

**Requirements**

These are the dependencies your Python environment is required to have in order to properly run this notebook.
```
ml3-platform-sdk==0.0.17
torch==2.2.0
datasets==2.15.0
sentence-transformers==3.0.1
polars==0.20.31
json==2.0.9
numpy==1.26.4
tqdm==4.66.4
```

Imports

In [None]:
from ml3_platform_sdk import enums as ml3_enums
from ml3_platform_sdk import models as ml3_models
from ml3_platform_sdk.client import ML3PlatformClient

from datasets import load_dataset, DatasetDict
from sentence_transformers import SentenceTransformer
import polars as pl
import json
import datetime
import numpy as np

User Inputs

In [None]:
URL = 'https://pre.api.platform.mlcube.com'
API_KEY = 'VbDyZ4U7Gbe60nK4UlL4aqOOpZrBLHl7flm0uVzuCj9iwyatWcYcezaOBAzckCOh'
PROJECT_ID = '669fbec80b64f51d12f1d12d'
model_name = 'mymodel'
model_version = 'v0.0.1'

## Dataset, model and predictions
Download dataset and model using Huggingface api.
After the dataset and the model are downloaded we run the model to get predictions.

Load dataset

In [None]:
complete_dataset = load_dataset("neural-bridge/rag-dataset-12000")

USER_INPUT_COL_NAME = 'question'
CONTEXT_COL_NAME = 'context'
ANSWER_COL_NAME = 'answer'

In [None]:
def sample_dataset(dataset, fraction=0.1, seed=42):
    sampled_dataset = DatasetDict()
    
    # Split train data
    train_split = dataset['train'].train_test_split(test_size=0.5, seed=seed)
    
    sampled_dataset['train'] = train_split['train'].train_test_split(test_size=fraction, seed=seed)['test']
    sampled_dataset['validation'] = train_split['test'].train_test_split(test_size=fraction, seed=seed)['test']
        
    # Split test data
    sampled_dataset['test'] = dataset['test'].train_test_split(test_size=fraction, seed=seed)['test']
    return sampled_dataset

# Perform the sampling
dataset = sample_dataset(complete_dataset)

In [None]:
len(dataset['train']['context']), len(dataset['validation']['context']), len(dataset['test']['context'])

In [None]:
embedder = SentenceTransformer('distilroberta-base')

## Create data objects for ML cube Platform
We use local data sources to upload data, hence, we need to create local files that will be shared with ML cube Platform.

Uploading data coming from a RAG system is equivalent to uploading text data (refer to this [notebook](https://colab.research.google.com/github/ml-cube/ml3-platform-docs/blob/main/notebooks/text_classification.ipynb) for further information). 

Data needs to be stored in a json file as a list of objects. Each object must contain two mandatory fields, namely the timestamp and the sample-id, along with other the other fields that represent the data (e.g. question and context for input data, answer for predcition data).

When dealing with unstructured data like text it is possible to send them in three ways:
1. By sending only embeddings i.e., a numerical representation of the text sample as a vector, using `EmbeddingData`;
2. By sending only unstructured text, using `TextData`. In this case ML cube Platform will create the numerical representation using internal encoders;
3. By sending ustructured text along with embeddings using `TextData` with `embedding_source` attribute. This more complete option has two benefits: first, the usage of a personal embedder that usually is focused on the domain instead of a general one. Secondly, providing the text allows the platform to extract additional metrics thus enabling more insights and more comprehensive views in the web application.

Data are uploaded separately for each category:
- **inputs:** TextData object in json format
- **predictions:** TextData object in json format

In Rag tasks, we deem the answer of the model as its prediction.


In [None]:
def build_data_objects(
    dataset,
    embedder,
    model_name: str,
    model_version: str,
    starting_id: int,
    starting_timestamp: float,
    prefix: str,
    with_prediction: bool = True
) -> tuple[
    ml3_models.TextData,
    ml3_models.TextData | None,
    int,
    float
]:
    """Builds data objects for inputs, target and predictions.
    
    Since each sample has associated an id and a timestamp we generate 
    them as incremental counters.
    Therefore, we need a starting point for both.

    Parameters
    ----------

    dataset: huggingface dataset
    embedder: sentence transformer model used to encode text
    model_name: name of the model
    model_version: version of the model
    starting_id: sample id to start from
    starting_timestamp: timestamp to start from
    prefix: prefix for the files
    with_prediction: whether to include the prediction or not

    Returns
    ------
    input text_data, prediction text_data, last_sample_id, last_timestamp
    """

    n_samples = len(dataset[USER_INPUT_COL_NAME])
    next_starting_sample_id = starting_id + n_samples
    sample_ids = list(map(lambda x: f'sample_{x}', range(starting_id, starting_id + n_samples)))
    # each sample has a time delta of 2 minutes
    next_starting_timestamp = starting_timestamp + n_samples * 120
    timestamps = list(np.arange(starting_timestamp, starting_timestamp + n_samples * 120, 120))
    
    input_samples_filename = f'{prefix}_input_samples.json'
    prediction_samples_filename = f'{prefix}_prediction_samples.json'
    input_embeddings_filename = f'{prefix}_input_embeddings.parquet'
    prediction_embeddings_filename = f'{prefix}_prediction_embeddings.parquet'

    # Create inputs embedding file and save it
    print('Creating text files')
    input_text_samples = []
    prediction_text_samples = []
    for (i, sample) in enumerate(dataset):
        input_text_samples.append({
            USER_INPUT_COL_NAME: sample[USER_INPUT_COL_NAME],
            CONTEXT_COL_NAME: sample[CONTEXT_COL_NAME],
            'sample-id': sample_ids[i],
            'timestamp': timestamps[i]
        })
        
        if with_prediction:
            prediction_text_samples.append({
                'sample-id': sample_ids[i],
                'timestamp': timestamps[i],
                f'{model_name}@{model_version}': sample[ANSWER_COL_NAME]
            })

    with open(input_samples_filename, 'w') as f:
        json.dump(input_text_samples, f)
    
    if with_prediction:
        with open(prediction_samples_filename, 'w') as f:
            json.dump(prediction_text_samples, f)
            
    # Create embedding dataframe
    print('Creating embedding file')
    embeddings_input = pl.DataFrame({
        'timestamp': timestamps,
        'sample-id': sample_ids,
        f'{USER_INPUT_COL_NAME}_embeddings': embedder.encode(dataset[USER_INPUT_COL_NAME]).tolist(),
        f'{CONTEXT_COL_NAME}_embeddings': embedder.encode(dataset[CONTEXT_COL_NAME]).tolist(),        
    })
    embeddings_input.write_parquet(input_embeddings_filename)
    
    if with_prediction:
        embeddings_prediction = pl.DataFrame({
            'timestamp': timestamps,
            'sample-id': sample_ids,
            f'{model_name}_embeddings@{model_version}': embedder.encode(dataset[ANSWER_COL_NAME]).tolist(),
        })
        
        embeddings_prediction.write_parquet(prediction_embeddings_filename)

    print('Creating inputs data')
    inputs_data = ml3_models.TextData(
        source=ml3_models.LocalDataSource(
            file_type=ml3_enums.FileType.JSON,
            is_folder=False,
            folder_type=None,
            file_path=input_samples_filename
        ),
        embedding_source=ml3_models.LocalDataSource(
            file_type=ml3_enums.FileType.PARQUET,
            is_folder=False,
            folder_type=None,
            file_path=input_embeddings_filename
        )
    )
    
    predictions_data = None
    if with_prediction:
        print('Creating predictions data')
        predictions_data = ml3_models.TextData(
            source=ml3_models.LocalDataSource(
                file_type=ml3_enums.FileType.JSON,
                is_folder=False,
                folder_type=None,
                file_path=prediction_samples_filename
            ),
            embedding_source=ml3_models.LocalDataSource(
                file_type=ml3_enums.FileType.PARQUET,
                is_folder=False,
                folder_type=None,
                file_path=prediction_embeddings_filename,
            )
        )
    
    return (
        inputs_data,
        predictions_data,
        next_starting_sample_id,
        next_starting_timestamp
    )


In [None]:
historical_initial_sample_id = 0
historical_initial_timestamp = datetime.datetime.now().timestamp()

(
    historical_inputs_data,
    _,  # Predictions are not needed for historical data
    starting_id,
    starting_timestamp,
) = build_data_objects(
    dataset['train'],
    embedder,
    model_name,
    model_version,
    historical_initial_sample_id,
    historical_initial_timestamp,
    prefix='train',
    with_prediction=False
)
historical_end_timestamp = starting_timestamp - 120

In [None]:
(
    production_0_inputs_data,
    production_0_prediction_data,
    starting_id,
    starting_timestamp,
) = build_data_objects(
    dataset['validation'],
    embedder,
    model_name,
    model_version,
    starting_id,
    starting_timestamp,
    'prod_0'
)

In [None]:
(
    production_1_inputs_data,
    production_1_prediction_data,
    starting_id,
    starting_timestamp,
) = build_data_objects(
    dataset['test'],
    embedder,
    model_name,
    model_version,
    starting_id,
    starting_timestamp,
    'prod_1'
)

## Create data schema

The data schema specifies the type of data present in the task with their specific names.
A data schema must contain:
- *sample id* column that is used to uniquely identify each sample
- *timestamp* column that is used to order samples
- *input* columns that specify the nature of the input. In this case, we have two input columns, one for the user input and one for the context. This distinction is specified through the subrole attribute.
- *input additional embedding* optional column for additional embedding of the text data. Mirroring the input columns, two additional embedding columns needs to be specified.

Prediction column must not be specified because it will be automatically added during the model creation with the name like `MODEL_NAME@MODEL_VERSION`. Same applies to its embedding column, which will be automatically added with the name `MODEL_NAME_embeddings@MODEL_VERSION`.

In [None]:
data_schema = ml3_models.DataSchema(
    columns=[
        ml3_models.ColumnInfo(
            name='timestamp',
            role=ml3_enums.ColumnRole.TIME_ID,
            is_nullable=False,
            data_type=ml3_enums.DataType.FLOAT,
        ),
        ml3_models.ColumnInfo(
            name='sample-id',
            role=ml3_enums.ColumnRole.ID,
            is_nullable=False,
            data_type=ml3_enums.DataType.STRING,
        ),
        ml3_models.ColumnInfo(
            name=USER_INPUT_COL_NAME,
            role=ml3_enums.ColumnRole.INPUT,
            is_nullable=False,
            data_type=ml3_enums.DataType.STRING,
            subrole=ml3_enums.ColumnSubRole.RAG_USER_INPUT
        ),
        ml3_models.ColumnInfo(
            name=CONTEXT_COL_NAME,
            role=ml3_enums.ColumnRole.INPUT,
            is_nullable=False,
            data_type=ml3_enums.DataType.STRING,
            subrole=ml3_enums.ColumnSubRole.RAG_RETRIEVED_CONTEXT
        ),
        ml3_models.ColumnInfo(
            name=f'{USER_INPUT_COL_NAME}_embeddings',
            role=ml3_enums.ColumnRole.INPUT_ADDITIONAL_EMBEDDING,
            is_nullable=False,
            data_type=ml3_enums.DataType.ARRAY_1,
            dims=(768,),
            subrole=ml3_enums.ColumnSubRole.RAG_USER_INPUT
        ),
        ml3_models.ColumnInfo(
            name=f'{CONTEXT_COL_NAME}_embeddings',
            role=ml3_enums.ColumnRole.INPUT_ADDITIONAL_EMBEDDING,
            is_nullable=False,
            data_type=ml3_enums.DataType.ARRAY_1,
            dims=(768,),
            subrole=ml3_enums.ColumnSubRole.RAG_RETRIEVED_CONTEXT
        )
    ]
)

## Interaction with ML cube Platform
To start, we create an instance of ML cube Platform client using the provided api key.
Then, we create a task, a dataschema, a model and finally we upload data.

In [None]:
client = ML3PlatformClient(URL, API_KEY)

When creating a task there are some information we need to specify:
- **task_type:** artificial intelligence task type. In this case it is a RAG task.
- **data_structure:** the type of input data. Since we are dealing with text data, we set it to TEXT.
- **optional_target:** rag tasks don't have a target, hence it must be set to True
- **text_language:** it is mandatory to specify the language used in the task.

In [None]:
task_id = client.create_task(
    project_id=PROJECT_ID,
    name='RAG_task',
    tags=["tag_1", "tag_2"],
    task_type=ml3_enums.TaskType.RAG,
    data_structure=ml3_enums.DataStructure.TEXT,
    optional_target=True, # Must be True in RAG tasks
    text_language=ml3_enums.TextLanguage.ENGLISH
)

In [None]:
client.add_data_schema(task_id=task_id, data_schema=data_schema)

After we added the data schema, we are able to create our model.

A model is uniquely identified by `name` and `model version`.

In [None]:
model_id: str = client.create_model(
    task_id=task_id,
    name=model_name,
    version=model_version,
    metric_name=None,  # Must be None in RAG tasks
    preferred_suggestion_type=None,  # Must be None in RAG tasks
    with_probabilistic_output=False,
)

Historical data are available data that don't come from the production environment. We need to add them in order to set up the reference data of our model, which is needed by internal algorithms.

Note that it is possible to add other historical data in any time.

In [None]:
job_id = client.add_historical_data(
    task_id=task_id,
    inputs=historical_inputs_data,
)
print(f'Waiting for job {job_id}')
client.wait_job_completion(job_id=job_id)
print(f'Job {job_id} completed')

In [None]:
job_id = client.set_model_reference(
    model_id,
    from_timestamp=historical_initial_timestamp,
    to_timestamp=historical_end_timestamp,
)
print(f'Waiting for job {job_id}')
client.wait_job_completion(job_id=job_id)
print(f'Job {job_id} completed')

Now, we are ready to upload production data.
Production data can be uploaded asynchronously, that means that we can upload each data category whenever, it is available without waiting for the others.

In [None]:
job_id = client.add_production_data(
    task_id=task_id,
    inputs=production_0_inputs_data,
    predictions=[(model_id, production_0_prediction_data)]
)
print(f'Waiting for job {job_id}')
client.wait_job_completion(job_id=job_id)
print(f'Job {job_id} completed')

Send production data asynchronously, first *inputs* and then *predictions*

In [None]:
job_id = client.add_production_data(
    task_id=task_id,
    inputs=production_1_inputs_data,
)
print(f'Waiting for job {job_id}')
client.wait_job_completion(job_id=job_id)
print(f'Job {job_id} completed')

In [None]:
job_id = client.add_production_data(
    task_id=task_id,
    predictions=[(model_id, production_1_prediction_data)]
)
print(f'Waiting for job {job_id}')
client.wait_job_completion(job_id=job_id)
print(f'Job {job_id} completed')