# Prueba 1 obtener datos de facturas en PDF 
##### 


In [2]:
#######################################################################################################################################
# Paquetes extra
#######################################################################################################################################

%additional_python_modules pandas

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.8 
Additional python modules to be included:
pandas


In [1]:
#######################################################################################################################################
# Configuración para la máquina y creación de la sesión de spark
#######################################################################################################################################

%idle_timeout 60
%glue_version 4.0
%worker_type G.4X
%number_of_workers 10

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import boto3

# spark = GlueContext(SparkContext.getOrCreate()).spark_session
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Current idle_timeout is None minutes.
idle_timeout has been set to 60 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.4X
Previous number of workers: None
Setting new number of workers to: 10
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.4X
Number of Workers: 10
Idle Timeout: 60
Session ID: e2a6aac6-2648-45d9-98a8-206e9df17a9d
Applying the following default arguments:
--glue_kernel_version 1.0.8
--enable-glue-datacatalog true
--additional-python-modules pandas
Waiting for session e2a6aac6-2648-45d9-98a8-206e9df17a9d to get into ready status...
Session e2a6aac6-2648-45d9-98a8-206e9df17a9d has been created.



In [2]:

#######################################################################################################################################
# Import de librerias
#######################################################################################################################################

import boto3
import xml.etree.ElementTree as ET
import re

from calendar import isleap
from datetime import datetime, timedelta

from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, to_date, date_format, when, lower, year
from pyspark.sql import Row

import pandas as pd
import numpy as np





In [3]:
#######################################################################################################################################
# Carga de librerías necesarias y funciones comunes
#######################################################################################################################################
 
from pyspark.sql.functions import count
import pandas as pd
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame
 
sc.addFile('s3://naturgy-platacom-pro/teams/prevision/funciones/test/compartido/guardar.py')
sc.addFile('s3://naturgy-platacom-pro/teams/prevision/funciones/test/compartido/borrar.py')
sc.addFile('s3://naturgy-platacom-pro/teams/prevision/funciones/test/compartido/leer.py')
sc.addFile('s3://naturgy-platacom-pro/teams/prevision/funciones/test/compartido/archivos.py')
 
from guardar import *
from borrar import *
from leer import *
from archivos import *




In [4]:
#######################################################################################################################################
# Configuración de variables de entrada
#######################################################################################################################################

# Parámetros de entrada para AWS para obtener las facturas y exportar excel

s3_uri      = 's3://naturgy-platacom-pro/teams/prevision/archivos/test/electricidad/liqportugalREN/facturas'
bucket_name = "naturgy-platacom-pro"
s3_key      = "teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/facturas.xlsx"





In [5]:
#######################################################################################################################################
# Función auxiliar para separar nombre del bucket y key de la "uri"
#######################################################################################################################################

def parse_s3_uri(uri):
    if not uri.startswith("s3://"):
        raise ValueError(f"URI inválido: {uri}")
    parts = uri[5:].split("/", 1)
    bucket_name = parts[0]
    key = parts[1] if len(parts) > 1 else ""
    return bucket_name, key




In [6]:
#Cambiar el nombre de las etiquetas del xml a los siguientes nombres:
tag_mapping = {
    "IssueDate"    : "Fecha de Emisión",
    "Name1"        : "Descrição1", 
    "Name2"        : "Descrição2", 
    "Name3"        : "Descrição3", 
    "Name4"        : "Descrição4", 
    "PayableAmount": "Importe Total Nº Documento",
    "PriceAmount1" : "Valor1",
    "PriceAmount2" : "Valor2",
    "PriceAmount3" : "Valor3"
}

#Todas estas etiquetas no son necesarias para la BBDD
etiquetas_a_eliminar = {
     "DigestValue",
     "SigningTime",
     "MimeType",
     "ReferencedSignatureID",
     "CustomizationID",
     "InvoiceTypeCode",
     "DocumentCurrencyCode",
     "Description",
     "StreetName",
     "CompanyID",
     "CityName",
     "PostalZone",
     "IdentificationCode",
     "BaseQuantity", 
     "Percent",
     "ElectronicMail",
     "TaxAmount", 
     "TaxableAmount", 
     "LineExtensionAmount", 
     "TaxExclusiveAmount",
     "TaxInclusiveAmount",
     "InvoicedQuantity",
     "Telephone",
     "UBLVersionID",
     "Name",
     "ID1",  "ID3",  "ID4",  "ID5",
     "ID6",  "ID7",  "ID8",  "ID9",
     "ID11", "ID12", "ID13", "ID14",
     "ID15", "ID16", "ID17", "ID18", 
     "ID19", "ID20", "ID21", "ID22", 
     "ID10"}

