In [1]:
pip install rdflib



In [2]:
pip install datetime



In [3]:
pip install tqdm



In [4]:
pip install psutil



In [5]:
import pandas as pd
import os
from tqdm import tqdm
import datetime
import re

from rdflib import Graph, Literal, RDF, RDFS, URIRef, Namespace
from rdflib.plugins.sparql import prepareQuery
from rdflib.namespace import XSD

In [6]:
# To measure the usage of RAM
import psutil

In [7]:
# Use your personal account!
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [8]:
global pbar

chunksize = 10000

BTP = Namespace("http://www.dei.unipd.it/~gdb/ontology/btp/")

global viaChiarini_gp, giardiniMargherita_gp, portaSanFelice_gp
viaChiarini_gp = [44.4997732567231, 11.2873095406444]
giardiniMargherita_gp = [44.4830615285162, 11.3528830371546] # via Medaro Bottonelli
portaSanFelice_gp = [44.4991470592725, 11.3270506316853]

In [9]:
# Function to populate the coils dataset
def coils_process_chunk(chunk, piece, year_dataset):
    # Graph
    chunk_set = set()

    for index, row in chunk.iterrows():

        # I check if the record is valid or not -> must have all the field not NaN
        if row['Livello'] == '' or row['tipologia'] == '' or row['codice arco'] == '':
            # I skip the record -> next record
            continue

        # else: is valid -> continue

        ## COIL:
        # -uri: coil_ + id number.
        # -attributi: hasID
        # -object properties: hasLevel, hasType, isOn, and isPlacedOn.

        Coil = "btp:coil_"+str(row['ID_univoco_stazione_spira'])

        # PollutionCoils and SimpleCoils are subclasses of Coil
        chunk_set.add("btp:PollutionCoil rdfs:subClassOf btp:Coil .")
        chunk_set.add("btp:SimpleCoil rdfs:subClassOf btp:Coil .")

        # Cast to float
        latitudine = row['latitudine']
        longitudine = row['longitudine']

        if(type(latitudine) == str):
            latitudine = latitudine.replace(',', '')
            # From 113473933293812,00 to 11.3473933293812
            latitudine = latitudine[:2] + '.' + latitudine[2:]
            # Cast to float
            latitudine = float(latitudine)
        if(type(longitudine) == str):
            longitudine = longitudine.replace(',', '')
            # From 44500438455000,00 to 44.500438455000
            longitudine = longitudine[:2] + '.' + longitudine[2:]
            longitudine = float(longitudine)

        # Pollution coils -> must be around 300 m
        if ((latitudine <= viaChiarini_gp[0] + 0.0027) and (latitudine >= viaChiarini_gp[0] + 0.0027)) and ((longitudine <= viaChiarini_gp[1] + 0.0013) and (longitudine >= viaChiarini_gp[1] - 0.0013)):
            chunk_set.add(Coil + " a btp:PollutionCoil")
            PollutionStation = "btp:controlUnitViaChiarini"
            chunk_set.add(PollutionStation + " a btp:PollutionStation .")
            chunk_set.add(PollutionStation + " btp:isNearTo " + Coil +" .")
        elif ((latitudine <= giardiniMargherita_gp[0] + 0.0027) and (latitudine >= giardiniMargherita_gp[0] + 0.0027)) and ((longitudine <= giardiniMargherita_gp[1] + 0.0013) and (longitudine >= giardiniMargherita_gp[1] - 0.0013)):
            chunk_set.add(Coil + " a btp:PollutionCoil")
            PollutionStation = "btp:controlUnitGiardiniMargherita"
            chunk_set.add(PollutionStation + " a btp:PollutionStation .")
            chunk_set.add(PollutionStation + " btp:isNearTo " + Coil +" .")
        elif ((latitudine <= portaSanFelice_gp[0] + 0.0027) and (latitudine >= portaSanFelice_gp[0] + 0.0027)) and ((longitudine <= portaSanFelice_gp[1] + 0.0013) and (longitudine >= portaSanFelice_gp[1] - 0.0013)):
            chunk_set.add(Coil + " a btp:PollutionCoil")
            PollutionStation = "btp:controlUnitPortaSanFelice"
            chunk_set.add(PollutionStation + " a btp:PollutionStation .")
            chunk_set.add(PollutionStation + " btp:isNearTo " + Coil +" .")
        else:
            chunk_set.add(Coil + " a btp:SimpleCoil")


        for i in range(2, 26):
            date_obj = datetime.datetime.strptime(str(row['data']), '%Y-%m-%d')
            VehicleDetection = "btp:vehicleDetection_"+str(row['ID_univoco_stazione_spira'])+"_"+date_obj.strftime('%Y-%m-%d')+"_"+str(i-2).zfill(2)+":00-"+str(i-1).zfill(2)+":00"
            chunk_set.add(VehicleDetection + " a btp:VehicleDetection .")
            chunk_set.add(VehicleDetection + " btp:isObserved " + Coil + " .")
            chunk_set.add(Coil + " btp:hasObserve " + VehicleDetection + " .")

        Level = "btp:level_"+str(int(row['Livello']))
        chunk_set.add(Level + " a btp:Level .")
        chunk_set.add(Coil + " btp:hasLevel " + Level + " .")

        Type = URIRef(BTP["type_"+str(row['tipologia'])])
        Type = "btp:type_"+str(row['tipologia'])
        chunk_set.add(Coil + " a btp:Type .")
        chunk_set.add(Coil + " btp:hasType " + Type + " .")

        chunk_set.add(Coil + " btp:hasID " + Literal(str(row['codice spira']), datatype=XSD.string))

        RoadArch = "btp:road_"+str(row['codice arco'])
        chunk_set.add(Coil + " a btp:RoadArch .")
        chunk_set.add(Coil + " btp:isOn " + RoadArch + " .")
        chunk_set.add(RoadArch + " btp:isPlacedOn " + Coil + " .")

    pbar.update(len(chunk))

    return chunk_set

