## 라이브러리 임포트

In [1]:
import os

import tiktoken
import json

import pprint as ppr

## 함수 및 전역 변수 정의

In [3]:
tokenizer = tiktoken.get_encoding("cl100k_base")

def tiktoken_len(input):
    global tokenizer
    
    if type(input)!=str:
        input = str(input)
        
    tokens = tokenizer.encode(input)
    
    return len(tokens)

In [4]:
class ChunkGroup:
    
    def __init__(self):
        self.info = []
        
    def load(self, dict_loaded: dict, func_tokenLength):
        self.base = dict_loaded
        self.tklenGet = func_tokenLength
        
        keys_subDt = [ [key_subDt] for key_subDt in ChunkGroup.keyParser(self.base)[0] ]
        self.info = keys_subDt            
        
    def chunkMerge(self, tkSize_chunk, tkSize_overlap, opt_leftover="Retain"):
        info_chunkGroups = []
        group = ChunkGroup.__boardReset()
        overlap = ChunkGroup.__boardReset()
        
        keys_subDt_inList = [key_subDt for key_subDt in self.info if type(key_subDt) == list]
        keys_subDt_merged = [key_subDt_inList[0] for key_subDt_inList in keys_subDt_inList if len(key_subDt_inList) == 1]
        keys_subDt_notMerged = [key_subDt for key_subDt in self.info if key_subDt not in [ [key_subDt_merged] for key_subDt_merged in keys_subDt_merged]]
        for key_subDt_merge in keys_subDt_merged:
            
            if group["tklength_Sum"] == 0 and overlap["tklength_Sum"] != 0:
                ChunkGroup.__boardUpdate(group, overlap["tklength_Sum"], overlap["keys"])
                overlap = ChunkGroup.__boardReset()
            
            ChunkGroup.__boardUpdate(group, self.tklenGet(self.base[key_subDt_merge]), [key_subDt_merge])
        
            if tkSize_chunk > group["tklength_Sum"] > tkSize_chunk-tkSize_overlap:
                ChunkGroup.__boardUpdate(overlap, self.tklenGet(self.base[key_subDt_merge]), [key_subDt_merge])
                
            if group["tklength_Sum"] >= tkSize_chunk:
                info_chunkGroups.append(group["keys"])
                group = ChunkGroup.__boardReset()
                
        if opt_leftover=="Retain":
            info_chunkGroups.append(group["keys"])
            
        elif opt_leftover=="Forced":
            group["keys"].reverse() 
            for key_notIn_Group in reversed([key_subDt for key_subDt in self.base if key_subDt not in group["keys"]]):
                ChunkGroup.__boardUpdate(group, self.tklenGet(self.base[key_notIn_Group]), [key_notIn_Group]) 
                if group["tklength_Sum"] >= tkSize_chunk:
                    group["keys"].reverse()  
                    info_chunkGroups.append(group["keys"])
                    break
        
        self.info = keys_subDt_notMerged        
        self.info.extend(info_chunkGroups)
        
        
    def chunkSplit(self, tkSize_chunk, tkSize_overlap, opt_leftover="Retain"):
        keys_subDt_inList = [key_subDt_inList for key_subDt_inList in self.info if type(key_subDt_inList) == list]
        keys_subDt = [key_subDt_inList[0] for key_subDt_inList in keys_subDt_inList if len(key_subDt_inList) == 1]
        keys_subDt_splited = [key_subDt for key_subDt in keys_subDt if self.tklenGet(self.base[key_subDt]) > tkSize_chunk]
        for key_subDt_splited in keys_subDt_splited:
            
            sub_ChunkGroup = ChunkGroup()
            sub_ChunkGroup.load(self.base[key_subDt_splited], self.tklenGet)
             
            sub_ChunkGroup.chunkMerge(tkSize_chunk, tkSize_overlap, opt_leftover=opt_leftover)
                        
            self.info[self.info.index([key_subDt_splited])] = {key_subDt_splited:sub_ChunkGroup.info}   
    
    
    def chunkMake(self):
        self.chunks = []
        keys_common = ChunkGroup.keyParser(self.base)[1]
        
        for chunkGroup_info in self.info:
            
            if type(chunkGroup_info) == list:
                keys_grouped = chunkGroup_info
                keys_grouped = keys_common + keys_grouped
                
                chunk = {key_grouped: self.base[key_grouped] for key_grouped in keys_grouped}
                
                self.chunks.append(chunk)
                
            elif type(chunkGroup_info) == dict:
                key_splited = list(chunkGroup_info.keys())[0]
                keysL_grouped_inSplited = chunkGroup_info[key_splited]
                
                chunkGroup_inSplited = ChunkGroup()
                chunkGroup_inSplited.load(self.base[key_splited], None)
                
                chunks_inSplited = []
                for keys_grouped_inSplited in keysL_grouped_inSplited:
                   chunkGroup_inSplited.info = [keys_grouped_inSplited]
                   
                   chunk_inSplited = {key_splited:chunkGroup_inSplited.chunkMake()}
                   for key_common in keys_common: chunk_inSplited[key_common] = self.base[key_common]
                   
                   chunks_inSplited.append(chunk_inSplited)
                self.chunks.extend(chunks_inSplited)
                   
        return self.chunks

    
    @staticmethod
    def keyParser(dict_parsed):
        keys_for_dict = [key_in_dict_parsed for key_in_dict_parsed in list(dict_parsed.keys()) if type(dict_parsed.get(key_in_dict_parsed)) == dict]
        keys_for_else = [key_in_dict_parsed for key_in_dict_parsed in list(dict_parsed.keys()) if key_in_dict_parsed not in keys_for_dict] 
        
        return keys_for_dict, keys_for_else
    
    @staticmethod
    def __boardReset():
            return {"tklength_Sum":0, "keys":[]}
        
    @staticmethod
    def __boardUpdate(input_board: dict, input_tkLength, input_key):
        input_board["tklength_Sum"] += input_tkLength
        input_board["keys"].extend(input_key) 

