# Cuaderno de ingesta de datos

En este bloque traeremos desde datos abiertos.

In [0]:
# Paso 1: Descargar los datos y leerlos en pandas, luego convertir a Spark

import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

# Iniciar sesión Spark
spark = SparkSession.builder.getOrCreate()

# URLs de los datasets
url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv?$limit=100000"

# Función para descargar y leer CSV desde la web
def descargar_csv(url):
    try:
        response = requests.get(url, timeout=10)  # Evita bloqueo por espera larga
        response.raise_for_status()  # Lanza error si el código de estado no es 200
        return pd.read_csv(StringIO(response.text))
    except requests.exceptions.RequestException as e:
        print(f"Error al descargar los datos desde {url}:\n{e}")
        return pd.DataFrame()  # Retorna un DataFrame vacío si falla

# Descargar y leer en pandas
df_secop_pd = descargar_csv(url_secop)
df_men_pd = descargar_csv(url_men)

# Verificar si los DataFrames no están vacíos antes de convertir
if not df_secop_pd.empty and not df_men_pd.empty:
    # Convertir a DataFrame Spark
    df_secop = spark.createDataFrame(df_secop_pd)
    df_men = spark.createDataFrame(df_men_pd)

    # Mostrar en Databricks
    display(df_secop)
    display(df_men)
else:
    print("Alguno de los DataFrames está vacío. Verifica la conexión o URLs.")

In [0]:
# Celda 1: Leer datos desde los archivos CSV que subiste a Volumes

# Rutas locales dentro del entorno Databricks (Volumes)
url_secop = "/Volumes/main/diplomado_datos/manual/df_secop.csv"
url_men = "/Volumes/main/diplomado_datos/manual/df_men.csv"

# Leer los archivos usando Spark
# "header" indica que los nombres de las columnas están en la primera fila
# "inferSchema" permite que Spark adivine automáticamente los tipos de datos
df_secop = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(url_secop)
df_men = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(url_men)

# Mostrar los primeros registros en Databricks usando .show()
print("Datos del SECOP cargados:")
df_secop.show()

print("Datos del MEN cargados:")
df_men.show()

In [0]:
display(df_men.limit(10))
display(df_secop.limit(10))

In [0]:
df_secop.count()
df_men.count()

In [0]:
# Celda 2: Guardar los DataFrames como tablas Delta

# La función .saveAsTable() guarda los datos y registra la tabla en el Unity Catalog.
# El modo "overwrite" reemplaza la tabla si ya existe, ideal para actualizaciones.

df_secop.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.secop")
df_men.write.format("delta").mode("overwrite").saveAsTable("main.diplomado_datos.men_estadisticas")

print("¡Tablas guardadas exitosamente en el catálogo 'main', esquema 'diplomado_datos'!")

In [0]:
# Importar librerías necesarias
import pandas as pd

# Ruta local del archivo
file_path = "/Volumes/main/diplomado_datos/manual/df_secop.csv"

# Leer el archivo CSV con pandas usando coma como delimitador
df_secop_pd = pd.read_csv(file_path, delimiter=',', header=0, on_bad_lines='skip')
print(df_secop_pd.columns.tolist())

# 🔧 Limpieza de texto y filas incompletas
for col in ["objeto_a_contratar", "objeto_del_proceso"]:
    df_secop_pd[col] = df_secop_pd[col].astype(str).str.replace("\n", " ").str.replace('"', "").str.strip()

df_secop_pd = df_secop_pd.dropna(thresh=10)  # Elimina filas con menos de 10 columnas válidas

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)

# Mostrar los datos limpios
display(df_secop)

## DataSets

In [0]:
from pyspark.sql.functions import col

# 🧠 Obtener el esquema de la tabla destino
target_schema = spark.table("main.diplomado_datos.secop").schema

# 🔄 Alinear cada columna del DataFrame con el tipo de dato de la tabla
df_secop_aligned = df_secop.select(
    [col(field.name).cast(field.dataType) for field in target_schema.fields]
)

# 📝 Escribir en la tabla Delta usando append y mergeSchema
df_secop_aligned.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("main.diplomado_datos.secop")

print("✅ Datos añadidos correctamente a la tabla 'main.diplomado_datos.secop'")

In [0]:
import pandas as pd
import os

# 📂 Ruta basada en tu ubicación actual en Databricks
ruta_base = "/Volumes/main/diplomado_datos/manual"

# 📌 Buscar todos los archivos que comienzan con 'secop_lote_' y terminan en '.csv'
archivos_lotes = [f for f in os.listdir(ruta_base) if f.startswith("secop_lote_") and f.endswith(".csv")]

# 🔢 Ordenar los archivos por número de lote (usando el número en el nombre)
archivos_lotes.sort(key=lambda x: int(x.split("_")[-1].split(".")[0]))

