# ETL SIG

In [1]:
%pip install dbfread2
%pip install psycopg2

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.3.1 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [3]:
from dbfread2 import DBF
import psycopg2
import psycopg2.extras
import time
import csv
from io import StringIO
from datetime import datetime
from typing import Iterable, List, Dict, Callable, Optional, Any

## 1. ACTUALIZACION DE LAS TABLAS DEL SIAF

### Conexión a la base de datos

In [4]:
# Connect to the PostgreSQL database
conn = psycopg2.connect(
    dbname="bytsscom_unmsm",
    user="postgres",
    password="postgres",
    host="localhost",
    port="5432"
)
conn.autocommit = True
cur = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)

### Leemos el backup para actualizar las tablas

In [5]:
certificado = DBF('DATA/certificado.dbf', record_factory=dict, encoding='cp1252',char_decode_errors='replace')
certificado_fase = DBF('DATA/certificado_fase.dbf', record_factory=dict, encoding='cp1252',char_decode_errors='replace')
certificado_secuencia = DBF('DATA/certificado_secuencia.dbf', record_factory=dict, encoding='cp1252',char_decode_errors='replace')
certificado_meta = DBF('DATA/certificado_meta.dbf', record_factory=dict, encoding='cp1252',char_decode_errors='replace')
expediente_fase = DBF('DATA/expediente_fase.dbf', record_factory=dict, encoding='cp1252',char_decode_errors='replace')

In [9]:
for row in certificado:
    if row['ANO_EJE'] == '2025':
        print(row)

