<div style="background-color: #04D7FD; padding: 20px; text-align: left;">
    <h1 style="color: #000000; font-size: 36px; margin: 0;">Data Processing for RAG with Data Prep Kit (Python)</h1>
    
</div>


## Before Running the notebook

Please complete [setting up python dev environment](./setup-python-dev-env.md)

## Overview

This notebook will process PDF documents as part of RAG pipeline

![](media/rag-overview-2.png)

This notebook will perform steps 1, 2 and 3 in RAG pipeline.

Here are the processing steps:

- **pdf2parquet** : Extract text from PDF and convert them into parquet files
- **Chunk documents**: Split the PDFs into 'meaningful sections' (paragraphs, sentences ..etc)
- **Doc_ID generation**: Each chunk is assigned a uniq id, based on content and hash
- **Exact Dedup**: Chunks with exact same content are filtered out
- **Fuzzy Dedup**: Eliminate chunks that are 'very similar' content
- **Doc quality**: Scores the documents based on criteria like number of words, if it contains bad words ..etc
- **Text encoder**: Convert chunks into vectors using embedding models

## Step-1: Configuration

### 1.1 - Common Config

In [1]:
from my_config import MY_CONFIG

### 1.2 - Inspect Input Data

We have a bnunch of datasets in [data](../data) folder.  Examine them

We will use one of them or feel free to bring your own data.

### 1.3 - Set input/output path variables for the pipeline

In [2]:
import os, sys
import shutil

if not os.path.exists(MY_CONFIG.INPUT_DATA_DIR ):
    raise Exception (f"❌ Input folder MY_CONFIG.INPUT_DATA_DIR = '{MY_CONFIG.INPUT_DATA_DIR}' not found")

output_parquet_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '01_parquet_out')
output_chunk_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '02_chunk_out')
output_docid_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '03_docid_out')
output_exact_dedupe_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '04_exact_dedupe_out')
output_fuzzy_dedupe_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '05_fuzzy_dedupe_out')
output_embeddings_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '06_embeddings_out')

## clear output folder
shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER, ignore_errors=True)
shutil.os.makedirs(MY_CONFIG.OUTPUT_FOLDER, exist_ok=True)

print ("✅ Cleared output directory")

✅ Cleared output directory


### 1.4 -  Import Common python modules

In [3]:
import os
import sys


from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.utils import ParamsUtils

<a id="pdf2parquet"></a>

## Step-2: pdf2parquet -  Convert data from PDF to Parquet

This step is reading the input folder containing all PDF files and ingest them in a parquet table using the [Docling package](https://github.com/DS4SD/docling).
The documents are converted into a JSON format which allows to easily chunk it in the later steps.



### 2.1 -  Set Input/output Folder

In [4]:
STAGE = 1 

input_folder = MY_CONFIG.INPUT_DATA_DIR
output_folder =  output_parquet_dir

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-1: Processing input='../data/fomc' --> output='output-fomc/01_parquet_out'


### 2.2 - Execute 

In [5]:
%%time 

import ast
import os
import sys

from pdf2parquet_transform import (
    pdf2parquet_contents_type_cli_param,
    pdf2parquet_contents_types,
)
from pdf2parquet_transform_python import Pdf2ParquetPythonTransformConfiguration

from data_processing.utils import GB, ParamsUtils


# create parameters
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
ingest_config = {
    pdf2parquet_contents_type_cli_param: pdf2parquet_contents_types.JSON,
}

params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    "data_files_to_use": ast.literal_eval("['.pdf']"),
}


