In [None]:
import re
import pandas as pd
from rdflib import RDF, RDFS, XSD, Graph, Literal, Namespace, URIRef
from rdflib.namespace import OWL
from sentence_transformers import SentenceTransformer, util
import ast
import gc 
import os
import time


def sanitize_for_uri(value) -> str:
    """
    Generic sanitization function for URIs

    :param value: value to sanitize

    :return: sanitized value
    """
    return re.sub(r"[^a-zA-Z0-9_]", "", str(value))

UNICA = Namespace("https://github.com/tail-unica/kgeats/")
SCHEMA = Namespace("https://schema.org/")

dizionario_hum = {}
dizionario_off = {}

hum_file = "../csv_file/pp_recipes_normalized_by_pipeline.csv"
off_file = "../csv_file/off_normalized_final.csv"
hum_off_file = "../csv_file/file_off_hummus.csv"
file_output_nt =  "../csv_file/ontology_merge.nt"

chunksize = 100000
cont_chunk = 0

for df_off_chunk in pd.read_csv(off_file, sep="\t", on_bad_lines="skip", chunksize=chunksize, low_memory=False, usecols=["product_name_normalized", "code"]):
    print(f"Processing rows off from {chunksize * cont_chunk} to {chunksize * (cont_chunk+1)}")
    
    for idx, row in df_off_chunk.iterrows():
        if(row["product_name_normalized"] != None and row["product_name_normalized"] != ""):
            id = URIRef(value=UNICA[f"Recipe_off_{row["code"]}"])
            if id != None:
                if row["product_name_normalized"] not in dizionario_off:
                    dizionario_off[row["product_name_normalized"]] = [id]
                else: 
                    dizionario_off[row["product_name_normalized"]].append(id)
    cont_chunk += 1

cont_chunk = 0
for df_hum_chunk in pd.read_csv(hum_file, sep=";", on_bad_lines="skip", chunksize=chunksize, low_memory=False, usecols=["title_normalized", "recipe_id"]):
    print(f"Processing rows hummus from {chunksize * cont_chunk} to {chunksize * (cont_chunk+1)}")
    
    for idx, row in df_hum_chunk.iterrows():
        if(row["title_normalized"] != None and row["title_normalized"] != ""):
            id = URIRef(UNICA[f"Recipe_hummus{sanitize_for_uri(row['recipe_id'])}"])
            if id != None:
                if row["title_normalized"] not in dizionario_hum:
                    dizionario_hum[row["title_normalized"]] = [id]
                else: 
                    dizionario_hum[row["title_normalized"]].append(id)
    cont_chunk += 1


Processing rows off from 0 to 100000
Processing rows off from 100000 to 200000
Processing rows off from 200000 to 300000
Processing rows off from 300000 to 400000
Processing rows off from 400000 to 500000
Processing rows off from 500000 to 600000
Processing rows off from 600000 to 700000
Processing rows off from 700000 to 800000
Processing rows off from 800000 to 900000
Processing rows off from 900000 to 1000000
Processing rows off from 1000000 to 1100000
Processing rows off from 1100000 to 1200000
Processing rows off from 1200000 to 1300000
Processing rows off from 1300000 to 1400000
Processing rows off from 1400000 to 1500000
Processing rows off from 1500000 to 1600000
Processing rows off from 1600000 to 1700000
Processing rows off from 1700000 to 1800000
Processing rows off from 1800000 to 1900000
Processing rows off from 1900000 to 2000000
Processing rows off from 2000000 to 2100000
Processing rows off from 2100000 to 2200000
Processing rows off from 2200000 to 2300000
Processing r

In [None]:
numchunk = 0
chunksize = 10
contatore = 0