In [10]:
# Function that populates the vehicle count dataset
def vehicle_count_process_chunk(chunk, piece, year_dataset):

    vc_set = set()

    for index, row in chunk.iterrows():

        # I check if the record is valid or not -> must have all the field not NaN
        if row['Livello'] == '' or row['tipologia'] == '':
            # I skip the record -> next record
            continue
        # else: is valid -> continue

        for i in range(2, 26):

            ## VEHICLEDETECTION:
            # -uri: vehicleDetection_ + id number + _ + date.
            # -attributi: hasCount.
            # -object properties: isObserved, hasObserve, isObservedOnPeriod, and hasObservedOnPeriod.

            date_obj = datetime.datetime.strptime(str(row['data']), '%Y-%m-%d')
            VehicleDetection = "btp:vehicleDetection_"+str(row['ID_univoco_stazione_spira'])+"_"+date_obj.strftime('%Y-%m-%d')+"_"+str(i-2).zfill(2)+":00-"+str(i-1).zfill(2)+":00"
            vc_set.add(VehicleDetection + "a btp:VehicleDetection .")

            vc_set.add(VehicleDetection + "btp:hasCount" + Literal(row.iloc[i], datatype=XSD.integer) + " .")

            # # PERIOD:
            # -uri: period_ + date + _ + hour1 + _ + hour2.
            # -attributi: startTime and endTime.
            # -object properties: onDay.

            date_obj = datetime.datetime.strptime(str(row['data']), '%Y-%m-%d')
            Period = "period_"+date_obj.strftime('%Y-%m-%d')+"_"+str(i-2).zfill(2)+":00-"+str(i-1).zfill(2)+":00"
            vc_set.add(Period + " a btp:Period .")

            vc_set.add(Period + " btp:isObservedOnPeriod " + VehicleDetection + " .")
            vc_set.add(VehicleDetection + " btp:hasObserve " + Period + " .")

            startTime = str(i-2).zfill(2)+":00"
            date_obj = datetime.datetime.strptime(str(row['data']), '%Y-%m-%d')

            vc_set.add(Period + " btp:startTime " + Literal(date_obj.strftime('%Y-%m-%d')+"T"+startTime, datatype=XSD.dateTime) + " .")

            endTime = str(i-1).zfill(2)+":00"

            # If the endTime is 24 -> date+1 and endTime = 00
            if(endTime == '24:00'):
                endTime = '00:00'
                # I add one day
                date_obj = date_obj + datetime.timedelta(days=1)

            vc_set.add(Period + " btp:endTime " + Literal(date_obj.strftime('%Y-%m-%d')+"T"+endTime, datatype=XSD.dateTime) + " .")

            ## Convert day from italian to english ex: lunedì -> monday
            day_value = ''
            if 'Giorno della settimana' in row:
                day_value = str(row['Giorno della settimana']).lower()
            elif 'giorno della settimana' in row:
                day_value = str(row['giorno della settimana']).lower()

            match day_value:
                case 'lunedì':
                    DayWeek = "btp:Monday"
                    vc_set.add(DayWeek + " a btp:DayWeek .")
                    vc_set.add(Period + " btp:onDay " + DayWeek + " .")
                case 'martedì':
                    DayWeek = "btp:Tuesday"
                    vc_set.add(DayWeek + " a btp:DayWeek .")
                    vc_set.add(Period + " btp:onDay " + DayWeek + " .")
                case 'mercoledì':
                    DayWeek = "btp:Wednesday"
                    vc_set.add(DayWeek + " a btp:DayWeek .")
                    vc_set.add(Period + " btp:onDay " + DayWeek + " .")
                case 'giovedì':
                    DayWeek = "btp:Thursday"
                    vc_set.add(DayWeek + " a btp:DayWeek .")
                    vc_set.add(Period + " btp:onDay " + DayWeek + " .")
                case 'venerdì':
                    DayWeek = "btp:Friday"
                    vc_set.add(DayWeek + " a btp:DayWeek .")
                    vc_set.add(Period + " btp:onDay " + DayWeek + " .")
                case 'sabato':
                    DayWeek = "btp:Saturday"
                    vc_set.add(DayWeek + " a btp:DayWeek .")
                    vc_set.add(Period + " btp:onDay " + DayWeek + " .")
                case 'domenica':
                    DayWeek = "btp:Sunday"
                    vc_set.add(DayWeek + " a btp:DayWeek .")
                    vc_set.add(Period + " btp:onDay " + DayWeek + " .")
                case _:
                    # No day provided
                    pass

    pbar.update(len(chunk))

    return vc_set