sys.argv = ParamsUtils.dict_to_req(d=(params | ingest_config))
# create launcher
launcher = PythonTransformLauncher(Pdf2ParquetPythonTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")


23:55:35 INFO - pdf2parquet parameters are : {'artifacts_path': None, 'contents_type': <pdf2parquet_contents_types.JSON: 'application/json'>, 'do_table_structure': True, 'do_ocr': True, 'double_precision': 8}
23:55:35 INFO - pipeline id pipeline_id
23:55:35 INFO - code location None
23:55:35 INFO - data factory data_ is using local data access: input_folder - ../data/fomc output_folder - output-fomc/01_parquet_out
23:55:35 INFO - data factory data_ max_files -1, n_sample -1
23:55:35 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.pdf'], files to checkpoint ['.parquet']
23:55:35 INFO - orchestrator pdf2parquet started at 2024-09-18 23:55:35
23:55:35 INFO - Number of files is 2, source profile {'max_file_size': 0.1193857192993164, 'min_file_size': 0.11060428619384766, 'total_file_size': 0.22999000549316406}
23:55:35 INFO - Initializing models


Fetching 7 files:   0%|          | 0/7 [00:00<?, ?it/s]

23:55:42 INFO - Completed 1 files (50.0%) in 0.06 min
23:55:46 INFO - Completed 2 files (100.0%) in 0.12 min
23:55:46 INFO - Done processing 2 files, waiting for flush() completion.
23:55:46 INFO - done flushing in 0.0 sec
23:55:46 INFO - Completed execution in 0.177 min, execution result 0


✅ Stage:1 completed successfully
CPU times: user 26.5 s, sys: 1.13 s, total: 27.6 s
Wall time: 13.3 s


### 2.3 -  Inspect Generated output

Here we should see one entry per input file processed

In [6]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Output dimensions (rows x columns)= ", output_df.shape)

output_df.head(5)

## To display certain columns
#parquet_df[['column1', 'column2', 'column3']].head(5)

Output dimensions (rows x columns)=  (2, 12)


Unnamed: 0,filename,contents,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename
0,monetary20240731a1.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf
1,monetary20240918a1.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf


<a id="chunking"></a>

##  Step-3: Doc chunks

Split the documents in chunks, according to their layout segmentation.

### 3.1 - Set Input/output Folder

In [7]:
STAGE = 2

input_folder = output_parquet_dir # previous output folder is the input folder for the current stage
output_folder =  output_chunk_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-2: Processing input='output-fomc/01_parquet_out' --> output='output-fomc/02_chunk_out'


### 3.2 - Execute 

In [8]:
%%time 

# Import doc_json_chunk transform configuration
from doc_chunk_transform_python import DocChunkPythonTransformConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # doc_chunk arguments
    # ...
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = PythonTransformLauncher(DocChunkPythonTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")

23:55:47 INFO - doc_chunk parameters are : {'chunking_type': <chunking_types.DL_JSON: 'dl_json'>, 'content_column_name': 'contents', 'output_chunk_column_name': 'contents', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox'}
23:55:47 INFO - pipeline id pipeline_id
23:55:47 INFO - code location None
23:55:47 INFO - data factory data_ is using local data access: input_folder - output-fomc/01_parquet_out output_folder - output-fomc/02_chunk_out
23:55:47 INFO - data factory data_ max_files -1, n_sample -1
23:55:47 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
23:55:47 INFO - orchestrator doc_chunk started at 2024-09-18 23:55:47
23:55:47 INFO - Number of files is 2, source profile {'max_file_size': 0.009243011474609375, 'min_file_size': 0.009152412414550781, 'total_file_size': 0.018395423889160156}
23:55:

✅ Stage:2 completed successfully
CPU times: user 849 ms, sys: 97.2 ms, total: 947 ms
Wall time: 941 ms


### 3.3 - Inspect Generated output

We would see documents are split into many chunks

In [9]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print (f"Files processed : {input_df.shape[0]:,}")
print (f"Chunks created : {output_df.shape[0]:,}")

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.sample(min(3, output_df.shape[0]))

Files processed : 2
Chunks created : 21
Input data dimensions (rows x columns)=  (2, 12)
Output data dimensions (rows x columns)=  (21, 15)


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox
15,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,FEDERAL RESERVE press release\ncould impede th...,$.main-text[10],2,"[71.55211639, 652.95452881, 529.98071289, 719...."
12,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,FEDERAL RESERVE press release\nThe Committee s...,$.main-text[4],1,"[71.34158325, 406.78799438, 534.84796143, 526...."
10,monetary20240731a1.pdf,4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf,Decisions Regarding Monetary Policy Implementa...,$.main-text[33],4,"[71.32019043, 619.66723633, 516.85302734, 648...."


## Step-4:  DOC ID generation

This transform annotates documents with document "ids". It supports the following transformations of the original data:

 - Adding document hash: this enables the addition of a document hash-based id to the data. The hash is calculated with `hashlib.sha256(doc.encode("utf-8")).hexdigest()`. To enable this annotation, set hash_column to the name of the column, where you want to store it.
 - Adding integer document id: this allows the addition of an integer document id to the data that is unique across all rows in all tables provided to the transform() method. To enable this annotation, set int_id_column to the name of the column, where you want to store it. **This is a pre-requisite for fuzzy dedup** in the pipeline.

### 4.1 - Set Input/output Folder

In [10]:

STAGE  = 3

input_folder = output_chunk_dir # previous output folder is the input folder for the current stage
output_folder =  output_docid_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-3: Processing input='output-fomc/02_chunk_out' --> output='output-fomc/03_docid_out'


### 4.2 - Execute 

In [11]:
%%time 

from doc_id_transform_python import DocIDPythonTransformRuntimeConfiguration
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # doc id configuration
    "doc_id_doc_column": "contents",
    "doc_id_hash_column": "chunk_hash",
    "doc_id_int_column": "chunk_id",
}
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = PythonTransformLauncher(DocIDPythonTransformRuntimeConfiguration())

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

23:55:47 INFO - Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'chunk_hash', 'int_column': 'chunk_id', 'start_id': 0}
23:55:47 INFO - pipeline id pipeline_id
23:55:47 INFO - code location None
23:55:47 INFO - data factory data_ is using local data access: input_folder - output-fomc/02_chunk_out output_folder - output-fomc/03_docid_out
23:55:47 INFO - data factory data_ max_files -1, n_sample -1
23:55:47 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
23:55:47 INFO - orchestrator doc_id started at 2024-09-18 23:55:47
23:55:47 INFO - Number of files is 2, source profile {'max_file_size': 0.01009368896484375, 'min_file_size': 0.010049819946289062, 'total_file_size': 0.020143508911132812}
23:55:47 INFO - Completed 1 files (50.0%) in 0.0 min
23:55:47 INFO - Completed 2 files (100.0%) in 0.0 min
23:55:47 INFO - Done processing 2 files, waiting for flush() comple

✅ Stage:3 completed successfully
CPU times: user 11.9 ms, sys: 2.93 ms, total: 14.9 ms
Wall time: 11.7 ms


### 4.3 - Inspect Generated output

In [12]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.sample(min(3, output_df.shape[0]))

Input data dimensions (rows x columns)=  (21, 15)
Output data dimensions (rows x columns)=  (21, 17)


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox,chunk_hash,chunk_id
13,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,FEDERAL RESERVE press release\nIn light of the...,$.main-text[5],1,"[71.19850159, 220.41990662, 533.89196777, 393....",443ad2771a4e7eabd27abc3098c76be456927a5af3df3b...,13
15,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,FEDERAL RESERVE press release\ncould impede th...,$.main-text[10],2,"[71.55211639, 652.95452881, 529.98071289, 719....",bbe51c32ce57c2b7bc3e8f0e42dcf7abd0cacdbca42959...,15
8,monetary20240731a1.pdf,4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf,Decisions Regarding Monetary Policy Implementa...,$.main-text[21],3,"[108.01999664, 500.24020386, 515.20001221, 514...",0db576e8428e011663940a32b19db0dbb5fcd3725c4a30...,8


## Step-5: Exact Dedup

Remove documents having identical code to remove bias in the training data. On the content of each document, a SHA256 hash is computed,
followed by de-duplication of record having identical hashes.

### 5.1 - Set Input/output Folder

In [13]:
STAGE  = 4

input_folder = output_docid_dir # previous output folder is the input folder for the current stage
output_folder =  output_exact_dedupe_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-4: Processing input='output-fomc/03_docid_out' --> output='output-fomc/04_exact_dedupe_out'


### 5.2 - Execute 

In [14]:
%%time

# Import ededup transform configuration
from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # ededup parameters
    "ededup_doc_column": "contents",
    "ededup_doc_id_column": "chunk_hash",
    
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = PythonTransformLauncher(EdedupPythonTransformRuntimeConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

23:55:47 INFO - exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'chunk_hash', 'use_snapshot': False, 'snapshot_directory': None}
23:55:47 INFO - pipeline id pipeline_id
23:55:47 INFO - code location None
23:55:47 INFO - data factory data_ is using local data access: input_folder - output-fomc/03_docid_out output_folder - output-fomc/04_exact_dedupe_out
23:55:47 INFO - data factory data_ max_files -1, n_sample -1
23:55:47 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
23:55:47 INFO - orchestrator ededup started at 2024-09-18 23:55:47
23:55:47 INFO - Number of files is 2, source profile {'max_file_size': 0.01155853271484375, 'min_file_size': 0.011479377746582031, 'total_file_size': 0.02303791046142578}
23:55:47 INFO - Starting from the beginning
23:55:47 INFO - Completed 1 files (50.0%) in 0.0 min
23:55:47 INFO - Completed 2 files (100.0%) in 0.0 min
23:55:

✅ Stage:4 completed successfully
CPU times: user 17.5 ms, sys: 2.24 ms, total: 19.7 ms
Wall time: 16.2 ms


### 5.3 - Inspect Generated output

In [15]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)
print (f"Input chunks before exact dedupe : {input_df.shape[0]:,}")
print (f"Output chunks after exact dedupe : {output_df.shape[0]:,}")
print ("Duplicate chunks removed :  ", (input_df.shape[0] - output_df.shape[0]))