meses_numericos = {
    "janeiro": "01", "fevereiro": "02", "março": "03", "abril": "04",
    "maio": "05", "junho": "06", "julho": "07", "agosto": "08",
    "setembro": "09", "outubro": "10", "novembro": "11", "dezembro": "12"
}

meses_numericos2 = {
    "Jan": "01", "Fev": "02", "Mar": "03", "Abr": "04",
    "Mai": "05", "Jun": "06", "Jul": "07", "Ago": "08",
    "Set": "09", "Out": "10", "Nov": "11", "Dez": "12"
}

#######################################################################################################################################
# Obtención y edición de un archivo xml
#######################################################################################################################################

def procesar_archivo_xml(bucket_name, key, s3_client):
    
    obj         = s3_client.get_object(Bucket=bucket_name, Key=key)
    xml_content = obj['Body'].read().decode('utf-8')
    
    root = ET.fromstring(xml_content)
    
    #Inicialización de variables 
    rows                    = []
    registration_date_count = 0
    id_count                = 0
    name_count              = 0
    price_count             = 0
    codigo                  = 0
    mes_periodo             = 0
    mes_emision             = 0
    contador                = 0
    tipo_documento          = False
    inicio_tabla            = False
    semana                  = False
    Fecha_tabla             = False
    issue_date              = False
    corre                   = False

    #Bucle para recorrer las etiquetas obtenidas a través del xml
    #Recorre línea por línea
    for elem in root.iter():
        
        etiqueta = elem.tag
        etiqueta_limpia = re.sub(r"\{.*?\}", "", etiqueta)  # Eliminar todo lo que esté dentro de {} para quedarnos solo con el nombre de la etiqueta
        
        #Obtener la información relacionda con la etiqueta
        valor = elem.text.strip() if elem.text else ""
        
        # restricción: valores con menos de 100 caracteres
        if valor and len(valor) <= 100:
            rows.append((etiqueta_limpia, valor))
        
        #Tratamiento de los pares etiqueta - valor        
        for i, (etiqueta, valor) in enumerate(rows):
        
            #Numeración de las etiquetas ID ya que solo es relevante el ID2, las demás serán eliminadas
            if etiqueta == "ID":
                id_count += 1
                rows[i] = (f"ID{id_count}", valor)
            
            #Obtención del número de documento
            if etiqueta == "ID2":
                    if '/' in valor:
                        codigo = int(valor.split('/')[-1]) 
                    else:
                        codigo = int(valor)  # Si no hay '/', usar el valor directamente
            #El código regresenta el número de documento, ya que las autofacturaciones tienen una distribución del xml distinta que los demás documentos. 
    
