# CREAR UNA BASE DE DATOS EN HIVE METAESTORE USANDO PYSPARK

## 1.	CREAR UNA SPARKSESION

In [None]:
from pyspark.sql import SparkSession
# Crear SparkSession con soporte para Hive
spark = SparkSession.builder \
    .appName("Ejemplo Hive Metastore") \
    .enableHiveSupport() \
    .getOrCreate()

## 2.	CREAR LA BASE DE DATOS

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS pyspark_hive")
spark.sql("SHOW DATABASES").show()


## 3.	CREAR LA TABLA EN LA BASE DE DATOS

In [None]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS pyspark_hive.empleado (
        ID INT,
        NOMBRE STRING,
        TELEFONO STRING,
        CORREO STRING,
        FECHA_INGRESO DATE,
        EDAD INT,
        SALARIO DOUBLE,
        ID_EMPRESA INT
    )
    STORED AS PARQUET
""")

## 4.	CARGA DE DATOS - FLUJO CORRECTO
1.	Subir el CSV de LOCAL â†’ HDFS
2.	Leer desde HDFS con PySpark
3.	Limpiar / convertir tipos
4.	Insertar en tabla Hive


### 3.1. Subir el archivo de LOCAL a HDFS
Ejecuta en la terminal (NO en PySpark):

hdfs dfs -mkdir -p /data/pyspark

hdfs dfs -put -f /home/hadoop/dataset/empleado.csv /data/pyspark/

Verifica:

hdfs dfs -ls /data/pyspark

Debe aparecer: empleado.csv

### 3.2 Leer el CSV desde HDFS en PySpark

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

ruta_hdfs = "hdfs:///data/pyspark/empleado.csv"

schema_raw = StructType([
    StructField("ID", StringType(), True),
    StructField("NOMBRE", StringType(), True),
    StructField("TELEFONO", StringType(), True),
    StructField("CORREO", StringType(), True),
    StructField("FECHA_INGRESO", StringType(), True),
    StructField("EDAD", StringType(), True),
    StructField("SALARIO", StringType(), True),
    StructField("ID_EMPRESA", StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .option("delimiter", ",") \
    .schema(schema_raw) \
    .csv(ruta_hdfs)

### 3.3 Limpiar y convertir datos (CLAVE)

In [None]:
df_limpio = df \
    .withColumn("ID", col("ID").cast("int")) \
    .withColumn("EDAD", col("EDAD").cast("int")) \
    .withColumn("SALARIO", col("SALARIO").cast("double")) \
    .withColumn("ID_EMPRESA", col("ID_EMPRESA").cast("int")) \
    .withColumn(
        "FECHA_INGRESO",
        coalesce(
            to_date(col("FECHA_INGRESO"), "yyyy-MM-dd"),
            to_date(col("FECHA_INGRESO"), "dd/MM/yyyy")
        )
    ) \
    .withColumn(
        "CORREO",
        when(trim(col("CORREO")).isin("", "NA", "N/A"), None)
        .otherwise(col("CORREO"))
    )

# Opcional (recomendado):

df_limpio = df_limpio.filter(col("ID").isNotNull())