output_df.sample(min(3, output_df.shape[0]))

Input data dimensions (rows x columns)=  (21, 17)
Output data dimensions (rows x columns)=  (16, 18)
Input chunks before exact dedupe : 21
Output chunks after exact dedupe : 16
Duplicate chunks removed :   5


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox,chunk_hash,chunk_id,removed
7,monetary20240731a1.pdf,4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf,Decisions Regarding Monetary Policy Implementa...,$.main-text[18],3,"[71.24868011, 620.85113525, 516.75402832, 649....",d971c9a22c5d2f4455150e54a26443294cbc1a7bea3d4f...,7,[]
3,monetary20240731a1.pdf,4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf,FEDERAL RESERVE press release\nIn assessing th...,$.main-text[6],1,"[71.39627075, 120.29415131, 526.43603516, 189....",882ace8bc12af9eaec998452abe7e3bc38e22a94b09d8c...,3,[]
0,monetary20240731a1.pdf,4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf,FEDERAL RESERVE press release\nRecent indicato...,$.main-text[3],1,"[71.41007233, 534.21520996, 536.76806641, 630....",d1f181a0a653e27f7c761b7bf6fdc910b9edf8bf8c3a7f...,0,[]


## Step-6: Fuzzy Dedup

**Fuzzy dedupe is currently available in RAY version**

