<div style="background-color: #04D7FD; padding: 20px; text-align: left;">
    <h1 style="color: #000000; font-size: 36px; margin: 0;">Data Processing for RAG with Data Prep Kit (RAY)</h1>
    
</div>


## Before Running the notebook

Please complete [setting up python dev environment](./setup-python-dev-env.md)

## Overview

This notebook will process PDF documents as part of RAG pipeline

![](media/rag-overview-2.png)

This notebook will perform steps 1, 2 and 3 in RAG pipeline.

Here are the processing steps:

Here are the processing steps:

- **pdf2parquet** : Extract text (in markdown format) from PDF and store them as parquet files
- **Exact Dedup**: Documents with exact content are filtered out
- **Chunk documents**: Split the PDFs into 'meaningful sections' (paragraphs, sentences ..etc)
- **Text encoder**: Convert chunks into vectors using embedding models

## Step-1: Configuration

In [1]:
## setup path to utils folder
import sys
sys.path.append('../utils')

In [2]:
import os
from my_config import MY_CONFIG

## RAY CONFIGURATION
num_cpus_available =  os.cpu_count()
# print (num_cpus_available)
# MY_CONFIG.RAY_NUM_CPUS = num_cpus_available // 2  ## use half the available cores for processing
MY_CONFIG.RAY_NUM_CPUS =  0.5
MY_CONFIG.RAY_MEMORY_GB = 2  # GB
# MY_CONFIG.RAY_RUNTIME_WORKERS = num_cpus_available // 3
MY_CONFIG.RAY_RUNTIME_WORKERS = 2

print (f"Ray configuration: CPUs={MY_CONFIG.RAY_NUM_CPUS},   memory={MY_CONFIG.RAY_MEMORY_GB} GB,  workers={MY_CONFIG.RAY_RUNTIME_WORKERS}")

Ray configuration: CPUs=0.5,   memory=2 GB,  workers=2


## Step-2:  Data

We will use white papers  about LLMs.  

- [Granite Code Models](https://arxiv.org/abs/2405.04324)
- [Attention is all you need](https://arxiv.org/abs/1706.03762)

You can of course substite your own data below

### 2.1 - Download data

In [3]:
import os, sys
import shutil
from file_utils import download_file

print ("Using input data:", MY_CONFIG.INPUT_DATA_DIR)

Using input data: ../data/papers


In [4]:
# import os, sys
# import shutil
# from file_utils import download_file

# shutil.rmtree(MY_CONFIG.INPUT_DATA_DIR, ignore_errors=True)
# shutil.os.makedirs(MY_CONFIG.INPUT_DATA_DIR, exist_ok=True)
# print ("‚úÖ Cleared input directory")
 
# download_file (url = 'https://arxiv.org/pdf/1706.03762', local_file = os.path.join(MY_CONFIG.INPUT_DATA_DIR, 'attention.pdf' ))
# download_file (url = 'https://arxiv.org/pdf/2405.04324', local_file = os.path.join(MY_CONFIG.INPUT_DATA_DIR, 'granite.pdf' ))
# download_file (url = 'https://arxiv.org/pdf/2405.04324', local_file = os.path.join(MY_CONFIG.INPUT_DATA_DIR, 'granite2.pdf' )) # duplicate


### 2.2  - Set input/output path variables for the pipeline

In [5]:
import os, sys
import shutil

if not os.path.exists(MY_CONFIG.INPUT_DATA_DIR ):
    raise Exception (f"‚ùå Input folder MY_CONFIG.INPUT_DATA_DIR = '{MY_CONFIG.INPUT_DATA_DIR}' not found")

output_parquet_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '01_parquet_out')
output_exact_dedupe_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '02_dedupe_out')
output_chunk_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '03_chunk_out')
output_embeddings_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '04_embeddings_out')


## clear output folder
shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER, ignore_errors=True)
shutil.os.makedirs(MY_CONFIG.OUTPUT_FOLDER, exist_ok=True)

print ("‚úÖ Cleared output directory")

