# Debug eDedupe

In [1]:
import os

## Configuration
class MyConfig:
    pass

MY_CONFIG = MyConfig ()

MY_CONFIG.INPUT_DATA_DIR = 'input'
  
MY_CONFIG.OUTPUT_FOLDER = "output"
MY_CONFIG.OUTPUT_FOLDER_FINAL = os.path.join(MY_CONFIG.OUTPUT_FOLDER , "output_final")

In [2]:
## Add parent dir to path
import os,sys

this_dir = os.path.abspath('')
parent_dir = os.path.dirname(this_dir)
sys.path.append (os.path.abspath (parent_dir))

### 2.2 - Setup input/outpur directories

In [3]:
import os, sys
import shutil

if not os.path.exists(MY_CONFIG.INPUT_DATA_DIR ):
    raise Exception (f"❌ Input folder MY_CONFIG.INPUT_DATA_DIR = '{MY_CONFIG.INPUT_DATA_DIR}' not found")

output_parquet_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '01_parquet_out')
output_chunk_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '02_chunk_out')
output_exact_dedupe_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '03_exact_dedupe_out')

## clear output folder
shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER, ignore_errors=True)
shutil.os.makedirs(MY_CONFIG.OUTPUT_FOLDER, exist_ok=True)

print ("✅ Cleared output directory")

✅ Cleared output directory


## Step-3: pdf2parquet -  Convert data from PDF to Parquet

This step is reading the input folder containing all PDF files and ingest them in a parquet table using the [Docling package](https://github.com/DS4SD/docling).
The documents are converted into a JSON format which allows to easily chunk it in the later steps.



### 3.1 - Set Input/output Folder

In [4]:
STAGE = 1

input_folder = MY_CONFIG.INPUT_DATA_DIR
output_folder =  output_parquet_dir

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-1: Processing input='input' --> output='output/01_parquet_out'


### 3.2 - Execute

In [5]:
%%time

import ast
import os
import sys

from pdf2parquet_transform import (
    pdf2parquet_contents_type_cli_param,
    pdf2parquet_contents_types,
)
from data_processing.runtime.pure_python import PythonTransformLauncher
from pdf2parquet_transform_python import Pdf2ParquetPythonTransformConfiguration

from data_processing.utils import GB, ParamsUtils


# create parameters
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
ingest_config = {
    pdf2parquet_contents_type_cli_param: pdf2parquet_contents_types.JSON,
}

params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    "data_files_to_use": ast.literal_eval("['.pdf']"),
}


