# Notebook Data Ingestion

In this section, we will retrieve data from "Datos abiertos".

#### Initial data load

In [0]:

# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# Iniciar Spark
spark = SparkSession.builder.getOrCreate()

# URLs para los primeros 100,000 registros
url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv?$limit=100000"
 
# Descargar contenido
response_secop = requests.get(url_secop)
response_men = requests.get(url_men)

# Leer como pandas y forzar todo a string (para evitar errores de tipado)
df_secop_pd = pd.read_csv(StringIO(response_secop.text)).astype(str)
df_men_pd = pd.read_csv(StringIO(response_men.text)).astype(str)

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)
df_men = spark.createDataFrame(df_men_pd)

# Mostrar en Databricks
display(df_secop)
display(df_men)

In [0]:
df_secop.count()


In [0]:
%sql 
-- CREATE CATALOG main;
-- CREATE SCHEMA IF NOT EXISTS main.diplomado_datos;

In [0]:
spark.sql("USE CATALOG main")

In [0]:
from pyspark.sql.functions import col, when

# 1. Define las columnas numéricas que tienes en secop según tu esquema
columnas_numericas = ["codigo_entidad_en_secop", "valor_contrato"]

# 2. Mostrar valores no numéricos en cada columna numérica (solo para secop)
for columna in columnas_numericas:
    print(f"Valores no numéricos en columna '{columna}':")
    df_secop.filter(~col(columna).rlike("^[0-9]+$")).select(columna).distinct().show(truncate=False)

# 3. Función safe_cast para casteo seguro de todo el dataframe según esquema destino
def safe_cast(df, target_schema):
    df_casted = df
    for field in target_schema.fields:
        name = field.name
        dtype = field.dataType
        if dtype.simpleString() in ['int', 'bigint', 'double', 'float', 'long']:
            df_casted = df_casted.withColumn(
                name,
                when(col(name).rlike("^[0-9]+$"), col(name).cast(dtype)).otherwise(None)
            )
        else:
            df_casted = df_casted.withColumn(name, col(name).cast(dtype))
    return df_casted

# 4. Obtener esquemas separados para cada tabla
target_schema_secop = spark.table("main.diplomado_datos.secop").schema
target_schema_men = spark.table("main.diplomado_datos.men_estadisticas").schema

# 5. Aplicar safe_cast a cada DataFrame con su esquema correspondiente
df_secop_aligned = safe_cast(df_secop, target_schema_secop)
df_men_aligned = safe_cast(df_men, target_schema_men)

print("Validación con casteo seguro completados.")


In [0]:
# 6. Guardar las tablas con overwrite
df_secop_aligned.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("main.diplomado_datos.secop")

df_men_aligned.write.format("delta").mode("overwrite").option("mergeSchema", "true").saveAsTable("main.diplomado_datos.men_estadisticas")

print("¡Tablas guardadas exitosamente en el catálogo 'main', esquema 'diplomado_datos'!")

#### Complete records download

In [0]:
import requests

# Consultar cuántos registros hay actualmente en el dataset SECOP
count_url = "https://www.datos.gov.co/resource/rpmr-utcd.json?$select=count(*)"
response = requests.get(count_url)

if response.status_code == 200:
    total_records = int(response.json()[0]['count'])
    print(f"Total de registros detectados: {total_records}")
else:
    print("No se pudo obtener el total de registros. Usando valor por defecto.")

total_records = 19446266  # Valor fijo como respaldo
    

In [0]:
## AJUSTES PARA EJECUCION MANUAL DESDE DONDE FALLA EL JOB

# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when

# URLs para los primeros 100,000 registros
url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv?$limit=100000"

offset = 16800000
total_records = 19446266  # Valor fijo como respaldo

In [0]:
import time
from pyspark.sql.functions import col, when

limit = 100000
# offset = 100000

## print(f"Iniciando la carga desde offset {offset}")