###############################################################################################
# Ajustes en etiquetas y valores
###############################################################################################

            #Autofacturaciones
            if str(codigo).startswith(("100","200",  "300")):
            #Tanto la sociedad como el emisor (REN) tienen la misma etiqueta = "RegistrationName" pero la sociedad es la primera que aparece
                for j, (sub_etiqueta, sub_valor) in enumerate(rows):
                    if sub_etiqueta == "RegistrationName":
                        registration_date_count += 1
                        if registration_date_count == 1:
                            rows[j] = ("Sociedad", sub_valor)
                        elif registration_date_count == 2:
                            rows[j] = ("Emisor", sub_valor)
                            
                    elif sub_etiqueta == "IssueDate":
                        mes_emision = int(sub_valor.split("-")[1]) #Se necesita el mes de emisión para calcular la versión del documento

                    elif sub_etiqueta == "Name":
                        #En las autofacturaciones el xml presenta el valor correspondiente a cada concepto y posteriorimente con la misma etiqueta un 0. 
                        #Es por eso que vamos a asignar Name1 para guardarlo y después MimeType para borrarlo
                        contador += 1
                        if contador == 1:  # Si es una aparición impar
                            rows[j] = ("Name1", sub_valor)
                        elif contador == 2:
                            rows[j] = ("MimeType", sub_valor)
                        elif contador == 3:
                            rows[j] = ("Name2", sub_valor)
                        elif contador == 4:
                            rows[j] = ("MimeType", sub_valor)
                            
                            
                    #La etiqueta "Description" contiene la información del inicio y fin del periodo        
                    elif sub_etiqueta == "Description":
                        patron = r"(\d{4})(\d{2})(\d{2})\s*a\s*(\d{4})(\d{2})(\d{2})" #Fecha sigue el siguiente formato: GGS YYYYmmdd a YYYYmmdd - GGS
                        match = re.search(patron, sub_valor)
                        if match and not inicio_tabla: #Si econtró ese formato en la fecha entonces:
                            inicio_periodo = f"{match.group(3)}-{match.group(2)}-{match.group(1)}"
                            fin_periodo = f"{match.group(6)}-{match.group(5)}-{match.group(4)}"
                            rows.append(("Inicio de periodo", inicio_periodo))
                            rows.append(("Fin de periodo", fin_periodo))
                            rows.append(("Tipo Periodo","Semanal"))
                            
                            mes_periodo = int(match.group(2)) #Se necesita el mes de periodo para calcular la versión del documento
                            
                            inicio_tabla = True
                        else:
                            # Buscamos si es una corrección de documento
                            patron_correccion = r"Correc[çc]ão do documento Nº:\s*(\d+)"
                            match_correccion = re.search(patron_correccion, sub_valor)

                            if match_correccion and not corre:
                                correccion = match_correccion.group(1)
                                print(correccion)
                                rows.append(("Correccion", correccion))
                                corre = True
                
                        
            #Demás documentos
            else:        
                 for j, (sub_etiqueta, sub_valor) in enumerate(rows):
                    if sub_etiqueta == "RegistrationName":
                        registration_date_count += 1
                        if registration_date_count == 1:
                            rows[j] = ("Emisor", sub_valor)
                        elif registration_date_count == 2:
                            rows[j] = ("Sociedad", sub_valor)
                    #En este caso el orden viene contrario a las autofacturaciones, primero Emisor (REN) y después la sociedda
                                                
                    elif sub_etiqueta == "IssueDate":
                        mes_emision = int(sub_valor.split("-")[1])
                        
                                                            
                    #La etiqueta "Name" contiene los conceptos de los distintos cargos de las facturas
                    elif sub_etiqueta == "Name":
                        name_count += 1
                        rows[j] = (f"Name{name_count}", sub_valor)

                    elif sub_etiqueta == "Description":
                        match_semana = re.search(r"(\d+)\s*ª\s*semana", sub_valor)  # Número de semana en el siguiente formato: GGS ##ª semana
                        if match_semana and not semana:
                            numero_semana = match_semana.group(1)  # Solo dígitos
                            rows.append(("Número de semana (Periodo)", numero_semana))
                            semana = True

                        # Expresión que busca ambos formatos de fecha en la misma línea
                        match = re.search(r"(\d{2}-\d{2}-\d{4}) a (\d{2}-\d{2}-\d{4})|(\d{4}\d{2}\d{2}) a (\d{4}\d{2}\d{2})", sub_valor)
                        if match and not inicio_tabla:
                            if match.group(1) and match.group(2):  # Formato dd-mm-YYYY a dd-mm-YYYY
                                inicio_periodo, fin_periodo = match.group(1), match.group(2)
                            else:  # Formato YYYYMMDD a YYYYMMDD
                                inicio_periodo = f"{match.group(3)[6:8]}-{match.group(3)[4:6]}-{match.group(3)[:4]}"
                                fin_periodo = f"{match.group(4)[6:8]}-{match.group(4)[4:6]}-{match.group(4)[:4]}"

                            rows.append(("Inicio de periodo", inicio_periodo))
                            rows.append(("Fin de periodo", fin_periodo))
                            rows.append(("Tipo Periodo", "Semanal"))
                            mes_periodo = int(inicio_periodo.split('-')[1]) 
                            inicio_tabla = True
                        else:
                            # Buscamos si es una corrección de documento
                            patron_correccion = r"Correc[çc]ão do documento Nº:\s*(\d+)"
                            match_correccion = re.search(patron_correccion, sub_valor)

                            if match_correccion and not corre:
                                correccion = match_correccion.group(1)
                                print(correccion)
                                rows.append(("Correccion", correccion))
                                corre = True
                


                        match_mes_anio = re.search(r"(Set|Out|Nov|Dez|Jan|Fev|Mar|Abr|Mai|Jun|Jul|Ago)\.?(\d{4})", sub_valor) #Fecha en el siguiente formato: Tarifa Social Mmm.YYY

                               #En el caso de facturas sociales los periodos son mensuales    
                        if match_mes_anio and not inicio_tabla:
                            mes_abreviado = match_mes_anio.group(1)
                            anio = int(match_mes_anio.group(2)) 

                            # Determinar si el año es bisiesto
                            es_bisiesto = isleap(anio)

                            # Diccionario de días por mes según si el año es bisiesto o no
                            meses = {
                                "Set": ("01-09-{0}", "30-09-{0}"),
                                "Out": ("01-10-{0}", "31-10-{0}"),
                                "Nov": ("01-11-{0}", "30-11-{0}"),
                                "Dez": ("01-12-{0}", "31-12-{0}"),
                                "Jan": ("01-01-{0}", "31-01-{0}"),
                                "Fev": ("01-02-{0}", "29-02-{0}" if es_bisiesto else "28-02-{0}"),
                                "Mar": ("01-03-{0}", "31-03-{0}"),
                                "Abr": ("01-04-{0}", "30-04-{0}"),
                                "Mai": ("01-05-{0}", "31-05-{0}"),
                                "Jun": ("01-06-{0}", "30-06-{0}"),
                                "Jul": ("01-07-{0}", "31-07-{0}"),
                                "Ago": ("01-08-{0}", "31-08-{0}")}

                            if mes_abreviado in meses:
                                inicio, fin = meses[mes_abreviado]
                                rows.append(("Inicio de periodo", inicio.format(anio)))
                                rows.append(("Fin de periodo", fin.format(anio)))
                                rows.append(("Número de semana (Periodo)", "n/a"))
                                inicio_tabla = True
                                rows.append(("Tipo Periodo","Mensual"))
                                mes_periodo = int(meses_numericos2.get(mes_abreviado))  # Obtener el número del mes
                                inicio_tabla = True

                                                    
            #Precios correspondientes a los "names"
            if etiqueta == "PriceAmount":
                price_count += 1
                rows[i] = (f"PriceAmount{price_count}", valor)
                
            elif etiqueta == "Description":
                match_mes_anio = re.search(r"(janeiro|fevereiro|março|abril|maio|junho|julho|agosto|setembro|outubro|novembro|dezembro)\s(\d{4})", sub_valor)
                if match_mes_anio and not inicio_tabla:
                    mes_extenso = match_mes_anio.group(1).lower()
                    anio = int(match_mes_anio.group(2))

                    # Determinar si el año es bisiesto
                    es_bisiesto = isleap(anio)

                    # Diccionario de días por mes según si el año es bisiesto o no
                    meses = {
                        "setembro":  ("01-09-{0}", "30-09-{0}"),
                        "outubro":   ("01-10-{0}", "31-10-{0}"),
                        "novembro":  ("01-11-{0}", "30-11-{0}"),
                        "dezembro":  ("01-12-{0}", "31-12-{0}"),
                        "janeiro":   ("01-01-{0}", "31-01-{0}"),
                        "fevereiro": ("01-02-{0}", "29-02-{0}" if es_bisiesto else "28-02-{0}"),
                        "março":     ("01-03-{0}", "31-03-{0}"),
                        "abril":     ("01-04-{0}", "30-04-{0}"),
                        "maio":      ("01-05-{0}", "31-05-{0}"),
                        "junho":     ("01-06-{0}", "30-06-{0}"),
                        "julho":     ("01-07-{0}", "31-07-{0}"),
                        "agosto":    ("01-08-{0}", "31-08-{0}"),
                    }

                    if mes_extenso in meses:
                        inicio, fin = meses[mes_extenso]
                        rows.append(("Inicio de periodo", inicio.format(anio)))
                        rows.append(("Fin de periodo", fin.format(anio)))
                        rows.append(("Número de semana (Periodo)", "n/a"))
                        rows.append(("Tipo Periodo", "Mensual"))

                        # Obtener el número del mes
                        mes_periodo = int(meses_numericos.get(mes_extenso))
                        inicio_tabla = True

    #Eliminación de información no relevante
    rows = [row for row in rows if row[0] not in etiquetas_a_eliminar]

    #Mapeo de nombres definido al principio de la celda
    rows = [(tag_mapping.get(row[0], row[0]), row[1]) for row in rows]

    columnas = [row[0] for row in rows]  # Los atributos están en la primera posición de cada fila
    valores = [row[1] for row in rows]  # Los valores están en la segunda posición de cada fila
    
    df_spark = transformar_dataframe(columnas, valores)
    
    #Cálculo de la versión
    dif = mes_emision - mes_periodo
    
    version = 100

    if dif in [7, 8, -4, -5, -6]:
        version = 4
    elif dif in [4, 5, -8, -7]:
        version = 3
    elif dif in [2, 3, -10, -9]:
        version = 2
    elif dif in [0, 1, -11]:
        version = 1
    else:
        version = dif

    # Añadir la nueva columna al DataFrame
    df_spark = df_spark.withColumn("Versión", lit(version))
    df_spark = df_spark.withColumn("ID2", lit(codigo))

    return df_spark