{'ANO_EJE': '2025', 'SEC_EJEC': '000088', 'CERTIFICAD': '0000000001', 'TIPO_CERTI': '2', 'ESTADO_REG': 'A', 'COD_ERROR': '00', 'COD_MENSA': '0000', 'ESTADO_ENV': 'T', 'TIPO_OPERA': ''}
{'ANO_EJE': '2025', 'SEC_EJEC': '000088', 'CERTIFICAD': '0000000002', 'TIPO_CERTI': '2', 'ESTADO_REG': 'A', 'COD_ERROR': '00', 'COD_MENSA': '0000', 'ESTADO_ENV': 'T', 'TIPO_OPERA': ''}
{'ANO_EJE': '2025', 'SEC_EJEC': '000088', 'CERTIFICAD': '0000000003', 'TIPO_CERTI': '2', 'ESTADO_REG': 'A', 'COD_ERROR': '00', 'COD_MENSA': '0000', 'ESTADO_ENV': 'T', 'TIPO_OPERA': ''}
{'ANO_EJE': '2025', 'SEC_EJEC': '000088', 'CERTIFICAD': '0000000004', 'TIPO_CERTI': '2', 'ESTADO_REG': 'A', 'COD_ERROR': '00', 'COD_MENSA': '0000', 'ESTADO_ENV': 'T', 'TIPO_OPERA': ''}
{'ANO_EJE': '2025', 'SEC_EJEC': '000088', 'CERTIFICAD': '0000000005', 'TIPO_CERTI': '2', 'ESTADO_REG': 'A', 'COD_ERROR': '00', 'COD_MENSA': '0000', 'ESTADO_ENV': 'T', 'TIPO_OPERA': ''}
{'ANO_EJE': '2025', 'SEC_EJEC': '000088', 'CERTIFICAD': '0000000006', 'TIPO

### Creacion de funciones para migrar datos

In [6]:
from typing import List, Dict, Any, Callable, Union

def simple_filter(fields: Dict[str, Union[List[Any], None]]) -> Callable[[Dict[str, Any]], bool]:
    """
    Genera un filter_fn que:
      - Rechaza si rec[field] is None
      - Si valid_values es lista, rechaza si rec[field] not in valid_values
    fields: { campo_sql: None | [val1, val2, ...] }
    """
    def filter_fn(rec: Dict[str, Any]) -> bool:
        for field, valid_values in fields.items():
            # 1) Obtener el valor sin confundir falsy con None
            if field in rec:
                value = rec[field]
            elif field.lower() in rec:
                value = rec[field.lower()]
            elif field.upper() in rec:
                value = rec[field.upper()]
            else:
                # si ni siquiera existe la clave, filtramos
                print('no existe: ',field)
                return False

            # 2) Filtrar nulos reales
            if value is None:
                return False

            # 3) Si hay lista de valores, comprobar pertenencia
            if valid_values is not None and value not in valid_values:
                return False

        return True

    return filter_fn



In [7]:
def bulk_load_csv(
    cur,
    table_name: str,
    columns: List[str],
    records: Iterable[Dict[str, Any]],
    field_map: Optional[Dict[str, str]] = None,
    filter_fn: Optional[Callable[[Dict[str, Any]], bool]] = None,
    truncate: bool = False
) -> None:
    """
    Carga datos masivamente a una tabla PostgreSQL usando COPY.

    Parámetros:
    - cur: cursor de psycopg2 (o similar).
    - table_name: tabla destino con esquema (e.g. 'esquema.tabla').
    - columns: lista de nombres de columnas en el orden deseado.
    - records: iterable de diccionarios donde las claves son los nombres de campos de entrada.
    - field_map: diccionario opcional para mapear columnas destino a claves de los registros
                 (e.g. {'columna_destino': 'campo_entrada'}).
    - filter_fn: función opcional que recibe un registro y devuelve True si debe incluirse.
    - truncate: si True, vacía la tabla y deshabilita triggers antes de cargar; luego los habilita.
    """
    # 1) Truncate + deshabilitar triggers (si aplica)
    start_time = time.time()
    if truncate:
        cur.execute(f"TRUNCATE TABLE {table_name};")
        cur.execute(f"ALTER TABLE {table_name} DISABLE TRIGGER ALL;")

    # 2) Preparar buffer CSV
    buffer = StringIO()
    writer = csv.writer(buffer, lineterminator='\n')
    writer.writerow(columns)
    count = 0

    # 3) Escribir filas aplicando filtro y mapeo
    for rec in records:
        if filter_fn and not filter_fn(rec):
            continue
        row = []
        for col in columns:
            # Si hay mapeo para esta columna, usar esa clave en el registro
            source_field = field_map[col] if field_map and col in field_map else col
            row.append(rec.get(source_field))
        writer.writerow(row)
        count += 1
    buffer.seek(0)

    # 4) Ejecutar COPY a PostgreSQL
    cols_sql = ', '.join(columns)
    copy_sql = (
        f"COPY {table_name} ({cols_sql}) "
        "FROM STDIN WITH (FORMAT csv, HEADER true);"
    )
    cur.copy_expert(copy_sql, buffer)

    # 5) Rehabilitar triggers (si antes los deshabilitamos)
    if truncate:
        cur.execute(f"ALTER TABLE {table_name} ENABLE TRIGGER ALL;")

    elapsed = time.time()- start_time 

    print(f"Carga masiva completada en {table_name}\n{count} registros migrados en {round(elapsed,2)} segundos.")


### Migracion de las tablas

In [8]:
bulk_load_csv(
    cur=cur,
    table_name='bytsscom_bytsiaf.certificado',
    columns=[
        'ANO_EJE',
        'CERTIFICADO',
        'SEC_EJEC',
        'TIPO_CERTIFICADO',
        'ESTADO_REGISTRO',
        'COD_ERROR',
        'COD_MENSA',
        'ESTADO_ENVIO'
    ],
    field_map = {
        'CERTIFICADO':'CERTIFICAD',
        'TIPO_CERTIFICADO':'TIPO_CERTI',
        'ESTADO_REGISTRO':'ESTADO_REG',
        'ESTADO_ENVIO':'ESTADO_ENV'
    },
    records=certificado,
    truncate=True,
    filter_fn= simple_filter({
        'CERTIFICAD': None,
        'ANO_EJE': ['2024', '2025'],
        'ESTADO_REG':['A'],
        'TIPO_CERTI':['2']
    })
)

Carga masiva completada en bytsscom_bytsiaf.certificado
32300 registros migrados en 1.36 segundos.


In [9]:
bulk_load_csv(
    cur=cur,
    table_name='bytsscom_bytsiaf.certificado_fase',
    columns=[
        'ANO_EJE',
        'SEC_EJEC',
        'CERTIFICADO',
        'SECUENCIA',
        'SECUENCIA_PADRE',
        'FUENTE_FINANC',
        'ETAPA',
        'TIPO_ID',
        'RUC',
        'ES_COMPROMISO',
        'MONTO',
        'MONTO_COMPROMETIDO',
        'MONTO_NACIONAL',
        'GLOSA',
        'ESTADO_REGISTRO',
        'COD_ERROR',
        'COD_MENSA',
        'ESTADO_ENVIO',
        'SALDO_NACIONAL',
        'IND_ANULACION',
        'TIPO_FINANCIAMIENTO',
        'TIPO_OPERACION',
        'SEC_AREA'
    ],
    field_map = {
        'CERTIFICADO':'CERTIFICAD',
        'SECUENCIA_PADRE':'SECUENCIA_',
        'FUENTE_FINANC':'FUENTE_FIN',
        'ES_COMPROMISO':'ES_COMPROM',
        'MONTO_COMPROMETIDO':'MONTO_COMP',
        'MONTO_NACIONAL': 'MONTO_NACI',
        'ESTADO_REGISTRO':'ESTADO_REG',
        'ESTADO_ENVIO':'ESTADO_ENV',
        'SALDO_NACIONAL':'SALDO_NACI',
        'IND_ANULACION': 'IND_ANULAC',
        'TIPO_FINANCIAMIENTO': 'TIPO_FINAN',
        'TIPO_OPERACION': 'TIPO_OPERA',

    },
    records=certificado_fase,
    truncate=True,
    filter_fn= simple_filter({
        'CERTIFICAD': None,
        'ANO_EJE': ['2024', '2025']
    })
)

Carga masiva completada en bytsscom_bytsiaf.certificado_fase
77947 registros migrados en 14.16 segundos.


In [10]:

bulk_load_csv(
    cur=cur,
    table_name='bytsscom_bytsiaf.certificado_secuencia',
    columns= [
        'ANO_EJE',
        'SEC_EJEC',
        'CERTIFICADO',
        'SECUENCIA',
        'CORRELATIVO',
        'COD_DOC',
        'NUM_DOC',
        'FECHA_DOC',
        'ESTADO_REGISTRO',
        'ESTADO_ENVIO',
        'IND_CERTIFICACION',
        'ESTADO_REGISTRO2',
        'ESTADO_ENVIO2',
        'MONTO',
        'MONTO_COMPROMETIDO',
        'MONTO_NACIONAL',
        'MONEDA',
        'TIPO_CAMBIO',
        'COD_ERROR',
        'COD_MENSA',
        'TIPO_REGISTRO',
        'FECHA_BD_ORACLE',
        'ESTADO_CTB',
        'SECUENCIA_SOLICITUD',
        'FECHA_CREACION_CLT',
        'FECHA_MODIFICACION_CLT',
        'FLG_INTERFASE'
    ],
    records=certificado_secuencia,
    field_map = {
        'CERTIFICADO':'CERTIFICAD',
        'CORRELATIVO':'CORRELATIV',
        'MONTO_COMPROMETIDO':'MONTO_COMP',
        'MONTO_NACIONAL': 'MONTO_NACI',
        'ESTADO_REGISTRO':'ESTADO_REG',
        'ESTADO_ENVIO':'ESTADO_ENV',
        'IND_CERTIFICACION': 'IND_CERTIF',
        'TIPO_CAMBIO':'TIPO_CAMBI',
        'TIPO_REGISTRO':'TIPO_REGIS',
        'FECHA_BD_ORACLE':'FECHA_BD_O',
        'FECHA_CREACION_CLT': 'FECHA_CREA',
        'FECHA_MODIFICACION_CLT':'FECHA_MODI'

    },
    truncate=True,
    filter_fn= simple_filter({
        'CERTIFICAD': None,
        'ANO_EJE': ['2024', '2025']
    })
)

Carga masiva completada en bytsscom_bytsiaf.certificado_secuencia
84447 registros migrados en 15.72 segundos.


In [11]:
bulk_load_csv(
    cur=cur,
    table_name='bytsscom_bytsiaf.certificado_meta',
    columns=[
        'ANO_EJE',
        'SEC_EJEC',
        'CERTIFICADO',
        'SECUENCIA',
        'CORRELATIVO',
        'ID_CLASIFICADOR',
        'SEC_FUNC',
        'MONTO',
        'MONTO_COMPROMETIDO',
        'MONTO_NACIONAL',
        'ESTADO_REGISTRO',
        'COD_ERROR',
        'COD_MENSA',
        'ESTADO_ENVIO',
        'MONTO_NACIONAL_AJUSTE',
        'SYS_COD_CLASIF',
        'SYS_ID_CLASIFICADOR'
    ],
    records=certificado_meta,
    truncate=True,
    field_map = {
        'CERTIFICADO':'CERTIFICAD',
        'CORRELATIVO':'CORRELATIV',
        'MONTO_COMPROMETIDO':'MONTO_COMP',
        'MONTO_NACIONAL': 'MONTO_NACI',
        'ESTADO_REGISTRO':'ESTADO_REG',
        'ESTADO_ENVIO':'ESTADO_ENV',
        'ID_CLASIFICADOR':'ID_CLASIFI',
        'MONTO_NACIONAL_AJUSTE': 'MONTO_NAC2'
    },
    filter_fn= simple_filter({
        'CERTIFICAD': None,
        'ANO_EJE': ['2024', '2025']
    })
)

Carga masiva completada en bytsscom_bytsiaf.certificado_meta
106557 registros migrados en 13.56 segundos.


In [12]:
bulk_load_csv(
    cur=cur,
    table_name='bytsscom_bytsiaf.expediente_fase',
    columns = [
        'ANO_EJE',
        'SEC_EJEC',
        'EXPEDIENTE',
        'CICLO',
        'FASE',
        'SECUENCIA',
        'SECUENCIA_PADRE',
        'SECUENCIA_ANTERIOR',
        'MES_CTB',
        'MONTO_NACIONAL',
        'MONTO_SALDO',
        'ORIGEN',
        'FUENTE_FINANC',
        'MEJOR_FECHA',
        'TIPO_ID',
        'RUC',
        'TIPO_PAGO',
        'TIPO_RECURSO',
        'TIPO_COMPROMISO',
        'ORGANISMO',
        'PROYECTO',
        'ESTADO',
        'ESTADO_ENVIO',
        'ARCHIVO',
        'TIPO_GIRO',
        'TIPO_FINANCIAMIENTO',
        'COD_DOC_REF',
        'FECHA_DOC_REF',
        'NUM_DOC_REF',
        'CERTIFICADO',
        'CERTIFICADO_SECUENCIA',
        'SEC_EJEC_RUC'
    ],
    records=expediente_fase,
    truncate=True,
    field_map =  {
        'SECUENCIA_PADRE':'SECUENCIA2',
        'SECUENCIA_ANTERIOR':'SECUENCIA_',
        'MONTO_NACIONAL':'MONTO_NACI',
        'MONTO_SALDO':'MONTO_SALD',
        'FUENTE_FINANC':'FUENTE_FIN',
        'MEJOR_FECHA':'MEJOR_FECH',
        'TIPO_RECURSO':'TIPO_RECUR',
        'TIPO_COMPROMISO':'TIPO_COMPR',
        'TIPO_FINANCIAMIENTO':'TIPO_FINAN',
        'COD_DOC_REF':'COD_DOC_RE',
        'FECHA_DOC_REF':'FECHA_DOC_',
        'NUM_DOC_REF':'NUM_DOC_RE',
        'CERTIFICADO':'CERTIFICAD',
        'CERTIFICADO_SECUENCIA':'CERTIFICA2',
        'SEC_EJEC_RUC':'SEC_EJEC_R'
    },
    filter_fn= simple_filter({
        'CERTIFICAD': None,
        'ANO_EJE': ['2024', '2025']
    })
)

Carga masiva completada en bytsscom_bytsiaf.expediente_fase
300688 registros migrados en 77.75 segundos.


## 2. Procesamiento de la información

In [18]:
import io

In [27]:
def list_to_tuple_string(data):
    # Escapar comas en texto usando comillas dobles
    formatted = [f'"{item}"' if ',' in item else item for item in data]
    # Unir elementos con comas y espacios
    return f"{','.join(formatted)}"

In [38]:
def add_info_table(sql_query:str,name_table_target:str,cursor,columns_target:list):
    cursor.execute(sql_query)
    cur.execute(f"TRUNCATE TABLE {name_table_target};")
    cur.execute(f"ALTER TABLE {name_table_target} DISABLE TRIGGER ALL;")
    cur.execute(sql_query)
    data = cursor.fetchall()
        # Construir el string de valores para el comando COPY
    values = "\n".join(
        [list_to_tuple_string([str(item) for item in row]) for row in data]
    )

    # Comando SQL COPY usando datos generados
    copy_sql = f"""
        COPY {name_table_target} {f"({', '.join(columns_target)})"}
        FROM stdin
        DELIMITER ','
        CSV;
    """
    
    # Ejecutar el comando COPY
    cursor.copy_expert(copy_sql, io.StringIO(values))
    print(f"Datos copiados a la tabla {name_table_target}.")

In [39]:
query_dim_area = """
    select
        id_area,
        area_name,
        cod_siaf_area,
        area_display_name,
        case id_parent_area when 10468 then 0
            else 1 end as is_central
    from bytsscom_bytcore.area
    where length(area_name) = 3 and cod_siaf_area not in ('0000', '0001')
    union
    select
        id_area,
        area_name,
        cod_siaf_area,
        area_display_name,
        case id_parent_area when 10468 then 0
            else 1 end as is_central
    from bytsscom_bytcore.area
    where id_parent_area = 10030 and length(area_name) = 5
    and id_area in (10383, 10390, 10395, 10385)
    union
    select
        0 as id_area,
        '000' as area_name,
        '0000' as cod_siaf_area,
        'UNMSM' as area_display_name
        , 2 as is_central
    union
    select
        11327 as id_area,
        'D65' as area_name,
        '0001' as cod_siaf_area,
        'ADMINISTRACION CENTRAL' as area_display_name
        , 2 as is_central;

    """

In [41]:
add_info_table(query_dim_area,'sistema_informacion_gerencial.dm_area',cur,['id_area','area_name','cod_siaf_area','area_display_name','is_central'])

Datos copiados a la tabla sistema_informacion_gerencial.dm_area.