In [11]:
# Function that populates the vehicle accuracy dataset
def vehicle_accuracy_process_chunk(chunk):

    # Graphs
    acc_set = set()

    for index, row in chunk.iterrows():

        for i in range(2, 26):

            ## VEHICLEDETECTION:
            # -uri: vehicleDetection_ + id number + _ + date.
            # -attributi: hasAccuracy, and hasCount.

            coil = ''

            # Query to get the coil's code associated to an ID
            coil = get_coil_by_id(str(row['codice spira']))
            if coil == '':
                # I skip the record -> next record
                continue

            date_obj = datetime.datetime.strptime(str(row['data']), '%Y-%m-%d')

            VehicleDetection = "btp:vehicleDetection_"+coil+"_"+date_obj.strftime('%Y-%m-%d')+"_"+str(i-2).zfill(2)+":00-"+str(i-1).zfill(2)+":00"
            acc_set(VehicleDetection + " a btp:VehicleDetection .")
            percentage = row.iloc[i].replace('%', '')
            acc_set(VehicleDetection + " btp:hasCount " + Literal(row.iloc[i], datatype=XSD.integer) + " .")

            # # PERIOD:
            # -uri: period_ + date + _ + hour1 + _ + hour2.
            # -attributi: startTime and endTime.
            # -object properties: onDay.

            Period = "period_"+date_obj.strftime('%Y-%m-%d')+"_"+str(i-2).zfill(2)+":00-"+str(i-1).zfill(2)+":00"
            acc_set(Period + " a btp:Period .")

            acc_set(Period + " btp:isObservedOnPeriod " + VehicleDetection + " .")
            acc_set(VehicleDetection + " btp:hasObservedOnPeriod " + Period + " .")

            startTime = str(i-2).zfill(2)+":00"
            date_obj = datetime.datetime.strptime(str(row['data']), '%Y-%m-%d')

            acc_set(Period + " btp:startTime " + Literal(date_obj.strftime('%Y-%m-%d')+"T"+startTime, datatype=XSD.dateTime) + " .")

            endTime = str(i-1).zfill(2)+":00"

            # If the endTime is 24 -> date+1 and endTime = 00
            if(endTime == '24:00'):
                endTime = '00:00'
                # I add one day
                date_obj = date_obj + datetime.timedelta(days=1)

            acc_set.add(Period + " btp:endTime " + Literal(date_obj.strftime('%Y-%m-%d')+"T"+endTime, datatype=XSD.dateTime) + " .")

    pbar.update(len(chunk))

