# **Demo on building data prep pipeline for model fine tuning**

<a href="https://colab.research.google.com/github/IBM/data-prep-kit/blob/dev/examples/notebooks/fine tuning/code/sample-notebook.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

This demo notebook shows how to use [data-prep-kit](https://github.com/IBM/data-prep-kit) to build a data preparation pipeline that can be used for fine tuning or extended pre-training. We will discuss the various data preparation steps to process raw data (code repositories), tokenise it that can then be fine tuned using any popular code models. We will also discuss a novel recipe for semantic ordering of files in a repository which has shown to enhance model training. Please see our [paper](https://arxiv.org/abs/2407.13739) here for more details. For this demo, we will use the [codeparrot/github-code](https://huggingface.co/datasets/codeparrot/github-code) dataset hosted on Hugging Face datasets.



## Setup

Install data-prep-toolkit and datasets library. This notebook requires atleast 8 cpus.
To run on google colab, it is recommended to change the runtime to TPUs to get the required number of cpus.


In [1]:
%%capture logpip --no-stderr
!pip3 install 'data-prep-toolkit[ray]==0.2.2.dev1'
!pip3 install 'data-prep-toolkit-transforms[ray,all]==0.2.2.dev1'
!pip install datasets
!pip install pandas

We use parallel processing capability using Ray, so that beyond the demo, a user can also use this for actual production runs on larger datasets, with minor code changes. Please read [here](https://github.com/IBM/data-prep-kit?tab=readme-ov-file#-about-) on various features of data-prep-kit that includes flexibility of compute to run from laptop to cluster.  There are three parameters, that the user can change, as per usecase:

`runtime_num_worker`: number of parallel workers to be used

`num_cpus`: number of cpus to be used per worker

`run_locally: True` start a ray cluster for parallel computation


In [3]:
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing.utils import ParamsUtils
import sys
import json
import pandas as pd
#Default parameters for computation
worker_options = {"num_cpus": 0.8}
common_config_params = {
        "run_locally": True,
        "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
        "runtime_num_workers": 2,
    }




We will do all the processing in `sample_data` folder. This concludes our setup section.

In [4]:
!rm -rf sample_data
!mkdir -p sample_data
!mkdir -p sample_data/hf_2_parquet

## Data Preparation Steps

We now discuss the various data preparation steps to transform the raw data to a tokenised format post cleaning and transforming the data. We use the [parquet data format](https://parquet.apache.org/) for all our operations. This helps to efficiently scale the data for actual production runs, beyond the demo.

1. HuggingFace2Parquet: Read the dataset from HF and convert into parquet format.
2. Exact Deduplication: Remove exact duplicates.
3. Fuzzy Deduplication: Remove near duplicates.
4. Programming Lang Selection: Select the programming languages to be used for the analysis.
5. Code Quality Annotations: Annotate whether a given code file is of high quality or not using various rules.
6. Filtering: Filter dataset to retain only programming language of interest.
7. Semantic Ordering: Organise code files by their semantic dependencies.  
8. Tokenization: Tokenise the data for model fine tuning.

The data processing pipeline is organised such that the output of the previous transform is used as input to the next one. Refer to the papers [here](https://arxiv.org/pdf/2405.04324) and [here](https://arxiv.org/abs/2407.13739) for complete details for each of the above steps.

## 1. Huggingface datasets to Parquet

This is the first component of this pipeline. It ingests a dataset `codeparrot/github-code` from huggingface and converts it into
parquet files for consumption by the next steps in this data processing pipeline.

For this demo we are trying to process a few records. The following fields can be updated in case you want to use more data.
_total_files_ = 10 <br/>
_rows_per_file_ = 10

The output of this stage of the pipeline would be written to `sample_data/hf_2_parquet`.

In [5]:
import os
import pyarrow as pa
import pyarrow.parquet as pq

from datasets import load_dataset

import uuid
from data_processing.utils import TransformUtils
from collections import defaultdict

DATASET_NAME='codeparrot/github-code'

ds = load_dataset(DATASET_NAME,
                  streaming=True,
                  split="train",
                  trust_remote_code=True)

def row_mapper(row):
    return {
            'ext': TransformUtils.get_file_extension(row['path'])[1],
            'document_id': str(uuid.uuid4())
            }

parquet_data_output = "sample_data/hf_2_parquet"

## Converts a subset of a Hugging Face dataset to a Parquet file, optionally mapping and renaming columns.
def hf_dataset_to_parquet(ds, skip, nrows, file_name, mapper=None, renamed_columns=[]):
    dst_ = ds.skip(skip).take(nrows)

    data_dict = defaultdict(list)

    dst = dst_.map(mapper)

    for data in dst:
        for k, v in data.items():
            data_dict[k].append(v)

    for old, new in renamed_columns:
        data_dict[new] = data_dict[old]
        del data_dict[old]

    table = pa.Table.from_pydict(data_dict)
    pq.write_table(table, file_name)


## Create parquet files

total_files = 20
rows_per_file = 20
for num in range(total_files):
    file_name = os.path.join(
        f"{parquet_data_output}",
        f"data_{num}.parquet"
    )
    print (f"Writing {file_name}")
    hf_dataset_to_parquet(ds,
                          1 * rows_per_file,
                          rows_per_file,
                          file_name=file_name,
                          mapper=row_mapper,
                          renamed_columns=[("code", "contents"),
                                           ("path", "title")])

README.md:   0%|          | 0.00/7.54k [00:00<?, ?B/s]

github-code.py:   0%|          | 0.00/7.23k [00:00<?, ?B/s]

Writing sample_data/hf_2_parquet/data_0.parquet
Writing sample_data/hf_2_parquet/data_1.parquet
Writing sample_data/hf_2_parquet/data_2.parquet
Writing sample_data/hf_2_parquet/data_3.parquet
Writing sample_data/hf_2_parquet/data_4.parquet
Writing sample_data/hf_2_parquet/data_5.parquet
Writing sample_data/hf_2_parquet/data_6.parquet
Writing sample_data/hf_2_parquet/data_7.parquet
Writing sample_data/hf_2_parquet/data_8.parquet
Writing sample_data/hf_2_parquet/data_9.parquet
Writing sample_data/hf_2_parquet/data_10.parquet
Writing sample_data/hf_2_parquet/data_11.parquet
Writing sample_data/hf_2_parquet/data_12.parquet
Writing sample_data/hf_2_parquet/data_13.parquet
Writing sample_data/hf_2_parquet/data_14.parquet
Writing sample_data/hf_2_parquet/data_15.parquet
Writing sample_data/hf_2_parquet/data_16.parquet
Writing sample_data/hf_2_parquet/data_17.parquet
Writing sample_data/hf_2_parquet/data_18.parquet
Writing sample_data/hf_2_parquet/data_19.parquet


In [6]:
#Function to read parquet files in a directory as pandas dataframe
from pathlib import Path
def read_parquet_bulk(dir_path):
    data_dir = Path(dir_path)
    # Get the list of all Parquet files in the directory
    parquet_files = list(data_dir.glob('*.parquet'))
    # Check if the directory contains any Parquet files
    if not parquet_files:
        raise ValueError(f"No Parquet files found in directory: {dir_path}")
    # Concatenate all Parquet files into a single DataFrame
    full_df = pd.concat(
        pd.read_parquet(parquet_file)
        for parquet_file in parquet_files
    ).reset_index(drop=True)

    return full_df


input_df=read_parquet_bulk(parquet_data_output)

print("No of rows, No of columns",input_df.shape)
print("Sample data \n ")
input_df.head(1)

No of rows, No of columns (400, 8)
Sample data 
 


Unnamed: 0,repo_name,language,license,size,ext,document_id,contents,title
0,decred/dcrd,GO,isc,10267,.go,75a34a46-cc2e-4744-be4e-26ec33d87baa,// Copyright (c) 2021 The Decred developers\n/...,blockchain/indexers/indexsubscriber.go


## 2. Exact deduplication

This step will find exact duplicates in the 'content' column and remove them. This is done by computing SHA256 hash on the code files and remove records having identical hashes.

The transform specific params for exact deduplication are: <br/>
 _ededup_hash_cpu_ -  Number of cpus per worker <br/>
 _ededup_num_hashes_ - Number of workers used to store hashes <br/>
 _ededup_doc_column_ - Name of column which has to be checked for deduplication <br/>


In [28]:
import os
import sys
from ededup_transform_ray import EdedupRayTransformRuntimeConfiguration

input_folder = parquet_data_output # Output of previous stage is used as input.
output_folder = "sample_data/ededup_out"

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

ededup_params = {
    # ededup parameters
    "ededup_hash_cpu": 0.5,
    "ededup_num_hashes": 2,
    "ededup_doc_column": "contents",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = common_config_params | ededup_params
sys.argv = ParamsUtils.dict_to_req(d=params)
ededup_launcher = RayTransformLauncher(EdedupRayTransformRuntimeConfiguration())
ededup_launcher.launch()

01:44:14 INFO - exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'document_id', 'use_snapshot': False, 'snapshot_directory': None, 'hash_cpu': 0.5, 'num_hashes': 2}
INFO:ededup_transform_base:exact dedup params are {'doc_column': 'contents', 'doc_id_column': 'document_id', 'use_snapshot': False, 'snapshot_directory': None, 'hash_cpu': 0.5, 'num_hashes': 2}
01:44:14 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
01:44:14 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
01:44:14 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
INFO:data_processing_ray.runtime.ray.execution_configuration:number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
01:44:14 INFO - actor creation delay 0
INFO:data_processing_ray.runtime.ray.execution_configuration:actor creation delay 0
01:44:14 INFO - job details {'job category': 'preproc

0

In [29]:
import json
import pprint
def read_metadata(path):
    with open(path, 'r') as file:
        metadata = json.load(file)
        pprint.pp(metadata)
read_metadata(f"{output_folder}/metadata.json")

{'pipeline': 'pipeline_id',
 'job details': {'job category': 'preprocessing',
                 'job name': 'ededup',
                 'job type': 'ray',
                 'job id': 'job_id',
                 'start_time': '2024-10-29 01:44:18',
                 'end_time': '2024-10-29 01:44:20',
                 'status': 'success'},
 'code': None,
 'job_input_params': {'doc_column': 'contents',
                      'doc_id_column': 'document_id',
                      'use_snapshot': False,
                      'snapshot_directory': None,
                      'hash_cpu': 0.5,
                      'num_hashes': 2,
                      'checkpointing': False,
                      'max_files': -1,
                      'random_samples': -1,
                      'files_to_use': ['.parquet'],
                      'number of workers': 2,
                      'worker options': {'num_cpus': 0.8, 'max_restarts': -1},
                      'actor creation delay': 0},
 'execution_stats':

In [30]:
input_df=read_parquet_bulk(output_folder)
print("No of rows, No of columns",input_df.shape)
print("Sample data \n ")
input_df.head(1)

No of rows, No of columns (20, 9)
Sample data 
 


Unnamed: 0,repo_name,language,license,size,ext,document_id,contents,title,removed
0,decred/dcrd,GO,isc,10267,.go,a523a161-d2e6-40ec-b9f7-2a826bef2e04,// Copyright (c) 2021 The Decred developers\n/...,blockchain/indexers/indexsubscriber.go,[]


## 3. Fuzzy Deduplication

This step will find near duplicates and remove them. The code is broken into two code cells, one for adding document ids to the parquet file and then running fuzzy dedup. Document id addition is a prerequisite for fuzzy dedup.

We first add the document ids as an additional column to the parquet files. <br/>
_doc_column_ - specifies name of the column containing the document (required for ID generation) <br/>
_hash_column_ - specifies name of the column created to hold the string document id, if None, id is not generated <br/>
_int_id_column_ - specifies name of the column created to hold the integer document id, if None, id is not generated <br/>
At least one of hash_column or int_id_column must be specified.



In [9]:
input_folder = "sample_data/ededup_out"
output_folder = "sample_data/docid_out"


from doc_id_transform_ray import DocIDRayTransformRuntimeConfiguration
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

doc_id_params = {
    # doc id configuration
    "doc_id_doc_column": "contents",
    "doc_id_hash_column": "hash_column",
    "doc_id_int_column": "int_id_column",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = doc_id_params | common_config_params
sys.argv = ParamsUtils.dict_to_req(d=params)
launcher = RayTransformLauncher(DocIDRayTransformRuntimeConfiguration())
launcher.launch()

01:19:16 INFO - Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'hash_column', 'int_column': 'int_id_column', 'start_id': 0}
INFO:doc_id_transform_base:Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'hash_column', 'int_column': 'int_id_column', 'start_id': 0}
01:19:16 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
01:19:16 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
01:19:16 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
INFO:data_processing_ray.runtime.ray.execution_configuration:number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
01:19:16 INFO - actor creation delay 0
INFO:data_processing_ray.runtime.ray.execution_configuration:actor creation delay 0
01:19:16 INFO - job details {'job category': 'preprocessing', 'job name': 'doc_id', 'job type': 'ray', 'job id': 'job_id'}
INFO:data_

0

In [31]:

input_df=read_parquet_bulk(output_folder)
print("No of rows, No of columns",input_df.shape)
print("Sample data \n ")
input_df.head(1)

No of rows, No of columns (20, 9)
Sample data 
 


Unnamed: 0,repo_name,language,license,size,ext,document_id,contents,title,removed
0,decred/dcrd,GO,isc,10267,.go,a523a161-d2e6-40ec-b9f7-2a826bef2e04,// Copyright (c) 2021 The Decred developers\n/...,blockchain/indexers/indexsubscriber.go,[]


In [32]:
read_metadata(f"{output_folder}/metadata.json")

{'pipeline': 'pipeline_id',
 'job details': {'job category': 'preprocessing',
                 'job name': 'ededup',
                 'job type': 'ray',
                 'job id': 'job_id',
                 'start_time': '2024-10-29 01:44:18',
                 'end_time': '2024-10-29 01:44:20',
                 'status': 'success'},
 'code': None,
 'job_input_params': {'doc_column': 'contents',
                      'doc_id_column': 'document_id',
                      'use_snapshot': False,
                      'snapshot_directory': None,
                      'hash_cpu': 0.5,
                      'num_hashes': 2,
                      'checkpointing': False,
                      'max_files': -1,
                      'random_samples': -1,
                      'files_to_use': ['.parquet'],
                      'number of workers': 2,
                      'worker options': {'num_cpus': 0.8, 'max_restarts': -1},
                      'actor creation delay': 0},
 'execution_stats':

Post adding the document ids, the next step is to run fuzzy deduplication. We apply a two-step method for this: (1) compute MinHashes of all the documents and then utilize Locally Sensitive Hashing (LSH) to group documents based on their MinHash fingerprints, (2) measure Jaccard similarity between each pair of documents
in the same bucket and annotate documents except one as duplicates based on a similarity
threshold.  

Some important transform specific params are: <br/>
_fdedup_doc_column_ - Column to be used for deduplication <br/>
_fdedup_threshold_ - specifies the Jaccard similarity threshold (default is 0.7)

In [33]:
input_folder = "sample_data/docid_out"
output_folder = "sample_data/fdedup_out"

import os
import sys

from data_processing.utils import ParamsUtils
from fdedup_transform_ray import FdedupRayTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
fdedup_params = {
    # columns used
    "fdedup_doc_column": "contents",
    "fdedup_id_column": "int_id_column",
    "fdedup_cluster_column": "hash_column",
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = common_config_params| fdedup_params

# Pass commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch
fdedup_launcher = RayTransformLauncher(FdedupRayTransformConfiguration())
fdedup_launcher.launch()

01:45:39 INFO - fuzzy dedup params are {'doc_column': 'contents', 'id_column': 'int_id_column', 'cluster_column': 'hash_column', 'bucket_cpu': 0.5, 'mhash_cpu': 0.5, 'doc_cpu': 0.5, 'num_doc_actors': 1, 'num_minhash_actors': 1, 'num_bucket_actors': 1, 'num_preprocessors': 1, 'num_permutations': 64, 'threshold': 0.8, 'shingles_size': 5, 'delimiters': ' ', 'snapshot_delay': 1, 'use_bucket_snapshot': False, 'use_doc_snapshot': False, 'random_delay_limit': 10, 'worker_options': {'num_cpus': 0.8}}
INFO:fdedup_transform_ray:fuzzy dedup params are {'doc_column': 'contents', 'id_column': 'int_id_column', 'cluster_column': 'hash_column', 'bucket_cpu': 0.5, 'mhash_cpu': 0.5, 'doc_cpu': 0.5, 'num_doc_actors': 1, 'num_minhash_actors': 1, 'num_bucket_actors': 1, 'num_preprocessors': 1, 'num_permutations': 64, 'threshold': 0.8, 'shingles_size': 5, 'delimiters': ' ', 'snapshot_delay': 1, 'use_bucket_snapshot': False, 'use_doc_snapshot': False, 'random_delay_limit': 10, 'worker_options': {'num_cpus': 

0

In [34]:
read_metadata(f"{output_folder}/metadata.json")

{'pipeline': 'pipeline_id',
 'job details': {'job category': 'preprocessing',
                 'job name': 'fdedup',
                 'job type': 'ray',
                 'job id': 'job_id',
                 'start_time': '2024-10-29 01:45:42',
                 'end_time': '2024-10-29 01:46:05',
                 'status': 'success'},
 'code': None,
 'job_input_params': {'doc_column': 'contents',
                      'id_column': 'int_id_column',
                      'cluster_column': 'hash_column',
                      'bucket_cpu': 0.5,
                      'mhash_cpu': 0.5,
                      'doc_cpu': 0.5,
                      'num_doc_actors': 1,
                      'num_minhash_actors': 1,
                      'num_bucket_actors': 1,
                      'num_preprocessors': 1,
                      'num_permutations': 64,
                      'threshold': 0.8,
                      'shingles_size': 5,
                      'delimiters': ' ',
                      'sn

In [35]:
print("No of rows, No of columns",input_df.shape)
print("Sample data \n ")
input_df.head(1)

No of rows, No of columns (20, 9)
Sample data 
 


Unnamed: 0,repo_name,language,license,size,ext,document_id,contents,title,removed
0,decred/dcrd,GO,isc,10267,.go,a523a161-d2e6-40ec-b9f7-2a826bef2e04,// Copyright (c) 2021 The Decred developers\n/...,blockchain/indexers/indexsubscriber.go,[]


## 4. Programming Language Selection

This module helps retain the code files for language of interest which can be specified using selected_languages_file. Post this step, a new column is added, that contains the programming language name. One can use the code in the Filtering step to do analytics on how many files are found for which languages and thereby selectively filter.

The important parameters used by this transform are: <br/>
_lang_allowed_langs_file_key_ - A file with a list of allowed languages. <br/>
_lang_lang_column_key_ - The name of column which has programming language. <br/>
_lang_output_column_key_ - The name of annotation column. <br/>

For this demo, we will use this [file](https://github.com/IBM/data-prep-kit/blob/dev/transforms/code/proglang_select/python/test-data/languages/allowed-code-languages.txt) to specify languages of interest and the module will add a new column called "language_of_interest" which can have two values 0/1. 1 is added for all rows that have code files belonging to programming language specified in the list.

In [42]:
input_folder = "sample_data/fdedup_out"
output_folder = "sample_data/ps_out"

# download allowed-code-languages.txt
# !wget https://raw.githubusercontent.com/IBM/data-prep-kit/dev/transforms/code/proglang_select/python/test-data/languages/allowed-code-languages.txt

# Create a file with language of interest
#! echo "JavaScript\nC++\nC\nGo\nJava" >> allowed-code-languages.txt

selected_languages_file = "./allowed-code-languages.txt"

from proglang_select_transform_ray import ProgLangSelectRayConfiguration
from proglang_select_transform import (
    lang_allowed_langs_file_key,
    lang_lang_column_key,
    lang_output_column_key,
)

# create parameters
language_column_name = "language"
annotated_column_name = "language_of_interest"

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

langselect_config = {
    lang_allowed_langs_file_key: selected_languages_file,
    lang_lang_column_key: language_column_name,
    lang_output_column_key: annotated_column_name,
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = common_config_params| langselect_config

sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(ProgLangSelectRayConfiguration())
launcher.launch()

01:49:17 INFO - data factory proglang_select_ is using local configuration without input/output path
INFO:data_processing.data_access.data_access_factory_base5879a0af-8cb9-4ebf-9289-136abddcface:data factory proglang_select_ is using local configuration without input/output path
01:49:17 INFO - data factory proglang_select_ max_files -1, n_sample -1
INFO:data_processing.data_access.data_access_factory_base5879a0af-8cb9-4ebf-9289-136abddcface:data factory proglang_select_ max_files -1, n_sample -1
01:49:17 INFO - data factory proglang_select_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
INFO:data_processing.data_access.data_access_factory_base5879a0af-8cb9-4ebf-9289-136abddcface:data factory proglang_select_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
01:49:17 INFO - pipeline id pipeline_id
INFO:data_processi

0

In [37]:
read_metadata(f"{output_folder}/metadata.json")

{'pipeline': 'pipeline_id',
 'job details': {'job category': 'preprocessing',
                 'job name': 'proglang_select',
                 'job type': 'ray',
                 'job id': 'job_id',
                 'start_time': '2024-10-29 01:46:36',
                 'end_time': '2024-10-29 01:46:38',
                 'status': 'success'},
 'code': None,
 'job_input_params': {'proglang_select_language_column': 'language',
                      'proglang_select_allowed_langs_file': './allowed-code-languages.txt',
                      'proglang_select_output_column': 'language_of_interest',
                      'checkpointing': False,
                      'max_files': -1,
                      'random_samples': -1,
                      'files_to_use': ['.parquet'],
                      'number of workers': 2,
                      'worker options': {'num_cpus': 0.8, 'max_restarts': -1},
                      'actor creation delay': 0},
 'execution_stats': {'cpus': 96,
            

In [40]:

input_df=read_parquet_bulk(output_folder)
print("No of rows, No of columns",input_df.shape)

No of rows, No of columns (20, 12)


In [41]:
read_parquet_bulk(output_folder).head(10)


Unnamed: 0,repo_name,language,license,size,ext,document_id,contents,title,removed,int_id_column,hash_column,language_of_interest
0,decred/dcrd,GO,isc,10267,.go,a523a161-d2e6-40ec-b9f7-2a826bef2e04,// Copyright (c) 2021 The Decred developers\n/...,blockchain/indexers/indexsubscriber.go,[],0,-1,False
1,Brawl345/WiiDataDownloader-Module,Batchfile,isc,521,.bat,e655f46b-b0d4-4b2e-9cb9-1ee7eaef5bd6,@echo off\nCLS\n%header%\necho.\nif not exist ...,HackMii Installer herunterladen.bat,[],1,-1,False
2,decred/decrediton,Markdown,isc,1305,.md,f2a2c77c-e5e8-48f0-bf43-9677f0991667,# Limit rozbieżności\n\n**Ostrzeżenie! Ustawie...,app/i18n/docs/pl/InfoModals/GapLimit.md,[],2,-1,False
3,k0gaMSX/scc,C,isc,1127,.h,176691ce-9a96-43bf-b9e9-fb5e5d3c8179,"enum asmop {\n\tASNOP = 0,\n\tASSTB,\n\tASSTH,...",cc2/target/qbe/arch.h,[],3,-1,False
4,Dirbaio/btcd,GO,isc,28011,.go,5841247d-a671-40b1-9a51-af3046ffa993,// Copyright (c) 2013-2015 The btcsuite develo...,wire/msgblock_test.go,[],4,-1,False
5,bsander/dJSON,JavaScript,isc,3391,.js,dbdb3232-a484-485f-9ddb-e155258d3146,"describe('dJSON', function () {\n 'use strict...",test/dJSON.spec.js,[],5,-1,False
6,ibara/chargend,C,isc,7754,.h,7da203fd-265c-4cf0-a248-9a4d72e4fc8e,/*\n * 94 shifted lines of 72 ASCII characters...,chargend.h,[],6,-1,False
7,kdhp/play,C,isc,2590,.c,293bf25f-8f4d-4a94-bddd-7dddd3645af4,\n#include <stdarg.h>\n#include <stddef.h>\n#i...,mp3.c,[],7,-1,False
8,sec51/clamav-yara,GO,isc,436,.go,5dd28622-053f-4c5c-81c7-50733cfea161,"package main\n\nimport (\n\t""testing""\n)\n\nfu...",hdb_signatures_test.go,[],8,-1,False
9,ibc/MediaSoup,C,isc,34797,.c,7b17efb2-2735-43c8-be72-61cda07f6dd4,/*\n * Copyright 2005-2019 The OpenSSL Project...,worker/deps/openssl/openssl/crypto/whrlpool/wp...,[],9,-1,False


## 5. Code Quality

We experiment with various code quality metrics but finally retain the four code quality metrics used by (Li et al., 2023) to balance the tradeoff between code quality versus data volume.

Quality metrics

'line_mean': Average of the total line lengths.
'line_max':  Maximum line length present .
'total_num_lines': Total number of lines present
'avg_longest_lines': Average of the first n longest lines, where n can be any number you choose.
'alphanum_frac':  Calculates average of alpha numeric with respect to total data
'char_token_ratio': Computes character/token ratio of the file with tokenizer
'autogenerated': Check if file is autogenerated by looking for keywords in the first few lines of the file.
'config_or_test':  Check if file is a configuration file or a unit test
'has_no_keywords': Check if a python file has none of the keywords - for funcion, class, for loop, while loop.
'has_few_assignments': Check if file uses symbol '=' less than 'minimum' times
'is_xml': Check if input data is xml content
'is_html': Check if input data is HTML files based on displayed text VS code ratio

In [21]:
input_folder = "sample_data/ps_out"
output_folder = "sample_data/cq_out"

from code_quality_transform_ray import CodeQualityRayTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
# ??


language_column_name = "language"
params = {
    "cq_contents_column_name": "contents",
    "cq_language_column_name": language_column_name,
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}

params = common_config_params| params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(CodeQualityRayTransformConfiguration())
# launch
launcher.launch()

The cache for model files in Transformers v4.22.0 has been updated. Migrating your old cache. This is a one-time only operation. You can interrupt this and resume the migration later on by calling `transformers.utils.move_cache()`.


0it [00:00, ?it/s]

01:37:46 INFO - pipeline id pipeline_id
INFO:data_processing.runtime.execution_configuration:pipeline id pipeline_id
01:37:46 INFO - code location None
INFO:data_processing.runtime.execution_configuration:code location None
01:37:46 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
INFO:data_processing_ray.runtime.ray.execution_configuration:number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
01:37:46 INFO - actor creation delay 0
INFO:data_processing_ray.runtime.ray.execution_configuration:actor creation delay 0
01:37:46 INFO - job details {'job category': 'preprocessing', 'job name': 'code_quality', 'job type': 'ray', 'job id': 'job_id'}
INFO:data_processing_ray.runtime.ray.execution_configuration:job details {'job category': 'preprocessing', 'job name': 'code_quality', 'job type': 'ray', 'job id': 'job_id'}
01:37:46 INFO - data factory data_ is using local data access: input_folder - sample_data/ps_out output_folder - sample_data/cq

0

In [23]:
read_metadata(f"{output_folder}/metadata.json")

{'pipeline': 'pipeline_id',
 'job details': {'job category': 'preprocessing',
                 'job name': 'code_quality',
                 'job type': 'ray',
                 'job id': 'job_id',
                 'start_time': '2024-10-29 01:37:52',
                 'end_time': '2024-10-29 01:37:58',
                 'status': 'success'},
 'code': None,
 'job_input_params': {'code_quality_params': {'contents_column_name': 'contents',
                                              'language_column_name': 'language',
                                              'tokenizer': 'codeparrot/codeparrot',
                                              'hf_token': None},
                      'checkpointing': False,
                      'max_files': -1,
                      'random_samples': -1,
                      'files_to_use': ['.parquet'],
                      'number of workers': 2,
                      'worker options': {'num_cpus': 0.8, 'max_restarts': -1},
                      'ac

In [24]:
read_parquet_bulk(output_folder).head(10)


Unnamed: 0,repo_name,language,license,size,ext,document_id,contents,title,removed,int_id_column,...,total_num_lines,avg_longest_lines,alphanum_frac,char_token_ratio,autogenerated,config_or_test,has_no_keywords,has_few_assignments,is_xml,is_html
0,decred/dcrd,GO,isc,10267,.go,a523a161-d2e6-40ec-b9f7-2a826bef2e04,// Copyright (c) 2021 The Decred developers\n/...,blockchain/indexers/indexsubscriber.go,[],0,...,390,90.571429,0.698549,3.408699,False,False,False,False,False,False
1,Brawl345/WiiDataDownloader-Module,Batchfile,isc,521,.bat,e655f46b-b0d4-4b2e-9cb9-1ee7eaef5bd6,@echo off\nCLS\n%header%\necho.\nif not exist ...,HackMii Installer herunterladen.bat,[],1,...,11,69.428571,0.738964,2.492823,False,False,False,False,False,False
2,decred/decrediton,Markdown,isc,1305,.md,f2a2c77c-e5e8-48f0-bf43-9677f0991667,# Limit rozbieżności\n\n**Ostrzeżenie! Ustawie...,app/i18n/docs/pl/InfoModals/GapLimit.md,[],2,...,15,170.142857,0.822186,1.743954,False,False,False,False,False,False
3,k0gaMSX/scc,C,isc,1127,.h,176691ce-9a96-43bf-b9e9-fb5e5d3c8179,"enum asmop {\n\tASNOP = 0,\n\tASSTB,\n\tASSTH,...",cc2/target/qbe/arch.h,[],3,...,135,10.428571,0.658385,1.506684,False,False,False,True,False,False
4,Dirbaio/btcd,GO,isc,28011,.go,5841247d-a671-40b1-9a51-af3046ffa993,// Copyright (c) 2013-2015 The btcsuite develo...,wire/msgblock_test.go,[],4,...,791,85.142857,0.642105,2.457968,False,True,False,False,False,False
5,bsander/dJSON,JavaScript,isc,3391,.js,dbdb3232-a484-485f-9ddb-e155258d3146,"describe('dJSON', function () {\n 'use strict...",test/dJSON.spec.js,[],5,...,147,94.571429,0.487762,3.276329,False,False,False,False,False,False
6,ibara/chargend,C,isc,7754,.h,7da203fd-265c-4cf0-a248-9a4d72e4fc8e,/*\n * 94 shifted lines of 72 ASCII characters...,chargend.h,[],6,...,100,81.0,0.583183,3.102841,False,False,False,False,False,False
7,kdhp/play,C,isc,2590,.c,293bf25f-8f4d-4a94-bddd-7dddd3645af4,\n#include <stdarg.h>\n#include <stddef.h>\n#i...,mp3.c,[],7,...,130,56.714286,0.57529,2.154742,False,False,False,False,False,False
8,sec51/clamav-yara,GO,isc,436,.go,5dd28622-053f-4c5c-81c7-50733cfea161,"package main\n\nimport (\n\t""testing""\n)\n\nfu...",hdb_signatures_test.go,[],8,...,19,51.285714,0.740826,2.520231,False,False,False,False,False,False
9,ibc/MediaSoup,C,isc,34797,.c,7b17efb2-2735-43c8-be72-61cda07f6dd4,/*\n * Copyright 2005-2019 The OpenSSL Project...,worker/deps/openssl/openssl/crypto/whrlpool/wp...,[],9,...,785,79.714286,0.465816,2.030993,False,False,False,False,False,False




## 6. Filtering

This step can be used to filter the code files based on our chosen conditions. In this demo example, we have only used one annotation of adding programming language names for each code file. To demonstrate the utility, we will use this module to retain only code files of interest.

In [None]:
input_folder = "sample_data/cq_out"
output_folder = "sample_data/filter_out"


from filter_transform import (
    filter_columns_to_drop_cli_param,
    filter_criteria_cli_param,
    filter_logical_operator_cli_param,
)
from filter_transform_ray import FilterRayTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

# This is just an example criteria to filter
filter_criteria = [
    "language_of_interest = 1",
    "total_num_lines > 10 AND total_num_lines < 90"
]
filter_logical_operator = "AND"
filter_columns_to_drop = ["language_of_interest", "hash_column"]

filter_params = {
    filter_criteria_cli_param: filter_criteria,
    filter_columns_to_drop_cli_param: filter_columns_to_drop,
    filter_logical_operator_cli_param: filter_logical_operator,
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}


sys.argv = ParamsUtils.dict_to_req(common_config_params| filter_params)
launcher = RayTransformLauncher(FilterRayTransformConfiguration())
launcher.launch()


In [None]:
read_metadata(f"{output_folder}/metadata.json")

## 7. Semantic Ordering of Code Files

In this step, we order the code files such that we pack files from the same repository together, arranging them to prioritize semantic dependencies. We identify these dependencies by analyzing file imports and create a directed acyclic graph, where each file is a node and edges represent API imports between files. After breaking any cycles in the graph, we perform a topological sort to establish an ordering of files based on their semantic dependencies. We then organize the files in a repository by placing documentation and build files first, followed by the ordered set of files with semantic dependencies, and finally the remaining non-connected files. These non-connected files are arranged according to their folder structure, using a depth-first search to traverse the repository. Finally, we determine the dominant programming language of a repository based on file extensions and presence of build files, to organise repo-ordered files by programming languages.


This transform has following parameters:  <br/>
 _repo_lvl_sorting_enabled_ - If True, the repo level output is sorted using _repo_lvl_sorting_algo_ <br/>
 _repo_lvl_sorting_algo_ - Select the sorting algorithm to be used for repo level sorting. Use SORT_SEMANTIC_NORMALISED to organise by semantic dependencies or SORT_BY_PATH to arrange files based on folder structure in a repository.  <br/>
 _repo_lvl_store_backend_dir_ -  Directory to use for local store. Needed only when repo_lvl_store_type=local <br/>
 _repo_lvl_output_by_langs_ - If True, it organises output into folders of programming language. <br/>
 _repo_lvl_combine_rows_ - If True, it combines the contents of repo into a single row. <br/>



In [None]:
input_folder = "sample_data/filter_out"
output_folder = "sample_data/rlo_out"

import tempfile
from repo_level_order_transform import RepoLevelOrderRayTransformConfiguration
with tempfile.TemporaryDirectory() as tmpdirname:

    # create parameters
    local_conf = {
        "input_folder": input_folder,
        "output_folder": output_folder,
     }

    worker_options = {"num_cpus": 0.8}
    code_location = {"github": "github", "commit_hash": "12345", "path": "path"}

    repo_level_params = {
        "repo_lvl_sorting_algo": "SORT_SEMANTIC_NORMALISED",
        "repo_lvl_store_type": "local",
        "repo_lvl_store_backend_dir": tmpdirname,
        "repo_lvl_output_by_langs": True,
        "repo_lvl_combine_rows": True,
        "repo_lvl_sorting_enabled": True,
        "data_local_config": ParamsUtils.convert_to_ast(local_conf)
    }


    sys.argv = ParamsUtils.dict_to_req(d= common_config_params| repo_level_params)
    launcher = RayTransformLauncher(RepoLevelOrderRayTransformConfiguration())
    launcher.launch()

In [None]:
read_metadata(f"{output_folder}/metadata.json")

## 8. Tokenization

Next, we tokenize the data to be used for fine tuning.



In [None]:
input_folder = "sample_data/rlo_out"
output_folder = "sample_data/tokenize_out"

from tokenization_transform_ray import TokenizationRayConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

tf_params= {
    "data_local_config": ParamsUtils.convert_to_ast(local_conf)
}
sys.argv = ParamsUtils.dict_to_req(d=common_config_params| tf_params)
# create launcher
launcher = RayTransformLauncher(TokenizationRayConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()

In [None]:
read_metadata(f"{output_folder}/metadata.json")

In [None]:
read_parquet_bulk(f"{output_folder}/C").head(5)


**The data is now ready for extended pretraining or fine tuning using any open source code models.**