Post exact deduplication, fuzzy deduplication is applied with
the goal of removing code files that may have slight variations and thereby unbiasing
the data further. Small variations are quite commonly seen in code data in the form
of variations in the values of variables, addittion of logging statements etc. Find near-
duplicate.

### 6.1 - Set Input/output Folder

In [16]:
STAGE  = 5

input_folder = output_exact_dedupe_dir # previous output folder is the input folder for the current stage
output_folder =  output_fuzzy_dedupe_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-5: Processing input='output-fomc/04_exact_dedupe_out' --> output='output-fomc/05_fuzzy_dedupe_out'


### 6.2 - Execute 

In [17]:
%%time 

import os
import sys

from data_processing.utils import ParamsUtils
from fdedup_transform_ray import FdedupRayTransformConfiguration
from data_processing_ray.runtime.ray import RayTransformLauncher


# create parameters

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus" : MY_CONFIG.RAY_NUM_CPUS}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # Orchestration parameters
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": MY_CONFIG.RAY_RUNTIME_WORKERS,
    # columns used
    "fdedup_doc_column": "contents",
    "fdedup_id_column": "chunk_id",
    "fdedup_cluster_column": "chunk_hash",
    # infrastructure
    "fdedup_bucket_cpu": 0.3,
    "fdedup_doc_cpu": 0.3,
    "fdedup_mhash_cpu": 0.3,
    "fdedup_num_doc_actors": 1,
    "fdedup_num_bucket_actors": 1,
    "fdedup_num_minhash_actors": 1,
    "fdedup_num_preprocessors": 1,
    # fuzzy parameters
    "fdedup_num_permutations": 64,
    "fdedup_threshold": 0.7, # between 0.0 to 1.0 ; smaller values tend to be more lenient in finding near dupes; close to 1.0 is more strict
    "fdedup_shingles_size": 5,
    "fdedup_delimiters": " "
}

