In [41]:
!pip install pyarrow




[notice] A new release of pip is available: 23.2.1 -> 24.0
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import requests
import json
import math

def crear_rambla(id, dateLastValueReported, description, locality, postalCode, source, location, 
                 section, nextSection, previousSection, name, isPartOf):
    rambla_content = {
        "id": f"urn:ngsi-ld:Ravine:{id}",
        "identifier":{
            "type": "Property",
            "value": f"Ravine:{id}"
        },
        "type": "Ravine",
        "description": {
            "type": "Property",
            "value": description
        },
        "address": {
            "type": "Property",
            "value": {
                "addressCountry": "ES",
                "addressRegion": "Murcia",
                "addressLocality": locality,
                "postalCode": postalCode,
            },
            "verified": {
                "type": "Property",
                "value": True
            }
        },
        "source": {
            "type": "Property",
            "value": source
        },
        "location": {
            "type": "GeoProperty",
            "value": {
                "type": "Point",
                "coordinates": location
            }
        },
        "name": {
            "type": "Property",
            "value": name
        },
        "@context": "https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld"
    }

    # Función auxiliar para agregar una propiedad si no es None
    def add_property_if_not_none(property_name, value):
            
        if isinstance(value, str):
            rambla_content[property_name] = {
                "type": "Relationship",
                "object": value
            }
        elif value is not None:
            rambla_content[property_name] = {
                "value": value
            }        

    # Agregar propiedades si no son None
    add_property_if_not_none("nextSection", nextSection)
    add_property_if_not_none("previousSection", previousSection)
    add_property_if_not_none("section", section)
    add_property_if_not_none("isPartOf", isPartOf)

    url = "http://localhost:1026/ngsi-ld/v1/entities/"

    headers = {
        "Content-Type": "application/ld+json"
    }
    
    response = requests.post(url, headers=headers, json=rambla_content)
    

In [2]:
def crear_device(id, alternateName, controlled_entity, controlled_properties, description, postalCode, locality, location, name, source, dateLastValueReported):
    device_content = {
        "id": f"urn:ngsi-ld:Device:{id}",
        "alternateName": {
            "type": "Property",
            "value": alternateName
        },
        "areaServed": {
            "type": "Property",
            "value": "Mar Menor"
        },
        "type": "Device",
        "controlledAsset" : {
            "type": "Relationship",
            "object": [
                controlled_entity
            ]
        },
        "controlledProperty": {
            "type": "Property",
            "value": controlled_properties
        },
        "dateLastValueReported": {
            "type": "Property",
            "value": dateLastValueReported
        },
        "description": {
            "type": "Property",
            "value": description
        },
        "deviceCategory": {
            "type": "Property",
            "value": [
                "sensor"
            ]
        },
        "address": {
            "type": "Property",
            "value": {
                "addressCountry": "ES",
                "addressRegion": "Murcia",
                "addressLocality": locality,
                "postalCode": postalCode
            },
            "verified": {
                "type": "Property",
                "value": True
            }
        },
        "source": {
            "type": "Property",
            "value": source
        },
        "location": {
            "type": "GeoProperty",
            "value": {
                "type": "Point",
                "coordinates": location
            }
        },
        "name": {
            "type": "Property",
            "value": name
        },
        "@context": "https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld"
    }

    url = "http://localhost:1026/ngsi-ld/v1/entities/"

    headers = {
        "Content-Type": "application/ld+json"
    }
    response = requests.post(url, headers=headers, json=device_content)
    print("Device")
    print(device_content)
    print(response.status_code)

In [3]:
def crear_device_measurement(id, deviceType, value, controlledProperty, observedAt, depth, target , refDevice, unitCode, name, source, dateLastValueReported):
    device_measure_content = {
        "id": f"urn:ngsi-ld:DeviceMeasurement:{id}",
        "deviceType": {
            "type": "Property",
            "value": deviceType
        },
        "type": "DeviceMeasurement",
        "controlledProperty": {
            "type": "Property",
            "value": controlledProperty
        },
        "dateLastValueReported": {
            "type": "Property",
            "value": dateLastValueReported
        },
        "numValue":{
            "type": "Property",
            "value": value,
            "observedAt" : observedAt,
        },
        "refDevice": {
            "type": "Relationship",
            "object": f"urn:ngsi-ld:Device:{refDevice}",
        },
        "source": {
            "type": "Property",
            "value": source
        },
        "name": {
            "type": "Property",
            "value": name
        },
        "@context": "https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld"
    }

    if unitCode is not None:
        device_measure_content["unit"] = {
            "type": "Property",
            "value": unitCode
        }
    
    if math.isnan(value):
        device_measure_content["numValue"]["value"] = -99

    if depth is not None:
        device_measure_content["depth"] = {
            "type": "Property",
            "value": depth
        }

    if target is not None:
        device_measure_content["target"] = {
            "type": "Property",
            "value": target
        }


    url = "http://localhost:1026/ngsi-ld/v1/entities/"

    headers = {
        "Content-Type": "application/ld+json"
    }
    print("DeviceMeasurement")
    print(device_measure_content)
    
    response = requests.post(url, headers=headers, json=device_measure_content)
    print(response.status_code)


