In [None]:
# Importar configuraciones
import sys
import os
sys.path.append(os.path.abspath('./src'))
os.makedirs("resultados", exist_ok=True)

from config.spark_config import SparkSession
from etl.transformaciones import TransformacionesVentas
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
spark = SparkSession.getActiveSession()
if spark:
    spark.stop()


In [None]:
# Inicializar Spark
spark = SparkSession.builder \
    .appName("AnalisisVentas-Notebook") \
    .enableHiveSupport() \
    .getOrCreate()
transformaciones = TransformacionesVentas(spark)
spark.conf.get("spark.sql.catalogImplementation")


In [None]:
# Cargar datos
ventas_df, productos_df = transformaciones.cargar_datos()

print("üìä Vista previa de ventas:")
ventas_df.show(5)

print("\nüì¶ Vista previa de productos:")
productos_df.show(5)

print(f"\nTotal ventas: {ventas_df.count()}")
print(f"Total productos: {productos_df.count()}")


In [None]:
# Esquema de datos
print("Esquema ventas:")
ventas_df.printSchema()

print("\nEsquema productos:")
productos_df.printSchema()



In [None]:
# Calcular m√©tricas
ventas_completas_df, metricas_df = transformaciones.calcular_metricas(ventas_df, productos_df)

print("üìà M√©tricas por categor√≠a:")
metricas_df.show()


In [None]:
# An√°lisis temporal
analisis_temporal_df = transformaciones.analisis_temporal(ventas_df)

print("üìÖ An√°lisis por fecha:")
analisis_temporal_df.show()



In [None]:
# Top clientes
top_clientes_df = transformaciones.top_clientes(ventas_df, top_n=5)

print("üëë Top 5 clientes:")
top_clientes_df.show()


In [None]:
# An√°lisis adicional: Ventas por producto
ventas_por_producto = ventas_completas_df.groupBy("nombre", "categoria") \
    .agg(
        F.sum("cantidad").alias("unidades_vendidas"),
        F.sum("venta_total").alias("ingresos"),
        F.round(F.avg("precio_unitario"), 2).alias("precio_promedio")
    ) \
    .orderBy(F.desc("ingresos"))

print("üèÜ Productos m√°s vendidos:")
ventas_por_producto.show()


In [None]:
# Visualizaci√≥n (si matplotlib est√° disponible)
try:
    import matplotlib.pyplot as plt
    
    # Convertir a pandas para visualizaci√≥n
    metricas_pandas = metricas_df.toPandas()
    
    plt.figure(figsize=(10, 6))
    plt.bar(metricas_pandas['categoria'], metricas_pandas['ingresos_totales'])
    plt.title('Ingresos por Categor√≠a')
    plt.xlabel('Categor√≠a')
    plt.ylabel('Ingresos Totales')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
except ImportError:
    print("Matplotlib no disponible para visualizaci√≥n")


In [None]:
# Esquema y Contenido
metricas_df.printSchema()
metricas_df.show()


In [None]:
# Modo no profesional
# Guardar resultados
print("üíæ Guardando resultados...")

# Guardar como Parquet (formato optimizado para Spark)
metricas_df.write.mode("overwrite").parquet("file:///home/hadoop/proyecto-spark-vscode/resultados/metricas_ventas.parquet")
ventas_completas_df.write.mode("overwrite").parquet("file:///home/hadoop/proyecto-spark-vscode/resultados/ventas_completas.parquet")

print("‚úÖ Resultados guardados en formato Parquet")


In [None]:
# Modo din√°mica y portable
import os

print("üíæ Guardando resultados...")

# Obtener ruta base del proyecto din√°micamente
project_root = os.getcwd()

# Carpeta resultados
output_dir = os.path.join(project_root, "resultados")

# Crear carpeta si no existe
os.makedirs(output_dir, exist_ok=True)

# Convertir a formato que Spark entiende
output_uri = f"file://{output_dir}"

# Guardar
metricas_df.write.mode("overwrite").parquet(
    f"{output_uri}/metricas_ventas.parquet"
)

ventas_completas_df.write.mode("overwrite").parquet(
    f"{output_uri}/ventas_completas.parquet"
)

print("‚úÖ Resultados guardados en formato Parquet")


In [None]:
# Modo clasico en local
os.makedirs("resultados", exist_ok=True)

metricas_df.write.mode("overwrite").parquet("resultados/metricas_ventas.parquet")
ventas_completas_df.write.mode("overwrite").parquet("resultados/ventas_completas.parquet")


In [None]:
# Trabajar con Hive (si est√° configurado)
try:
    # Crear tabla Hive temporal
    ventas_completas_df.createOrReplaceTempView("ventas_completas")
    
    # Consulta SQL
    print("üîç Consulta SQL a trav√©s de Hive:")
    resultado_sql = spark.sql("""
        SELECT categoria, 
               SUM(venta_total) as ingresos_totales,
               AVG(precio_unitario) as precio_promedio
        FROM ventas_completas
        GROUP BY categoria
        ORDER BY ingresos_totales DESC
    """)
    
    resultado_sql.show()
    
except Exception as e:
    print(f"Hive no disponible: {e}")


In [None]:
# Detener Spark
from config.spark_config import detener_spark_session
detener_spark_session(spark)
