**Diseño de la base de datos segun la estructura del archivo csv**

In [None]:
DROP TABLE IF EXISTS postulacion CASCADE;
DROP TABLE IF EXISTS postulante CASCADE;
DROP TABLE IF EXISTS carrera CASCADE;
DROP TABLE IF EXISTS ubicacion CASCADE;
DROP TABLE IF EXISTS proceso_admision CASCADE;

CREATE TABLE proceso_admision (
    id SERIAL PRIMARY KEY,
    fecha_corte DATE,
    anio_adm INT,
    tipo_proceso VARCHAR(10),
    desc_proceso VARCHAR(200),
    modalidad VARCHAR(100),
    sede VARCHAR(100)
);

CREATE TABLE ubicacion (
    ubigeo VARCHAR(6) PRIMARY KEY,
    departamento VARCHAR(100),
    provincia VARCHAR(100),
    distrito VARCHAR(100)
);

CREATE TABLE carrera (
    id SERIAL PRIMARY KEY,
    id_area INT,
    area VARCHAR(100),
    facultad VARCHAR(150),
    escuela_academica VARCHAR(150)
);

CREATE TABLE postulante (
    uuid TEXT PRIMARY KEY,
    sexo VARCHAR(20),
    edad INT,
    fecha_nacimiento DATE,
    ubigeo VARCHAR(6) REFERENCES ubicacion(ubigeo)
);

CREATE TABLE postulacion (
    id SERIAL PRIMARY KEY,
    uuid TEXT REFERENCES postulante(uuid),
    id_proceso INT REFERENCES proceso_admision(id),
    id_carrera INT REFERENCES carrera(id),
    nota NUMERIC(10,2),
    situacion VARCHAR(100)
);


#Script de Spark para cargar todas las tablas generadas anteriormente




In [None]:
procesoDF_clean   → proceso_admision
ubicacionDF_clean → ubicacion
carreraDF_clean   → carrera
postulanteDF      → postulante
postulacionDF     → postulacion

**Lectura del archivo CSV en Spark para crear un DataFrame**

In [None]:
val df = spark.read.option("header", "true").
  option("delimiter", ";").
  csv("C:/Users/LESLIE/Downloads/datosabiertos_postulantes2025.csv")


**Divimos el DataFrame original en varias tablas limpias y normalizadas, cada una enfocada segun la estructura de nuestras tablas de base de datos: ubicación, carrera, proceso, postulante y postulación.**

In [None]:
import org.apache.spark.sql.functions._

In [None]:
// ---------------------------
//   UBICACIÓN
// ---------------------------
val ubicacionDF_clean = df.select(
  $"UBIGEO",
  $"DEPARTAMENTO",
  $"PROVINCIA",
  $"DISTRITO"
).distinct()

// ---------------------------
//   CARRERA
// ---------------------------
val carreraDF_clean = df.select(
  $"ID_AREA".cast("int"),
  $"AREA",
  $"FACULTAD",
  $"ESCUELA_ACADEMICA"
).distinct()

// ---------------------------
//   PROCESO DE ADMISIÓN
// ---------------------------
val procesoDF_clean = df.select(
  to_date($"FECHA_CORTE", "yyyyMMdd").as("fecha_corte"),
  $"ANIO_ADM".cast("int").as("anio_adm"),
  $"TIPO_PROCESO",
  $"DESC_PROCESO",
  $"MODALIDAD",
  $"SEDE"
).distinct()

// ---------------------------
//   POSTULANTE
//   (solo datos personales)
// ---------------------------
val postulanteDF = df.select(
  $"UUID",
  $"SEXO",
  $"EDAD".cast("int"),
  to_date($"FECHA_NACIMIENTO", "yyyyMMdd").as("fecha_nacimiento"),
  $"UBIGEO"
).distinct()

// ---------------------------
//   POSTULACIÓN
// ---------------------------
val postulacionDF = df.select(
  $"UUID",
  to_date($"FECHA_CORTE", "yyyyMMdd").as("fecha_corte"),
  $"ANIO_ADM".cast("int").as("anio_adm"),
  $"ID_AREA".cast("int").as("id_area"),
  $"NOTA".cast("double"),
  $"SITUACION"
)

**Cargamos a Postgres**

Exportamos los DataFrames generados a nuestras tablas de PostgreSQL mediante JDBC

In [None]:
ubicacionDF_clean.write.format("jdbc").
  option("url", "jdbc:postgresql:postgres").
  option("dbtable", "ubicacion").
  option("user", "postgres").
  option("password", "123456").
  mode("append").
  save()


In [None]:
carreraDF_clean.write.format("jdbc").
  option("url", "jdbc:postgresql:postgres").
  option("dbtable", "carrera").
  option("user", "postgres").
  option("password", "123456").
  mode("append").
  save()


In [None]:
procesoDF_clean.write.format("jdbc").
  option("url", "jdbc:postgresql:postgres").
  option("dbtable", "proceso_admision").
  option("user", "postgres").
  option("password", "123456").
  mode("append").
  save()


In [None]:
postulanteDF.write.format("jdbc").
  option("url", "jdbc:postgresql:postgres").
  option("dbtable", "postulante").
  option("user", "postgres").
  option("password", "123456").
  mode("append").
  save()


Como la tabla postulacion no puede cargarse directamente porque depende de:


*   proceso_admision(id)
*   carrera(id)
*   postulante(uuid)

Antes de insertar una postulación, PostgreSQL nos exige que estos IDs existan en sus tablas correspondientes

**Generación de la tabla postulacion mediante joins para obtener las claves foráneas antes de cargarla en PostgreSQL**

In [None]:
val procesoPG = spark.read.format("jdbc").
  option("url", "jdbc:postgresql:postgres").
  option("dbtable", "proceso_admision").
  option("user", "postgres").
  option("password", "123456").
  load()

val carreraPG = spark.read.format("jdbc").
  option("url", "jdbc:postgresql:postgres").
  option("dbtable", "carrera").
  option("user", "postgres").
  option("password", "123456").
  load()


In [None]:
val post_proceso = postulacionDF.join(procesoPG,
    postulacionDF("fecha_corte") === procesoPG("fecha_corte") &&
    postulacionDF("anio_adm") === procesoPG("anio_adm")
  )


In [None]:
val post_final = post_proceso.join(carreraPG,
    post_proceso("id_area") === carreraPG("id_area")
  ).select(
    $"UUID".as("uuid"),
    procesoPG("id").as("id_proceso"),
    carreraPG("id").as("id_carrera"),
    $"NOTA",
    $"SITUACION"
  )


In [None]:
post_final.write.format("jdbc").
  option("url", "jdbc:postgresql:postgres").
  option("dbtable", "postulacion").
  option("user", "postgres").
  option("password", "123456").
  mode("append").
  save()