In [7]:
def transformar_dataframe(columnas, valores):
    #Para asignar una fila a cada cocepto de la factura (una fila para encargo de regulacion, otra para desvío defecto y otra para desvío excesso)
    #Cada una con su valor correspondiente pero con las demás celdas duplicadas
    
    if isinstance(valores, list) and all(isinstance(v, str) for v in valores):
        valores = [valores]  # Convertir a una lista de listas

    # Crear el DataFrame inicial
    df_spark = spark.createDataFrame(valores, columnas)
    
    # Identificar columnas de Descripcion y Valor
    descripcion_cols = [c for c in columnas if "Descrição" in c]
    
    valor_cols = [c for c in columnas if "Valor" in c]

    if not len(descripcion_cols):
        raise ValueError("No se encontraron columnas de 'Descrição' en los datos proporcionados.")
    
    if not len(valor_cols):
        raise ValueError("No se encontraron columnas de 'Valor' en los datos proporcionados.")

    # Crear una lista de DataFrames con un solo valor de 'Descrição' y 'Valor' en cada uno
    df_final = None
    for desc_col, val_col in zip(descripcion_cols, valor_cols):
        # Crear un DataFrame temporal con un solo valor de 'Descrição' y 'Valor', manteniendo los demás datos duplicados
        temp_df = df_spark.select(
            *[col(c) for c in columnas if c not in descripcion_cols + valor_cols],  # Mantener las columnas que no son Descrição/Valor
            F.col(desc_col).alias("Descrição"),
            F.col(val_col).alias("Valor")
        )

        # Unir los DataFrames temporales para crear el DataFrame final
        df_final = temp_df if df_final is None else df_final.union(temp_df)

    return df_final