# Pass commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = RayTransformLauncher(FdedupRayTransformConfiguration())

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

23:55:48 INFO - Running locally
23:55:48 INFO - fuzzy dedup params are {'doc_column': 'contents', 'id_column': 'chunk_id', 'cluster_column': 'chunk_hash', 'bucket_cpu': 0.3, 'mhash_cpu': 0.3, 'doc_cpu': 0.3, 'num_doc_actors': 1, 'num_minhash_actors': 1, 'num_bucket_actors': 1, 'num_preprocessors': 1, 'num_permutations': 64, 'threshold': 0.7, 'shingles_size': 5, 'delimiters': ' ', 'snapshot_delay': 1, 'use_bucket_snapshot': False, 'use_doc_snapshot': False, 'random_delay_limit': 10, 'worker_options': {'num_cpus': 1}}
23:55:48 INFO - data factory data_ is using local data access: input_folder - output-fomc/04_exact_dedupe_out output_folder - output-fomc/05_fuzzy_dedupe_out
23:55:48 INFO - data factory data_ max_files -1, n_sample -1
23:55:48 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
23:55:48 INFO - pipeline id pipeline_id
23:55:48 INFO - code location None
23:55:48 INFO 

✅ Stage:5 completed successfully
CPU times: user 565 ms, sys: 329 ms, total: 893 ms
Wall time: 37.1 s


### 6.3 - Inspect Generated output

In [18]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)
print ("Duplicate chunks removed  by fuzzy-dedupe:  ", (input_df.shape[0] - output_df.shape[0]))

output_df.sample(min(3, output_df.shape[0]))

Input data dimensions (rows x columns)=  (16, 18)
Output data dimensions (rows x columns)=  (15, 18)
Duplicate chunks removed  by fuzzy-dedupe:   1


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox,chunk_id,removed,chunk_hash
7,monetary20240731a1.pdf,4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf,Decisions Regarding Monetary Policy Implementa...,$.main-text[18],3,"[71.24868011, 620.85113525, 516.75402832, 649....",7,[],-1
13,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,FEDERAL RESERVE press release\nVoting for the ...,$.main-text[11],2,"[71.21774292, 520.26879883, 539.80987549, 640....",16,[],-1
12,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,FEDERAL RESERVE press release\nIn light of the...,$.main-text[5],1,"[71.19850159, 220.41990662, 533.89196777, 393....",13,[],-1


## Step-7:   Text encoding

Encode text for the vector storage.

### 7.1 - Set Input/output Folder