## 워킹 코드

### 데이터 로드

In [26]:
targetChapter = 1 

data_folder=r'G:\내 드라이브\LAB_works\법률 LLM 프로젝트\data\데이터 전처리\3. JSON 컨버팅\JSON_byChapter_v2\R078r3e_Annex3_Appendix1'
data_fileName = fr'R078r3e_Annex3_Appendix1_chapter{targetChapter}_ver2.json'

# 텍스트 추출
with open(data_folder+'\\'+data_fileName , 'r') as source:
    dict_chapter = json.load(source)

### 지정한 청킹 및 오버랩 사이즈를 바탕으로 챕터 청킹

In [27]:
tkSize_chunk_global = 500
tkSize_overlap_global = 100
tklen_func_global = tiktoken_len

In [28]:
ChunkGroup_targetChapter = ChunkGroup()

ChunkGroup_targetChapter.load(dict_chapter, tklen_func_global)

ChunkGroup_targetChapter.chunkSplit(tkSize_chunk_global, tkSize_overlap_global)
ChunkGroup_targetChapter.chunkMerge(tkSize_chunk_global,tkSize_overlap_global)

Chunks_targetChapter = ChunkGroup_targetChapter.chunkMake()

### 메타 데이터를 기록하여 JSONL 형식으로 저장

In [34]:
file_path = r'G:\내 드라이브\LAB_works\법률 LLM 프로젝트\data\데이터 전처리\3. JSON 컨버팅\JSON_byChapter_v2\R078r3e_Annex3_Appendix1'
file_name = data_fileName = fr'R078r3e_Annex3_Appendix1_chapter{targetChapter}_ver2.json'

In [35]:
with open(file_path + '\\'+ file_name, encoding= "utf-8",mode="w") as file: 
	for chunk_targetChapter in Chunks_targetChapter: file.write(json.dumps(chunk_targetChapter) + "\n")

In [36]:
with open(file_path + '\\'+ file_name) as f: 
	for line in f: print(line)

