# **Demo on building data prep pipeline for model fine tuning**

This notebook demonstrates data preparation techniques for fine-tuning language models using the Data Prep Kit.
Here is the workflow:


![](https://raw.githubusercontent.com/sapthasurendran/data-prep-lab/nb/examples/notebooks/intro/images/code-processing-flowdiagram.png)


![](https://raw.githubusercontent.com/sapthasurendran/data-prep-kit/nb/examples/notebooks/intro/images/code-processing-flowdiagram.png)


## How to run this notebook

Two options:

- **Option 1 - Google Colab:** easiest option.  no setup required.  Click this link to open this on google colab.  [![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/sapthasurendran/data-prep-lab/blob/nb/examples/notebooks/intro/dpk_intro_code_python.ipynb)
- **Option 2 - Local python dev environment:**  Setup using this [guide](../../../README.md#-getting-started)


## Step-1: SET UP

### 1.1 - Determine runtime

Determine if we are running on Google colab or local python environment


In [None]:
import os

if os.getenv("COLAB_RELEASE_TAG"):
   print("Running in Colab")
   RUNNING_IN_COLAB = True
else:
   print("NOT in Colab")
   RUNNING_IN_COLAB = False

### 1.2 -Download Data if running on Google Colab

In [None]:
if RUNNING_IN_COLAB:
    !mkdir -p 'input/source-code-data'

    !wget -O 'input/source-code-data/application-java.zip'  'https://raw.githubusercontent.com/IBM/data-prep-kit/dev/transforms/code/code2parquet/python/test-data/input/application-java.zip'
    !wget -O 'input/source-code-data/data-processing-lib.zip' 'https://raw.githubusercontent.com/IBM/data-prep-kit/dev/transforms/code/code2parquet/python/test-data/input/data-processing-lib.zip'
    !wget -O 'input/source-code-data/https___github.com_00000o1_environments_archive_refs_heads_master.zip' 'https://raw.githubusercontent.com/IBM/data-prep-kit/dev/transforms/code/code2parquet/python/test-data/input/https___github.com_00000o1_environments_archive_refs_heads_master.zip'
    !wget -O 'my_utils.py'  'https://raw.githubusercontent.com/IBM/data-prep-kit/dev/examples/notebooks/intro/my_utils.py'
    !wget -O 'language.json'  'https://raw.githubusercontent.com/IBM/data-prep-kit/dev/transforms/code/code2parquet/python/test-data/languages/lang_extensions.json'


### 1.3 - Install dependencies if running on Google Colab

In [None]:
if RUNNING_IN_COLAB:
    !pip install "data-prep-toolkit-transforms[all]==0.2.2"
    !pip install pandas
    !pip install humanfriendly

### 1.4 - Restart Runtime

After installing dependencies, be sure <font color="red">restart runtime</font>, so libraries will be loaded

You do this by going to **`Runtime --> Restart Session`**

Then you can continue to the next step (no need to re-run the notebook)

## Step-2: Configure


### Step-2.1: Basic configuration

In [None]:
import os

if os.getenv("COLAB_RELEASE_TAG"):
   print("Running in Colab")
   RUNNING_IN_COLAB = True
else:
   print("NOT in Colab")
   RUNNING_IN_COLAB = False

In [None]:
import os

## Configuration
class MyConfig:
    pass

MY_CONFIG = MyConfig ()

MY_CONFIG.INPUT_DATA_DIR = 'input/source-code-data/'

MY_CONFIG.OUTPUT_FOLDER = "output"
MY_CONFIG.OUTPUT_FOLDER_FINAL = os.path.join(MY_CONFIG.OUTPUT_FOLDER , "output_final")



In [None]:
## Add parent dir to path
import os,sys

this_dir = os.path.abspath('')
parent_dir = os.path.dirname(this_dir)
sys.path.append (os.path.abspath (parent_dir))

### 2.2 - Setup input/outpur directories

In [None]:
import os
import shutil

if not os.path.exists(MY_CONFIG.INPUT_DATA_DIR ):
    raise Exception (f"❌ Input folder MY_CONFIG.INPUT_DATA_DIR = '{MY_CONFIG.INPUT_DATA_DIR}' not found")

output_parquet_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '01_parquet_out')

output_exact_dedupe_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '02_exact_dedupe_out')
output_code_quality_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '03_code_quality_out')
output_filter_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '04_filter_out')
output_tokenisation_dir = os.path.join (MY_CONFIG.OUTPUT_FOLDER, '05_tokenisation_out')