In [12]:
# Function that populates the pollution data
def pollution_process_chunk(chunk, piece, year_dataset):

    pol_set = set()

    for index, row in chunk.iterrows():

        ## POLLUTIONSTATION:
        # -uri: centralUnit + pollution name.
        # -object properties: hasRegister, and isRegistered.

        PollutionStation = "controlUnit" + (str(row['COD_STAZ']).lower()).replace(" ", "")
        pol_set.add(PollutionStation + " a btp:PollutionStation .")

        # PERIOD:
        # -uri: period_ + date + _ + hour1 + _ + hour2.
        # -attributi: startTime and endTime.
        # -object properties: onDay.

        # date format: yyyy-mm-ddThh:mm:ss+hh:mm
        # keep only the data: 'Thh:mm:ss+hh:mm' -> yyyy-mm-dd
        date_obj = datetime.datetime.strptime((str(row['DATA_INIZIO']).split('T'))[0], '%Y-%m-%d')
        # keep only the hour: 'Thh:mm:ss+hh:mm' -> hh:mm:ss
        startTime = str((((str(row['DATA_INIZIO']).split('T'))[1].split('+')[0]).split(':'))[0])+":00"
        endTime = str((((str(row['DATA_FINE']).split('T'))[1].split('+')[0]).split(':'))[0])+":00"
        Period = URIRef(BTP["period_"+date_obj.strftime('%Y-%m-%d')+"_"+startTime+"-"+endTime])

        pol_set.add(Period + " a btp:Period .")
        pol_set.add(Period + " btp:startDay " + Literal(date_obj.strftime('%Y-%m-%d'), datatype=XSD.date) + " .")
        pol_set.add(Period + " btp:endTime " + Literal(date_obj.strftime('%Y-%m-%d')+"T"+endTime, datatype=XSD.dateTime) + " .")

        ## CHEMICALDETECTION:
        # -uri: chemicalDetection_ + pollution_station_name + _ + date + _ + element.
        # -attributi: inQuantity (conversion all in ug/m), and hasChemicalName.
        # -object properties: isDetectedOnPeriod, hasDetectedOnPeriod, hasDetect, and isDetected.

        chemical_element = (row['AGENTE'].split("(")[0]).strip()
        ChemicalElement = URIRef(BTP["chemicalElement_"+chemical_element])

        date_obj = datetime.datetime.strptime((str(row['DATA_INIZIO']).split('T'))[0], '%Y-%m-%d')
        ChemicalDetection = "chemicalDetection_"+(str(row['COD_STAZ']).lower()).replace(" ", "")+"_"+date_obj.strftime('%Y-%m-%d')+"_"+startTime+"-"+endTime+"_"+chemical_element
        pol_set(ChemicalDetection + " a btp:ChemicalDetection .")

        # Cast from mg/m^3 to ug/m^3
        if(row['UM'] == 'mg/m3'):
            pol_set.add(ChemicalDetection + " btp:inQuantity " + Literal((row['VALORE']*1000), datatype=XSD.float) + " .")
        else:
            pol_set.add(ChemicalDetection + " btp:inQuantity " + Literal((row['VALORE']), datatype=XSD.float) + " .")

        ## CHEMICALELEMENT:
        # -uri: chemicalElement_ + chemical element name.
        # -object properties: hasDetect, and isDetected

        pol_set.add(ChemicalElement + " a btp:ChemicalElement .")
        pol_set.add(ChemicalDetection + " btp:hasDetected " + ChemicalElement + " .")
        pol_set.add(ChemicalElement + " btp:isDetected " + ChemicalDetection + " .")

        if len(row['AGENTE'].split("(")) > 1:
            chemical_element_name = (((row['AGENTE'].split("(")[1]).replace(")","")).strip()).lower()

            match chemical_element_name:
                case 'benzene':
                    pol_set.add(ChemicalDetection + " btp:hasChemicalName " + Literal("Benzene", datatype=XSD.string) + " .")
                case 'monossido di carbonio':
                    pol_set.add(ChemicalDetection + " btp:hasChemicalName " + Literal("Carbon monoxide", datatype=XSD.string) + " .")
                case 'monossido di azoto':
                    pol_set.add(ChemicalDetection + " btp:hasChemicalName " + Literal("Nitrogen Monoxide", datatype=XSD.string) + " .")
                case 'biossido di azoto':
                    pol_set.add(ChemicalDetection + " btp:hasChemicalName " + Literal("Nitrogen dioxide", datatype=XSD.string) + " .")
                case 'ossidi di azoto':
                    pol_set.add(ChemicalDetection + " btp:hasChemicalName " + Literal("Nitrogen oxides", datatype=XSD.string) + " .")
                case 'ozono':
                    pol_set.add(ChemicalDetection + " btp:hasChemicalName " + Literal("Ozone", datatype=XSD.string) + " .")
                case _:
                    # New element provided
                    pol_set.add(ChemicalDetection + " btp:hasChemicalName " + Literal(chemical_element_name, datatype=XSD.string) + " .")

    pbar.update(len(chunk))

