In [1]:
from text_dedup.minhash import embed_func
from dedup_rs import EmbedFunc
from dedup_rs import UnionFind as uf_rs
import os
import numpy as np
import re
import multiprocessing as mp
from text_dedup.utils import *
INDEX_COLUMN = "__index__"
CLUSTER_COLUMN = "__cluster__"
NUM_PROC = os.cpu_count()
SIGNATURE_COLUMN = "__signatures__"

In [2]:
uf = uf_rs.load("/home/wayne/kioxia/github/text-dedup/temp_output_minhash_rust/uf.json")

In [2]:
from text_dedup.minhash_rust import *

In [3]:
SEED = 42
RNG = np.random.RandomState(SEED)
NON_ALPHA = re.compile(r"\W", re.UNICODE)
datasets.logging.set_verbosity_error()
# for is originally used to reduce memory usage in MacOS but also ensures that the Union Find data structure
# is not copied to child processes as long as it is not modified.
mp.set_start_method("fork", force=True)
uf = UnionFind()
SIGNATURE_COLUMN = "__signatures__"

In [4]:
Emb = EmbedFunc.from_b_r(50, 4, 200, SIGNATURE_COLUMN, INDEX_COLUMN)

In [5]:
meta_args = MetaArgs(column="text",batch_size=5000)
io_args = IOArgs(
        path="./temp_inp_ds",
        local=True,
        num_proc=NUM_PROC,
        cache_dir=".cache",
        output="./temp_output_minhash",
        debug=True,
        clean_cache=True,
    )
minhash_args = MinHashArgs(num_perm=200, ngram=2, threshold=0.5, b=50, r=4)

In [6]:
ds, id2id = load_hf_dataset(io_args=io_args, meta_args=meta_args)
ds = ds.filter(
    lambda x: len(NON_ALPHA.split(x[meta_args.column].lower())) >= minhash_args.min_length,
    num_proc=io_args.num_proc,
)
# ds = ds.select(range(100000))
ds.cleanup_cache_files()

0

In [8]:
ds.map(
                Emb.batch_embed_shard,
                input_columns=[meta_args.column, INDEX_COLUMN],
                remove_columns=[col for col in ds.column_names if col != INDEX_COLUMN],
                batched=True,
                batch_size=10000,
                with_indices=False,
                desc="Fingerprinting with rust...",
            )

Fingerprinting with rust...:   0%|          | 0/100000 [00:00<?, ? examples/s]

Dataset({
    features: ['core_id', 'doi', 'original_abstract', 'original_title', 'processed_title', 'processed_abstract', 'cat', 'labelled_duplicates', 'text', '__index__'],
    num_rows: 100000
})

def hash_func(byte_data):
    return sha1_hash(byte_data, d=min(minhash_args.hash_bits, 32))

dtype, max_hash, modulo_prime =  (np.uint64, np.uint32((1 << 32) - 1), np.uint64((1 << 61) - 1))
PERMUTATIONS: tuple[np.ndarray, np.ndarray] = (
        RNG.randint(
            1, modulo_prime, size=(200,), dtype=dtype
        ),  # a is a multiplier so should not be 0
        RNG.randint(0, modulo_prime, size=(200,), dtype=dtype),  # b
    )
B, R = optimal_param(
            minhash_args.threshold,
            minhash_args.num_perm,
            false_positive_weight=0.5,
            false_negative_weight=0.5,
        )
hashranges = [(i * R, (i + 1) * R) for i in range(B)]
embedded = ds.map(
        function=embed_func,
        fn_kwargs={
            "num_perm": minhash_args.num_perm,
            "hashranges": hashranges,
            "ngram_size": minhash_args.ngram,
            "min_length": minhash_args.min_length,
            "permutations": PERMUTATIONS,
            "hash_func": hash_func,
            "dtype": dtype,
            "max_hash": max_hash,
            "modulo_prime": modulo_prime
        },
        input_columns=[meta_args.column, INDEX_COLUMN],
        remove_columns=[col for col in ds.column_names if col != INDEX_COLUMN],
        num_proc=io_args.num_proc,
        with_indices=False,
        desc="Fingerprinting...",
    )
embedded.cleanup_cache_files()

In [7]:
uf = UnionFind.load("/home/wayne/kioxia/github/text-dedup/temp_output_minhash_rust/uf.json")

AttributeError: type object 'UnionFind' has no attribute 'load'

In [8]:
uf = Emb.cluster()

In [9]:
ds = ds.map(
                function=lambda record: {CLUSTER_COLUMN: uf.find(record[INDEX_COLUMN])},
                with_indices=False,
                num_proc=io_args.num_proc,
                new_fingerprint=str(random.getrandbits(128)),
                desc="Finding clusters...",
            )

Finding clusters... (num_proc=12):   0%|          | 0/100000 [00:00<?, ? examples/s]

In [10]:
uf.dump(os.path.join(io_args.output, "uf.json"))

In [13]:
import json

data = json.load(open("temp_output_minhash_rust/uf.json"))

In [8]:
cluster = ds.map(
        Emb.batch_embed_shard,
        input_columns=[meta_args.column, INDEX_COLUMN],
        remove_columns=[col for col in ds.column_names if col != INDEX_COLUMN],
        batched=True,
        batch_size=10000,
    )

Map:   0%|          | 0/10000 [00:00<?, ? examples/s]

Number of hash tables: 50
Clustering...
Number of edges: 4619Number of edges: 4619


In [10]:
LEN_EMBEDDED = len(embedded_rust)
NUM_SHARDS = np.ceil(LEN_EMBEDDED / meta_args.batch_size).astype(int)

In [11]:
HASH_TABLES: list[dict[int, set]] = [defaultdict(set) for _ in range(50)]

In [1]:
edges = []
for i in tqdm(
    range(0, NUM_SHARDS),
    dynamic_ncols=True,
    desc="Iterating MinHashes...",  # noqa: E501
):
    embedded_shard = embedded_rust.shard(
        num_shards=NUM_SHARDS,
        index=i,
        contiguous=True,
        writer_batch_size=meta_args.batch_size,
    )
    for key, Hs in zip(embedded_shard[INDEX_COLUMN], embedded_shard[SIGNATURE_COLUMN]):
        Emb.batch_add(Hs, key)

NameError: name 'tqdm' is not defined

In [13]:
len(edges)

0

In [11]:
Emb.hash_tables

In [11]:
ds = ds.map(
    function=lambda record: {CLUSTER_COLUMN: Emb.uf_batch_find(record[INDEX_COLUMN])},
    with_indices=False,
    batched=True,
    batch_size=meta_args.batch_size * 10,
    new_fingerprint=str(random.getrandbits(128)),
    desc="Finding clusters...",
)

Finding clusters...:   0%|          | 0/10000 [00:00<?, ? examples/s]