In [4]:
def crear_water_network(id, isComposedOf, description, name, location = [[[37.7138,-1.1874], [37.7157, -0.8588], [37.7646, -1.1439], [37.7138,-1.1874]]]):
    water_network_content = {
        "id": f"urn:ngsi-ld:WaterNetwork:{id}",
        "type": "WaterNetwork",
        "isComposedOf": isComposedOf,
        "description": {
            "type": "Property",
            "value": description
        },
        "location": {
            "type": "GeoProperty",
            "value": {
                "type": "Polygon",
                "coordinates": location
            }
        },
        "address": {
            "type": "Property",
            "value": {
                "addressCountry": "ES",
                "addressRegion": "Murcia"
            },
            "verified": {
                "type": "Property",
                "value": True
            }
        },
        "name": {
            "type": "Property",
            "value": name
        },
        "@context": "https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld"
    }

    url = "http://localhost:1026/ngsi-ld/v1/entities/"

    headers = {
        "Content-Type": "application/ld+json"
    }
    response = requests.post(url, headers=headers, json=water_network_content)
    print("WaterNetwork")
    print(response.status_code)

In [5]:
def crear_piezometric_net(id, description, name, source = "http://155.54.95.167/" , location = [[[37.6410,-0.7628], [37.7171, -0.8621], [37.8293,-0.7844], [37.8367, -0.8037], [37.7225, -0.9086], [37.6178, -0.7755], [37.6410,-0.7628] ]]):
    water_network_content = {
        "id": f"urn:ngsi-ld:PiezometricNet:{id}",
        "identifier": {
            "type": "Property",
            "value": f"urn:ngsi-ld:PiezometricNet:{id}"
        },
        "type": "PiezometricNet",
        "address": {
            "type": "Property",
            "value": {
                "addressCountry": "ES",
                "addressRegion": "Murcia"
            },
            "verified": {
                "type": "Property",
                "value": True
            }
        },
        "description": {
            "type": "Property",
            "value": description
        },
        "location": {
            "type": "GeoProperty",
            "value": {
                "type": "Polygon",
                "coordinates": location
            }
        },
        "name" : {
            "type": "Property",
            "value": name
        },
        "source": {
            "type": "Property",
            "value": source
        },
        "@context": "https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld"
    }

    url = "http://localhost:1026/ngsi-ld/v1/entities/"

    headers = {
        "Content-Type": "application/ld+json"
    }
    response = requests.post(url, headers=headers, json=water_network_content)
    print("PiezometricNet")
    print(response.status_code)

In [6]:
import math

def crear_sounding(id, locality, postalCode, location, description, source, dateLastValueReported, numberInNetwork, name, isPartOf):

    piezometer_content = {
        "id": f"urn:ngsi-ld:SoundingPlace:{id}",
        "identifier":{
            "type": "Property",
            "value": f"SoundingPlace:{id}"
        },
        "type": "SoundingPlace",
        "category": {
            "type": "Property",
            "value": "sensor"
        },
        "address": {
            "type": "Property",
            "value": {
                "addressCountry": "ES",
                "addressRegion": "Murcia",
                "addressLocality": locality,
                "postalCode": postalCode
            },
            "verified": {
                "type": "Property",
                "value": True
            }
        },
        "location": {
            "type": "GeoProperty",
            "value": {
                "type": "Point",
                "coordinates": location
            }
        },
        "description": {
            "type": "Property",
            "value": description
        },
        "source": {
            "type": "Property",
            "value": source
        },
        "numberInNetwork": {
            "type": "Property",
            "value": numberInNetwork
        },
         "name": {
            "type": "Property",
            "value": name
        },
        "@context": "https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld"
    }

    if isPartOf is not None:
        piezometer_content["isPartOf"] = {
            "type": "Relationship",
            "object": isPartOf
        }

    url = "http://localhost:1026/ngsi-ld/v1/entities/"

    headers = {
        "Content-Type": "application/ld+json"
    }


    response = requests.post(url, headers=headers, json=piezometer_content)
    print(piezometer_content)
    print(response)

In [7]:
import requests
import json

