# Text deduplication

References:
- https://gist.github.com/conceptofmind/c5804428ea1bd89767815f9cd5f02d9a
- https://github.com/Sripaad/MinHash-LSH-Deduplicate/tree/main

In [1]:
import json
import multiprocessing as mp
import re
from collections import defaultdict
from functools import partial
from typing import Dict, List, Optional, Set, Tuple, Type

from datasets import Dataset
from tqdm import tqdm

from datasketch import MinHash, MinHashLSH
from dpu_utils.utils.iterators import ThreadedIterator


# NON_ALPHA = re.compile("[^A-Za-z_0-9]")
NON_ALPHA = re.compile(r"\W", re.UNICODE)
# parameters used in DuplicationIndex
MIN_NUM_TOKENS = 10
NUM_PERM = 256


def get_min_hash(tokens: List[str]) -> Optional[MinHash]:
    """Compute the MinHash of a code snippet."""
    if len(tokens) < MIN_NUM_TOKENS:
        return None
    min_hash = MinHash(num_perm=NUM_PERM)
    for token in set(tokens):
        min_hash.update(token.encode())
    return min_hash


def get_tokens(code: str) -> Set[str]:
    """Tokenize a code snippet."""
    return set([t for t in NON_ALPHA.split(code) if len(t.strip()) > 0])


class DuplicationIndex:
    def __init__(
        self,
        *,
        duplication_jaccard_threshold: float = 0.95,
    ):
        self._duplication_jaccard_threshold = duplication_jaccard_threshold
        self._num_perm = NUM_PERM
        self._index = MinHashLSH(threshold=self._duplication_jaccard_threshold, num_perm=self._num_perm)

        self._duplicate_clusters = defaultdict(set)

    def add(self, code_key: Tuple, min_hash: MinHash) -> None:
        """Add a key to _index (MinHashLSH)
        the min_hash is used to query closest matches based on the jaccard_threshold.
        The new key is either added to a existing cluster of one close match,
        or a new cluster is created. The clusters created in this way, depend on the order of add.
        Args:
            code_key (Tuple of (index, repo_name, path)):
                Theoritically any hasbale key. Here we use a tuple to retrieve the information later.
            min_hash: MinHash of the code_key.
        """
        close_duplicates = self._index.query(min_hash)
        if code_key in self._index.keys:
            print(f"Duplicate key {code_key}")
            return

        self._index.insert(code_key, min_hash)
        if len(close_duplicates) > 0:

            for base_duplicate in close_duplicates:
                if base_duplicate in self._duplicate_clusters:
                    self._duplicate_clusters[base_duplicate].add(code_key)
                    break
            else:
                self._duplicate_clusters[close_duplicates[0]].add(code_key)

    def get_duplicate_clusters(self) -> List[List[Dict]]:
        """Export the duplicate clusters.
        For each cluster, the first element is the base element of the cluster.
        The base element has an estimation jaccard similarity higher than the threshold with all the other elements.
        Returns:
            duplicate_clusters (List[List[Dict]]):
                List of duplicate clusters.
        """
        duplicate_clusters = []
        for base, duplicates in self._duplicate_clusters.items():
            cluster = [base] + list(duplicates)
            # reformat the cluster to be a list of dict
            cluster = [{"base_index": el} for el in cluster]
            duplicate_clusters.append(cluster)
        return duplicate_clusters

    def save(self, filepath) -> None:
        duplicate_clusters = self.get_duplicate_clusters()
        with open(filepath, "w") as f:
            json.dump(duplicate_clusters, f)


def _compute_min_hash(element, column='content'):
    index, data = element
    min_hash = get_min_hash([t for t in NON_ALPHA.split(data[column]) if len(t.strip()) > 0])
    if min_hash is not None:
        return index, min_hash


def minhash_iter(dataset_iterator: Type[Dataset]):
    with mp.Pool() as pool:
        for data in pool.imap_unordered(
            _compute_min_hash,
            ThreadedIterator(dataset_iterator, max_queue_size=10000),
            chunksize=100,
        ):
            if data is not None:
                yield data


def make_duplicate_clusters(dataset_iterator: Type[Dataset], jaccard_threshold: float):
    """Find duplicate clusters in the dataset in two steps:
    1. Compute MinHash for each code snippet. MinHash is a tool for fast jaccard similarity estimation.
    This step is computed using an asynchronous multiprocessing pool, minhash_iter
    2. Find duplicate clusters. The computed MinHash is added sequentially to the DuplicationIndex.
    This step cannot be parallelized. So using asynchronous thread in the previous step helps to speed up the process.
    """
    di = DuplicationIndex(duplication_jaccard_threshold=jaccard_threshold)

    for filename, min_hash in tqdm(ThreadedIterator(minhash_iter(enumerate(dataset_iterator)), max_queue_size=100)):
        di.add(filename, min_hash)

    # Returns a List[Cluster] where Cluster is List[str] with the filenames.
    return di.get_duplicate_clusters()


