In [None]:
from src.wikidataDumpReader import WikidataDumpReader
from src.wikidataEntityDB import WikidataEntity
from multiprocessing import Manager
import time
import os
import json
from huggingface_hub import login
from multiprocessing import Process, Value, Queue
from datasets import load_dataset_builder
import gzip

FILEPATH = os.getenv("FILEPATH", '../data/Wikidata/latest-all.json.bz2')
PUSH_SIZE = int(os.getenv("PUSH_SIZE", 10000))
QUEUE_SIZE = int(os.getenv("QUEUE_SIZE", 10000))
NUM_PROCESSES = int(os.getenv("NUM_PROCESSES", 4))
SKIPLINES = 0

api_key = json.load(open("../API_tokens/huggingface_api.json", 'r+'))['API_KEY']

In [None]:
def save_items_to_sqlite(item, data_batch, sqlitDBlock):
    if (item is not None):
        labels = WikidataEntity.clean_label_description(item['labels'])
        descriptions = WikidataEntity.clean_label_description(item['descriptions'])
        labels = json.dumps(labels, separators=(',', ':'))
        descriptions = json.dumps(descriptions, separators=(',', ':'))
        in_wikipedia = WikidataEntity.is_in_wikipedia(item)
        data_batch.append({
            'id': item['id'],
            'labels': labels,
            'descriptions': descriptions,
            'in_wikipedia': in_wikipedia,
            'is_property': ('P' in item['id']),
            'is_item': ('Q' in item['id']),
        })

        with sqlitDBlock:
            if len(data_batch) > PUSH_SIZE:
                worked = WikidataEntity.add_bulk_items(list(data_batch[:PUSH_SIZE]))
                if worked:
                    del data_batch[:PUSH_SIZE]

multiprocess_manager = Manager()
sqlitDBlock = multiprocess_manager.Lock()
data_batch = multiprocess_manager.list()

wikidata = WikidataDumpReader(FILEPATH, num_processes=NUM_PROCESSES, queue_size=QUEUE_SIZE, skiplines=SKIPLINES)
wikidata.run(lambda item: save_items_to_sqlite(item, data_batch, sqlitDBlock), max_iterations=None, verbose=True)

while len(data_batch) > 0:
    worked = WikidataEntity.add_bulk_items(list(data_batch))
    if worked:
        del data_batch[:PUSH_SIZE]
    else:
        time.sleep(1)

In [None]:
FILEPATH = os.getenv("FILEPATH", '../data/Wikidata/latest-all.json.bz2')
QUEUE_SIZE = int(os.getenv("QUEUE_SIZE", 2000))
NUM_PROCESSES = int(os.getenv("NUM_PROCESSES", 4))
SKIPLINES = 0

def save_to_queue(item, data_batch):
    if (item is not None):

        clean_claims = WikidataEntity._remove_keys(item.get('claims', {}), ['hash', 'snaktype', 'type', 'entity-type', 'numeric-id', 'qualifiers-order', 'snaks-order'])
        clean_claims = WikidataEntity._clean_datavalue(clean_claims)
        clean_claims = WikidataEntity._remove_keys(clean_claims, ['id'])
        # clean_claims = WikidataEntity._add_labels_to_claims(clean_claims)

        sitelinks = WikidataEntity._remove_keys(item.get('sitelinks', {}), ['badges'])

        data_batch.put({
            'id': item['id'],
            'labels': item['labels'],
            'descriptions': item['descriptions'],
            'aliases': item['aliases'],
            'sitelinks': sitelinks,
            'claims': clean_claims
        })

def writer_loop(data_batch, finished):
    file_handle = None
    NUM_ITEMS = 0
    FILE_ID = 0
    FILE_SIZE = 1_000_000

    while True:
        if finished.value == 1 and data_batch.empty():
            if file_handle is not None:
                file_handle.write('\n]')
                file_handle.close()
            break

        try:
            next_item = data_batch.get(timeout=1)
        except:
            continue

        if next_item:

            if file_handle is None:
                file_handle = gzip.open(f'chunk_{FILE_ID}.json.gz', mode='wt')
                file_handle.write('[\n')
                NUM_ITEMS = 0

            json_data = json.dumps(next_item, separators=(',', ':'))
            file_handle.write(json_data)
            NUM_ITEMS += 1

            if NUM_ITEMS >= FILE_SIZE:
                file_handle.write('\n]')
                file_handle.close()
                file_handle = None
                FILE_ID += 1
                NUM_ITEMS = 0
            else:
                file_handle.write(",\n")