def crear_boya(id, dateLastValueReported, description, postalCode, locality, location, name, source, closeMeasurements=None):
    buoy_content = {
        "id": f"urn:ngsi-ld:Buoy:{id}",
        "identifier":{
            "type": "Property",
            "value":  f"Buoy:{id}"
        },
        "type": "Buoy",
        "description": {
            "type": "Property",
            "value": description
        },
        "address": {
            "type": "Property",
            "value": {
                "addressCountry": "ES",
                "addressRegion": "Murcia",
                "addressLocality": locality,
                "postalCode": postalCode
            },
        },
        "location": {
            "type": "GeoProperty",
            "value": {
                "type": "Point",
                "coordinates": location
            }
        },
        "name": {
            "type": "Property",
            "value": name
        },
        "source": {
            "type": "Property",
            "value": source
        },
        "closeMeasurements": closeMeasurements,

        "@context": "https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld"
    }
    
    if closeMeasurements is None:
        del buoy_content["closeMeasurements"]

    url = "http://localhost:1026/ngsi-ld/v1/entities/"

    headers = {
        "Content-Type": "application/ld+json"
    }

    try:
        response = requests.post(url, headers=headers, json=buoy_content)
        response.raise_for_status()  # Lanza una excepción para códigos de estado HTTP no exitosos
    except requests.exceptions.RequestException as e:
        print(f"Error creating entity: {e}")

In [8]:
def modify_entity( value, observedAt, unitcode, url, depth = None, place = None, isComposedOf = False):

    headers = {
        'Content-Type': 'application/json',
        'Link': '<https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"'
    }

    if isinstance(value, float) and  math.isnan(value):
        value = -99

    modify_content = {
        "numValue": {
            "type": "Property",
            "value": value,
            "observedAt": observedAt
        },
        "dateLastValueReported": {
            "type": "Property",
            "value": observedAt
        }
    }

    if depth is not None:
        modify_content["numValue"]["depth"] = {
            "type": "Property",
            "value": depth
        }
    
    if place is not None:
        modify_content["numValue"]["measurementPlace"] = {
            "type": "Property",
            "value": place
        }

    if unitcode is not None:
        modify_content["numValue"]["unitCode"] = unitcode

    if isComposedOf:
        modify_content = {
            "isComposedOf": value
        }


    print("modify_content")
    print(modify_content)
    response = requests.patch(url, headers=headers, json=modify_content)
    print(response.status_code)
    try:
        if response.status_code != 204:
            print(response.text)
    finally:
        response.close()

In [9]:
def create_attribute(newAttributeName, value, observedAt, url, multipleValue = None):

    headers = {
        'Content-Type': 'application/json',
        'Link': '<https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"'
    }

    if multipleValue is not None:
        modify_content = {
            newAttributeName: {
                 "value": [multipleValue]
            }
        }
    elif observedAt is None:
        modify_content = {
            newAttributeName: {
                "value": value
            }
        }
    else:       
        modify_content = {
            newAttributeName: {
                "value": value,
                "observedAt": observedAt
            }
        }

    response = requests.post(url, headers=headers, json=modify_content)
    try:
        if response.status_code != 204:
            print(response.text)
    finally:
        response.close()

In [10]:
def search_close_entities(coordinate_lat,coordinate_long, id_entity):
    url = 'http://localhost:1026/ngsi-ld/v1/entities'
    headers = {
        'Accept': 'application/json',
        'Link': '<https://raw.githubusercontent.com/mariete1223/MarMenor/main/data_models_description/datamodels.context-ngsi.jsonld>; rel="http://www.w3.org/ns/json-ld#context"; type="application/ld+json"'
    }

    params = {
        'geometry': 'Point',
        'coordinates': f'[{coordinate_lat},{coordinate_long}]',
        'georel': 'near;maxDistance==4000',
        'options': 'keyValues',
    }

    response = requests.get(url, headers=headers, params=params)

    data = response.json()

    filtered_data = [entity.get("id",) for entity in data if (id_entity not in entity.get('id', '') )]
    
    filtered_data = [ entity for entity in filtered_data if "Buoy" in entity or "SoundingPlace" in entity or "Ravine" in entity] 

    return filtered_data 

In [52]:
''' NO HAY DATOS SEMANALES PARA BOYAS CTD
import os
import glob
import pandas as pd
import time

# Especifica la ruta de la carpeta principal
carpeta = "C:/Users/marie/OneDrive/Escritorio/Antiguo ordenador/Universidad/Master 1º/TFM/NGSI-LD/marMenorDockerCompose/node-app/historicData/SAIHdatasActualizados/BoyasProf/"

# Utiliza os.listdir() para obtener una lista de elementos en la carpeta
elementos_en_carpeta = os.listdir(carpeta)

# Filtra solo las carpetas en la lista de elementos
carpetas = [elemento for elemento in elementos_en_carpeta if os.path.isdir(os.path.join(carpeta, elemento))]

# DataFrame vacío para almacenar los datos
dfs = {}

# Imprime la lista de carpetas
for carpeta_actual in carpetas:
    print(carpeta_actual)
    df_total = pd.DataFrame()
    archivos_csv = glob.glob(os.path.join(carpeta+carpeta_actual, "*.csv"))
    for archivo in archivos_csv:
        
        nombre_archivo = archivo.split("\\")[-1].split(".")[0]
        df_temporal = pd.read_csv(archivo)
        df_temporal.columns = [df_temporal.columns[0]] + [f"{nombre_archivo}_{col.split('_')[1]}" for col in df_temporal.columns[1:]]
        df_temporal = df_temporal.loc[:, ~df_temporal.columns.str.contains("None")]
        
        if(len(df_total) == 0):
            df_total = df_temporal
        else:
            #Hacer un merge de df_total y df_temporal por la columna "Date"
            df_total = pd.merge(df_total, df_temporal, on="Date", how="outer")
    dfs[carpeta_actual] = df_total

for key, df in dfs.items():
    # Convierte la columna "Date" a tipo datetime
    df["Date"] = pd.to_datetime(df["Date"])
    # Formatea la columna "Date" en el nuevo formato
    df["Date"] = df["Date"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")
'''