{"Chapter": "1", "Title": "Alternative method for the determination of peak braking coefficient (PBC)", "1.1.": {"Description": ["General:"], "Item": ["(a) The test is to establish a PBC for the vehicle type when being brakedon the test surfaces described in Annex 3, paragraphs 1. 1.1. and 1.1.2.", "(b) The test comprises a number of stops with varying brake control forces. Both wheels shall be braked simultaneously up to the point reached before the wheel lock, in order to achieve the maximum vehicle deceleration rate on the given test surface.", "(c) The maximum vehicle deceleration rate is the highest value recorded during all the test stops.", "(d) The Peak Braking Coefficient (PBC) is calculated from the test stop that generates the maximum vehicle deceleration rate, as follows: [Equation 1]", "(e) The value of PBC shall be rounded to two decimal places."], "Equation 1": ["_PBC_ =0.566/ _t_", "", "where:", "", "t = time taken for the vehicle speed to reduce from 40 km/h to 20 km/h

## 워킹 코드 (자동화)

In [31]:
## 챕터 별 개별 저장
for targetChapter in range(1,2): 

    # 데이터 입출력 경로 지정
    
    data_folder=r'G:\내 드라이브\LAB_works\법률 LLM 프로젝트\data\데이터 전처리\3. JSON 컨버팅\JSON_byChapter_v2\R078r3e_Annex3_Appendix1'
    data_fileName = fr'R078r3e_Annex3_Appendix1_chapter{targetChapter}_ver2.json'

    data_folder_out = r'G:\내 드라이브\LAB_works\법률 LLM 프로젝트\data\데이터 전처리\3. JSON 컨버팅\JSONL_byChunk_v2\R078r3e_Annex3_Appendix1'
    data_fileName_out = data_fileName = fr'R078r3e_Annex3_Appendix1_chapter{targetChapter}_chunked_ver2.jsonl'

    # 청크 사이즈 & 토크나이저 지정
    tkSize_chunk_global = 500
    tkSize_overlap_global = 100
    tklen_func_global = tiktoken_len

    # 원본 JSON 불러오기
    with open(data_folder_in+'\\'+data_fileName_in , 'r') as source:
        dict_chapter = json.load(source)
        
    # 청킹    
    ChunkGroup_targetChapter = ChunkGroup()

    ChunkGroup_targetChapter.load(dict_chapter, tklen_func_global)

    ChunkGroup_targetChapter.chunkSplit(tkSize_chunk_global, tkSize_overlap_global)
    ChunkGroup_targetChapter.chunkMerge(tkSize_chunk_global,tkSize_overlap_global)

    Chunks_targetChapter = ChunkGroup_targetChapter.chunkMake()

    # 청킹 가공된 JSON 저장하기
    with open(data_folder_out + '\\'+ data_fileName_out, encoding= "utf-8",mode="w") as file: 
        for chunk_targetChapter in Chunks_targetChapter: file.write(json.dumps(chunk_targetChapter) + "\n")

In [33]:
## 챕터 통합 저장
Chunks_fullChapter = []

for targetChapter in range(1,2): 

    # 데이터 입출력 경로 지정
    data_folder_in = r'G:\내 드라이브\LAB_works\법률 LLM 프로젝트\data\데이터 전처리\3. JSON 컨버팅\JSON_byChapter_v2\R078r3e_Annex3_Appendix1'
    data_fileName_in = fr'R078r3e_Annex3_Appendix1_chapter{targetChapter}_ver2.json'

    # 청크 사이즈 & 토크나이저 지정
    tkSize_chunk_global = 500
    tkSize_overlap_global = 100
    tklen_func_global = tiktoken_len

    # 원본 JSON 불러오기
    with open(data_folder_in+'\\'+data_fileName_in , 'r') as source:
        dict_chapter = json.load(source)
        
    # 청킹    
    ChunkGroup_targetChapter = ChunkGroup()

    ChunkGroup_targetChapter.load(dict_chapter, tklen_func_global)

    ChunkGroup_targetChapter.chunkSplit(tkSize_chunk_global, tkSize_overlap_global)
    ChunkGroup_targetChapter.chunkMerge(tkSize_chunk_global,tkSize_overlap_global)

    Chunks_targetChapter = ChunkGroup_targetChapter.chunkMake()

    Chunks_fullChapter.extend(Chunks_targetChapter)
    
# 청킹 가공된 JSON 저장하기
    data_folder_out = r'G:\내 드라이브\LAB_works\법률 LLM 프로젝트\data\데이터 전처리\3. JSON 컨버팅\JSONL_byChunk_v2\R078r3e_Annex3_Appendix1'
    data_fileName_out = data_fileName = fr'R078r3e_Annex3_Appendix1_chapter{targetChapter}_chunked_ver2.jsonl'


with open(data_folder_out + '\\'+ data_fileName_out, encoding= "utf-8",mode="w") as file: 
    for Chunk_fullChapter in Chunks_fullChapter: file.write(json.dumps(Chunk_fullChapter) + "\n")