# CREAR UNA BASE DE DATOS EN HIVE METAESTORE USANDO PYSPARK

1. CREAR UNA SPARKSESION

In [None]:
# Importar configuraciones

import sys
import os
sys.path.append(os.path.abspath('./src'))

from config.spark_config import SparkSession
from etl.transformaciones import TransformacionesVentas
from pyspark.sql import functions as F
from config.hdfs_uploader import subir_a_hdfs

In [None]:
spark = SparkSession.getActiveSession()
if spark:
    spark.stop()

# Inicializar Spark
spark = SparkSession.builder \
    .appName("CrearInsertarDatosSparkHive-Notebook") \
    .enableHiveSupport() \
    .getOrCreate()

2. CREAR LA BASE DE DATOS

In [None]:
spark.sql("CREATE DATABASE IF NOT EXISTS pyspark_hive")
spark.sql("SHOW DATABASES").show()

3. CREAR LA TABLA EN LA BASE DE DATOS

In [None]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS pyspark_hive.empleado (
        ID INT,
        NOMBRE STRING,
        TELEFONO STRING,
        CORREO STRING,
        FECHA_INGRESO DATE,
        EDAD INT,
        SALARIO DOUBLE,
        ID_EMPRESA INT
    )
    STORED AS PARQUET
""")

In [None]:
spark.sql("""SHOW TABLES IN pyspark_hive;""").show()

4. CARGA DE DATOS - FLUJO CORRECTO

    Subir el CSV de LOCAL ‚Üí HDFS
    Leer desde HDFS con PySpark
    Limpiar / convertir tipos
    Insertar en tabla Hive

```
hdfs dfs -mkdir -p /dataset/pyspark
hdfs dfs -put -f /home/hadoop/bigdata-spark/dataset/empleado.csv /data/pyspark/
hdfs dfs -ls /data/pyspark
```

In [None]:
#Usando la librare import os
# Ejecutar comando hdfs
os.system("hdfs dfs -mkdir -p /dataset/pyspark")
os.system("hdfs dfs -put -f /home/hadoop/bigdata-spark/dataset/empleado.csv /data/pyspark/")
os.system("hdfs dfs -ls /data/pyspark")

In [None]:
spark._jvm.org.apache.hadoop.fs.FileSystem \
    .get(spark._jsc.hadoopConfiguration()) \
    .copyFromLocalFile(
        False,
        True,
        spark._jvm.org.apache.hadoop.fs.Path("file:///home/hadoop/bigdata-spark/dataset/empleado.csv"),
        spark._jvm.org.apache.hadoop.fs.Path("/data/pyspark/empleado.csv")
    )

In [None]:
import subprocess
from datetime import datetime

# üìå Pedir ruta del archivo al usuario
local_path = input("üìÇ Ingresa la ruta completa del archivo a subir: ").strip()

# Verificar que exista
if not os.path.isfile(local_path):
    print("‚ùå El archivo no existe.")
    exit()

# Extraer nombre base
file_name = os.path.basename(local_path)

# Agregar timestamp para nombre autom√°tico
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
new_name = f"{timestamp}_{file_name}"

# Ruta destino en HDFS
hdfs_dir = "/data/pyspark"
hdfs_path = f"{hdfs_dir}/{new_name}"

print(f"üöÄ Subiendo archivo como: {new_name}")

# Crear carpeta en HDFS si no existe
subprocess.run(["hdfs", "dfs", "-mkdir", "-p", hdfs_dir])

# Subir archivo
subprocess.run(["hdfs", "dfs", "-put", "-f", local_path, hdfs_path])

# Listar contenido
subprocess.run(["hdfs", "dfs", "-ls", hdfs_dir])

print("‚úÖ Archivo cargado correctamente en HDFS")

In [None]:
# Subir archivo y obtener ruta final en HDFS
hdfs_file = subir_a_hdfs("/home/hadoop/bigdata-spark/dataset/empleado.csv")

print(f"Archivo disponible en HDFS en: {hdfs_file}")
hdfs_file

In [None]:
# Ingresar dataset/empleado.csv

hdfs_file = subir_a_hdfs()

4.2 Leer el CSV desde HDFS en PySpark

In [None]:
# Pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *

#ruta_hdfs = "hdfs:///data/pyspark/empleado.csv"
ruta_hdfs = hdfs_file

schema_raw = StructType([
    StructField("ID", StringType(), True),
    StructField("NOMBRE", StringType(), True),
    StructField("TELEFONO", StringType(), True),
    StructField("CORREO", StringType(), True),
    StructField("FECHA_INGRESO", StringType(), True),
    StructField("EDAD", StringType(), True),
    StructField("SALARIO", StringType(), True),
    StructField("ID_EMPRESA", StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .option("delimiter", ",") \
    .schema(schema_raw) \
    .csv(ruta_hdfs)

4.3 Limpiar y convertir datos (CLAVE)

In [None]:
# Pyspark
df_limpio = df \
    .withColumn("ID", col("ID").cast("int")) \
    .withColumn("EDAD", col("EDAD").cast("int")) \
    .withColumn("SALARIO", col("SALARIO").cast("double")) \
    .withColumn("ID_EMPRESA", col("ID_EMPRESA").cast("int")) \
    .withColumn(
        "FECHA_INGRESO",
        coalesce(
            to_date(col("FECHA_INGRESO"), "yyyy-MM-dd"),
            to_date(col("FECHA_INGRESO"), "dd/MM/yyyy")
        )
    ) \
    .withColumn(
        "CORREO",
        when(trim(col("CORREO")).isin("", "NA", "N/A"), None)
        .otherwise(col("CORREO"))
    )

# Opcional (recomendado):

df_limpio = df_limpio.filter(col("ID").isNotNull())

4.4 Verifica esquema antes de insertar

In [None]:
# Pyspark
df_limpio.printSchema()
df_limpio.show(5, truncate=False)

4.5 Insertar en la tabla Hive pyspark_hive.empleado

In [None]:
# Pyspark
df_limpio.write \
    .mode("append") \
    .insertInto("pyspark_hive.empleado")

4.6 Validar carga

In [None]:
# Pyspark
df = spark.sql("SELECT * FROM pyspark_hive.empleado")
df.show(5, truncate=False)
     


In [None]:
# Detener Spark
from config.spark_config import detener_spark_session
detener_spark_session(spark)