CTD-E1
CTD-E10
CTD-E11
CTD-E12
CTD-E2
CTD-E3
CTD-E4
CTD-E5
CTD-E6
CTD-E7


CTD-E8
CTD-E9


In [53]:
'''
import psycopg2
from datetime import datetime
import uuid
import pickle

maper_property_unitcode = { "Temperatura": "CEL", "Conductividad": "microseconds/cm", "Piezometrico":"msnm", "Salinidad":"PSU",
       "Total solido disuelto (TDS)": "mg/l", "Materia Organiza":None,"Clorofila":None,"PH":"pH",
       "Polietileno":None,"Turbidez":"NTU","Transparencia":None,"Oxigeno":None
}

# Abre el archivo .pkl en modo de lectura binaria
with open('./variables_entidades/boya_entity.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    boya_entity = pickle.load(f)
with open('./variables_entidades/link_boya_sensores.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    link_boya_sensores = pickle.load(f)

# Conectarse a la base de datos
conn = psycopg2.connect(
    host="localhost",
    port="5432",
    database="orion",
    user="orion",
    password="orion"
)

cursor = conn.cursor()
datos_a_insertar = []

# Recorremos las distintas Boyas
for carpeta, valorCarpeta in dfs.items():
    # Obtenemos el ID de la boya a partir del nombre de la carpeta
    id_entity = boya_entity[carpeta]
    sensores = list(set( val.split("_")[0] for val in valorCarpeta.columns))
    # Recorremos por cada fila del csv
    for index, row in valorCarpeta.iterrows():
        if index == len(valorCarpeta)-1:
            continue
        # Para cada columna del csv
        for contador,valorSensor in enumerate(sensores):
            if valorSensor == "Date":
                continue
            
            # Recorremos todas las columnas que empiecen por ese sensor ya que en los csv hay varias segun la profundidad
            for number, columnName in enumerate([ col for col in valorCarpeta.columns if col.startswith(valorSensor)]):
                depth = columnName.split("_")[1]
                depth = float(depth)
                
                # Recuperamos el ID del device measurement a partir de la entidad y el nombre de la propiedad controlada
                id_device_measurement = link_boya_sensores[id_entity][valorSensor][number]

                fecha_hora = datetime.strptime(row["Date"], "%Y-%m-%dT%H:%M:%SZ")
                
                value = row[columnName]
                if math.isnan(row[columnName]):
                    value = -99

                # Almacenamos en una variable los datos que vamos a insertar habiendonos fijado en el esquema de la base de datos
                datos_a_insertar.append(
                    (f'urn:ngsi-ld:attribute:{uuid.uuid4()}', 'https://smartdatamodels.org/dataModel.DeviceMeasurement/numValue', 'Replace', id_device_measurement, fecha_hora, True, maper_property_unitcode[valorSensor], 'None', 'Number', None, None, value, None, None, None, None, None, None, None, None, datetime.now())
                )
                
                # En vez de insertar los datos en la base de datos cada vez que se recorre una fila del csv,
                # se almacenan en una lista y se insertan en la base de datos cuando la lista tiene más de 100000 filas
                if len(datos_a_insertar) > 100000:
                    consulta_insercion = f"INSERT INTO attributes VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
                    print(f"Insertando {len(datos_a_insertar)} filas en la tabla attributes")
                    try:
                        # Ejecutar la consulta para insertar los datos
                        cursor.executemany(consulta_insercion, datos_a_insertar)
                        conn.commit()
                        datos_a_insertar = []
                        print(f"Se han insertado {cursor.rowcount} filas en la tabla attributes")
                    except (Exception, psycopg2.DatabaseError) as error:
                        conn.rollback()
                    # Confirmar la transacción
                        print(f"Error: {error}")

# Insertar los datos restantes en la base de datos
if len(datos_a_insertar) > 0:
    consulta_insercion = f"INSERT INTO attributes VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
    print(f"Insertando {len(datos_a_insertar)} filas en la tabla attributes")
    try:
        # Ejecutar la consulta para insertar los datos
        cursor.executemany(consulta_insercion, datos_a_insertar)
        conn.commit()
        datos_a_insertar = []
        print(f"Se han insertado {cursor.rowcount} filas en la tabla attributes")
    except (Exception, psycopg2.DatabaseError) as error:
        conn.rollback()
    # Confirmar la transacción
        print(f"Error: {error}")

                

# Cerrar el cursor y la conexión
cursor.close()
conn.close()
'''

