#### Paquetes necesarios

In [None]:
import datetime
import wasabisql

#### Source of Truth Loader
Me permite traer todos los archivos .avro en los que están distribuidos los datos de navegación de UPA entre dos fechas.

#### TIP 1: Empezá con small data
Para armar un script empezá trayendo una hora. Cuando el script entero funcione, traé el resto de los datos que necesites. Esto es porque cuanto más datos pido, más tarda en procesar, así que empezar por pocos datos agiliza el proceso de escribir el script. 

#### TIP 2: Bajar datos en tandas
A Spark le cuesta trabajar con demasiados datos avro. Cada día de datos se compone de unos ~6 archivos. Si necesitás más de una semana de datos, bajá en tandas de a una semana y luego uní. Si no, hacen paro los python workers.
http://stackoverflow.com/questions/34461804/stackoverflow-due-to-long-rdd-lineage

In [None]:
loader = wasabisql.SOTLoader(sqlContext)

# Los inputs de la función getEvents son una fecha de inicio, una de fin, y los campos que querés

In [None]:
start = datetime.datetime(2016, 6, 12, 5)
end = datetime.datetime(2016, 6, 12, 6)
campos = ['userid', 'datetime', 'pr', 'fl', 'cc', 'ci', 'co', 'dc', 'hr', 'hid', 'di','hc', 'pri', 
                           'pritax', 'cur', 'exch']

events = loader.getEvents(campos, start, end)

#### events es un DataFrame de Spark

(Está organizado en columnas y está distribuido en partecitas en diferentes máquinas). Un Dataframe tiene adentro un RDD y un schema para organizar ese RDD. 
A data frame is a table, or two-dimensional array-like structure, in which each column contains measurements on one variable, and each row contains one case. So, a DataFrame has additional metadata due to its tabular format, which allows Spark to run certain optimizations on the finalized query.
An RDD, on the other hand, is merely a Resilient Distributed Dataset that is more of a blackbox of data that cannot be optimized as the operations that can be performed against it are not as constrained. Also, RDD is immutable.
However, you can go from a DataFrame to an RDD via its rdd method, and you can go from an RDD to a DataFrame (if the RDD is in a tabular format) via the toDF method
In general it is recommended to use a DataFrame where possible due to the built in query optimization.

source: http://stackoverflow.com/questions/31508083/difference-between-dataframe-and-rdd-in-spark

#### TIP 3: Filtrar todo lo que no necesitás lo antes posible
Así no estás procesando datos que nunca terminás usando y que vuelven más lentos tus procesos.

### Filtrar con condiciones tipo SQL

In [None]:
# Aplicar filtros sobre cada columna
events = events.filter("length(userid) = 36")\ #me quedo solo con acciones donde el userid respeta el formato habitual
                .filter('lower(fl) in ("search", "detail", "checkout", "thanks")')\ #elijo solo determinados flows
                .filter('fl not like "ab%"')\ # Me quedo con las acciones donde el flow no empieza en "ab" 
                .filter('fl != "event-suscription"') # Excluyo acciones donde el flow sea event-subscription
                .filter('lower(pr) in ("hotels")')\ # Me quedo solo con acciones donde el producto es hotels
                .filter('lower(cc) in ("mx")')\ # Me quedo solo con acciones del site MX
                .filter("dc is not null") # Me quedo solo con acciones donde el campo dc no está vacío

In [None]:
# Puedo elegir quedarme con solo algunos campos
campos_sin_pr = ['userid', 'datetime', 'fl', 'cc', 'ci', 'co', 'dc', 'hr', 'hid', 'di','hc', 'pri', 
                           'pritax', 'cur', 'exch']                
events = events[campos_sin_pr]

#### Unir los DF (si hice mi getEvents por partes)
Si solo son dos Dataframes los que tengo que unir, simplemente puedo hacer:

In [None]:
events = eventsA.unionAll(eventsB)
# Si son varios Dataframes los que tengo que unir puedo hacer esta funcion:

In [None]:
from functools import reduce
from pyspark.sql import DataFrame

def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)

In [None]:
allevents = allunionAll(eventsA, eventsB, eventsC)

#### ------------------------------------------------------------------------------------------------------------------------------------------

### Cómo bajar muchos días de datos de forma automatizada
En vez de hacer  "events = loader.getEvents(campos, start, end)" una y otra vez cambiando start y end para después unir todos esos dataframes con unionAll, se puede hacer esto:

#### 1) hacés una lista vacía a la cual vas a ir appendeando los dataframes de events de cada día

In [None]:
allevents = []

#### 2) Determinás que campos vas a pedir y el inicio y fin del período
(la fecha de fin no entra en el período así que sumar un día)

In [None]:
campos = ['userid', 'datetime', 'pr', 'fl', 'cc', 'ci', 'co', 'dc', 'hr', 'hid', 'di','hc', 'pri', 
                               'pritax', 'cur', 'exch']

start_range = datetime.datetime(2016, 6, 12, 0)
end_range = datetime.datetime(2016, 6, 19, 0)

#### Se necesitan dos funciones adicionales: 
- generateDays, que genera los rangos de días para que loader traiga de a uno.
- getEventsDay que usa getEvents para bajar los eventos de ese día, aplica el filtro, crea un DataFrame por día y hace un checkpoint para cada dataframe (que contiene un día de eventos)

(!) El checkpoint es importante porque cuando llega el momento de la acción (un count, take, etc.) para procesarla Spark arrastra la información de todas las transformaciones que hubo antes. El checkpoint borra toda esa historia previa y se queda solo con el DataFrame, haciendo que sea más fácil de procesar. 
 
En este caso, el checkpoint está metido dentro de la función que busca los eventos de cada día, getEventsDay, así se hacen mini checkpoints por día antes de appendear ese DataFrame a la lista vacía.

In [None]:
def generateDays(start, end):
    one_day = datetime.timedelta(1)
    i = 0
    while start + one_day*i < end:
        yield start + one_day*i
        i+=1
        
def getEventsDay(start, end):
    events = loader.getEvents(campos, start, end)
    events = events.filter("length(userid) = 36")                    .filter('lower(fl) in ("search", "detail", "checkout", "thanks")')                    .filter('lower(pr) in ("hotels")')                    .filter('lower(cc) in ("mx")')                    .filter("dc is not null")
    events = checkpoint(events)
    events.count() # este count es solo para aplicar una acción que corra las transformaciones (el filter)
    return events

In [None]:
for start in generateDays(start_range, end_range):
    end = start + datetime.timedelta(1)
    events = getEventsDay(start, end)
    allevents.append(events)

#### 4) Unís la lista de DF en un solo DF
Si pedís más de una semana de datos, hacerlo en diferentes Dataframes eventsA, eventsB, eventsC. Luego unir con está función:

In [None]:
events = reduce(DataFrame.unionAll, allevents)

#### ------------------------------------------------------------------------------------------------------------------------------------------

### Guardar los datos
Si bajaste una gran cantidad de días, y eso te llevó horas, y vas a seguir usando el DF por varios días, y ya está lo suficientemente filtrado, es una buena idea guardarlo en el storage de p13n. 
 
(!) En el storage de p13n no se guardan porquerías y en lo posible tampoco archivos demasiado chicos. Esto es porque sí o sí el sistema de archivos está configurado para guardar lo que sea en varios archivos de 128Mb entonces aún con un archivo chico, las porquerías llenan el disco rápido.

In [None]:
# se guarda en la carpeta /dataset
# poner un nombre representativo de lo que contiene
# usar formato parquet
# si vas a guardar varias veces el mismo archivo actualizandolo, poner Overwrite para que no salte un error
events.write.option("header", "true").save('dataset/DynFeeEvents28d-MX.parquet', format="parquet", mode="Overwrite")

#### Después el DF guardado por Spark (es distinto si lo guardaste con Pandas!) se levanta así:

In [None]:
events = sqlContext.read.load("dataset/DynFeeEvents28d-MX.parquet", format="parquet")

#### -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### Agrupar por usuarios
Puedo hacer esto cuando quiero sacar diferentes métricas para un mismo usuario. Ej: cuántas sesiones tuvo, cuántos destinos distintos vio.
También puedo hacerlo cuando quiero quedarme con sólo una acción relevante para cada usuario.
 
Partimos de un dataframe de Spark que estaba distribuido en partecitas en diferentes máquinas. El resultado de group by es un rdd distribuido donde los distintos eventos de un mismo usuario quedan guardados en la misma máquina. Esto hace que todos los procesos/funciones que aplique de acá en adelante sean más eficientes!

In [None]:
users = events_rdd.groupBy(lambda x: x.userid)