sys.argv = ParamsUtils.dict_to_req(d=(params | ingest_config))
# create launcher
launcher = PythonTransformLauncher(Pdf2ParquetPythonTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")


23:00:40 INFO - pdf2parquet parameters are : {'artifacts_path': None, 'contents_type': <pdf2parquet_contents_types.JSON: 'application/json'>, 'do_table_structure': True, 'do_ocr': True, 'double_precision': 8}
23:00:40 INFO - pipeline id pipeline_id
23:00:40 INFO - code location None
23:00:40 INFO - data factory data_ is using local data access: input_folder - input output_folder - output/01_parquet_out
23:00:40 INFO - data factory data_ max_files -1, n_sample -1
23:00:40 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.pdf'], files to checkpoint ['.parquet']
23:00:40 INFO - orchestrator pdf2parquet started at 2024-09-13 23:00:40
23:00:40 INFO - Number of files is 6, source profile {'max_file_size': 0.02610492706298828, 'min_file_size': 0.0250701904296875, 'total_file_size': 0.15283870697021484}
23:00:40 INFO - Initializing models


Fetching 7 files:   0%|          | 0/7 [00:00<?, ?it/s]

Neither CUDA nor MPS are available - defaulting to CPU. Note: This module is much faster with a GPU.
23:00:43 INFO - Completed 1 files (16.67%) in 0.013 min
23:00:44 INFO - Completed 2 files (33.33%) in 0.026 min
23:00:45 INFO - Completed 3 files (50.0%) in 0.038 min
23:00:46 INFO - Completed 4 files (66.67%) in 0.052 min
23:00:46 INFO - Completed 5 files (83.33%) in 0.065 min
23:00:47 INFO - Completed 6 files (100.0%) in 0.078 min
23:00:47 INFO - Done processing 6 files, waiting for flush() completion.
23:00:47 INFO - done flushing in 0.0 sec
23:00:47 INFO - Completed execution in 0.12 min, execution result 0


✅ Stage:1 completed successfully
CPU times: user 22.3 s, sys: 1.62 s, total: 24 s
Wall time: 9.64 s


### 3.3 - Inspect Generated output

Here we should see one entry per input file processed.

In [6]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Output dimensions (rows x columns)= ", output_df.shape)

output_df.head(5)

## To display certain columns
#parquet_df[['column1', 'column2', 'column3']].head(5)

Output dimensions (rows x columns)=  (6, 12)


Unnamed: 0,filename,contents,num_pages,num_tables,num_doc_elements,document_id,ext,hash,size,date_acquired,pdf_convert_time,source_filename
0,a1.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",1,0,2,5d766fe6-67ae-4620-9431-d695b66e6a89,pdf,41122e7a59cbacf219f10cdbd22c24d6ffa4a6d31533ec...,789,2024-09-13T23:00:43.733981,0.780716,a1.pdf
1,b1.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",1,0,2,b529d38c-cf70-4878-9089-7c8449bed7b5,pdf,6f4a8f8e44f2d5938093f66afdcaa2a8426a2a05e00c23...,783,2024-09-13T23:00:45.258812,0.74894,b1.pdf
2,b2.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",1,0,2,6b904b0c-26f1-4fd5-830f-47d91a08b8a6,pdf,eeefd9456dbef796f630a382cf0c0d4766978203b9cda7...,783,2024-09-13T23:00:46.075172,0.814075,b2.pdf
3,c1.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",1,0,2,38488a60-bb3c-438a-8fee-ec45fc68f22f,pdf,56cda15204f7c8f8bb99a799e36fba2108d2afe466c6e7...,787,2024-09-13T23:00:46.837473,0.760193,c1.pdf
4,d1.pdf,"{""_name"":"""",""type"":""pdf-document"",""description...",1,0,2,655dd99b-9c3c-4227-832e-75a2525480ff,pdf,9a7f163d75ce6335d0ef75bf49a669153d04f2f5da5c9a...,784,2024-09-13T23:00:47.604329,0.764891,d1.pdf



### 3.4 - Understand the output

Here are some interesting attributes to note:

- **filename** : original filename
- **contents** : text
- **document_id**: unique id (UUID) assignd to this document
- **hash** : hash of document
- **pdf_convert_time** : time to convert this pdf in seconds

Let's inspect the **contents** column.  See how the text is being divided up!

In [7]:
import pprint
import json

pprint.pprint (json.loads(output_df.iloc[0, ]['contents']))
# json.loads(output_df.iloc[0, ]['contents'])

{'_name': '',
 'description': {'logs': []},
 'equations': [],
 'figures': [],
 'file-info': {'#-pages': 1,
               'document-hash': '4512df83786d672e062f144a718290982e3a8952c20ddb11014cbb3dcb9b507d',
               'filename': 'a1.pdf',
               'page-hashes': [{'hash': '1a75ddf16ddb235368915aed32ab00ccd753838488ec3bb785be9bb84c0d9259',
                                'model': 'default',
                                'page': 1}]},
 'footnotes': [],
 'main-text': [{'name': 'Text',
                'prov': [{'bbox': [132.78564453,
                                   655.18377686,
                                   251.93409729,
                                   665.57006836],
                          'page': 1,
                          'span': [0, 29]}],
                'text': 'Twinkle, twinkle, little star',
                'type': 'paragraph'},
               {'name': 'Page-footer',
                'prov': [{'bbox': [303.13299561,
                                   87.

In [8]:
pprint.pprint (json.loads(output_df.iloc[1, ]['contents']))

{'_name': '',
 'description': {'logs': []},
 'equations': [],
 'figures': [],
 'file-info': {'#-pages': 1,
               'document-hash': 'd4eac2a62256525112db6fcc23666b6f7156582460d3df7557a0dfecbaf10820',
               'filename': 'b1.pdf',
               'page-hashes': [{'hash': '9f6ceee3445d8de9a99fee27551ba26b4b434b315776b8e905a99ed2398f9587',
                                'model': 'default',
                                'page': 1}]},
 'footnotes': [],
 'main-text': [{'name': 'Text',
                'prov': [{'bbox': [133.23675537,
                                   654.84521484,
                                   257.8024292,
                                   665.2824707],
                          'page': 1,
                          'span': [0, 26]}],
                'text': 'How I wonder what you are!',
                'type': 'paragraph'},
               {'name': 'Page-footer',
                'prov': [{'bbox': [303.13299561,
                                   87.43224

##  Step-4: Doc chunks

In the previous step, we have extracted text from oru PDFs.  But we have the content of entire file as 'one row' in our parquet output.

In this step, we are going to split the documents in chunks, according to their layout segmentation.

This transform uses [Quackling](https://github.com/DS4SD/quackling) `HierarchicalChunker`
to chunk according to the document layout segmentation, i.e. respecting the original document components as paragraphs, tables, enumerations, etc.
It relies on documents converted with the Docling library in the [pdf2parquet transform](https://github.com/IBM/data-prep-kit/blob/dev/transforms/language/pdf2parquet/python/README.md) using the option `contents_type: "application/json"`,
which provides the required JSON structure.

### 4.1 - Set Input/output Folder


In [9]:
STAGE = 2

input_folder = output_parquet_dir # previous output folder is the input folder for the current stage
output_folder =  output_chunk_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

🏃🏼 STAGE-2: Processing input='output/01_parquet_out' --> output='output/02_chunk_out'


### 4.2 - Execute


In [10]:
%%time

from data_processing.runtime.pure_python import PythonTransformLauncher
from doc_chunk_transform_python import DocChunkPythonTransformConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # doc_chunk arguments
    # ...
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = PythonTransformLauncher(DocChunkPythonTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")

23:00:48 INFO - doc_chunk parameters are : {'chunking_type': <chunking_types.DL_JSON: 'dl_json'>, 'content_column_name': 'contents', 'output_chunk_column_name': 'contents', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox'}
23:00:48 INFO - pipeline id pipeline_id
23:00:48 INFO - code location None
23:00:48 INFO - data factory data_ is using local data access: input_folder - output/01_parquet_out output_folder - output/02_chunk_out
23:00:48 INFO - data factory data_ max_files -1, n_sample -1
23:00:48 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
23:00:48 INFO - orchestrator doc_chunk started at 2024-09-13 23:00:48
23:00:48 INFO - Number of files is 6, source profile {'max_file_size': 0.010021209716796875, 'min_file_size': 0.009985923767089844, 'total_file_size': 0.060021400451660156}
23:00:48 INFO - 

✅ Stage:2 completed successfully
CPU times: user 736 ms, sys: 87.9 ms, total: 824 ms
Wall time: 820 ms


### 4.3 - Inspect Generated output

We would see documents are split into many chunks

In [11]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print (f"Files processed : {input_df.shape[0]:,}")
print (f"Chunks created : {output_df.shape[0]:,}")

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.head(10)

Files processed : 6
Chunks created : 0
Input data dimensions (rows x columns)=  (6, 12)
Output data dimensions (rows x columns)=  (0, 0)


### 4.4 - Understanding the Output

Here we see 2 PDF files are split into 6 chunks.  Basically we see the documents are being split along 'natural boundaris' - paragraphs and bullet points

See how **document_id** is carried throughout.  This helps us identify original documents.

Also note **contents** is now plain text (not JSON as before)

In [12]:
output_df[['filename', 'contents']]


KeyError: "None of [Index(['filename', 'contents'], dtype='object')] are in the [columns]"

In [None]:
for f in output_df['filename'].unique():
    print ('==========' , f, '===========')
    chunks = output_df[output_df['filename'] == f]['contents']
    for idx , chunk in enumerate(chunks):
        print (f'-------Chunk {idx}------\n{chunk}\n-------')

## Step-5: Exact Dedup



### 5.1 - Set Input/output Folder

In [None]:
STAGE  = 3

input_folder = output_chunk_dir # previous output folder is the input folder for the current stage
output_folder =  output_exact_dedupe_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

### 5.2 - Execute

In [None]:
%%time

from data_processing.runtime.pure_python import PythonTransformLauncher
from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # ededup parameters
    "ededup_doc_column": "contents",
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = PythonTransformLauncher(EdedupPythonTransformRuntimeConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Job failed")

### 5.3 - Inspect Generated output

In [None]:
from utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)
print (f"Input chunks before exact dedupe : {input_df.shape[0]:,}")
print (f"Output chunks after exact dedupe : {output_df.shape[0]:,}")
print ("Duplicate chunks removed :  ", (input_df.shape[0] - output_df.shape[0]))

output_df.head(10)

In [None]:
output_df[['filename', 'contents']]

In [None]:
for f in output_df['filename'].unique():
    print ('==========' , f, '===========')
    chunks = output_df[output_df['filename'] == f]['contents']
    for idx , chunk in enumerate(chunks):
        print (f'-------Chunk {idx}------\n{chunk}\n-------')