‚úÖ Cleared output directory


## Step-3: pdf2parquet -  Convert data from PDF to Parquet

This step is reading the input folder containing all PDF files and ingest them in a parquet table using the [Docling package](https://github.com/DS4SD/docling).
The documents are converted into a JSON format which allows to easily chunk it in the later steps.



### 3.1 -  Execute 

In [6]:
%%time 

from dpk_docling2parquet.ray import Docling2Parquet
from data_processing.utils import GB
from dpk_docling2parquet import docling2parquet_contents_types

STAGE = 1 
print (f"üèÉüèº STAGE-{STAGE}: Processing input='{MY_CONFIG.INPUT_DATA_DIR}' --> output='{output_parquet_dir}'\n", flush=True)

result = Docling2Parquet(input_folder= MY_CONFIG.INPUT_DATA_DIR,
                    output_folder= output_parquet_dir, 
                    data_files_to_use=['.pdf'],
                    docling2parquet_contents_type=docling2parquet_contents_types.MARKDOWN,
                    
                    
                    ## runtime options
                    # run_locally= True,
                    # num_cpus= MY_CONFIG.RAY_NUM_CPUS,
                    # memory= MY_CONFIG.RAY_MEMORY_GB * GB,
                    # runtime_num_workers = MY_CONFIG.RAY_RUNTIME_WORKERS,

                    ## debug
                    run_locally= True,
                    num_cpus=  1, 
                    memory= MY_CONFIG.RAY_MEMORY_GB * GB, 
                    runtime_num_workers = 1, ## Note: has to be one for this particular job, to prevent race condition when downloading models!
               ).transform()

if result == 0:
    print (f"‚úÖ Stage:{STAGE} completed successfully")
else:
    raise Exception (f"‚ùå Stage:{STAGE}  failed")

2025-11-16 12:52:02,713	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


üèÉüèº STAGE-1: Processing input='../data/papers' --> output='output-papers/01_parquet_out'



{"time": "12:52:03", "logger": "dpk", "logLevel": "INFO", "message": "docling2parquet parameters are : {'batch_size': -1, 'artifacts_path': None, 'contents_type': <docling2parquet_contents_types.MARKDOWN: 'text/markdown'>, 'do_table_structure': True, 'do_ocr': True, 'ocr_engine': <docling2parquet_ocr_engine.EASYOCR: 'easyocr'>, 'bitmap_area_threshold': 0.05, 'pdf_backend': <docling2parquet_pdf_backend.DLPARSE_V2: 'dlparse_v2'>, 'double_precision': 8, 'pipeline': <docling2parquet_pipeline.MULTI_STAGE: 'multi_stage'>, 'generate_picture_images': False, 'generate_page_images': False, 'images_scale': 2.0}"}
{"time": "12:52:03", "logger": "dpk", "logLevel": "INFO", "message": "pipeline id pipeline_id"}
{"time": "12:52:03", "logger": "dpk", "logLevel": "INFO", "message": "code location {'github': 'UNDEFINED', 'build-date': 'UNDEFINED', 'commit_hash': 'UNDEFINED', 'path': 'UNDEFINED'}"}
{"time": "12:52:03", "logger": "dpk", "logLevel": "INFO", "message": "number of workers 1 worker options {'n

‚úÖ Stage:1 completed successfully
CPU times: user 6.33 s, sys: 1.27 s, total: 7.6 s
Wall time: 8min 52s


### 3.2 - Inspect Generated output

Here we should see one entry per input file processed

In [7]:
from file_utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_parquet_dir)
# print ("Output dimensions (rows x columns)= ", output_df.shape)
output_df.head(5)
## To display certain columns
#parquet_df[['column1', 'column2', 'column3']].head(5)

Successfully read 3 parquet files with 3 total rows


Unnamed: 0,filename,contents,num_pages,num_tables,num_doc_elements,document_id,document_hash,ext,hash,size,date_acquired,document_convert_time,source_filename
0,attention.pdf,"Provided proper attribution is provided, Googl...",15,4,513,6d9b3dd6-ec55-4372-bd36-e9502ad03ceb,2949302674760005271,pdf,214960a61e817387f01087f0b0b323cf1ebd8035fffcab...,48981,2025-11-16T12:53:20.869360,60.467835,attention.pdf
1,granite2.pdf,## Granite Code Models: A Family of Open Found...,28,17,484,4c259929-e3a2-4ac2-9a38-3880a8ff26ca,3127757990743433032,pdf,58342470e7d666dca0be87a15fb0552f949a5632606fe1...,121131,2025-11-16T13:00:39.713942,217.35429,granite2.pdf
2,granite.pdf,## Granite Code Models: A Family of Open Found...,28,17,484,1f89c840-f6bc-4560-b8c1-43d5fe297783,3127757990743433032,pdf,58342470e7d666dca0be87a15fb0552f949a5632606fe1...,121131,2025-11-16T12:57:02.283422,221.354569,granite.pdf


## Step-4: Eliminate Duplicate Documents

We have 2 duplicate documnets here : `granite.pdf` and `granite2.pdf`.

Note how the `hash` for these documents are same.

We are going to perform **de-dupe**

On the content of each document, a SHA256 hash is computed, followed by de-duplication of record having identical hashes.

[Dedupe transform documentation](https://github.com/data-prep-kit/data-prep-kit/blob/dev/transforms/universal/ededup/README.md)

### 4.1 - Execute 

In [8]:
%%time 

from dpk_ededup.ray.transform import Ededup

STAGE = 2
print (f"üèÉüèº STAGE-{STAGE}: Processing input='{output_parquet_dir}' --> output='{output_exact_dedupe_dir}'\n", flush=True)

result = Ededup(input_folder=output_parquet_dir,
                output_folder=output_exact_dedupe_dir,
                ededup_hash_cpu= 0.5,
                ededup_num_hashes= 2,
                ededup_doc_column="contents",
                ededup_doc_id_column="document_id",
                
                ## runtime options
                run_locally= True,
                num_cpus= MY_CONFIG.RAY_NUM_CPUS,
                memory= MY_CONFIG.RAY_MEMORY_GB * GB,
                runtime_num_workers = MY_CONFIG.RAY_RUNTIME_WORKERS,
    ).transform()

if result == 0:
    print (f"‚úÖ Stage:{STAGE} completed successfully")
else:
    raise Exception (f"‚ùå Stage:{STAGE}  failed")

üèÉüèº STAGE-2: Processing input='output-papers/01_parquet_out' --> output='output-papers/02_dedupe_out'



{"time": "13:00:51", "logger": "dpk", "logLevel": "INFO", "message": "exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'document_id', 'use_snapshot': False, 'snapshot_directory': None, 'hash_cpu': 0.5, 'num_hashes': 2}"}
{"time": "13:00:51", "logger": "dpk", "logLevel": "INFO", "message": "pipeline id pipeline_id"}
{"time": "13:00:51", "logger": "dpk", "logLevel": "INFO", "message": "code location {'github': 'UNDEFINED', 'build-date': 'UNDEFINED', 'commit_hash': 'UNDEFINED', 'path': 'UNDEFINED'}"}
{"time": "13:00:51", "logger": "dpk", "logLevel": "INFO", "message": "number of workers 2 worker options {'num_cpus': 0.5, 'memory': 2147483648, 'max_restarts': -1}"}
{"time": "13:00:51", "logger": "dpk", "logLevel": "INFO", "message": "actor creation delay 0"}
{"time": "13:00:51", "logger": "dpk", "logLevel": "INFO", "message": "job details {'job category': 'preprocessing', 'job name': 'ededup', 'job type': 'ray', 'job id': 'job_id'}"}
{"time": "13:00:51", "logger": "dpk", 

‚úÖ Stage:2 completed successfully
CPU times: user 148 ms, sys: 165 ms, total: 313 ms
Wall time: 21.7 s


### 4.2 - Inspect Generated output

We would see 2 documents: `attention.pdf`  and `granite.pdf`.  The duplicate `granite.pdf` has been filtered out!

In [9]:
from file_utils import read_parquet_files_as_df

input_df = read_parquet_files_as_df(output_parquet_dir)
output_df = read_parquet_files_as_df(output_exact_dedupe_dir)

# print ("Input data dimensions (rows x columns)= ", input_df.shape)
# print ("Output data dimensions (rows x columns)= ", output_df.shape)
print (f"Input files before exact dedupe : {input_df.shape[0]:,}")
print (f"Output files after exact dedupe : {output_df.shape[0]:,}")
print ("Duplicate files removed :  ", (input_df.shape[0] - output_df.shape[0]))

output_df.sample(min(3, output_df.shape[0]))

Successfully read 3 parquet files with 3 total rows
Successfully read 2 parquet files with 2 total rows
Input files before exact dedupe : 3
Output files after exact dedupe : 2
Duplicate files removed :   1


Unnamed: 0,filename,contents,num_pages,num_tables,num_doc_elements,document_id,document_hash,ext,hash,size,date_acquired,document_convert_time,source_filename
1,granite.pdf,## Granite Code Models: A Family of Open Found...,28,17,484,1f89c840-f6bc-4560-b8c1-43d5fe297783,3127757990743433032,pdf,58342470e7d666dca0be87a15fb0552f949a5632606fe1...,121131,2025-11-16T12:57:02.283422,221.354569,granite.pdf
0,attention.pdf,"Provided proper attribution is provided, Googl...",15,4,513,6d9b3dd6-ec55-4372-bd36-e9502ad03ceb,2949302674760005271,pdf,214960a61e817387f01087f0b0b323cf1ebd8035fffcab...,48981,2025-11-16T12:53:20.869360,60.467835,attention.pdf


##  Step-5: Doc chunks

Split the documents in chunks.

[Chunking transform documentation](https://github.com/data-prep-kit/data-prep-kit/blob/dev/transforms/language/doc_chunk/README.md)

**Experiment with chunking size to find the setting that works best for your documents**

### 5.1 -  Execute 

In [10]:
%%time

from dpk_doc_chunk.ray.transform import DocChunk
from data_processing.utils import GB

STAGE = 3
print (f"üèÉüèº STAGE-{STAGE}: Processing input='{output_exact_dedupe_dir}' --> output='{output_chunk_dir}'\n", flush=True)

result = DocChunk(input_folder=output_exact_dedupe_dir,
                        output_folder=output_chunk_dir,
                        doc_chunk_chunking_type= "li_markdown",

                        ## runtime options
                        run_locally= True,
                        num_cpus= MY_CONFIG.RAY_NUM_CPUS,
                        memory= MY_CONFIG.RAY_MEMORY_GB * GB,
                        runtime_num_workers = MY_CONFIG.RAY_RUNTIME_WORKERS,
                        ).transform()

if result == 0:
    print (f"‚úÖ Stage:{STAGE} completed successfully")
else:
    raise Exception (f"‚ùå Stage:{STAGE}  failed")

üèÉüèº STAGE-3: Processing input='output-papers/02_dedupe_out' --> output='output-papers/03_chunk_out'



{"time": "13:01:13", "logger": "dpk", "logLevel": "INFO", "message": "doc_chunk parameters are : {'chunking_type': 'li_markdown', 'content_column_name': 'contents', 'doc_id_column_name': 'document_id', 'output_chunk_column_name': 'contents', 'output_source_doc_id_column_name': 'source_document_id', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox', 'chunk_size_tokens': 128, 'chunk_overlap_tokens': 30, 'dl_min_chunk_len': None}"}
{"time": "13:01:13", "logger": "dpk", "logLevel": "INFO", "message": "pipeline id pipeline_id"}
{"time": "13:01:13", "logger": "dpk", "logLevel": "INFO", "message": "code location {'github': 'UNDEFINED', 'build-date': 'UNDEFINED', 'commit_hash': 'UNDEFINED', 'path': 'UNDEFINED'}"}
{"time": "13:01:13", "logger": "dpk", "logLevel": "INFO", "message": "number of workers 2 worker options {'num_cpus': 0.5, 'memory': 2147483648, 'max_restarts': -1}"}
{"time": "13:01:13", "logger": "dpk", "logL

‚úÖ Stage:3 completed successfully
CPU times: user 661 ms, sys: 262 ms, total: 923 ms
Wall time: 23.3 s


### 5.2 - Inspect Generated output

We would see documents are split into many chunks

In [11]:
from file_utils import read_parquet_files_as_df

input_df = read_parquet_files_as_df(output_exact_dedupe_dir)  ## for debug purposes
output_df = read_parquet_files_as_df(output_chunk_dir)

print (f"Files processed : {input_df.shape[0]:,}")
print (f"Chunks created : {output_df.shape[0]:,}")

# print ("Input data dimensions (rows x columns)= ", input_df.shape)
# print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.sample(min(3, output_df.shape[0]))

Successfully read 2 parquet files with 2 total rows
Successfully read 2 parquet files with 61 total rows
Files processed : 2
Chunks created : 61


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_hash,ext,hash,size,date_acquired,document_convert_time,source_filename,source_document_id,contents,document_id
33,granite.pdf,28,17,484,3127757990743433032,pdf,58342470e7d666dca0be87a15fb0552f949a5632606fe1...,121131,2025-11-16T12:57:02.283422,221.354569,granite.pdf,1f89c840-f6bc-4560-b8c1-43d5fe297783,## 2.2 Exact and Fuzzy Deduplication\n\nWeadop...,ea0e99c1a018e0ec8dd5ea8aceb222c889d510acdd7313...
12,attention.pdf,15,4,513,2949302674760005271,pdf,214960a61e817387f01087f0b0b323cf1ebd8035fffcab...,48981,2025-11-16T12:53:20.869360,60.467835,attention.pdf,6d9b3dd6-ec55-4372-bd36-e9502ad03ceb,## 3.3 Position-wise Feed-Forward Networks\n\n...,1a97c9bc3e8cb8908b65d45262e35f4518dfec9ba91f7d...
24,attention.pdf,15,4,513,2949302674760005271,pdf,214960a61e817387f01087f0b0b323cf1ebd8035fffcab...,48981,2025-11-16T12:53:20.869360,60.467835,attention.pdf,6d9b3dd6-ec55-4372-bd36-e9502ad03ceb,## 6.3 English Constituency Parsing\n\nTo eval...,96797312c2020f4406609dc6d30585c50c669d5e4cc1d3...


## Step-6:   Calculate Embeddings for Chunks

we will calculate embeddings for each chunk using an open source embedding model

[Embeddings / Text Encoder documentation](https://github.com/data-prep-kit/data-prep-kit/blob/dev/transforms/language/text_encoder/README.md)

### 6.1 - Execute

In [12]:
%%time 

from dpk_text_encoder.ray.transform import TextEncoder
from data_processing.utils import GB

STAGE  = 4
print (f"üèÉüèº STAGE-{STAGE}: Processing input='{output_chunk_dir}' --> output='{output_embeddings_dir}'\n", flush=True)

result = TextEncoder(input_folder= output_chunk_dir, 
               output_folder= output_embeddings_dir, 
               text_encoder_model_name = MY_CONFIG.EMBEDDING_MODEL,
               
               ## runtime options
               run_locally= True,
               num_cpus= MY_CONFIG.RAY_NUM_CPUS,
               memory= MY_CONFIG.RAY_MEMORY_GB * GB,
               runtime_num_workers = MY_CONFIG.RAY_RUNTIME_WORKERS,
               ).transform()

if result == 0:
    print (f"‚úÖ Stage:{STAGE} completed successfully")
else:
    raise Exception (f"‚ùå Stage:{STAGE}  failed")

üèÉüèº STAGE-4: Processing input='output-papers/03_chunk_out' --> output='output-papers/04_embeddings_out'



{"time": "13:01:36", "logger": "dpk", "logLevel": "INFO", "message": "text_encoder parameters are : {'content_column_name': 'contents', 'output_embeddings_column_name': 'embeddings', 'model_name': 'sentence-transformers/all-MiniLM-L6-v2'}"}
{"time": "13:01:36", "logger": "dpk", "logLevel": "INFO", "message": "pipeline id pipeline_id"}
{"time": "13:01:36", "logger": "dpk", "logLevel": "INFO", "message": "code location {'github': 'UNDEFINED', 'build-date': 'UNDEFINED', 'commit_hash': 'UNDEFINED', 'path': 'UNDEFINED'}"}
{"time": "13:01:36", "logger": "dpk", "logLevel": "INFO", "message": "number of workers 2 worker options {'num_cpus': 0.5, 'memory': 2147483648, 'max_restarts': -1}"}
{"time": "13:01:36", "logger": "dpk", "logLevel": "INFO", "message": "actor creation delay 0"}
{"time": "13:01:36", "logger": "dpk", "logLevel": "INFO", "message": "job details {'job category': 'preprocessing', 'job name': 'text_encoder', 'job type': 'ray', 'job id': 'job_id'}"}
{"time": "13:01:36", "logger":

‚úÖ Stage:4 completed successfully
CPU times: user 224 ms, sys: 182 ms, total: 407 ms
Wall time: 26.6 s


### 6.2 - Inspect Generated output

In [13]:
from file_utils import read_parquet_files_as_df

input_df = read_parquet_files_as_df(output_chunk_dir)
output_df = read_parquet_files_as_df(output_embeddings_dir)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.sample(min(3, output_df.shape[0]))

Successfully read 2 parquet files with 61 total rows
Successfully read 2 parquet files with 61 total rows
Input data dimensions (rows x columns)=  (61, 14)
Output data dimensions (rows x columns)=  (61, 15)


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_hash,ext,hash,size,date_acquired,document_convert_time,source_filename,source_document_id,contents,document_id,embeddings
4,attention.pdf,15,4,513,2949302674760005271,pdf,214960a61e817387f01087f0b0b323cf1ebd8035fffcab...,48981,2025-11-16T12:53:20.869360,60.467835,attention.pdf,6d9b3dd6-ec55-4372-bd36-e9502ad03ceb,## 2 Background\n\nThe goal of reducing sequen...,84e98ea1b539c14df1e2219a455074552f0339ede803b6...,"[-0.082636066, -0.037095945, 0.013174194, -0.0..."
22,attention.pdf,15,4,513,2949302674760005271,pdf,214960a61e817387f01087f0b0b323cf1ebd8035fffcab...,48981,2025-11-16T12:53:20.869360,60.467835,attention.pdf,6d9b3dd6-ec55-4372-bd36-e9502ad03ceb,## 6.1 Machine Translation\n\nOn the WMT 2014 ...,5747e89ea00329843a874f8a3f0c2055469ded849066d1...,"[-0.0403513, -0.071009785, 0.0003329234, 0.013..."
56,granite.pdf,28,17,484,3127757990743433032,pdf,58342470e7d666dca0be87a15fb0552f949a5632606fe1...,121131,2025-11-16T12:57:02.283422,221.354569,granite.pdf,1f89c840-f6bc-4560-b8c1-43d5fe297783,## 6.7 Model Robustness\n\nWhile the performan...,bac7e00148c7ab245583902ca33f6489228e82f4758791...,"[-0.08713458, -0.06469581, -0.020314984, 0.035..."


## Step-7: Copy output to final output dir

In [14]:
import shutil

shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER_FINAL, ignore_errors=True)
shutil.copytree(src=output_embeddings_dir, dst=MY_CONFIG.OUTPUT_FOLDER_FINAL)

print (f"‚úÖ Copied output from '{output_embeddings_dir}' --> '{MY_CONFIG.OUTPUT_FOLDER_FINAL}'")

‚úÖ Copied output from 'output-papers/04_embeddings_out' --> 'output-papers/output_final'
