## Ejercicio 2-1

Creamos una serie de jobs para leer los datos de la activdad de usarios sobre una plataforma de E-commerce. El objetivo es proponer los siguientes tres reportes:

- Identificar la actidad  de usuarios activos únicos por día
- Calcular el promedio de artículos en los carritos de compras 
- Generar el top de artículos agregados al carrito.

#### Antes de comenzar
- ¿Cúales son las fuentes de datos que tenemos disponibles?
- ¿Quién es el propietario de los datos? / Sabemos quien produce la información
- ¿La información puede ser leída directamente por Spark?

In [19]:
## Comenzamos con Spark 
from pyspark.sql import SparkSession
import  pyspark.sql.functions as F

In [10]:
# toda aplicación de spark usar un driver que se conecta con spark 
# se crea la sesión para comunicarnos con el clúster
spark = SparkSession.builder.appName('ecommerce').getOrCreate()

In [13]:
# creamos un dataframe desde un objeto array-de-arrays y asigna los headers del DF
ecmm = spark.createDataFrame([
    ['u1','c1','i1'],
    ['u2','c2','i1'],
    ['u1','c1','i2'],
    ['u3','c3','i2'],
    ['u3','c3','i2'],
    ['u2','c2','i3'],
]).toDF("user_id","cart_id","item_id")

In [14]:
ecmm.show(3)

+-------+-------+-------+
|user_id|cart_id|item_id|
+-------+-------+-------+
|     u1|     c1|     i1|
|     u2|     c2|     i1|
|     u1|     c1|     i2|
+-------+-------+-------+
only showing top 3 rows



#### 1- Identificar la actidad  de usuarios activos únicos por día

In [18]:
activity = ecmm.select("user_id").distinct().count()
activity

3

La consulta regresa el número de usuarios activos (únicos)

#### 2.- Calcular el promedio de artículos en los carritos de compras 

In [25]:
avg_items = ecmm.select("cart_id","item_id").groupBy("cart_id") \
            .agg(F.count(F.col("item_id")).alias( "total_per_cart_items")) \
            .agg(F.avg(F.col("total_per_cart_items")).alias("avg_items_per_cart"))
                 
    

In [26]:
avg_items.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[avg(total_per_cart_items#141L)])
   +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=532]
      +- HashAggregate(keys=[], functions=[partial_avg(total_per_cart_items#141L)])
         +- HashAggregate(keys=[cart_id#57], functions=[count(item_id#58)])
            +- Exchange hashpartitioning(cart_id#57, 200), ENSURE_REQUIREMENTS, [plan_id=528]
               +- HashAggregate(keys=[cart_id#57], functions=[partial_count(item_id#58)])
                  +- Project [_2#51 AS cart_id#57, _3#52 AS item_id#58]
                     +- Scan ExistingRDD[_1#50,_2#51,_3#52]




In [27]:
avg_items.show()

+------------------+
|avg_items_per_cart|
+------------------+
|               2.0|
+------------------+



Se cálcula el promedio de artíclos en los carritos de los usuarios. 

#### 3.- Generar el top de artículos agregados al carrito.

In [40]:
top_items = ecmm.select("item_id")\
            .groupby("item_id")\
            .agg(F.count("item_id").alias("total"))\
            .sort(F.desc("total"))\
            .limit(2)

In [38]:
top_items.show()

+-------+-----+
|item_id|total|
+-------+-----+
|     i2|    3|
|     i1|    2|
+-------+-----+



La operación de calcular el top emplea agrupaciones y ordenamiento descendiente en conjunto de un límite 