Insertando 100001 filas en la tabla attributes
Se han insertado 100001 filas en la tabla attributes
Insertando 100001 filas en la tabla attributes
Se han insertado 100001 filas en la tabla attributes
Insertando 100001 filas en la tabla attributes
Se han insertado 100001 filas en la tabla attributes
Insertando 100001 filas en la tabla attributes
Se han insertado 100001 filas en la tabla attributes
Insertando 24986 filas en la tabla attributes
Se han insertado 24986 filas en la tabla attributes


In [54]:
'''
for carpeta, valorCarpeta in dfs.items():
    # Obtenemos los ultimos valores de cada columna del csv
    ultimos_valores = valorCarpeta.iloc[len(valorCarpeta)-1]
    sensores = list(set( val.split("_")[0] for val in valorCarpeta.columns))
    id_entity = boya_entity[carpeta]
    # Recorremos por cada columna del csv
    for contador,valorSensor in enumerate(sensores):
        if valorSensor == "Date":
            continue

        for number, columnName in enumerate([ col for col in valorCarpeta.columns if col.startswith(valorSensor)]):
            depth = columnName.split("_")[1]
            depth = float(depth)

            id_device_measurement = link_boya_sensores[id_entity][valorSensor][number]

            modify_entity(
                value=ultimos_valores[columnName],
                observedAt=ultimos_valores["Date"],
                unitcode=maper_property_unitcode[valorSensor],
                url="http://localhost:1026/ngsi-ld/v1/entities/"+id_device_measurement+"/attrs",
            )
'''

