# Cuaderno de Ingesta de datos

En este bloque traeremos la información desde datos abiertos

In [0]:
# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv?$limit=100000"

# Descargar contenido
response_secop = requests.get(url_secop)
response_men = requests.get(url_men)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text))
df_men_pd = pd.read_csv(StringIO(response_men.text))

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)
df_men = spark.createDataFrame(df_men_pd)

# Mostrar en Databricks
display(df_secop)
display(df_men)


In [0]:
df_secop_pd = df_secop.toPandas()
df_men_pd = df_men.toPandas()

In [0]:
# Celda 2: Guardar los DataFrames como tablas Delta
# La función .saveAsTable() guarda los datos y registra la tabla en el Unity Catalog.
# El modo "overwrite" reemplaza la tabla si ya existe, ideal para actualizaciones.
df_secop.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.secop")
df_men.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.men_estadisticas")

print("¡Tablas guardadas exitosamente en el catálogo 'main', esquema 'diplomado_datos'!")

In [0]:
# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000&$offset=100000"

# Descargar contenido
response_secop = requests.get(url_secop)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text), delimiter=',', header=0)

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)

# Mostrar en Databricks
display(df_secop)

 

In [0]:
df_secop.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.secop")
df_men.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.men_estadisticas")

In [0]:
    from pyspark.sql.functions import col



target_schema = spark.table("main.diplomado_datos.secop").schema

df_secop_aligned = df_secop.select(
    [col(field.name).cast(field.dataType) for field in target_schema.fields]
)


df_secop_aligned.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("main.diplomado_datos.secop")

# descarga de datos en bloques

In [0]:
total_registros = 19446266
offset_inicial = 200000
limite = 100000
paginas_faltantes = ((total_registros - offset_inicial) // limite) + 1

print(f"Quedan {paginas_faltantes} bloques por descargar...")

In [0]:
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, lit

spark = SparkSession.builder.getOrCreate()

def descargar_guardar_secop(offset_inicial, limite, total_registros, tabla_destino):
    """
    Descarga y guarda bloques de datos del SECOP en una tabla Delta de Databricks.
    
    Args:
        offset_inicial (int): Offset desde el cual empezar la descarga.
        limite (int): Cantidad de filas por bloque.
        total_registros (int): Total estimado de registros a descargar.
        tabla_destino (str): Nombre completo de la tabla Delta (ej: "main.diplomado_datos.secop").
    """

    paginas_faltantes = ((total_registros - offset_inicial) // limite) + 1
    print(f"🔁 Descargando {paginas_faltantes} bloques desde offset {offset_inicial}...")

    invalid_values = ["NO DEFINIDO", "NO APLICA", "N/A", "SIN DATO", "NULL", "S/I", "", "NO_REGISTRA"]

    for i in range(paginas_faltantes):
        offset = offset_inicial + (i * limite)
        url = f"https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit={limite}&$offset={offset}"

        print(f"⬇️  Página {i+1}/{paginas_faltantes} - Offset: {offset}")
        response = requests.get(url)

        if response.status_code == 200:
            df_secop_pd = pd.read_csv(
                StringIO(response.text),
                dtype=str,
                low_memory=False
            )

            if not df_secop_pd.empty:
                df_secop = spark.createDataFrame(df_secop_pd)

                # Obtener el schema objetivo de la tabla Delta
                target_schema = spark.table(tabla_destino).schema
                df_secop_aligned = df_secop

                # Alinear columnas usando try_cast para evitar errores de tipo
                for field in target_schema.fields:
                    col_name = field.name
                    data_type = field.dataType.simpleString()  # ejemplo: "bigint", "double", "string"

                    if col_name in df_secop.columns:
                        if data_type.startswith("bigint") or data_type.startswith("int"):
                            df_secop_aligned = df_secop_aligned.withColumn(
                                col_name,
                                expr(f"try_cast({col_name} AS bigint)")
                            )
                        elif data_type.startswith("double") or data_type.startswith("float") or data_type.startswith("decimal"):
                            df_secop_aligned = df_secop_aligned.withColumn(
                                col_name,
                                expr(f"try_cast({col_name} AS double)")
                            )
                        else:
                            df_secop_aligned = df_secop_aligned.withColumn(
                                col_name,
                                col(col_name).cast(field.dataType)
                            )
                    else:
                        df_secop_aligned = df_secop_aligned.withColumn(
                            col_name, lit(None).cast(field.dataType)
                        )

                # Escribir en la Delta Table
                df_secop_aligned.write.format("delta") \
                    .mode("append") \
                    .option("mergeSchema", "true") \
                    .saveAsTable(tabla_destino)

                print(f"✅ Página {i+1} almacenada con {df_secop_pd.shape[0]} filas")
            else:
                print(f"⚠️ Página {i+1} llegó vacía. Terminando descarga.")
                break
        else:
            print(f"❌ Error HTTP {response.status_code} en página {i+1}.")
            break


In [0]:
# Parámetros de entrada
offset_inicial = 200000  # O el último offset que cargaste
limite = 100000
total_registros = 19446266
tabla_destino = "main.diplomado_datos.secop"

# Ejecutar función
descargar_guardar_secop(offset_inicial, limite, total_registros, tabla_destino)