data_batch = Queue(maxsize=QUEUE_SIZE)
finished = Value('i', 0)
with finished.get_lock():
    finished.value = 0

wikidata = WikidataDumpReader(FILEPATH, num_processes=NUM_PROCESSES, queue_size=QUEUE_SIZE, skiplines=SKIPLINES)

writer_process = Process(
    target=writer_loop,
    args=(data_batch, finished)
)
writer_process.start()

wikidata.run(lambda item: save_to_queue(item, data_batch), max_iterations=None, verbose=True)

with finished.get_lock():
    finished.value = 1

writer_process.join()

In [None]:
# Constants
QUEUE_SIZE = int(os.getenv("QUEUE_SIZE", 5000))
NUM_PROCESSES = int(os.getenv("NUM_PROCESSES", 4))
SKIPLINES = 0

def save_to_queue(item, data_queue):
    """Processes and puts cleaned item into the multiprocessing queue."""
    if (item is not None) and (WikidataEntity.is_in_wikipedia(item)):
        claims = WikidataEntity.add_labels_batched(item['claims'], query_batch=100)
        data_queue.put({
            'id': item['id'],
            'labels': json.dumps(item['labels'], separators=(',', ':')),
            'descriptions': json.dumps(item['descriptions'], separators=(',', ':')),
            'aliases': json.dumps(item['aliases'], separators=(',', ':')),
            'sitelinks': json.dumps(item['sitelinks'], separators=(',', ':')),
            'claims': json.dumps(claims, separators=(',', ':'))
        })

def chunk_generator(filepath, num_processes=2, queue_size=5000, skip_lines=0):
    """
    A generator function that reads a chunk file with WikidataDumpReader,
    processes each item, and yields the result. It uses a multiprocessing
    queue to handle data ingestion in parallel without storing everything
    in memory.
    """
    data_queue = Queue(maxsize=queue_size)
    finished = Value('i', 0)

    # Initialize the dump reader
    wikidata = WikidataDumpReader(
        filepath,
        num_processes=num_processes,
        queue_size=queue_size,
        skiplines=skip_lines
    )

    # Define a function to feed items into the queue
    def run_reader():
        wikidata.run(lambda item: save_to_queue(item, data_queue),
                     max_iterations=None, verbose=True)
        with finished.get_lock():
            finished.value = 1

    # Start reader in a separate process
    reader_proc = Process(target=run_reader)
    reader_proc.start()

    # Continuously yield items from the queue to the Dataset generator
    while True:
        # If reader is done AND queue is empty => stop
        if finished.value == 1 and data_queue.empty():
            break
        try:
            item = data_queue.get(timeout=1)
        except:
            continue
        if item:
            yield item

    # Wait for the reader process to exit
    reader_proc.join()

# Now process each chunk file and push to the same Hugging Face repo
HF_REPO_ID = "wikidata"  # Change to your actual repo on Hugging Face

for i in [43, 44]:
    login(token=api_key)
    builder = load_dataset_builder("philippesaade/wikidata")
    print(builder.info.splits.keys())
    if f"chunk_{i}" not in builder.info.splits.keys():
        filepath = f"../data/Wikidata/latest-all-chunks/chunk_{i}.json.gz"
        split_name = f"chunk_{i}"

        print(f"Processing {filepath} -> split={split_name}")

        # Create a Dataset from the generator
        ds_chunk = Dataset.from_generator(lambda: chunk_generator(
            filepath,
            num_processes=NUM_PROCESSES,
            queue_size=QUEUE_SIZE,
            skip_lines=SKIPLINES
        ))

        # Push each chunk as a separate "split" under the same dataset repo
        login(token=api_key)
        ds_chunk.push_to_hub(HF_REPO_ID, split=split_name)
        print(f"Chunk {i} pushed to {HF_REPO_ID} as {split_name}.")

In [None]:
login(token=api_key)
ds_chunk.push_to_hub(HF_REPO_ID, split=split_name)
print(f"Chunk {i} pushed to {HF_REPO_ID} as {split_name}.")