In [13]:
# Function to save a graph
def save_graph(set, path):

    with open(path, 'w') as file:

        file.write('@prefix btp: ' + BTP + ' .\n')
        file.write('@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .\n')
        file.write('@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .\n')

        for elem in set:
            file.write(elem)

In [14]:
def get_coil_by_id(coil_id):
    # Graph
    g_coils = Graph()
    g_coils.parse('/content/coils_populated.ttl', format='turtle')

    code_coil_query = prepareQuery("""
    SELECT DISTINCT ?coil WHERE {
        ?coil btp:hasID ?id .
    FILTER (?id = ?coil_id)
                               }""" , initNs={"btp": BTP})

    res = g_coils.query(code_coil_query, initBindings={'coil_id':Literal(coil_id, datatype=XSD.string)})
    if res == [] or res == None:
        return ''
    else:
        for r in res:
            return str(r.coil).replace('http://www.dei.unipd.it/~gdb/ontology/btp/coil_', '')

In [15]:
## Datasets

# Rilevazione flusso datasets
rilevazione_flusso = []

# ONLY FOR TEST
# rilevazione_flusso.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/test/rilevazione_flusso_veicoli_2019.csv')

rilevazione_flusso.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/rilevazione_flusso_veicoli_2019.csv')
# rilevazione_flusso.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/rilevazione_flusso_veicoli_2020.csv')
# rilevazione_flusso.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/rilevazione_flusso_veicoli_2021.csv')
rilevazione_flusso.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/rilevazione_flusso_veicoli_2022.csv')

# Accuratezza spire datasets
accuratezza_spire = []

# ONLY FOR TEST
# accuratezza_spire.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/test/accuratezza_spire_2019.csv')

accuratezza_spire.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/accuratezza_spire_2019.csv')
# accuratezza_spire.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/accuratezza_spire_2020.csv')
# accuratezza_spire.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/accuratezza_spire_2021.csv')
accuratezza_spire.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/accuratezza_spire_2022.csv')

# Centraline qualità datasets
centraline = []

# ONLY FOR TEST
# centraline.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/test/dati_centraline_2019.csv')

centraline.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/dati_centraline_2019.csv')
# centraline.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/dati_centraline_2020.csv')
# centraline.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/dati_centraline_2021.csv')
centraline.append('/content/drive/MyDrive/Colab Notebooks/Graph Database/datasets/dati_centraline_2022.csv')

# Save path
save_path = '/content/drive/MyDrive/Colab Notebooks/Graph Database/rdf'

In [16]:
# I check if the folder is empty or not
if not os.listdir(save_path) == []:
    print("The folder is not empty, do you want to continue? (y/n)")
    answer = input()
    if(answer.lower() == 'y'):
        # I remove all the files in the folder
        print("Removing all the files in the folder ...")
        for file in os.listdir(save_path):
            os.remove(os.path.join(save_path, file))
        print("DONE!")
    else:
        exit()

In [None]:
print("--- populating coils ---")

coils_set = set()