def safe_cast(df, target_schema):
    df_casted = df
    for field in target_schema.fields:
        name = field.name
        dtype = field.dataType
        if dtype.simpleString() in ['int', 'bigint', 'double', 'float', 'long']:
            df_casted = df_casted.withColumn(
                name,
                when(col(name).rlike("^[0-9]+$"), col(name).cast(dtype)).otherwise(None)
            )
        else:
            df_casted = df_casted.withColumn(name, col(name).cast(dtype))
    return df_casted

start_time = time.time()  # Tiempo inicio

while offset < total_records:
    print(f"Descargando registros desde {offset} hasta {offset + limit}...")

    url_secop = f"https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit={limit}&$offset={offset}"
    response_secop = requests.get(url_secop)

    df_secop_pd = pd.read_csv(
        StringIO(response_secop.text),
        delimiter=',',
        header=0,
        dtype=str,
        low_memory=False
    )
    
    if df_secop_pd.empty:
        print("No hay más datos para descargar.")
        break

    df_secop_spark = spark.createDataFrame(df_secop_pd.astype(str))

    target_schema = spark.table("main.diplomado_datos.secop").schema

    df_secop_aligned = safe_cast(df_secop_spark, target_schema)

    df_secop_aligned.write.format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .saveAsTable("main.diplomado_datos.secop")

    print(f"Datos del offset {offset} guardados.")
    offset += limit

end_time = time.time()  # Tiempo fin

total_seconds = end_time - start_time
print(f"Carga completa de SECOP en {total_seconds:.2f} segundos.")

# Opcional: formato legible horas, minutos, segundos
hours = int(total_seconds // 3600)
minutes = int((total_seconds % 3600) // 60)
seconds = int(total_seconds % 60)

print(f"Tiempo total: {hours}h {minutes}m {seconds}s")


#### Data Exploration

In [0]:
df_secop = spark.table("main.diplomado_datos.secop")
df_secop.printSchema()

In [0]:
df_secop.select("nivel_entidad").distinct().orderBy("nivel_entidad").show(100, truncate=False)

In [0]:
df_secop.select("departamento_entidad").distinct().orderBy("departamento_entidad").show(100, truncate=False)

In [0]:
df_secop.select("tipo_documento_proveedor").distinct().orderBy("tipo_documento_proveedor").show(100, truncate=False)

In [0]:
df_secop.select("modalidad_de_contrataci_n").distinct().orderBy("modalidad_de_contrataci_n").show(100, truncate=False)

In [0]:
df_secop.select("origen").distinct().orderBy("origen").show(100, truncate=False)

#### Data cleaning

In [0]:
# User-Defined Function

from pyspark.sql.functions import udf, trim
from pyspark.sql.types import StringType

# UDF para capitalizar cada palabra y conservar tildes
def capitalizar(texto):
    if texto is None:
        return None
    return ' '.join(word.capitalize() for word in texto.strip().split())

capitalizar_udf = udf(capitalizar, StringType())

In [0]:
columnas_a_limpieza = [
    "nivel_entidad",
    "nombre_de_la_entidad",
    "nit_de_la_entidad",
    "departamento_entidad",
    "municipio_entidad",
    "estado_del_proceso",
    "modalidad_de_contrataci_n",
    "tipo_de_contrato",
    "nom_raz_social_contratista",
    "tipo_documento_proveedor"
]


In [0]:
for col_name in columnas_a_limpieza:
    df_secop = df_secop.withColumn(
        col_name,
        capitalizar_udf(trim(df_secop[col_name]))
    )

In [0]:
df_secop.select("municipio_entidad").distinct().orderBy("municipio_entidad").show(100, truncate=False)


#### Manual adjustments

In [0]:
from pyspark.sql.functions import when, col

df_secop = df_secop.withColumn(
    "departamento_entidad",
    when(col("departamento_entidad") == "Distrito Capital De Bogotá", "Bogotá D.C.")
    .otherwise(col("departamento_entidad"))
)

In [0]:
df_secop = df_secop.withColumn(
    "tipo_de_contrato",
    when(col("tipo_de_contrato") == "Suministros", "Suministro")
    .otherwise(col("tipo_de_contrato"))
)