In [8]:

def procesar_xml_en_carpeta(s3_uri):
    bucket_name, prefix = parse_s3_uri(s3_uri)
    s3_client = boto3.client('s3')#Hacemos la conexión con s3

    # Inicializar un DataFrame vacío
    df_spark_all = None
    df_spark = None
    issue_date_dict = {}
    all_columns = set()
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
    from pyspark.sql import functions as F
    from pyspark.sql.functions import first, coalesce

    if 'Contents' not in response:
            raise ValueError("No se encontraron archivos en la carpeta especificada.")
    
    for obj in response['Contents']:
        key = obj['Key']
        if key.endswith('.xml'): #De la carpeta de s3 tomamos solo los archivos que terminen con .xml
            
            print(f": {key}")
    
            # Procesar el archivo XML
            df_spark = procesar_archivo_xml(bucket_name, key, s3_client)

            if df_spark is None:
                print(f"Advertencia: El archivo {key} no generó un DataFrame válido.")
                continue
                
            # Obtener columnas del DataFrame procesado
            all_columns.update(df_spark.columns)
            
            # Asegurar que todas las columnas están presentes en todos los archivos
            for column in all_columns:
                if column not in df_spark.columns:
                    df_spark = df_spark.withColumn(column, lit(None))
               
            # Si es el primer DataFrame, inicializarlo
            if df_spark_all is None:
                df_spark_all = df_spark
            else:
                df_spark_all = df_spark_all.unionByName(df_spark, allowMissingColumns=True)
    
    if df_spark_all is not None:
        
        df_spark_all = df_spark_all.withColumn(
        #Asignamos el nombre de tipo de documento dependiendo de los 4 primero dígitos del ID2
                "Tipo Documento",
                when(col("ID2").startswith("3102"), lit("NOTA DE CREDITO"))
                .when(col("ID2").startswith("3103"), lit("NOTA DE DEBITO"))
                .when(col("ID2").startswith("3104"), lit("FATURA"))
                .when(col("ID2").startswith("1000"), lit("AUTOFATURAÇÃO"))
                .when(col("ID2").startswith("2000"), lit("AUTOFATURAÇÃO - NOTA DE CREDITO"))
                .when(col("ID2").startswith("3000"), lit("AUTOFATURAÇÃO - NOTA DE DEBITO"))
                .otherwise(lit(None))
            )

        df_spark_all = df_spark_all.withColumn(
        #Diferenciación entre proovedor y cliente, util para 
                "Tipo Orden",
                when(
                    col("ID2").startswith("3102") |
                    col("ID2").startswith("3103") |
                    col("ID2").startswith("3104"), lit("Proveedor"))
                .when(
                    col("ID2").startswith("1000") |
                    col("ID2").startswith("2000") |
                    col("ID2").startswith("3000"), lit("Cliente"))
                .otherwise(lit(None))
            )
        
        df_spark_all = df_spark_all.withColumn(
        #Archivos que no se procesaron correctamente y hace falta indicar que son semanales. 
                "Tipo Periodo",
                when(
                    col("ID2").startswith("3104403300") |
                    col("ID2").startswith("3102403944") |
                    col("ID2").startswith("3102403945") |
                    col("ID2").startswith("3102401420") |
                    col("ID2").startswith("3104402728") |
                    col("ID2").startswith("3102402086"), lit("Semanal"))
                .otherwise(col("Tipo Periodo"))
            )
        
        #cambio de nombre de ID2 a Nº de Documento
        df_spark_all = df_spark_all.withColumnRenamed("ID2", "Nº Documento")

        # Filtrar y agregar la columna Fecha de Vencimiento solo para ciertos tipos de documento
        df_spark_all = df_spark_all.withColumn(
                "Fecha de Vencimiento",
                when(
                    (col("Tipo Documento").isin("FATURA", "NOTA DE DEBITO")) & (col("DueDate").isNotNull()),
                    col("DueDate")
                ).otherwise(lit(None))
            )
        #Únicos archivos que la qtiqueta de DueDate tiene la fecha de vencimiento correcta. 
        #Más adelante se procesará la fecha de Vencimiento de los demás archivos

        df_spark_all = df_spark_all.drop("DueDate")
        
        # Aplicar alias
        df_original = df_spark_all.alias("original")
        df_correccion = df_spark_all.alias("correccion")

        # Hacer el join
        df_joined = df_correccion.join(
            df_original,
            col("correccion.Correccion") == col("original.`Nº Documento`"),
            how="left"
        )
        
        columnas_a_sobrescribir = [
                "Tipo Periodo",
                "Inicio de periodo",
                "Fin de periodo",
                "Versión",
                "Número de semana (Periodo)"
            ]


        columnas_base = [
            col(f"correccion.`{c}`").alias(c)
            for c in df_spark_all.columns
            if c not in columnas_a_sobrescribir
        ]


        # Sobrescribir solo las columnas que deben corregirse
        df_spark_all = df_joined.select(
            *columnas_base,
            when(col("correccion.Correccion").isNotNull(), col("original.`Tipo Periodo`")).otherwise(col("correccion.`Tipo Periodo`")).alias("Tipo Periodo"),
            when(col("correccion.Correccion").isNotNull(), col("original.`Inicio de periodo`")).otherwise(col("correccion.`Inicio de periodo`")).alias("Inicio de periodo"),
            when(col("correccion.Correccion").isNotNull(), col("original.`Fin de periodo`")).otherwise(col("correccion.`Fin de periodo`")).alias("Fin de periodo"),
            when(col("correccion.Correccion").isNotNull(), col("original.`Versión`")).otherwise(col("correccion.`Versión`")).alias("Versión"),
            when(col("correccion.Correccion").isNotNull(), col("original.`Número de semana (Periodo)`")).otherwise(col("correccion.`Número de semana (Periodo)`")).alias("Número de semana (Periodo)")
        )
        

