# Cuaderno de ingesta de datos

En este bloque traeremos desde datos abiertos.

In [0]:
# Paso 1: Descargar los datos con requests y leerlos en pandas
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000"
url_men = "https://www.datos.gov.co/resource/nudc-7mev.csv?$limit=100000"
 
# Descargar contenido
response_secop = requests.get(url_secop)
response_men = requests.get(url_men)

# Convertir contenido a pandas usando StringIO
df_secop_pd = pd.read_csv(StringIO(response_secop.text))
df_men_pd = pd.read_csv(StringIO(response_men.text))

# Convertir pandas a Spark
df_secop = spark.createDataFrame(df_secop_pd)
df_men = spark.createDataFrame(df_men_pd)

# Mostrar en Databricks
display(df_secop)
display(df_men)

In [0]:
df_secop.count()
df_men.count()

In [0]:
spark.sql("USE CATALOG main")

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS main.diplomado_datos;

In [0]:
display(df_secop.head(10))

In [0]:
import requests
import pandas as pd
from io import StringIO
from pyspark.sql import SparkSession

# Crear sesión de Spark
spark = SparkSession.builder.getOrCreate()

# URL del dataset
url_secop = "https://www.datos.gov.co/resource/rpmr-utcd.csv?$limit=100000&$offset=100000"

# Descargar contenido
response_secop = requests.get(url_secop)

# Leer CSV en pandas
df_secop_pd = pd.read_csv(StringIO(response_secop.text), delimiter=",", low_memory=False)

# Limpiar nombres de columnas (opcional pero recomendado)
df_secop_pd.columns = [col.strip().lower().replace(" ", "_") for col in df_secop_pd.columns]

# Verifica que los datos se cargaron correctamente en pandas
print(df_secop_pd.head())

# Convertir a Spark DataFrame
df_secop = spark.createDataFrame(df_secop_pd)

# Mostrar en Databricks
display(df_secop)

# DataSets


In [0]:
from pyspark.sql.functions import col

# Get the target schema
target_schema = spark.table("main.diplomado_datos.secop").schema

# Select and cast columns that exist in both df_secop and target_schema
df_secop_aligned = df_secop.select(
    [col(field.name).cast(field.dataType) for field in target_schema.fields if field.name in df_secop.columns]
)

# Write the aligned DataFrame to the Delta table
df_secop_aligned.write.format("delta") \
    .mode("append") \
    .option("mergeSchema", "true") \
    .saveAsTable("main.diplomado_datos.secop")