In [1]:
import pyarrow.dataset as ds
import os
import pandas as pd
from stqdm import stqdm as tqdm
import spacy
from spacy.tokens import DocBin
import re

In [2]:
def stream_parquet_telegram_text_column(db, batch_size=1000, columns=["message_text", "record_id"]):
    
    batch = []
    
    for record_batch in db.to_batches(columns=columns, batch_size = batch_size):
        
        col_message_text = record_batch.column(0)
        col_record_id = record_batch.column(1)

        for text, record_id in zip(col_message_text, col_record_id):
            batch.append((text.as_py(), record_id.as_py()))
            if len(batch) == batch_size:
                yield batch
                batch = []

    if batch:
        yield batch

In [3]:
def process_telegram_text_main(nlp, parquet_file, docbin_folder = "./myfolder", file_prefix = "myname", workers = 1, batch_size = 1000):

    doc2docbin = dict()

    
    db = ds.dataset(parquet_file, format = "parquet")
    n_batches = db.count_rows() // batch_size + 1

    docs = stream_parquet_telegram_text_column(db, batch_size = batch_size, columns = ["message_text", "record_id"])

    if not os.path.exists(docbin_folder):
        os.mkdir(docbin_folder)

    print("Processing docbins...")
    for i, batch in enumerate(docs):
        before = len(batch)
        batch = [(text, rid) for (text, rid) in batch if text is not None]
        after = len(batch)
        if before != after:
            print(f"Dropped {before - after} None records in batch {i}")

        doc_bin = DocBin()
        docbin_name = f"{docbin_folder}/{file_prefix}_docbin_{i}.db"
        
        for j, (doc, record_id) in enumerate(nlp.pipe(batch, n_process = workers, batch_size = batch_size, as_tuples=True)):
            
            doc2docbin[record_id] = {
                "docbin" : i,
                "line": j
            }
            
            doc_bin.add(doc)

        doc_bin.to_disk(docbin_name)
        # print(f"Docbin: {docbin_name} saved.")

    print("Saving index...")

    json_path = f"{docbin_folder}/{file_prefix}_doc2docbin.json"

    with open(json_path, "w") as d2db:
        json.dump(doc2docbin, d2db, indent = 4)

    print("Complete")



In [4]:
filepath_gen = 'S:\\ERP Raw Data\\pien\\THESIS'
filepath_parq = 'S:\\ERP Raw Data\\pien\\THESIS\\parquets'

nlp = spacy.load('nl_core_news_sm')
num_workers = os.cpu_count()    
    
for file in os.listdir(filepath_parq):
    name_base = file.removesuffix('.pqt')
    batch_size = 1000
    print(f'processing {name_base} docbins...')

    process_telegram_text_main(
        nlp=nlp, 
        parquet_file = os.path.join(filepath_parq, file),
        docbin_folder = f'{name_base}_docbins',
        file_prefix = f'{name_base}', 
        workers = num_workers,
        batch_size = batch_size
    )


processing bataafsemossel docbins...
Processing docbins...
Saving index...
Complete
processing berichten_uit_donbass docbins...
Processing docbins...
Saving index...
Complete
processing blckbxtv docbins...
Processing docbins...
Saving index...
Complete
processing carelvdf docbins...
Processing docbins...
Saving index...
Complete
processing deguldenmiddenweg docbins...
Processing docbins...
Saving index...
Complete
processing derdekamer docbins...
Processing docbins...
Saving index...
Complete
processing ditishetnieuws docbins...
Processing docbins...
Saving index...
Complete
processing european_dissident docbins...
Processing docbins...
Saving index...
Complete
processing gek_genoeg docbins...
Processing docbins...
Saving index...
Complete
processing gezond_verstand docbins...
Processing docbins...
Saving index...
Complete
processing hetkantelpunt docbins...
Processing docbins...
Saving index...
Complete
processing hetkloptniet docbins...
Processing docbins...
Saving index...
Complete
