# Preprocessing pipeline

This is the first step of our workflow, the preprocessing pipeline. Here, I will wrangle the data and then load it into an S3 bucket to be used by our fine-tuning job.

I will be using the FlagEmbedding library to fine-tune the bge-base-en-v1.5 model. For this, a specific dataframe shape is required:

```

{"query": str, "pos": List[str], "neg":List[str]}

```

Moreover, for performance validation, I will also need to save three JSON files:
1. Validation queries
2. Query-to-positive mapping
3. The entire answers corpus

In [1]:
!pip install -q -r data_processing/requirements.txt

You should consider upgrading via the '/Users/nicolas.dominutti/Desktop/ml/medical-qa-system/.venv/bin/python3.10 -m pip install --upgrade pip' command.[0m


In [1]:
import pandas as pd
pd.set_option('display.max_colwidth', None)  

In [34]:
from dotenv import load_dotenv
from general_utils import load_config
import torch

load_dotenv("data_processing/.env")
CONFIG = load_config()
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

## Read data

In [3]:
df = pd.read_csv('data_processing/data/intern_screening_dataset.csv')

In [4]:
# we have null answers (5)
df = df[~df.answer.isnull()]

## Text cleansing

In [7]:
# repetition of "(are)", like in What is (are) Hyperthyroidism ?
df.loc[:, 'question'] = df['question'].apply(lambda q: q.replace("(are)", "").replace('? ?','?').strip())

## Chunking

While I was exploring the dataset, I discovered that some questions are too long to be utilized to fine-tune a bge-base-en-v1.5 model, that's why I would chunk them to avoid loosing information at fine-tuning time (and also at embedding and saving time to the vector DB).
For this, I will:
* use the llama-index's sentence splitter provides a quick way to chunk text while preserving the structure, such as paragraphs and sentences
* bring the bge-base-en-1.5 tokenizer to provide the tokens countig to the chunker

In [5]:
from llama_index.core.node_parser import SentenceSplitter

In [6]:
from transformers import AutoTokenizer
tokenizer = AutoTokenizer.from_pretrained(f"BAAI/{CONFIG['EMBEDDING_MODEL']}")

  from .autonotebook import tqdm as notebook_tqdm


In [7]:
splitter = SentenceSplitter(
    chunk_size=CONFIG['CHUNK_MAX_LENGTH'],
    tokenizer=tokenizer.tokenize,
    chunk_overlap=CONFIG['CHUNK_OVERLAP'],
    separator=".",
    paragraph_separator="\n"
)

In [8]:
#Here I apply the splitter and then explode the rows to have 1 row per chunk
df.loc[:, 'answer'] = df['answer'].apply(lambda ans: splitter.split_text(ans))
df = df.explode('answer').reset_index(drop=True)

Token indices sequence length is longer than the specified maximum sequence length for this model (1191 > 512). Running this sequence through the model will result in indexing errors


In [9]:
#remove some duplicates in question-answer pairs
print(f"previous: {df.shape}")
df = df.drop_duplicates(subset=['question', 'answer'])
print(f"after: {df.shape}")

previous: (20053, 2)
after: (19944, 2)


### Training Triplets
To fine-tune the embedding model, I will adopt a *contrastive learning approach*, where the task during training is to bring positive samples closer than negative ones.

For that purpose, I currently have a dataset of Q–Positive chunks, but I still need to create the *negative samples*. There are different ways to do this, with the most effective being hard negative mining (selecting chunks that are similar to the positive ones to make the task more challenging for the model). However, for the sake of time, I won’t be using hard negative mining and will instead select negative samples at random.

In [12]:
from data_processing.utils import TripletsMiner
tm = TripletsMiner(CONFIG['RANDOM_SEED'])
negatives = tm.get_negatives(df, 'soft', CONFIG['NEGATIVES_N'])

### Handling the queries repetition and IDs tracking

In the original dataset, there were cases where the same query appeared in multiple rows with different answers. Since I further chunked the answers, this now happens even more often. Given the structure required by the FlagEmbedding library, I need to keep track of one ID for unique questions and another for unique chunks. To do this, I create two mapping tables that I then join with the dataframe.

In [14]:
unique_questions = pd.DataFrame(df.question.unique(), columns=['question']).reset_index(names='question_id')
unique_answers = pd.DataFrame(df.answer.unique(), columns=['answer']).reset_index(names='chunk_id')

df = df.merge(
    unique_answers,
    how='left',
    on='answer',
)

df = df.merge(
    unique_questions,
    how='left',
    on='question',
)

Now, to continue shaping the dataframe, I will generate a list of positive chunks for each question.

In [15]:
gb_gen = df.groupby(['question_id','question'])
#create the lists separatedly as I need to keep track of the chunks ids
df = gb_gen['answer'].apply(list).reset_index()
chunks = gb_gen['chunk_id'].apply(list).reset_index()
df['neg'] = negatives
df = df.merge(chunks[['question_id','chunk_id']], on='question_id', how='left')

In [16]:
#Renaming to respect what FlagEmbeddings needs
df = df.rename(columns={'answer': 'pos'})
df = df.rename(columns={'question': 'query'})

### Train dataset

In [17]:
#Here I create the train-test dataset and proceed to save the trianing one
from datasets import Dataset
dataset = Dataset.from_pandas(df, preserve_index=False)
splits = dataset.train_test_split(test_size=CONFIG['TEST_DATA_FRAC'], seed=CONFIG['RANDOM_SEED'])
train_df = splits['train']
val_df = splits['test']

In [18]:
train_df.to_json("data/training.json")

Creating json from Arrow format: 100%|██████████| 12/12 [00:02<00:00,  4.40ba/s]


219285853

### Validation dataset

For training validation I will need to save:

1. The validation queries
2. The whole corpus (so at validation time I can embed it an index for retrieval) 
3. A dictionary mapping the query_id to the positive chunks ids


In [19]:
queries = val_df.select_columns(column_names=["question_id", "query"])
queries = queries.rename_columns({"query": "text", "question_id": "id"})
queries[0]

{'id': 12782,
 'text': 'How many people are affected by von Hippel-Lindau syndrome ?'}

In [22]:
corpus = Dataset.from_pandas(unique_answers, preserve_index=False)
corpus = corpus.select_columns(column_names=["chunk_id", "answer"])
corpus = corpus.rename_columns({"answer": "text", "chunk_id": "id"})
corpus[0]

{'id': 0,
 'text': "Glaucoma is a group of diseases that can damage the eye's optic nerve and result in vision loss and blindness. The most common form of the disease is open-angle glaucoma. With early treatment, you can often protect your eyes against serious vision loss. (Watch the video to learn more about glaucoma. To enlarge the video, click the brackets in the lower right-hand corner. To reduce the video, press the Escape (Esc) button on your keyboard.)  See this graphic for a quick overview of glaucoma, including how many people it affects, whos at risk, what to do if you have it, and how to learn more.  See a glossary of glaucoma terms."}

In [23]:
qrels = val_df.select_columns(["question_id"])
qrels = qrels.rename_column("question_id", "qid")
qrels = qrels.add_column("docid", list(val_df["chunk_id"]))
qrels = qrels.add_column("relevance", [1]*len(list(val_df["chunk_id"])))
qrels[0]

Flattening the indices: 100%|██████████| 2996/2996 [00:00<00:00, 167439.07 examples/s]


{'qid': 12782, 'docid': [15877], 'relevance': 1}

In [24]:
queries.to_json("data/test_queries.jsonl")
corpus.to_json("data/corpus.jsonl")
qrels.to_json("data/test_qrels.jsonl")

Creating json from Arrow format: 100%|██████████| 3/3 [00:00<00:00, 86.77ba/s]
Creating json from Arrow format: 100%|██████████| 19/19 [00:00<00:00, 90.15ba/s]
Creating json from Arrow format: 100%|██████████| 3/3 [00:00<00:00, 503.72ba/s]


133606

### Saving to S3

To complete the work in this notebook, I will save the artifacts to S3, simulating execution within a pipeline service. This ensures the data is ready to be ingested on a GPU-enabled machine for fine-tuning.

In [None]:
from general_utils import S3Manager
s3_client = S3Manager.get_client()

In [33]:
S3Manager.upload_bulk(s3_client, 'data/', ["training.json", "test_queries.jsonl", "corpus.jsonl", "test_qrels.jsonl"], CONFIG['S3_BUCKET'])