def jaccard_similarity(code1: str, code2: str) -> float:
    """Compute the Jaccard similarity of two code snippets."""
    tokens1 = get_tokens(code1)
    tokens2 = get_tokens(code2)
    return len(tokens1 & tokens2) / len(tokens1 | tokens2)


_shared_dataset = None


def _find_cluster_extremes_shared(cluster, jaccard_threshold):
    """Find a reduced cluster such that each code in the origin cluster is similar to at least one code in the reduced cluster.
    Two codes are similar if their Jaccard similarity is above the threshold.
    Args:
        cluster (List[dict]):
           cluster is a list of dict, each dict contains the following keys:
                - base_index
                - repo_name
                - path
            This is a typical output of DuplicationIndex.get_duplicate_clusters()
        jaccard_threshold (float):
            threshold for Jaccard similarity.
            Two codes are similar if their Jaccard similarity is above the threshold.
    Returns:
        extremes (List[dict]):
            A reduced representation of the cluster. The field copies is added to each dict.
            The copies field indicates the number of similar codes in the cluster for a extreme.
    """
    extremes = []
    for element1 in cluster:
        code1 = _shared_dataset[element1["base_index"]]["content"]
        for element2 in extremes:
            code2 = _shared_dataset[element2["base_index"]]["content"]
            if jaccard_similarity(code1, code2) >= jaccard_threshold:
                element2["copies"] += 1
                break
        else:
            element1["copies"] = 1
            extremes.append(element1)
    return extremes


def find_extremes(cluster_list, dataset, jaccard_threshold):
    """Call the _find_cluster_extremes_shared function in a parallel fashion.
    Args:
        cluster_list (List[List[Dict]]):
            each cluster is a list of dicts with the key base_index,
            referring to the index of the base code in the dataset.
        dataset (Type[Dataset]):
            dataset is used to access the content of the code snippets,
            using the base_index from the cluster_list.
            dataset is shared between all the processes using a glabal variable (any other way to share the dataset?),
            otherwise the multi processing is not speeded up.
        jaccard_threshold (float):
            the threshold for the jaccard similarity. The default value is 0.85
    Returns:
        extremes_list (List[Dict]):
            Each cluster is reduced to extremes.
            See _find_cluster_extremes_shared for the definition of extremes.
    """
    global _shared_dataset
    _shared_dataset = dataset
    extremes_list = []
    f = partial(_find_cluster_extremes_shared, jaccard_threshold=jaccard_threshold)
    with mp.Pool() as pool:
        for extremes in tqdm(
            pool.imap_unordered(
                f,
                cluster_list,
            ),
            total=len(cluster_list),
        ):
            extremes_list.append(extremes)
    return extremes_list


def deduplicate_dataset(
    dataset: Type[Dataset], jaccard_threshold: float = 0.85
) -> Tuple[Type[Dataset], List[List[Dict]]]:
    """Deduplicate the dataset using minhash and jaccard similarity.
    This function first generate duplicate clusters, then each cluster
    is reduced to the extremes that are similar to the other elements in the cluster.
    Codes are called similar if their Jaccard similarity is greater than jaccard_threshold (0.85 default).
    Args:
        dataset (Type[Dataset]):
            The dataset to deduplicate.
        jaccard_threshold (float, default=0.95):
            jaccard threshold to determine if two codes are similar
    Returns:
        ds_dedup (Type[Dataset]):
            The deduplicated dataset.
        duplicate_clusters (List[List[Dict]]):
            The list of duplicate clusters.
            Each cluster is a list of dicts with the following keys:
            - base_index : int
                The index of the code in the original dataset.
            - repo_name : str
            - path : str
            - copies : int
                The number of copies of the code in the cluster. (find_cluster_extremes)
            - is_extreme : bool
                Whether the code is an extreme in the cluster.
            All the codes in the cluster are removed from the dataset except the extremes.
    Example:
        >>> from datasets import load_dataset
        >>> from minhash_deduplication import deduplicate_dataset
        >>> ds = load_dataset("lvwerra/codeparrot-clean", split="train")
        >>> ds_dedup, duplicate_clusters = deduplicate_dataset(ds, jaccard_threshold=0.85)
    """
    duplicate_clusters = make_duplicate_clusters(dataset, jaccard_threshold)
    duplicate_indices = set(x["base_index"] for cluster in duplicate_clusters for x in cluster)
    extreme_dict = {}
    extremes_clusters = find_extremes(duplicate_clusters, dataset, jaccard_threshold)
    for extremes in extremes_clusters:
        for element in extremes:
            extreme_dict[element["base_index"]] = element
    remove_indices = duplicate_indices - set(extreme_dict.keys())
    ds_filter = dataset.filter(lambda x, idx: idx not in remove_indices, with_indices=True)

    # update duplicate_clusters
    for cluster in duplicate_clusters:
        for element in cluster:
            element["is_extreme"] = element["base_index"] in extreme_dict
            if element["is_extreme"]:
                element["copies"] = extreme_dict[element["base_index"]]["copies"]

    print(f"Original dataset size: {len(dataset)}")
    print(f"Number of duplicate clusters: {len(duplicate_clusters)}")
    print(f"Files in duplicate cluster: {len(duplicate_indices)}")
    print(f"Unique files in duplicate cluster: {len(extreme_dict)}")
    print(f"Filtered dataset size: {len(ds_filter)}")

    return ds_filter, duplicate_clusters

