# Librerias a usar para el parser

In [2]:
from typing import Optional, List, Dict, Literal
from dataclasses import dataclass
from pathlib import Path
from datetime import date
import pandas as pd
import math as mh
import json
import re

# Mejoras de constantes

In [3]:
escala: Dict[str, float] = {'0': 1e3, '6': 1e4, '8': 1e5}
orden: Dict[str,int] = {'1':3, '2':1}

# Paths y REGEX

In [16]:
# Paths y expresiones

PATH = Path("G:\\Otros ordenadores\\Mi PC\\IGAC\\archivosPrueba\\Archivos_Pruebas\\L5_GSI_1")
L5Crudos = PATH / 'CRUDOS_L5'
L5Orden = PATH / 'Orden_L5' / 'L5_Principio_Linea;Fin_linea.txt'
crudos = [p for p in L5Crudos.iterdir() if p.is_file() is True]

# Estandarizador de Nomenclaturas

In [17]:
# Función que estandariza las nomenclaturas
def vert_std(codigo:str) -> str:

    # VERT: Patron que estandariza los vertices XXXX-XXXX-09
    VERT_RE = re.compile(r'^([A-Z0-9]+)([A-Z]{2,4})(\d{1,2})$')

    code_mayusc = codigo.replace("-", "").replace(" ", "").upper()
    m = VERT_RE.match(code_mayusc)
    if m:
        g1, g2, g3 = m.groups()
        return f"{g1}-{g2}-{g3}"
    
    return codigo

# Clasificador de Orden

In [None]:
# Función que ordena el archivo orden
def clasi_orden(forden:Path, headline:bool = True) -> List[Dict[str,str]]:
    output = []

    if headline:
        with open(forden, "r", encoding="utf-8") as f:
            # suponemos primera línea encabezado
            for i, linea in enumerate(f.readlines()[1:]):
                linea = linea.strip()
                if not linea or ";" not in linea:
                    continue
                a, b = linea.split(";")
                a = vert_std(a)
                b = vert_std(b)
                if i % 2 == 0:
                    output.append({"ini":a,"fin":b,"sentido":"I"})
                else:
                    output.append({"ini":a,"fin":b,"sentido":"R"})
    else:
        with open(forden, "r", encoding="utf-8") as f:
            # suponemos primera línea encabezado
            for i, linea in enumerate(f.readlines()):
                linea = linea.strip()
                if not linea or ";" not in linea:
                    continue
                a, b = linea.split(";")
                a = vert_std(a)
                b = vert_std(b)
                if i % 2 == 0:
                    output.append({"ini":a,"fin":b,"sentido":"I"})
                else:
                    output.append({"ini":a,"fin":b,"sentido":"R"})

    with open("../LeicaGSI_LS10_Parser/data/orden_niv.json", "w") as f:
        json.dump(output, f, indent=2)

    return output

clasi_orden(L5Orden)