modify_content
{'numValue': {'type': 'Property', 'value': 7.22, 'observedAt': '2023-08-24T00:00:00Z', 'unitCode': 'microseconds/cm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2023-08-24T00:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': -99, 'observedAt': '2023-08-24T00:00:00Z', 'unitCode': 'microseconds/cm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2023-08-24T00:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 7.23, 'observedAt': '2023-08-24T00:00:00Z', 'unitCode': 'microseconds/cm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2023-08-24T00:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 7.19, 'observedAt': '2023-08-24T00:00:00Z', 'unitCode': 'microseconds/cm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2023-08-24T00:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 7.19, 'observedAt': '2023-08-24T00:00:00Z', 'unitCode': 'microseconds/c

# Piezometros

In [13]:
import os
import glob
import pandas as pd
import time
import psycopg2
from datetime import datetime
import uuid
import pickle

In [11]:
import os
import glob
import pandas as pd
import pyarrow.parquet as pq


# Especifica la ruta de la carpeta principal
carpeta = "/home/thinking/raw/SAIHdatasActualizados/piezometros/"


# Utiliza os.listdir() para obtener una lista de elementos en la carpeta
elementos_en_carpeta = os.listdir(carpeta)

# Filtra solo las carpetas en la lista de elementos
carpetas = [elemento for elemento in elementos_en_carpeta if os.path.isdir(os.path.join(carpeta, elemento))]

# DataFrame vacío para almacenar los datos
dfs = {}


# Imprime la lista de carpetas
for carpeta_actual in carpetas:
    df_total = pd.DataFrame()
    #archivos_parquet = glob.glob(os.path.join(carpeta+carpeta_actual, "*.parquet"))
    archivos_csv = glob.glob(os.path.join(carpeta+carpeta_actual, "*.csv"))
    archivos_csv = [archivo for archivo in archivos_csv if "from_parquet" not in archivo]
    for archivo in archivos_csv:
    
        df_temporal = pd.read_csv(archivo)

        if(len(df_total) == 0):
            df_total = df_temporal
        else:
            #Hacer un merge de df_total y df_temporal por la columna "Date"
            df_total = pd.merge(df_total, df_temporal, on="Date", how="outer")
    dfs[carpeta_actual] = df_total

for key, df in dfs.items():
    # Convierte la columna "Date" a tipo datetime
    df["Date"] = pd.to_datetime(df["Date"])
    # Formatea la columna "Date" en el nuevo formato
    df["Date"] = df["Date"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

In [14]:
import psycopg2
from datetime import datetime
import uuid

maper_property_unitcode = { "Temperatura": "CEL", "Conductividad": "microseconds/cm", "Piezometrico":"msnm", "Salinidad":"PSU",
       "Total solido disuelto (TDS)": "mg/l"
}

# Abre el archivo .pkl en modo de lectura binaria
with open('./variables_entidades/piezometro_entity.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    piezometro_entity = pickle.load(f)
with open('./variables_entidades/link_piezometro_sensores.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    link_piezometro_sensores = pickle.load(f)


# Conectarse a la base de datos
conn = psycopg2.connect(
    host="localhost",
    port="5432",
    database="orion",
    user="orion",
    password="orion"
)

cursor = conn.cursor()
datos_a_insertar = []

for carpeta, valorCarpeta in dfs.items():
    id_entity = piezometro_entity[carpeta]
    sensores = list(set( val.split("_")[0] for val in valorCarpeta.columns))
    for index, row in valorCarpeta.iterrows():
        if index == len(valorCarpeta)-1:
            continue
        for contador,valorSensor in enumerate(sensores):
            if valorSensor == "Date":
                continue


            id_device_measurement = link_piezometro_sensores[id_entity][valorSensor]

            fecha_hora = datetime.strptime(row["Date"], "%Y-%m-%dT%H:%M:%SZ")

            value = row[valorSensor]
            if math.isnan(row[valorSensor]):
                    value = -99

            datos_a_insertar.append(
                (f'urn:ngsi-ld:attributes2:{uuid.uuid4()}', 'https://smartdatamodels.org/dataModel.DeviceMeasurement/numValue', 'Replace', id_device_measurement, fecha_hora, True, maper_property_unitcode[valorSensor], 'None', 'Number', None, None, value, None, None, None, None, None, None, None, None, datetime.now())
            )
            
            if len(datos_a_insertar) > 100000:
                consulta_insercion = f"INSERT INTO attributes VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
                print(f"Insertando {len(datos_a_insertar)} filas en la tabla attributes")
                try:
                    # Ejecutar la consulta para insertar los datos
                    cursor.executemany(consulta_insercion, datos_a_insertar)
                    conn.commit()
                    datos_a_insertar = []
                    print(f"Se han insertado {cursor.rowcount} filas en la tabla attributes")
                except (Exception, psycopg2.DatabaseError) as error:
                    conn.rollback()
                # Confirmar la transacción
                    print(f"Error: {error}")

if len(datos_a_insertar) > 0:
    consulta_insercion = f"INSERT INTO attributes VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
    print(f"Insertando {len(datos_a_insertar)} filas en la tabla attributes")
    try:
        # Ejecutar la consulta para insertar los datos
        cursor.executemany(consulta_insercion, datos_a_insertar)
        conn.commit()
        datos_a_insertar = []
        print(f"Se han insertado {cursor.rowcount} filas en la tabla attributes")
    except (Exception, psycopg2.DatabaseError) as error:
        conn.rollback()
    # Confirmar la transacción
        print(f"Error: {error}")

# Cerrar el cursor y la conexión
cursor.close()
conn.close()

Insertando 100001 filas en la tabla attributes
Se han insertado 100001 filas en la tabla attributes
Insertando 91759 filas en la tabla attributes
Se han insertado 91759 filas en la tabla attributes


In [15]:

for carpeta, valorCarpeta in dfs.items():
    ultimos_valores = valorCarpeta.iloc[len(valorCarpeta)-1]
    print(carpeta)
    sensores = list(set( val.split("_")[0] for val in valorCarpeta.columns))
    id_entity = piezometro_entity[carpeta]
    for contador,valorSensor in enumerate(sensores):
        if valorSensor == "Date":
            continue
        
        id_device_measurement = link_piezometro_sensores[id_entity][valorSensor]

        modify_entity(
            value=ultimos_valores[valorSensor],
            observedAt=ultimos_valores["Date"],
            unitcode=maper_property_unitcode[valorSensor],
            url="http://localhost:1026/ngsi-ld/v1/entities/"+id_device_measurement+"/attrs",
        )

06Z06-Sondeo06
modify_content
{'numValue': {'type': 'Property', 'value': 9980.0, 'observedAt': '2024-03-19T02:00:00Z', 'unitCode': 'mg/l'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T02:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 15590.0, 'observedAt': '2024-03-19T02:00:00Z', 'unitCode': 'microseconds/cm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T02:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 9.15, 'observedAt': '2024-03-19T02:00:00Z', 'unitCode': 'PSU'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T02:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 0.0, 'observedAt': '2024-03-19T02:00:00Z', 'unitCode': 'msnm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T02:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 21.69, 'observedAt': '2024-03-19T02:00:00Z', 'unitCode': 'CEL'}, 'dateLastValueReport

In [16]:
import os
import pandas as pd
import glob
import pyarrow.parquet as pq
import numpy as np
# Especifica la ruta de la carpeta principal
carpeta = "/home/thinking/raw/SAIHdatasActualizados/Ramblas/"

# Utiliza os.listdir() para obtener una lista de elementos en la carpeta
elementos_en_carpeta = os.listdir(carpeta)

# Filtra solo las carpetas en la lista de elementos
carpetas = [elemento for elemento in elementos_en_carpeta if os.path.isdir(os.path.join(carpeta, elemento))]

# DataFrame vacío para almacenar los datos
dfs = {}

# Imprime la lista de carpetas
for carpeta_actual in carpetas:
    df_total = pd.DataFrame()
    archivos_csv = glob.glob(os.path.join(carpeta+carpeta_actual, "*.csv"))
    archivos_csv = [archivo for archivo in archivos_csv if "from_parquet" not in archivo]
    for archivo in archivos_csv:
    
        df_temporal = pd.read_csv(archivo)

        if(len(df_total) == 0):
            df_total = df_temporal
        else:
            #Hacer un merge de df_total y df_temporal por la columna "Date"
            df_total = pd.merge(df_total, df_temporal, on="Date", how="outer")
    dfs[carpeta_actual] = df_total

for key, df in dfs.items():
    # Convierte la columna "Date" a tipo datetime
    df["Date"] = pd.to_datetime(df["Date"])
    # Formatea la columna "Date" en el nuevo formato
    df["Date"] = df["Date"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

for key, df in dfs.items():
    numeric_cols = df.columns.copy()
    numeric_cols = numeric_cols.drop("Date")
    df.replace('-', float('nan'), inplace=True)
    df.replace("null", float('nan'), inplace=True)
    df[numeric_cols] = df[numeric_cols].astype(float)
    df["Date"] = pd.to_datetime(df["Date"])
    # Formatea la columna "Date" en el nuevo formato
    df["Date"] = df["Date"].dt.strftime("%Y-%m-%dT%H:%M:%SZ")

In [17]:
# Abre el archivo .pkl en modo de lectura binaria
with open('./variables_entidades/rambla_entity.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    rambla_entity = pickle.load(f)
with open('./variables_entidades/link_rambla_sensores.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    link_rambla_sensores = pickle.load(f)

print(rambla_entity)
print(link_rambla_sensores)

{'06A02-Pozo Estrecho': 'urn:ngsi-ld:Ravine:002', '06P04-La Murta': 'urn:ngsi-ld:Ravine:010', '06P02-Torre Pacheco': 'urn:ngsi-ld:Ravine:009', '06A05-Fuente Alamo': 'urn:ngsi-ld:Ravine:005', '06P05-San Javier': 'urn:ngsi-ld:Ravine:011', '06A19-La Marana': 'urn:ngsi-ld:Ravine:008', '06A04-El Estrecho': 'urn:ngsi-ld:Ravine:004', '06A01-La Puebla': 'urn:ngsi-ld:Ravine:001', '06A03-Rbla Albujon': 'urn:ngsi-ld:Ravine:003', '01M02-Relojero': 'urn:ngsi-ld:Ravine:012', '06B01-Elevacion El Albujon': 'urn:ngsi-ld:Ravine:013', '06A18-Desemboc Rbla Albujon': 'urn:ngsi-ld:Ravine:007', '06A06-Los Cegarras': 'urn:ngsi-ld:Ravine:006'}
{'urn:ngsi-ld:Ravine:002': {'Temperatura': {None: 'urn:ngsi-ld:DeviceMeasurement:1300'}, 'Pluviometro': {None: 'urn:ngsi-ld:DeviceMeasurement:1301'}, 'Nivel': {None: 'urn:ngsi-ld:DeviceMeasurement:1302'}, 'Caudal': {None: 'urn:ngsi-ld:DeviceMeasurement:1303'}}, 'urn:ngsi-ld:Ravine:010': {'Pluviometro': {None: 'urn:ngsi-ld:DeviceMeasurement:1304'}, 'Temperatura': {None: '

In [18]:
import psycopg2
from datetime import datetime
import uuid

maper_property_unitcode = { 
    "Volumen":"m3","Presion":"bar", "Presion Atmosferica":"mbar", "Direccion del Viento":"°","Velocidad del Viento":"m/s","Temperatura": "CEL", "Conductividad": "microseconds/cm", "Nivel": "m", "Pluviometro":"mm","Caudal":"m3/s", "Humedad":"%"
}
maper_property_controlled = { "Volumen":"volume","Presion":"pressure","Presion Atmosferica":"atmosphericPressure","Direccion del Viento":"windDirection","Velocidad del Viento":"windSpeed","Temperatura": "temperature", "Nivel": "waterLevel", "Pluviometro":"precipitation","Caudal":"waterFlow", "Humedad":"humidity","Conductividad":"conductivity"
}


# Abre el archivo .pkl en modo de lectura binaria
with open('./variables_entidades/rambla_entity.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    rambla_entity = pickle.load(f)
with open('./variables_entidades/link_rambla_sensores.pkl', 'rb') as f:
    # Carga la variable desde el archivo
    link_rambla_sensores = pickle.load(f)

# Conectarse a la base de datos
conn = psycopg2.connect(
    host="localhost",
    port="5432",
    database="orion",
    user="orion",
    password="orion"
)

cursor = conn.cursor()
datos_a_insertar = []

for carpeta, valorCarpeta in dfs.items():
    id_entity = rambla_entity[carpeta]
    print(carpeta)
    sensores = list(set( val.split("_")[0] for val in valorCarpeta.columns))
    for index, row in valorCarpeta.iterrows():
        if index == len(valorCarpeta)-1:
            continue
        counters_valorSensor= {}
        for contador,valorSensor in enumerate(sensores):
            if valorSensor == "Date":
                continue
            original = valorSensor
        
            
            measurementPlace = None
            if len(valorSensor.split(" "))>0 and valorSensor not in maper_property_controlled:
                measurementPlace = " ".join(valorSensor.split(" ")[1:])
                valorSensor = valorSensor.split(" ")[0]

            id_device_measurement = link_rambla_sensores[id_entity][valorSensor][measurementPlace]

            fecha_hora = datetime.strptime(row["Date"], "%Y-%m-%dT%H:%M:%SZ")

            value = row[original]
            if math.isnan(row[original]):
                    value = -99

            datos_a_insertar.append(
                (f'urn:ngsi-ld:attributes3:{uuid.uuid4()}', 'https://smartdatamodels.org/dataModel.DeviceMeasurement/numValue', 'Replace', id_device_measurement, fecha_hora, True, maper_property_unitcode[valorSensor], 'None', 'Number', None, None, value, None, None, None, None, None, None, None, None, datetime.now())
            )
            
            if len(datos_a_insertar) > 100000:
                consulta_insercion = f"INSERT INTO attributes VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
                print(f"Insertando {len(datos_a_insertar)} filas en la tabla attributes")
                try:
                    # Ejecutar la consulta para insertar los datos
                    cursor.executemany(consulta_insercion, datos_a_insertar)
                    conn.commit()
                    datos_a_insertar = []
                    print(f"Se han insertado {cursor.rowcount} filas en la tabla attributes")
                except (Exception, psycopg2.DatabaseError) as error:
                    conn.rollback()
                # Confirmar la transacción
                    print(f"Error: {error}")

if len(datos_a_insertar) > 0:
    consulta_insercion = f"INSERT INTO attributes VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
    print(f"Insertando {len(datos_a_insertar)} filas en la tabla attributes")
    try:
        # Ejecutar la consulta para insertar los datos
        cursor.executemany(consulta_insercion, datos_a_insertar)
        conn.commit()
        datos_a_insertar = []
        print(f"Se han insertado {cursor.rowcount} filas en la tabla attributes")
    except (Exception, psycopg2.DatabaseError) as error:
        conn.rollback()
    # Confirmar la transacción
        print(f"Error: {error}")


# Cerrar el cursor y la conexión
cursor.close()
conn.close()

06A02-Pozo Estrecho
06P04-La Murta
06P02-Torre Pacheco
06A05-Fuente Alamo
06P05-San Javier
06A19-La Marana
06A04-El Estrecho
06A01-La Puebla
06A03-Rbla Albujon
01M02-Relojero
06B01-Elevacion El Albujon
Insertando 100001 filas en la tabla attributes
Se han insertado 100001 filas en la tabla attributes
06A18-Desemboc Rbla Albujon
06A06-Los Cegarras
Insertando 36727 filas en la tabla attributes
Se han insertado 36727 filas en la tabla attributes


In [19]:


for carpeta, valorCarpeta in dfs.items():
    ultimos_valores = valorCarpeta.iloc[len(valorCarpeta)-1]
    print(carpeta)
    sensores = list(set( val.split("_")[0] for val in valorCarpeta.columns))
    id_entity = rambla_entity[carpeta]
    for contador,valorSensor in enumerate(sensores):
        if valorSensor == "Date":
            continue
        original = valorSensor
        counters_valorSensor= {}
        
        measurementPlace = None
        if len(valorSensor.split(" "))>0 and valorSensor not in maper_property_controlled:
            measurementPlace = " ".join(valorSensor.split(" ")[1:])
            valorSensor = valorSensor.split(" ")[0]

        if valorSensor not in counters_valorSensor:
            counters_valorSensor[valorSensor] = 0

        id_device_measurement = link_rambla_sensores[id_entity][valorSensor][measurementPlace]
        counters_valorSensor[valorSensor] += 1
        
        modify_entity(
            value=ultimos_valores[original],
            observedAt=ultimos_valores["Date"],
            unitcode=maper_property_unitcode[valorSensor],
            url="http://localhost:1026/ngsi-ld/v1/entities/"+id_device_measurement+"/attrs",
        )

06A02-Pozo Estrecho
modify_content
{'numValue': {'type': 'Property', 'value': 0.0, 'observedAt': '2024-03-19T19:00:00Z', 'unitCode': 'mm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T19:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 0.0, 'observedAt': '2024-03-19T19:00:00Z', 'unitCode': 'm'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T19:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 0.0, 'observedAt': '2024-03-19T19:00:00Z', 'unitCode': 'm3/s'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T19:00:00Z'}}
204
modify_content
{'numValue': {'type': 'Property', 'value': 19.5, 'observedAt': '2024-03-19T19:00:00Z', 'unitCode': 'CEL'}, 'dateLastValueReported': {'type': 'Property', 'value': '2024-03-19T19:00:00Z'}}
204
06P04-La Murta
modify_content
{'numValue': {'type': 'Property', 'value': 22.66, 'observedAt': '2024-03-19T19:00:00Z', 'unitCode': 'CEL'}, 'dateLastValueReported'