In [2]:
from datasets import load_dataset

dataset = load_dataset("tmnam20/vnexpress_20231013", "all", cache_dir='./cache', download_mode='FORCE_REDOWNLOAD'.lower(), split='train')

Downloading builder script:   0%|          | 0.00/8.30k [00:00<?, ?B/s]

Downloading and preparing dataset vnexpress_20231013/all (download: 10.71 MiB, generated: 64.32 MiB, post-processed: Unknown size, total: 75.03 MiB) to /home/minhnam/Desktop/rust/text-dedup/cache/tmnam20___vnexpress_20231013/all/1.0.0/288af6ee255ded210e6310cb9f3ec1493f0ccc958b424b4a16bede68fa79aedb...


Downloading data files:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/11.2M [00:00<?, ?B/s]

Extracting data files:   0%|          | 0/1 [00:00<?, ?it/s]

Generating train split:   0%|          | 0/9124 [00:00<?, ? examples/s]

Loading file from topics/all.tar.gz...
Dataset vnexpress_20231013 downloaded and prepared to /home/minhnam/Desktop/rust/text-dedup/cache/tmnam20___vnexpress_20231013/all/1.0.0/288af6ee255ded210e6310cb9f3ec1493f0ccc958b424b4a16bede68fa79aedb. Subsequent calls will reuse this data.


In [3]:
dataset

Dataset({
    features: ['title', 'description', 'content', 'raw', 'topic', 'id'],
    num_rows: 9124
})

In [4]:
# def deduplicate_dataset(
#     dataset: Type[Dataset], jaccard_threshold: float = 0.85
# ) -> Tuple[Type[Dataset], List[List[Dict]]]:

dedup_dataset, extras = deduplicate_dataset(dataset, jaccard_threshold=0.9)

8612it [00:03, 2469.74it/s]
100%|██████████| 67/67 [00:00<00:00, 4622.47it/s]


Filter:   0%|          | 0/9124 [00:00<?, ? examples/s]

Original dataset size: 9124
Number of duplicate clusters: 67
Files in duplicate cluster: 314
Unique files in duplicate cluster: 79
Filtered dataset size: 8889


In [None]:
def print_sth(sth, idx):
    print(sth)
    return sth

dataset = dataset.map(
    print_sth,
    batched=True,
    num_proc=2,
    batch_size=10,
    with_indices=True,
    # desc="Fingerprinting...",
)

In [7]:
import re

NON_ALPHA = re.compile(r"\W", re.UNICODE)
NON_ALPHA.split('Anh ta là một người đàn ông tốt')

['Anh', 'ta', 'là', 'một', 'người', 'đàn', 'ông', 'tốt']

In [15]:
from datasets import load_from_disk
loaded_dataset = load_from_disk("/home/minhnam/Desktop/rust/text-dedup/output/test/text_dataset/train")
loaded_dataset

Dataset({
    features: ['text'],
    num_rows: 4
})

In [20]:
for i in range(len(loaded_dataset)):
    print(loaded_dataset[i])

{'text': ''}
{'text': 'Mark 6:1-13 (NIV)'}
{'text': 'This is the text of the message I preached at Lakes Entrance Uniting Church on Sunday 29th July 2018. It was the fifth in a series on the Gospel of Mark, and the second of two on Mark 6:1-13.'}
{'text': '1 Jesus left there and went to his hometown, accompanied by his disciples. 2 When the Sabbath came, he began to teach in the synagogue, and many who heard him were amazed.'}


In [12]:
loaded_dataset.push_to_hub("tmnam20/ahihi")

Pushing dataset shards to the dataset hub:   0%|          | 0/1 [00:00<?, ?it/s]

Creating parquet from Arrow format:   0%|          | 0/9 [00:00<?, ?ba/s]

In [1]:
from datasets import load_dataset

In [7]:
dataset = load_dataset(
    "text", 
    data_files="text_dataset/*.txt", split="train")

Generating train split: 0 examples [00:00, ? examples/s]

In [8]:
dataset

Dataset({
    features: ['text'],
    num_rows: 11710791
})