###############################################################################################
# Ajustes para facturas mal procesadas de NISA
###############################################################################################
#Tras un cambio en el formato del xml, las facturas de NISA no contienen la fecha de periodo de facturación 
#Por lo que hace falta escribirlas manualmente

       
        df_spark_all = (
            df_spark_all
            .withColumn("Tipo Periodo", when(col("Nº Documento") == "3102402840", lit("Semanal")).otherwise(col("Tipo Periodo")))
            .withColumn("Inicio de periodo", when(col("Nº Documento") == "3102402840", lit("10-06-2024")).otherwise(col("Inicio de periodo")))
            .withColumn("Fin de periodo", when(col("Nº Documento") == "3102402840", lit("16-06-2024")).otherwise(col("Fin de periodo")))
            .withColumn("Número de semana (Periodo)", when(col("Nº Documento") == "3102402840", lit("24")).otherwise(col("Número de semana (Periodo)")))
            .withColumn("Versión", when(col("Nº Documento") == "3102402840", lit("4")).otherwise(col("Versión")))
        )

        df_spark_all = (
            df_spark_all
            .withColumn("Inicio de periodo", when(col("Nº Documento") == "3102404235", lit("26-08-2024")).otherwise(col("Inicio de periodo")))
            .withColumn("Fin de periodo", when(col("Nº Documento") == "3102404235", lit("01-09-2024")).otherwise(col("Fin de periodo")))
            .withColumn("Número de semana (Periodo)", when(col("Nº Documento") == "3102404235", lit("35")).otherwise(col("Número de semana (Periodo)")))
            .withColumn("Tipo Periodo", when(col("Nº Documento") == "3102404235", lit("Semanal")).otherwise(col("Tipo Periodo")))
            .withColumn("Versión", when(col("Nº Documento") == "3102404235", lit("4")).otherwise(col("Versión")))
        )
        
        df_spark_all = (
            df_spark_all
            .withColumn("Inicio de periodo", when(col("Nº Documento") == "3102403945", lit("12-08-2024")).otherwise(col("Inicio de periodo")))
            .withColumn("Fin de periodo", when(col("Nº Documento") == "3102403945", lit("18-08-2024")).otherwise(col("Fin de periodo")))
            .withColumn("Número de semana (Periodo)", when(col("Nº Documento") == "3102403945", lit("33")).otherwise(col("Número de semana (Periodo)")))
            .withColumn("Tipo Periodo", when(col("Nº Documento") == "3102403945", lit("Semanal")).otherwise(col("Tipo Periodo")))
            .withColumn("Versión", when(col("Nº Documento") == "3102403945", lit("4")).otherwise(col("Versión")))
        )
        
        df_spark_all = (
            df_spark_all
            .withColumn("Inicio de periodo", when(col("Nº Documento") == "3102403944", lit("04-11-2024")).otherwise(col("Inicio de periodo")))
            .withColumn("Fin de periodo", when(col("Nº Documento") == "3102403944", lit("10-11-2024")).otherwise(col("Fin de periodo")))
            .withColumn("Número de semana (Periodo)", when(col("Nº Documento") == "3102403944", lit("45")).otherwise(col("Número de semana (Periodo)")))
            .withColumn("Tipo Periodo", when(col("Nº Documento") == "3102403944", lit("Semanal")).otherwise(col("Tipo Periodo")))
            .withColumn("Versión", when(col("Nº Documento") == "3102403944", lit("3")).otherwise(col("Versión")))
        )

        df_spark_all = (
            df_spark_all
            .withColumn("Valor",when((col("Nº Documento") == "3104404191") & (col("Descrição") == "Encargo Regulação"),lit("12190.18")).when(
                    (col("Nº Documento") == "3104404191") & (col("Descrição") == "Energia Desvio Defeito"),lit("3071.90")).when(
                    (col("Nº Documento") == "3104404191") & (col("Descrição") == "Energia Desvio Excesso"),lit("1670.40")
                ).otherwise(col("Valor"))))

