## 1. 필요 라이브러리

In [1]:
# install those if needed

# pip install datasketch
# pip install networkx
# pip install fuzzywuzzy
# pip install datasets

import os
from tqdm.notebook import tqdm
from datasketch import MinHash, MinHashLSH          # MinHash와 LSH를 utilize하기 위한 라이브러리 
import networkx as nx                               # documents pairs를 documents cluster로 만들어주기 위한 그래프 관련 라이브러리
from fuzzywuzzy import fuzz                         # levenshtein 거리를 구하기 위한 라이브러리
from datasets import Dataset, load_dataset, get_dataset_config_names

## 2. Deduplicate를 위한 MinHash와 LSH

In [2]:
class LSHSubSet():
    """
    주어진 문서의 MinHash 값을 계산하고 해쉬 백터를 b개의 버킷으로 나누어 각각을 문서의 key 값으로 사용.
    이 때, b는 MinHash의 Threshold Jaccard Sim.을 기준으로 정함.
    같은 버킷에 들어 있는 documents들에 대해 filtering threshold를 넘는지 계산한 후, 넘으면 dedup.

    :param doc_list: Dataset
    :param num_perm: MinHash의 차원
    :param seed: MinHash를 구할 때 사용할 랜덤 시드 값
    """

    def __init__(self, doc_list, num_perm = 128, seed = 42):
        self.doc_list = doc_list
        self.num_perm = num_perm
        self.seed = seed
        self.doc_list_inverse = {doc:idx for idx, doc in enumerate(doc_list)}

    def _preprocess(self):
        """
        input document를 shingles(n-gram)로 변환.
        shingles 기반의 Jaccard Sim.을 기준으로 signiture hash(MinHash)값이 정해짐.
        baseline은 간단히 split-unigram으로 구현
        """

        self.doc_dict = {document:document.split() for document in self.doc_list}

    def _create_cand_pairs(self, lsh, min_hashes):
        """
        계산된 MinHash를 참조, 각 document들에 대해 Threshold Jaccard Sim.을 넘는 유사한 문서들을 pair로 return.

        
        :param lsh: 전체 문서들에 대한 MinHash table
        :param min_hashes: Original문서를 key로, signiture hash값을 value로 가지고 있는 MinHash 객체

        :return: Dedup checking이 필요한 document pairs를 담고 있는 list
        """

        no_check = []
        need_check = []
        sanity_symmetric = set()
        for idx, min_hash in enumerate(min_hashes):
            bucket = lsh.query(min_hash)
            if len(bucket)==1:
                no_check.append(idx)
            elif len(bucket)>1:
                first_val = self.doc_list[idx]
                for val in bucket:
                    if val == self.doc_list[idx]:
                        continue
                    second_val = val
                    need_check.append([first_val,second_val])
                    sanity_symmetric.add(self.doc_list_inverse[second_val])
        no_check = [self.doc_list[idx] for idx in tqdm(no_check, desc='sanity symmetric...') if idx not in sanity_symmetric]
        return no_check, need_check
    
    def _picked_by_graph(self, big_list):
        """
        문서 pair를 graph로 변환 후, 비교할 set을 만들어 줌.
        비교가 필요없는 sole node는 바로 final_docs list에 넣어줌.

        
        :param big_list: Dedup checking이 필요한 documents pairs를 담고 있는 list

        :return: Dedup checking이 필요하지 않은 documents를 담고 있는 list, 서로서로 dedup checking을 해야하는 set을 담고 있는 list
        """

        graph = []
        for pair in tqdm(big_list, desc='Building graph...'):
            graph.append(tuple(self.doc_list_inverse[doc] for doc in pair))
        G = nx.Graph()
        G.add_edges_from(graph)
        return list(nx.connected_components(G))
    
    def _dedup_by_idx(self, doc_indices):
        """
        Dedup checking 대상이 되는 한 cluster에 대해 one-by-one으로 dedup checking.
        baseline은 reference에서 많이 사용하는 levenshtein ratio 0.8로 설정.
        만약 어떤 pair가 dup이라면, 둘 중 긴 document를 제거(보통 긴 document는 불필요한 spamming을 담고 있는 경우가 많음).

        :param big_list: Dedup checking이 필요한 documents set의 indices

        :return: Dedup checking 후 남은 documents들의 indices, Dedup checking 후 제거된 documents들의 indices, 
        """

        doc_indices = sorted(doc_indices, key=lambda x: len(self.doc_list[x]))
        final_indices = []
        removed_indices = []

        while doc_indices:
            current_val = doc_indices.pop()
            is_duplicate = False

            for other_val in reversed(doc_indices):
                if fuzz.ratio(self.doc_list[current_val], self.doc_list[other_val]) > 70:
                    is_duplicate = True
                    removed_indices.append((current_val, other_val))
                    break

            if not is_duplicate:
                final_indices.append(current_val)

        return final_indices, removed_indices     

    def process(self):
        self._preprocess()
        print('Tokenizing Done.')
        # Create LSH index 
        lsh = MinHashLSH(threshold=0.7, num_perm=self.num_perm)
        min_hashes = []
        for doc_ori, doc_prep in tqdm(self.doc_dict.items(), desc='Building Hash...'):
            # min hash 계산
            min_hash = MinHash(num_perm=self.num_perm, seed=self.seed)
            for d in doc_prep:
                min_hash.update(d.encode('utf8'))
            lsh.insert(doc_ori, min_hash)
            min_hashes.append(min_hash)
        print('Building Hash Done.')
        final_docs, pairs_list = self._create_cand_pairs(lsh, min_hashes)
        print('Creating Pair Done.')
        clusters = self._picked_by_graph(pairs_list)
        print('Graphify Done.')
        removed_docs = []
        for doc_indices in tqdm(clusters, desc='Deduplicating...'):
            final_indices, removed_indices = self._dedup_by_idx(doc_indices)
            final_docs.extend([self.doc_list[idx] for idx in final_indices])
            removed_docs.extend([(self.doc_list[removed_idx], self.doc_list[compared_idx]) for removed_idx, compared_idx in removed_indices])
        print('Jobs All Done.')

        return final_docs, removed_docs
    

## 3. korean_textbooks deduplication

In [None]:
get_dataset_config_names("maywell/korean_textbooks")

In [5]:
DATASET_NAME = [i for i in get_dataset_config_names("maywell/korean_textbooks")]
# DATASET_NAME = [i for i in get_dataset_config_names("maywell/korean_textbooks") if i =='helpsteer']
print(DATASET_NAME)
raw_corpus = [load_dataset("maywell/korean_textbooks", dn) for dn in DATASET_NAME]
deduped_corpus = []
removed_corpus = []

In [None]:
for dn, data in zip(DATASET_NAME, raw_corpus):
    print(f'{dn} dedup ongoing...')
    print(f"{len(data := data['train']['text'])} samples.")
    raw_len = len(data)

    lshsubset = LSHSubSet(data)
    final_docs, removed_docs = lshsubset.process()

    print(f'{dn} dedup done.')
    print(f"{raw_len} -> {len(final_docs)} samples.")
    print("-"*10)
    
    deduped_corpus.append(final_docs)
    removed_corpus.append(removed_docs)

In [8]:
removed_corpus

[[('"새로운 토론 주제"에 대한 토론 내용:\n\n토론', '"새로운 토론 주제"에 대한 토론 내용:\n\n')]]

deduped_corpus: deduplicated된 문서들 (List[List[str]])

removed_corpus: 제거의 기준이 된 문서와 제거된 문서의 쌍들 (List[List[Tuple[str, str]]])