for namefile in rilevazione_flusso:

    year_dataset = namefile.split('_')[3].split('.')[0]
    piece = 0

    total_rows = len(pd.read_csv(namefile))
    pbar = tqdm(total=total_rows)

    for chunk in pd.read_csv(namefile, sep=';', chunksize=chunksize):

        # Manage NaN values
        chunk = chunk.fillna('')

        # Add the coils to the set
        coils_set.update(coils_process_chunk(chunk, piece, year_dataset))

        # Memory monitor
        if psutil.virtual_memory().percent > 85:
            save_graph(coils_set, '/content/coils_populated_'+year_dataset+'_'+str(piece)+'.txt')
            # Reset the set
            coils_set.clear()
            piece += 1
        break

    save_graph(coils_set, '/content/coils_populated_'+year_dataset+'_'+str(piece)+'.ttl')
    coils_set.clear()

    pbar.close()

--- populating coils ---


  3%|▎         | 10000/287747 [00:17<07:56, 583.30it/s]
  0%|          | 0/302872 [00:00<?, ?it/s]

In [None]:
# Free memory
del coils_set

In [None]:
print("--- populating coils ---")

vehicle_count_set = set()

for namefile in rilevazione_flusso:

    break

    year_dataset = namefile.split('_')[3].split('.')[0]
    piece = 0

    total_rows = len(pd.read_csv(namefile))
    pbar = tqdm(total=total_rows)

    for chunk in pd.read_csv(namefile, sep=';', chunksize=chunksize):

        # Manage NaN values
        chunk = chunk.fillna('')

        # Add the coils to the set
        vehicle_count_set.update(vehicle_count_process_chunk(chunk, piece, year_dataset))

        # Memory monitor
        if psutil.virtual_memory().percent > 85:
            save_graph(vehicle_count_set, '/content/vehicle_count_populated_'+year_dataset+'_'+str(piece)+'.txt')
            # Reset the set
            vehicle_count_set.clear()
            piece += 1

    save_graph(vehicle_count_set, '/content/vehicle_count_populated_'+year_dataset+'_'+str(piece)+'.txt')
    vehicle_count_set.clear()

    pbar.close()

In [None]:
# Free memory
del vehicle_count_set

In [None]:
print("--- populating vehicle accuracy ---")

acc_set = set()

for namefile in rilevazione_flusso:

    year_dataset = namefile.split('_')[3].split('.')[0]
    piece = 0

    total_rows = len(pd.read_csv(namefile))
    pbar = tqdm(total=total_rows)

    for chunk in pd.read_csv(namefile, sep=';', chunksize=chunksize):

        # Manage NaN values
        chunk = chunk.fillna('')

        # Add the coils to the set
        acc_set.update(coils_process_chunk(chunk, piece, year_dataset))

        # Memory monitor
        if psutil.virtual_memory().percent > 85:
            save_graph(coils_set, '/content/coils_populated_'+year_dataset+'_'+str(piece)+'.txt')
            # Reset the set
            acc_set.clear()
            piece += 1
        break

    save_graph(coils_set, '/content/coils_populated_'+year_dataset+'_'+str(piece)+'.ttl')
    acc_set.clear()

    pbar.close()

In [None]:
# Free memory
del acc_set

In [None]:
print("--- populating pollution data ---")

pollution_set = set()

for namefile in rilevazione_flusso:

    year_dataset = namefile.split('_')[3].split('.')[0]
    piece = 0

    total_rows = len(pd.read_csv(namefile))
    pbar = tqdm(total=total_rows)

    for chunk in pd.read_csv(namefile, sep=';', chunksize=chunksize):

        # Manage NaN values
        chunk = chunk.fillna('')

        # Add the coils to the set
        pollution_set.update(pollution_process_chunk(chunk, piece, year_dataset))

        # Memory monitor
        if psutil.virtual_memory().percent > 85:
            save_graph(coils_set, '/content/pollution_populated_'+year_dataset+'_'+str(piece)+'.txt')
            # Reset the set
            pollution_set.clear()
            piece += 1
        break

    save_graph(coils_set, '/content/pollution_populated_'+year_dataset+'_'+str(piece)+'.ttl')
    pollution_set.clear()

    pbar.close()

In [None]:
# Free memory
del pollution_set