###############################################################################################
##
###############################################################################################
         
        #Asegurar que todas las fechas de la BBDD tenga el mismo formato
        df_spark_all = df_spark_all.withColumn("Inicio de periodo", to_date(col("Inicio de periodo"), "dd-MM-yyyy"))
        df_spark_all = df_spark_all.withColumn("Fin de periodo", to_date(col("Fin de periodo"), "dd-MM-yyyy"))
        df_spark_all = df_spark_all.withColumn("Fecha de Emisión", to_date(col("Fecha de Emisión"), "yyyy-MM-dd"))
        df_spark_all = df_spark_all.withColumn("Fecha de Vencimiento", to_date(col("Fecha de Vencimiento"), "yyyy-MM-dd"))
        
        #Cálculo del número de semana de emisión y de periodo de facturación 
        df_spark_all = df_spark_all.withColumn("Número de semana (Periodo)",when(col("Número de semana (Periodo)").isNull(), F.weekofyear(col("Inicio de periodo"))).otherwise(col("Número de semana (Periodo)")))
        df_spark_all = df_spark_all.withColumn("Número de semana (Emisión)", F.lit(None))
        df_spark_all = df_spark_all.withColumn("Número de semana (Emisión)",when(col("Número de semana (Emisión)").isNull(), F.weekofyear(col("Fecha de Emisión"))).otherwise(col("Número de semana (Emisión)")))
        df_spark_all = df_spark_all.withColumn("Año de Emisión", year(col("Fecha de Emisión")))
        
        #Para asegurar que todos los conceptos tengan el mismo formato
        df_spark_all = df_spark_all.withColumn("Descrição", F.initcap(col("Descrição")))
        
###############################################################################################
# Mapeo Fecha de Vencimiento
###############################################################################################

        # 1. Crear un DataFrame que mapea "Fecha de Emisión" a la primera "Fecha de Vencimiento" no nula.
        issue_dates_df = (
            df_spark_all
            .groupBy("Fecha de Emisión")
            .agg(first("Fecha de Vencimiento", ignorenulls=True).alias("mapped_vencimiento"))
        )

        # 2. Unir el mapeo con el DataFrame original usando "Fecha de Emisión" como llave.
        df_spark_all = df_spark_all.join(issue_dates_df, on="Fecha de Emisión", how="left")

        # 3. Rellenar los valores nulos en "Fecha de Vencimiento" con el valor del mapeo.
        df_spark_all = df_spark_all.withColumn(
            "Fecha de Vencimiento",
            coalesce(F.col("Fecha de Vencimiento"), F.col("mapped_vencimiento"))
        )

        # 4.Se elimina la columna auxiliar
        df_spark_all = df_spark_all.drop("mapped_vencimiento")
        