# 🧩 Combinar todos los archivos en un solo DataFrame
df_secop = pd.concat([
    pd.read_csv(os.path.join(ruta_base, archivo), low_memory=False)
    for archivo in archivos_lotes
], ignore_index=True)

# 📊 Verificación rápida
print(f"✅ Total de registros combinados: {df_secop.shape[0]}")
print(f"🔬 Columnas disponibles: {df_secop.columns.tolist()}")

# 🧼 (Opcional) Validar duplicados y nulos
print(f"🔍 Duplicados: {df_secop.duplicated().sum()}")
print("🧹 Nulos por columna:")
print(df_secop.isnull().sum())

# 💾 Guardar el archivo combinado en la misma ruta
df_secop.to_csv(os.path.join(ruta_base, "secop_completo.csv"), index=False)
print("📦 Archivo combinado guardado como 'secop_completo.csv'")

In [0]:
df_raw = spark.read.csv(
    path="/Volumes/main/diplomado_datos/manual/secop_lote_*.csv",
    header=True,
    sep=",",
    quote='"',
    escape='"',
    multiLine=True,
    encoding="UTF-8",
    inferSchema=False
)

# Verificar número de columnas
print(len(df_raw.columns))
print(df_raw.columns)


In [0]:
from pyspark.sql.functions import col

df_limpio = df_raw.dropna(subset=[
    "valor_contrato",
    "fecha_de_firma_del_contrato",
    "fecha_inicio_ejecuci_n",
    "fecha_fin_ejecuci_n",
    "documento_proveedor"
])


In [0]:
from pyspark.sql.functions import to_timestamp, regexp_replace

df_limpio = df_limpio.withColumn("valor_contrato", regexp_replace("valor_contrato", "[^0-9]", "").cast("long"))

df_limpio = df_limpio.withColumn("fecha_de_firma_del_contrato", to_timestamp("fecha_de_firma_del_contrato"))
df_limpio = df_limpio.withColumn("fecha_inicio_ejecuci_n", to_timestamp("fecha_inicio_ejecuci_n"))
df_limpio = df_limpio.withColumn("fecha_fin_ejecuci_n", to_timestamp("fecha_fin_ejecuci_n"))


In [0]:
from pyspark.sql.functions import col, to_timestamp

# Filtrar solo las filas con fechas válidas en el formato YYYY-MM-DDTHH:MM:SS
df_limpio = df_raw.filter(
    col("fecha_de_firma_del_contrato").rlike(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
)

# Convertir esa columna a timestamp
df_limpio = df_limpio.withColumn(
    "fecha_de_firma_del_contrato", to_timestamp("fecha_de_firma_del_contrato")
)

In [0]:
from pyspark.sql.functions import col, to_timestamp

# Primero, filtrar solo las filas con fechas válidas en las 3 columnas
df_fechas_validas = df_raw.filter(
    col("fecha_de_firma_del_contrato").rlike(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}") &
    col("fecha_inicio_ejecuci_n").rlike(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}") &
    col("fecha_fin_ejecuci_n").rlike(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}")
)

# Luego convertirlas a timestamp
df_limpio = (
    df_fechas_validas
    .withColumn("fecha_de_firma_del_contrato", to_timestamp("fecha_de_firma_del_contrato"))
    .withColumn("fecha_inicio_ejecuci_n", to_timestamp("fecha_inicio_ejecuci_n"))
    .withColumn("fecha_fin_ejecuci_n", to_timestamp("fecha_fin_ejecuci_n"))
)


In [0]:
from pyspark.sql.functions import to_timestamp
df_limpio = df_limpio.withColumn(
    "fecha_de_firma_del_contrato",
    to_timestamp("fecha_de_firma_del_contrato")
)
df_limpio = df_limpio.withColumn(
    "fecha_inicio_ejecuci_n",
    to_timestamp("fecha_inicio_ejecuci_n")
).withColumn(
    "fecha_fin_ejecuci_n",
    to_timestamp("fecha_fin_ejecuci_n")
)

In [0]:
df_limpio.select(
    "fecha_de_firma_del_contrato", 
    "fecha_inicio_ejecuci_n", 
    "fecha_fin_ejecuci_n"
).show(20, truncate=False)


In [0]:
from pyspark.sql.functions import col, count, when, isnan, trim

columnas_clave = [
    "valor_contrato",
    "nombre_de_la_entidad",
    "numero_de_proceso",
    "tipo_documento_proveedor",
    "documento_proveedor"
]

df_limpio.select([
    count(
        when(
            col(c).isNull() | (trim(col(c)) == ""),
            c
        )
    ).alias(f"{c}_vacios")
    for c in columnas_clave
]).show()