## clear output folder
shutil.rmtree(MY_CONFIG.OUTPUT_FOLDER, ignore_errors=True)
shutil.os.makedirs(MY_CONFIG.OUTPUT_FOLDER, exist_ok=True)

print ("✅ Cleared output directory")

## Step-3: Data ingestion -  Convert source data to Parquet


This is the first component of this pipeline. It ingests few zip files and converts it into
parquet files for consumption by the next steps in this data processing pipeline.


### 3.1 - Set Input/output Folder

In [None]:
STAGE = 1

input_folder = MY_CONFIG.INPUT_DATA_DIR
output_folder =  output_parquet_dir

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

### 3.2 - Execute

In [None]:

%%time

import sys
import ast
from data_processing.utils import ParamsUtils
from data_processing.runtime.pure_python import PythonTransformLauncher
from code2parquet_transform import (  # domain_key,; snapshot_key,
    detect_programming_lang_cli_key,
    supported_langs_file_cli_key,
)
from code2parquet_transform_python import CodeToParquetPythonConfiguration
# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

supported_languages_file = "language.json"

params = {
    # code2parquet parameters
    supported_langs_file_cli_key: supported_languages_file,
    detect_programming_lang_cli_key: True,
    "data_files_to_use": ast.literal_eval("['.zip']"),

    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = PythonTransformLauncher(CodeToParquetPythonConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")


### 3.3 - Inspect Generated output


In [None]:
from my_utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Output dimensions (rows x columns)= ", output_df.shape)

output_df.head(5)
print(output_df.columns)

## To display certain columns
#parquet_df[['column1', 'column2', 'column3']].head(5)

## 3.4 - Understand the output

Each file contained within the ZIP is transformed into a distinct row within the Parquet dataset, adhering to the below schema.

- **title** : Path to the file within the ZIP archive.
- **document** : Name of the ZIP file containing the current file.
- **repo_name:** : The name of the repository to which the code belongs. This should match the name of the zip file containing the repository.

- **contents** : Content of the file, converted to a string.
- **document_id** :  Unique identifier computed as a uuid.
- **ext**: File extension extracted from the file path.
- **hash** : SHA256 hash value computed from the file content string.
- **size**: Size of the file content in bytes.
- **date_acquired** : Timestamp indicating when the file was processed.
- **programming_language** : Programming language detected using the file extension.




In [None]:
output_df
import pprint
import json

pprint.pprint (output_df.iloc[5, ])
# json.loads(output_df.iloc[0, ]['contents'])

##  Step-4: Exact Deduplication

This step will find exact duplicates in the 'content' column and remove them. This is done by computing SHA256 hash on the code files and remove records having identical hashes.

### 4.1 - Set Input/output Folder

In [None]:
STAGE = 2

input_folder = output_parquet_dir # previous output folder is the input folder for the current stage
output_folder =  output_exact_dedupe_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

### 4.2 - Execute

In [None]:
%%time

import sys

from ededup_transform_python import EdedupPythonTransformRuntimeConfiguration
from ededup_transform_base import doc_column_name_cli_param, int_column_name_cli_param

from data_processing.utils import ParamsUtils

# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

params = {
    # ededup parameters
    doc_column_name_cli_param: "contents",
    int_column_name_cli_param: "document_id",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = PythonTransformLauncher(EdedupPythonTransformRuntimeConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")


### 4.3 - Inspect Generated output



You will notice

In [None]:
from my_utils import read_parquet_files_as_df
import pandas as pd

output_df = read_parquet_files_as_df(output_folder)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.head(10)


##  Step-5: Code Quality

Code quality gives detailed evaluation of various aspects of code quality in your dataset, offering metrics to analyze structural properties, detect anomalies, and classify files based on their characteristics.

### 5.1 - Set Input/output Folder

In [None]:
STAGE = 3

input_folder = output_exact_dedupe_dir # previous output folder is the input folder for the current stage
output_folder =  output_code_quality_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

### 5.2 - Execute

In [None]:

%%time

import sys
from code_quality_transform_python import CodeQualityPythonTransformConfiguration
from data_processing.utils import ParamsUtils


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

params = {
    # code quality parameters
    "cq_contents_column_name": "contents",
    "cq_language_column_name": "programming_language",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = PythonTransformLauncher(CodeQualityPythonTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

### 5.3 - Inspect Generated output



You will notice we have two extra columns



- **line_mean**: Average line length.
- **line_max**: Longest line length.
- **total_num_lines**: Number of lines.
- **avg_longest_lines**: Avg. of top n longest lines.
- **alphanum_frac**: Alphanumeric fraction.
- **char_token_ratio**: Character-to-token ratio.
- **autogenerated**: Detects autogenerated files.
- **config_or_test**: Identifies config/test files.
- **has_no_keywords**: No Python keywords (e.g., class, def).
- **has_few_assignments**: Fewer than min = signs.
- **is_xml/is_html**: Detects XML or HTML content.


But still the same number or rows as before


In [None]:
from my_utils import read_parquet_files_as_df
import pprint

output_df = read_parquet_files_as_df(output_folder)

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.head(10)

pprint.pprint(output_df.columns)
total_num_lines_rows=output_df[output_df["total_num_lines"]<10].head(3)

for _, row in total_num_lines_rows.iterrows():  # Use the second element (row) from the tuple
    pprint.pprint(f'-------Total Num Lines {row["total_num_lines"]}------\n{row["contents"]}\n-------')



##  Step-6: Filtering

This step can be used to filter the code files based on our chosen conditions.

### 6.1 - Set Input/output Folder

In [None]:
STAGE = 4

input_folder = output_code_quality_dir # previous output folder is the input folder for the current stage
output_folder =  output_filter_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

### 6.2 - Execute

In [None]:
%%time

import sys
from filter_transform import (
    filter_columns_to_drop_cli_param,
    filter_criteria_cli_param,
    filter_logical_operator_cli_param,
)
from filter_transform_python import FilterPythonTransformConfiguration
from data_processing.utils import ParamsUtils



# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

filter_criteria = [
    "total_num_lines > 10 AND total_num_lines < 90"
]
filter_logical_operator = "AND"
filter_columns_to_drop = []

params = {

    # filter parameters
    filter_criteria_cli_param: filter_criteria,
    filter_columns_to_drop_cli_param: filter_columns_to_drop,
    filter_logical_operator_cli_param: filter_logical_operator,
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = PythonTransformLauncher(FilterPythonTransformConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

### 6.3 - Inspect Generated output


In [None]:
from my_utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print (f"Files processed : {input_df.shape[0]:,}")
print (f"Rows created : {output_df.shape[0]:,}")

print ("Input data dimensions (rows x columns)= ", input_df.shape)
print ("Output data dimensions (rows x columns)= ", output_df.shape)

output_df.head(10)

##  Step-7: Tokenization

Next, we tokenize the data to be used for fine tuning.
Tokenization module can use any Hugging Face compatible tokenizer.



### 7.1 - Set Input/output Folder

In [None]:
STAGE = 5

input_folder = output_filter_dir # previous output folder is the input folder for the current stage
output_folder =  output_tokenisation_dir

input_df = read_parquet_files_as_df(input_folder)  ## for debug purposes

print (f"🏃🏼 STAGE-{STAGE}: Processing input='{input_folder}' --> output='{output_folder}'")

### 7.2 - Execute

In [None]:

%%time

import sys

from data_processing.utils import ParamsUtils
from tokenization_transform_python import TokenizationPythonConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

params = {
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),

}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = PythonTransformLauncher(TokenizationPythonConfiguration())
# launch
return_code = launcher.launch()

if return_code == 0:
    print (f"✅ Stage:{STAGE} completed successfully")
else:
    raise Exception ("❌ Ray job failed")

### 7.3 - Inspect Generated output

Here we should see the contents column tokenised.

In [None]:
from my_utils import read_parquet_files_as_df

output_df = read_parquet_files_as_df(output_folder)

print ("Output dimensions (rows x columns)= ", output_df.shape)

output_df.head(5)



**The data is now ready for extended pretraining or fine tuning using any open source code models.**