# NIVEL 1
## Setup
Importo librerias necesarias para usar pyspark asi como ciertas funciones

In [101]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, explode
from pyspark.sql.types import StringType, MapType, IntegerType, TimestampType, StructType, StructField
import os
import json


Set del JAVA_HOME para correcto funcionamiento
cambiar por JAVA_HOME de la maquina donde se ejecuta

In [102]:
os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home"

Inicializacion del spark contex

In [103]:
spark = SparkSession \
    .builder \
    .appName("testing") \
    .getOrCreate()

## Funcion parser
- Función usada para convertir el formato de la columna experiments a un CSV, dado que spark.read.csv no soporta el tipo MapType()
- uso UDF para poder usar esta funcion con spark

In [104]:
def parser(value):
    json_str = value.replace(" ", "").replace("{", "{\"").replace("}", "\"}").replace("=", "\":\"").replace(",", "\",\"")
    return json.loads(json_str)
parser_udf = udf(parser, MapType(StringType(), StringType()))


## Lectura del dataset

- Se define la ruta del archivo csv
- Se define la estructura de los datos. (spark la puede deducir, solo se hace por seguridad)

In [105]:
data_path = "../data/data.csv"
schema = StructType([
    StructField('event_name', StringType(), True),
    StructField('item_id', IntegerType(), True), 
    StructField('timestamp', TimestampType(), True), 
    StructField('site', StringType(), True), 
    StructField('experiments', StringType(), True), 
    StructField('user_id', IntegerType(), True), 
])
data = spark.read.schema(schema).option("header","true").csv(data_path)

## Transformación

- Se aplica la funcion parser sobre la columna experiments para volver la columna a MapType
- Se explotan los datos

In [106]:
new_data = data.withColumn("parsed_experiments",parser_udf("experiments"))
new_data = new_data.select(*schema.fieldNames(), explode("parsed_experiments").alias("experiment_id","version_id"))

## Agrupamiento

- Se crea una vista para ejecutar sentencia SQL.
- Se ejecuta SELECT GROUP BY requerido

In [107]:
new_data.createOrReplaceTempView("final_data")
result = spark.sql("""
SELECT experiment_id, version_id, count(*) as numero_compras
FROM final_data
WHERE event_name='BUY'
GROUP BY experiment_id, version_id
""")

## Resultado

- Se muestra resultado

In [108]:
result.show(truncate=False)



+----------------------+----------+--------------+
|experiment_id         |version_id|numero_compras|
+----------------------+----------+--------------+
|buyingflow/secure_card|4612      |31            |
|buyingflow/user-track |6796      |1088          |
|buyingflow/address_hub|3574      |922           |
+----------------------+----------+--------------+



                                                                                