In [19]:
STAGE  = 6

input_folder = output_fuzzy_dedupe_dir
output_folder =  output_embeddings_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-6: Processing input='output-fomc/05_fuzzy_dedupe_out' --> output='output-fomc/06_embeddings_out'


### 7.2 - Execute

In [20]:
%%time 

from text_encoder_transform_python import TextEncoderPythonTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # text_encoder
    "text_encoder_model_name": MY_CONFIG.EMBEDDING_MODEL,
}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = PythonTransformLauncher(TextEncoderPythonTransformConfiguration())
# Launch the ray actor(s) to process the input

return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")

23:56:25 INFO - text_encoder parameters are : {'content_column_name': 'contents', 'output_embeddings_column_name': 'embeddings', 'model_name': 'sentence-transformers/all-MiniLM-L6-v2'}
23:56:25 INFO - pipeline id pipeline_id
23:56:25 INFO - code location None
23:56:25 INFO - data factory data_ is using local data access: input_folder - output-fomc/05_fuzzy_dedupe_out output_folder - output-fomc/06_embeddings_out
23:56:25 INFO - data factory data_ max_files -1, n_sample -1
23:56:25 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
23:56:25 INFO - orchestrator text_encoder started at 2024-09-18 23:56:25
23:56:25 INFO - Number of files is 2, source profile {'max_file_size': 0.018599510192871094, 'min_file_size': 0.01068878173828125, 'total_file_size': 0.029288291931152344}
23:56:28 INFO - Completed 1 files (50.0%) in 0.002 min
23:56:28 INFO - Completed 2 files (100.0%) in 0.003 m

✅ Stage:6 completed successfully
CPU times: user 760 ms, sys: 173 ms, total: 932 ms
Wall time: 3.15 s


### 7.3 - Inspect Generated output

In [21]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.sample(min(3, output_df.shape[0]))

Input data dimensions (rows x columns)=  (15, 18)
Output data dimensions (rows x columns)=  (15, 19)


Unnamed: 0,filename,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename,contents,doc_jsonpath,page_number,bbox,chunk_id,removed,chunk_hash,embeddings
12,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,FEDERAL RESERVE press release\nIn light of the...,$.main-text[5],1,"[71.19850159, 220.41990662, 533.89196777, 393....",13,[],-1,"[0.08220579, -0.02423102, -0.09095508, 0.06659..."
14,monetary20240918a1.pdf,4,0,33,2967427b-e3cd-45af-b8f2-f5f8c38c495a,pdf,bff09e8a8597537ad482be953a47aa417bd2e693e45e9f...,11056,2024-09-18T23:55:46.442882,3.634502,monetary20240918a1.pdf,Decisions Regarding Monetary Policy Implementa...,$.main-text[18],3,"[71.23934174, 623.85803223, 535.30566406, 651....",18,[],8,"[0.0029574682, -0.009945164, -0.032299336, 0.0..."
7,monetary20240731a1.pdf,4,0,34,a0e52580-4912-4aac-93d4-4fc015b6aa12,pdf,ae2f7f6b109cf8e6388b507c9f866d7946ca907cab6b5a...,11018,2024-09-18T23:55:42.804866,3.573463,monetary20240731a1.pdf,Decisions Regarding Monetary Policy Implementa...,$.main-text[18],3,"[71.24868011, 620.85113525, 516.75402832, 649....",7,[],-1,"[0.035019822, -0.02151688, -0.04607042, 0.0283..."


## Step-8: Copy output to final output dir

In [22]:
import shutil

shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER_FINAL, ignore_errors=True)
shutil.copytree(src=output_folder, dst=MY_CONFIG.OUTPUT_FOLDER_FINAL)

print (f"✅ Copied output from '{output_folder}' --> '{MY_CONFIG.OUTPUT_FOLDER_FINAL}'")

✅ Copied output from 'output-fomc/06_embeddings_out' --> 'output-fomc/output_final'