[{'ini': 'B70-CW-7', 'fin': '23660003', 'sentido': 'I'},
 {'ini': '23660003', 'fin': 'B70-CW-7', 'sentido': 'R'},
 {'ini': '23660003', 'fin': 'A2-CB-3', 'sentido': 'I'},
 {'ini': 'A2-CB-3', 'fin': '23660003', 'sentido': 'R'},
 {'ini': 'A2-CB-3', 'fin': 'A3-CB-3', 'sentido': 'I'},
 {'ini': 'A3-CB-3', 'fin': 'A2-CB-3', 'sentido': 'R'},
 {'ini': 'A3-CB-3', 'fin': '4-CB-3', 'sentido': 'I'},
 {'ini': '4-CB-3', 'fin': 'A3-CB-3', 'sentido': 'R'},
 {'ini': '4-CB-3', 'fin': '5-CB-3', 'sentido': 'I'},
 {'ini': '5-CB-3', 'fin': '4-CB-3', 'sentido': 'R'},
 {'ini': '5-CB-3', 'fin': '6-CB-3', 'sentido': 'I'},
 {'ini': '6-CB-3', 'fin': '5-CB-3', 'sentido': 'R'},
 {'ini': '6-CB-3', 'fin': '23189001', 'sentido': 'I'},
 {'ini': '23189001', 'fin': '6-CB-3', 'sentido': 'R'},
 {'ini': '23189001', 'fin': '8-CB-3', 'sentido': 'I'},
 {'ini': '8-CB-3', 'fin': '23189001', 'sentido': 'R'},
 {'ini': '8-CB-3', 'fin': '9-CB-3', 'sentido': 'I'},
 {'ini': '9-CB-3', 'fin': '8-CB-3', 'sentido': 'R'},
 {'ini': '9-CB-3',

# Generador de Fechas

In [19]:
def ext_fecha(fname:str):

    # FECHA6: Extrae fechas
    FECHA6_RE = re.compile(r'(\d{6})')

    m = FECHA6_RE.search(fname)
    if not m:
        return None
    
    y, mth, d = int(m.group(1)[:2]), int(m.group(1)[2:4]), int(m.group(1)[4:6])

    # regla para siglo: 00–49 → 2000s, 50–99 → 1900s
    year = 2000 + y if y < 50 else 1900 + y

    try:
        return date(year, mth, d).isoformat()
    except ValueError:
        return None
    

# Calculo Colimación

In [30]:
def colimacion(lineas,file):
    """
    La función colimación tiene como proposito calcular el error de colimación en metros para posteriormente
    ser tenido en cuenta en el ajuste. La metodología con la que es calculada es la mencionada en el manual de 
    Leica para equipos LS10/LS15.
    
    Parametros:
        - lineas: Es el conjunto de lineas del archivo para que la misma función examine si hay o no las mediciones de
            colimación con los nombres presentados en el manual de Leica A1, B1, B2 y A2.

        - file: Es el archivo que se desea procesar para la automatización del proceso.

    Retorna: Retorna un vector con dos valores, un real y un string, los cuales son descritos acontinuación:
        1) El real es el cálculo de la colimación ofrecido en el manual del equipo leica y que es también mencionado
            en los equipos de trimble. El método de colimación es mediante el método de forstner y posteriormente
            es pasado a metros con 4 decimales independientementes si la medida esta en mm, decimas de mm o centecimas
            de mm.

        2) El segundo valor es referente al tipo de archivo descargado, los archivos de leica tipo GSI permiten
            la descarga de la información en 8 o 16 bits, lo cual afecta el número de caracteres que se imprimen en 
            el archivo de texto, esta información puede ser relevante para el control de calidad según los lineamientos
            que estan siendo actualizados por el equipo de nivelacion.

    TODO: La presente función contempla registros posiblemente ofrecidos por el equipo LEICA DNA 03, se debe realizar
        el código que permita obtener la información de ambos equipos debe ser desarrollado o complementado a este.
    """
    # Patron de las medidas de colimación leica 
    patron_colimacion = r'^(\d{6})\+(\S{8,16})\s+(\S{6})([+-]\S{8,16})\s(\S{6})([+-]\S{8,16})'
    colimacion = []
    tipo_gsi = None

    # Recorre todas las líneas del archivo 
    for linea in lineas:
        linea = linea.strip()
        try:
            pat_col = re.search(patron_colimacion,linea) # patron de colimación
            if pat_col is not None: # Verifica si el patron de colimación existe
                try:
                    tipo_gsi = f'{len(pat_col.group(2))} bits' # Obtiene el tipo de archivo gsi que se esta manejando
                    data_col = ['A1','B1','B2','A2']
                    if pat_col.group(2).lstrip('0') in data_col:
                        try:
                            conversor = escala.get(pat_col.group(3)[-1])
                            if pat_col.group(6) is not None and pat_col.group(4) is not None and conversor is not None:
                                alt = float(pat_col.group(6))
                                dist = float(pat_col.group(4))
                                col_data = {
                                    "id_colimacion": pat_col.group(2).lstrip('0'),
                                    "alt.obs" : float(alt)/conversor,
                                    "dist.hor": float(dist)/conversor
                                }
                                colimacion.append(col_data)
                            else:
                                pass

                        except ValueError as e:
                            print(f"Error al obtener los datos de colimación en: {Path(file)}")
                            return None
                    else:
                        pass
                except Exception as e:
                    print(f"El error {e} se presenta en el archivo: {Path(file)}")
            else:
                pass
        except Exception as e:
            print(f"El error {e} se presenta en el archivo: {Path(file)}")

    try:
        output = mh.atan(((colimacion[0]['alt.obs'] - colimacion[1]['alt.obs']) + 
                            (colimacion[2]['alt.obs'] - colimacion[3]['alt.obs'])) / 
                            ((colimacion[0]['dist.hor'] - colimacion[1]['dist.hor']) + 
                            (colimacion[2]['dist.hor'] - colimacion[3]['dist.hor']))) * (6378137/3600)
        return [round(output,5),tipo_gsi]
    except Exception as e:
                        print(f"El error: {e} \nSe encuentra en el archivo: {Path(file)}")
                        return None

# Obtención de los datos crudos

In [21]:
def parser_leica_dna03(archivos,json_file):

    ruta = Path(json_file)

    if not ruta.exists():
        raise FileNotFoundError(f"No se encontró el JSON en: {ruta}")

    patron_metodo_observacion = r'^(41\S{4})\+(\S{8,16})'
    patron_inicio = r'^(\d{6,7})\+(\S{8,16})\s(83\S{4})([+-]\S{8,16})'
    patron_fin = r'^(\d{6,7})\+(\S{8,16})\s(573\S{3})([+-]\S{8,16})\s(574\S{3})([+-]\S{8,16})\s(83\S{4})([+-]\S{8,16})'

    orden_archivo = None
    nom_ini = None
    nom_fin = None
    dist_hor_total = None
    dif_dist_hor = None
    ceros_ini = None
    alt_obs = None
    datos_crudos = []

    with open(Path(json_file), "r", encoding="utf-8") as f:
        print("Archivo json cargado")
        json_data = json.load(f)

    for archivo in archivos:
        with open(Path(archivo),"r") as file:
            lineas = file.readlines()
            col = colimacion(lineas,archivo)
            if col is None:
                raise ValueError (f'No se logro calcular la colimación')
            
            for linea in lineas:
                linea = linea.strip()
                # Permite obtener el orden del archivo (si la nivelación se realizo con metodología de orden 1, 2 o 3)
                if re.search(patron_metodo_observacion,linea) is not None:
                    orden_archivo = orden.get(linea[-1])
                
                # Permite obtener el nombre y la medida con la cual se inicio el vértice a nivelar
                if re.search(patron_inicio, linea):
                    est_inicio = re.search(patron_inicio, linea)
                    if est_inicio is not None:
                        nom_ini = vert_std(est_inicio.group(2).lstrip('0'))
                        ceros_ini = float(est_inicio.group(4))

                # Permite obtener el nombre, altura, distancia, diferencia de distancias del vértice al cual se llega
                if re.search(patron_fin, linea):
                    est_fin = re.search(patron_fin,linea)
                    if est_fin is not None:
                        nomen = vert_std(est_fin.group(2).lstrip('0'))
                        if any(ptos['fin'] == nomen for ptos in json_data):
                            nom_fin = vert_std(est_fin.group(2).lstrip('0')) # Obtiene el nombre del vértice
                            conversor = escala.get(est_fin.group(3)[-1]) # Obtiene el conversor
                            if conversor is not None:
                                dif_dist_hor = float(est_fin.group(4))/conversor # Obtiene la diferencia de distancias horizontales (balanceo)
                                dist_hor_total = float(est_fin.group(6))/conversor # Obtiene la distancia total
                                alt_obs = float(est_fin.group(8))/conversor
                                if ceros_ini is not None:
                                    ceros_ini = ceros_ini/conversor
                        
                                # Mejorar el método de entrega de las observaciones, es preferible indicar en que sistema esta (mm,dmm,cmm)
                                new_data = {
                                    "ini": nom_ini,
                                    "fin": nom_fin,
                                    "dis.total": dist_hor_total,
                                    "dif.dist.hor": dif_dist_hor,
                                    "ceros.iniciales": ceros_ini,
                                    "dif.alt.obs": alt_obs,
                                    "orden_obs": orden_archivo,
                                    "colimacion": col[0],
                                    "tipo.archivo": col[1],
                                    "nombre.archivo": archivo.name,
                                    "fecha": ext_fecha(archivo.name)
                                }

                                datos_crudos.append(new_data)       

                    else:
                        continue
                else:
                    continue

    return datos_crudos
        

# Parser Leica DNA03 8 - 16 bits

In [22]:
def parser_ini(files, order_info, output_info_xlsx):

    datos_crudos = parser_leica_dna03(files,order_info)

    new_info = {(n["ini"], n["fin"]): n for n in datos_crudos}

    with open(order_info, "r", encoding="utf-8") as f:
            json_data = json.load(f)

    for data in json_data:
        clave = (data['ini'], data['fin'])
        if clave in new_info:
            for k, v in new_info[clave].items():
                if k not in ("ini", "fin"):   # proteger claves
                    data[k] = v 

    with open(order_info, "w") as f:
        json.dump(json_data, f, indent=2) 
        
    pd.DataFrame(json_data).to_excel(output_info_xlsx, index=False)

# Parser de estaciones de Inicio

In [32]:
archivos = sorted([p for p in L5Crudos.iterdir() if p.is_file() is True])

parser_ini(archivos,"../LeicaGSI_LS10_Parser/data/orden_niv.json","../LeicaGSI_LS10_Parser/data/prueba_crudos.xlsx")


Archivo json cargado