total_lines = sum(1 for _ in open(hum_off_file, encoding="utf-8")) - 1
total_chunks = (total_lines // chunksize) + 1
start_total = time.time()


for df_merge_chunk in pd.read_csv(hum_off_file, sep=",", on_bad_lines="skip", chunksize=chunksize, low_memory=False, usecols=["title_normalized", "product_name_normalized"]):
    chunk_start = time.time()
    if numchunk % 10 == 0:
        print(f"\nProcessing chunk {numchunk+1}/{total_chunks}")

    for row in df_merge_chunk.itertuples(index=False):
        title = row.title_normalized
        product = row.product_name_normalized

        if title in dizionario_hum and product in dizionario_off:
            for hum_ricetta in dizionario_hum[title]:
                for off_ricetta in dizionario_off[product]: 
                    contatore += 1

    del df_merge_chunk
    gc.collect() 

    chunk_time = time.time() - chunk_start
    avg_time_per_chunk = (time.time() - start_total) / (numchunk + 1)
    remaining_chunks = total_chunks - (numchunk + 1)
    est_remaining = avg_time_per_chunk * remaining_chunks
    if numchunk % 10 == 0:
        print(f"Chunk time: {chunk_time:.2f}s — Estimated remaining: {est_remaining/60:.1f} min")
        print(contatore)
    numchunk += 1

total_time = time.time() - start_total
print(f"\nTotal processing time: {total_time/60:.2f} minutes")



Processing chunk 1/3779657
Chunk time: 1.32s — Estimated remaining: 83530.6 min
564333

Processing chunk 11/3779657
Chunk time: 1.29s — Estimated remaining: 81831.1 min
1076368

Processing chunk 21/3779657
Chunk time: 1.30s — Estimated remaining: 184634.7 min
565903354

Processing chunk 31/3779657
Chunk time: 1.29s — Estimated remaining: 151398.6 min
566181524

Processing chunk 41/3779657
Chunk time: 1.31s — Estimated remaining: 134532.7 min
568785803

Processing chunk 51/3779657
Chunk time: 1.30s — Estimated remaining: 124213.4 min
570103708

Processing chunk 61/3779657
Chunk time: 1.29s — Estimated remaining: 117217.8 min
570408113

Processing chunk 71/3779657
Chunk time: 1.29s — Estimated remaining: 112194.9 min
570825646

Processing chunk 81/3779657
Chunk time: 1.29s — Estimated remaining: 108406.0 min
570869486

Processing chunk 91/3779657
Chunk time: 1.29s — Estimated remaining: 105453.3 min
570871354

Processing chunk 101/3779657
Chunk time: 1.30s — Estimated remaining: 103091.

Exception ignored in: <bound method IPythonKernel._clean_thread_parent_frames of <ipykernel.ipkernel.IPythonKernel object at 0x7fe0566ef230>>
Traceback (most recent call last):
  File "/home/gzedda/miniconda3/envs/ambientez/lib/python3.13/site-packages/ipykernel/ipkernel.py", line 775, in _clean_thread_parent_frames
    def _clean_thread_parent_frames(
KeyboardInterrupt: 



Processing chunk 401/3779657
Chunk time: 1.31s — Estimated remaining: 83988.5 min
618372961


In [6]:
numchunk = 0
chunksize = 1

total_lines = 5 #sum(1 for _ in open(hum_off_file, encoding="utf-8")) - 1
total_chunks = (total_lines // chunksize) + 1
start_total = time.time()

with open(file_output_nt, "w", encoding="utf-8") as f_out:

    for df_merge_chunk in pd.read_csv(hum_off_file, sep=",", on_bad_lines="skip", chunksize=chunksize, low_memory=False, usecols=["title_normalized", "product_name_normalized"], nrows=5):
        chunk_start = time.time()
        if numchunk % 100 == 0:
            print(f"\nProcessing chunk {numchunk+1}/{total_chunks}")

        for row in df_merge_chunk.itertuples(index=False):
            title = row.title_normalized
            product = row.product_name_normalized

            if title in dizionario_hum and product in dizionario_off:
                for hum_ricetta in dizionario_hum[title]:
                    for off_ricetta in dizionario_off[product]: 
                        triple_str = f"<{off_ricetta}> <https://schema.org/sameAs> <{hum_ricetta}> .\n"
                        f_out.write(triple_str)

        del df_merge_chunk
        gc.collect() 

        chunk_time = time.time() - chunk_start
        avg_time_per_chunk = (time.time() - start_total) / (numchunk + 1)
        remaining_chunks = total_chunks - (numchunk + 1)
        est_remaining = avg_time_per_chunk * remaining_chunks
        if numchunk % 100 == 0:
            print(f"Chunk time: {chunk_time:.2f}s — Estimated remaining: {est_remaining/60:.1f} min")
        numchunk += 1

    total_time = time.time() - start_total
    print(f"\nTotal processing time: {total_time/60:.2f} minutes")



Processing chunk 1/6
Chunk time: 1.29s — Estimated remaining: 0.1 min

Total processing time: 0.11 minutes