###############################################################################################
##
###############################################################################################       

        #Se mantiene un formato único por Sociedad para evitar posibles errores en la BBDD
        df_spark_all = df_spark_all.withColumn(
                "Sociedad",
                when(
                    (col("Sociedad") == "GAS NATURAL COMERCIALIZADORA, SA"),
                    lit("GAS NATURAL COMERCIALIZADORA, S.A.")
                ).when(
                    (col("Sociedad") == "NATURGY IBERIA, SOCIEDAD ANONIMA"),
                    lit("NATURGY IBERIA, S.A.")
                ).otherwise(col("Sociedad"))
            )
        
        #Mnatener un único nombre para los 3 concpetos 
        df_spark_all = df_spark_all.withColumn(
                "Descrição",
                when(
                    (col("Descrição") == "Desvios Defeito"),
                    lit("Energia Desvio Defeito")
                ).when(
                    (col("Descrição") == "Desvios Excesso"),
                    lit("Energia Desvio Excesso")
                ).when(
                    (col("Descrição") == "Enc. Reg. Consumo"),
                    lit("Encargo Regulação")
                ).when(
                    (col("Descrição") == "Ac-desvios Excesso Energia Rede-compra"),
                    lit("Energia Desvio Excesso")
                ).otherwise(col("Descrição"))
            )
        

        # Si es la primera vez que encontramos GAS NATURAL o NATURGY, añadimos el NIF
        df_spark_all = df_spark_all.withColumn("NIF_Empresa",
                when((col("Sociedad") == "GAS NATURAL COMERCIALIZADORA, S.A."),lit("ESA61797536")
                ).when((col("Sociedad") == "NATURGY IBERIA, S.A."),lit("ESA08431090")
                ).otherwise(lit(None)))
        
        df_spark_all = df_spark_all.withColumn(
            "Tipología",
            when(
                (lower(col("Descrição")) != "tarifa social-financiadores"), lit("Energía")
            ).when(
                (lower(col("Descrição")) == "tarifa social-financiadores"), lit("Factura Social")
            ).otherwise(lit(None))
        )
        from pyspark.sql.types import FloatType

        # Convertir las columnas a tipo Float
        df_spark_all = df_spark_all.withColumn("Valor", F.col("Valor").cast(FloatType())) \
                                   .withColumn("Importe Total Nº Documento", F.col("Importe Total Nº Documento").cast(FloatType()))

        # Listado de tipos negativos
        tipos_negativos = ["NOTA DE CREDITO", "AUTOFATURAÇÃO"]

        # Aplicar cambio de signo en la columna 'Valor' y 'Importe Total Nº Documento' cuando el Tipo Documento sea de los negativos
        df_spark_all = df_spark_all.withColumn(
            "Valor", 
            F.when(F.col("Tipo Documento").isin(tipos_negativos), F.col("Valor") * -1).otherwise(F.col("Valor"))
        )
        df_spark_all = df_spark_all.withColumn(
            "Importe Total Nº Documento", 
            F.when(F.col("Tipo Documento").isin(tipos_negativos), F.col("Importe Total Nº Documento") * -1).otherwise(F.col("Importe Total Nº Documento"))
        )

        # Ordenar y reorganizar las columnas
        orden_columnas = [
                "Sociedad", "NIF_Empresa", "Emisor", "Tipología", "Tipo Documento", "Tipo Orden",
                "Nº Documento", "Versión", "Fecha de Emisión", "Fecha de Vencimiento","Número de semana (Emisión)","Año de Emisión", "Tipo Periodo",
                "Inicio de periodo", "Fin de periodo","Número de semana (Periodo)", "Descrição", "Valor", "Importe Total Nº Documento", "Correccion"
            ]
        columnas_en_orden = [col for col in orden_columnas if col in df_spark_all.columns]
        df_spark_all = df_spark_all.select(*columnas_en_orden)
        
        #Ordenar los registros por fecha de emisión
        df_spark_all = df_spark_all.orderBy(col("Fecha de Emisión").desc(), col("Nº Documento").asc())
    else:
        print("No se procesaron archivos XML.")
        
    return df_spark_all





In [9]:
# Procesar todos los XML en la carpeta
df_spark = procesar_xml_en_carpeta(s3_uri)

: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200481.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200527.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200581.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200606.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200670.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200684.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200809.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200822.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200868.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200899.xml
: teams/prevision/archivos/test/electricidad/liqportugalREN/facturas/FATURA_3104200965.xml

In [10]:
delete_table_from_s3('prevision_test_electricidad','tabla400facts')

Error al obtener la información de la tabla 'prevision_test_electricidad.tabla400facts'
Detalles del error: An error occurred (EntityNotFoundException) when calling the GetTable operation: Entity Not Found
No se puede eliminar la tabla prevision_test_electricidad.tabla400facts porque no existe.


In [11]:
ddf_spark = DynamicFrame.fromDF(df_spark, glueContext, "dynamic_frame")
 
s3output_gas = glueContext.getSink(

    path="s3://naturgy-platacom-pro/teams/prevision/databases/test/electricidad/tabla400facts/",

    connection_type="s3",

    updateBehavior="UPDATE_IN_DATABASE",

    # partitionKeys=["fecha_extraccion"],

    # compression="snappy",

    enableUpdateCatalog=True,

    transformation_ctx="s3output_gas",

)
 
s3output_gas.setCatalogInfo(

  catalogDatabase= "prevision_test_electricidad", catalogTableName="tabla400facts"

)
 
s3output_gas.setFormat("glueparquet")                                         

s3output_gas.writeFrame(ddf_spark)
 

<awsglue.dynamicframe.DynamicFrame object at 0x7f6188e65840>
