### Dynamic Fee: Processing Data con Spark
Primera parte del procesamiento de datos. 

In [None]:
import datetime
import wasabisql
import pytz
import pandas as pd
from pyspark.sql.functions import udf, lit
from math import radians, cos, sin, asin, sqrt, atan2
import numpy as np
from functools import reduce
from pyspark.sql import DataFrame

In [None]:
sc.applicationId

### Levantamos el dataframe de eventos

In [None]:
# para levantar
events = sqlContext.read.load('dataset/DynFeeEvents-BR-1WeekTo25jul.parquet', format="parquet")

In [None]:
events.count()

#### Agrupar por usuarios
 
Agrupar hace que la lista events se separe en dos partes: una es la key, el userid, y la otra parte son las caraterísticas de las acciones. Más adelante el haber separado por usuario va a hacer que los procesos y funciones como el sorting se hagan por usuario. (para tener en cuenta)
 
(!) CLAVE: los distintos eventos están guardados en diferentes máquinas. Al agrupar por usuario, si eso es relevante para las funciones que voy a aplicar de ahí en adelante, hace que todos los procesos sean más eficientes porque hace que todas las acciones de un mismo usuario se re-ubiquen en una misma máquina!!

In [None]:
users = events.rdd.groupBy(lambda x: x.userid)

#### Filtrar a los usuarios compradores
#### Esta función marca con True a los userids que tienen al menos una compra. Esto no es un dataframe sino una lista / diccionario. Entonces le estoy pidiendo usuario por usuario que recorra su lista de características y busque si la palabra "thanks" aparece al menos una vez dentro de la sub-lista de flow

In [None]:
def tieneThanks(pair):
    #Filter no sabe de pairs.
    evs = pair[1]
    for event in evs:
        if event.fl.lower() == 'thanks':
            return True
    return False

# pero esto al final!!
# Luego filtro y separo a los que dan True como resultado de la funcion TieneThanks.
buyers = users.filter(tieneThanks)
checkers = users.filter(lambda x: not tieneThanks(x))

#### Para los compradores, me quedo solo con las acciones previas al primer thanks

In [None]:
def filterPreThanks(evs):
    # obtengo el minimo datetime de los thanks, el mas viejo
    min_datetime = min( [ ev.datetime for ev in evs if ev.fl.lower() == 'thanks'] )
    # todos los eventos anteriores a min_datetime 
    result = [ev for ev in evs if ev.datetime <= min_datetime]
    return result
#todos los eventos antes del thanks y el primer thanks
buyers_first_thanks = buyers.mapValues(filterPreThanks)

#buyers_first_thanks.take(10)

#### Ya puedo volver a unir buyers y checkers

In [None]:
allusers = buyers_first_thanks.union(checkers)

#### Eventos ordenados por usuario por timestamp
##### Para ordenar convierto a diccionario

In [None]:
def toDict(evs):
    return [ev.asDict() for ev in evs]

def sortEvs(evs):
    return sorted(evs, key=lambda x: x['datetime'])

In [None]:
allusers = allusers.mapValues(toDict).mapValues(sortEvs)

### Voy a separar el análisis en 2 partes
#### 1) POR CARACT: DE NAVEGACION: Contadores de sesión, acciones por flow, dias de navegacion
#### 2) POR CARACT DEL VIAJE: Características del viaje del flow más profundo más reciente

## 1) POR NAVEGACION: Contadores de sesión, acciones por flow, dias de navegacion

#### Agrego la diferencia de tiempo entre sesiones

In [None]:
def agregarDiff(evs):
    new_evs = []
    evs = list(evs) # convierto a evs, el input de la funcion, en lista, porque la parte Values del RDD es iterable pero no indexable
    prev = evs[0]['datetime'] # ahora que converti en lista, puedo indexar evs[0]
    for event in evs:
        diff = event['datetime'] - prev
        prev = event['datetime']
        event['diff'] = diff  
        new_evs.append(event) #los nuevos eventos los fui acumulando en la variable new_events
    return new_evs

allusers_con_diff = allusers.mapValues(agregarDiff)

In [None]:
# #### Agrego contador de sesiones
# #### la sesion cambia con 30 minutos de inactividad

In [None]:
def add_session(new_evs):
    current_session = 1
    for ev in new_evs:
        if ev['diff'] > 30*60*1000:
            current_session = current_session + 1
        ev['session'] = current_session
    return new_evs

allusers_con_session = allusers_con_diff.mapValues(add_session)

In [None]:
# ### Quiero saber la cantidad de sesiones, de dias de sesiones distintas y de acciones antes de una compra
# - 1) Me quedo con el maximo del numero de sesion por usuario
# - 2) agrego el timestamp de la primera accion de la sesion. Busco el dia al que corresponde y cuento cuantos dias distintos busco.

In [None]:
# #### Agrego el día del año porque así veo cuantos días distintos dedicó a navegar el sitio hasta comprar

In [None]:
# #### Funcion que pasa datetime de GMT a horario local
# (dependiendo del pais del evento) Si el input es un timestamp

In [None]:
def tstampAsDatetime(tstamp, cc):
    cc = cc.upper() # paso a mayuscula el pais porque despues uso ese codigo para buscar el timezone en una libreria
    fecha = datetime.datetime.fromtimestamp(tstamp/1000) # paso datetime a fecha. esta funcion toma el datetime sin los decimales
    tzinfoMI = pytz.timezone('America/New_York') # fecha es "naive". Desconoce su timezone. Busco el TZ de Miami/NY aca
    fechaconsciente = tzinfoMI.localize(fecha) # hago que fecha sea consciente de tu timezone
    tzname = pytz.country_timezones[cc][0] # country_timezones es una libreria de nombres de timezones.
    tzcode = pytz.timezone(tzname) # Vero agrego esto. CHEQUEAR
    
    # la funcion me devuelve la fecha del evento en horario local del evento, al aplicar el timezone local. 
    return fechaconsciente.astimezone(tzcode) # Vero cambio tzname por tzcode

In [None]:
def addLocalDayOfYear(new_evs):
    for ev in new_evs:
        ev['local_day_of_year'] = tstampAsDatetime(ev['datetime'], ev['cc']).strftime('%j') #strftime('%j'): fecha a n° de dia en el año
    # la funcion me devuelve la fecha del evento en horario local del evento, al aplicar el timezone local. 
    return new_evs

allusers_con_localday = allusers_con_session.mapValues(addLocalDayOfYear)

In [None]:
# #### Sort por datetime de nuevo por las dudas

In [None]:
allusers_condia_sort = allusers_con_localday.mapValues(sortEvs)

In [None]:
def agregarDayNumber(evs):
    new_evs = []
    #local_day = new_evs['local_day_of_year']
    day_number = 1
    prev = evs[0]['local_day_of_year'] # ahora que converti en lista, puedo indexar evs[0]

    for event in evs:
        if event['local_day_of_year'] > prev :
            day_number = day_number + 1
            
        event['planning_day_number'] = day_number
        prev = event['local_day_of_year']
        #new_evs = new_evs + [event]
        new_evs.append(event)

    return new_evs

allusers_con_planning_day = allusers_condia_sort.mapValues(agregarDayNumber)

In [None]:
# #### Paso a fecha el datetime de la accion

In [None]:
def addActionDate(new_evs):
    for ev in new_evs:
        ev['action_date'] = tstampAsDatetime(ev['datetime'], ev['cc']).strftime("%Y-%m-%d")  #strftime: fecha a n° de dia en el año
    # la funcion me devuelve la fecha del evento en horario local del evento, al aplicar el timezone local. 
    return new_evs

allusers_con_actiondate = allusers_con_planning_day.mapValues(addActionDate)

In [None]:
# ### Funcion que me dice el numero de sesiones que hizo un usuario y me dice cuantas acciones de cada tipo hizo
# #### Para compradores, son las acciones hasta el primer thanks. Para no compradores, son todas las acciones dentro del periodo.

In [None]:
def maxSession(evs):
    session_numbers = [event['session'] for event in evs]
    return max(session_numbers)

def maxPlanningDay(evs):
    planning_days = [event['planning_day_number'] for event in evs]
    return max(planning_days)

def getFeatures(pair):
    userid = pair[0]
    evs = pair[1]
    
    result = {}
    result['search'] = 0
    result['detail'] = 0
    result['checkout'] = 0
    result['thanks'] = 0
    
    for ev in evs:
        result[ev['fl']] = result[ev['fl']] + 1
    
    result['max_session'] = maxSession(evs)
    result['max_planning_days'] = maxPlanningDay(evs)
    result['cant_actions'] = len(evs)
    result['userid'] = userid
    compras = [event for event in evs if event['fl'] == 'thanks']
    comprador = len(compras) > 0
    result['comprador'] = comprador
    
    return result

# Luego filtro y separo a los que dan True como resultado de la funcion TieneThanks.
allusers_maxsession = allusers_con_planning_day.map(getFeatures)

In [None]:
# ##### Paso esta tabla a Dataframe

In [None]:
# El problema para pasar de rdd a dataframe con createDataFrame es que inferschema infiere el schema del primer dict, 
# pero no todos los dict tienen el mismo schema (porque no todos los eventos tienen los mismos campos)

#sqlContext.createDataFrame(allusers_maxsession).printSchema()

In [None]:
# Hago yo el schema, asumiendo que son todos strings
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, DateType                            ,TimestampType,LongType, BooleanType

def inferirSchema(campos):
    schema_campo = []
    for k in campos:
        if k in ['cant_actions', 'max_planning_days', 'max_session', 'checkout', 'detail','thanks', 'search']:
            schema_campo.append(StructField(k,  IntegerType(), True))
        elif k in ['comprador']:
            schema_campo.append(StructField(k, BooleanType(), True))
        elif k in ['datetime', 'diff', 'flow_depth', 'planning_day_number', 'session']:
            schema_campo.append(StructField(k, LongType(), True))
        else:
            schema_campo.append(StructField(k, StringType(), True))
    return StructType(schema_campo)

In [None]:
campos_agregados = ['userid' ,'cant_actions', 'max_planning_days', 'max_session','search', 'detail','checkout', 'thanks', 'comprador']

schema = inferirSchema(campos_agregados)

print schema

In [None]:
# ahora si, create dataframe
user_navigation_statsDF = sqlContext.createDataFrame(allusers_maxsession, schema)

In [None]:
# # 2) POR VIAJE: Características del viaje del flow más profundo más reciente
# 

# ### Primero me quedo con una acción relevante por userid

# - Para los compradores me quedo con la información de viaje del thanks, 
# - Para los no compradores me quedo con la información del viaje del flow más profundo más reciente

In [None]:
# allusers ya estaba ordenado por datetime

def keepRelevantAction(pair):
    userid = pair[0]
    evs = pair[1]
    
    evs = list(evs) # convierto a evs, el input de la funcion, en lista, porque la parte Values del RDD es iterable pero no indexable
    
    for event in evs: 
        if event['fl'] == 'thanks':
            event['flow_depth'] = 4
        elif event['fl'] == 'checkout':
            event['flow_depth'] = 3
        elif event['fl'] == 'detail':
            event['flow_depth'] = 2
        else:
            event['flow_depth'] = 1
            
    max_flow = max( [ event['flow_depth'] for event in evs] )
    max_flow_datetime = max( [ event['datetime'] for event in evs if event['flow_depth'] == max_flow] ) #devolver timestamp mas reciente del flujo mas profundo
    
    result = [event for event in evs if event['datetime'] == max_flow_datetime]
    return result

relevant_actions = allusers_con_actiondate.flatMap(keepRelevantAction)

In [None]:
# ##### Paso esta tabla a Dataframe

In [None]:
# El problema para pasar de rdd a dataframe con createDataFrame es que inferschema infiere el schema del primer dict, 
# pero no todos los dict tienen el mismo schema (porque no todos los eventos tienen los mismos campos)

#sqlContext.createDataFrame(relevant_actions).printSchema()

In [None]:
# agrego la variable que cree a los campos que habia pedido
# diff, sesion, planning_day_number y local_day_of_year ya no me sirven
campos_sin_pr.append("action_date")

In [None]:
# Hago yo el schema con la funcion que ya tenia
schema = inferirSchema(campos_sin_pr)

print schema

In [None]:
# ahora si, create dataframe
user_relevant_actionDF = sqlContext.createDataFrame(relevant_actions, schema)

In [None]:
# ### Uno las dos partes

In [None]:
# uno los dos dataframes
all_data = user_relevant_actionDF.join(user_navigation_statsDF, user_relevant_actionDF.userid==user_navigation_statsDF.userid, 'outer')        .drop(user_navigation_statsDF["userid"])

In [None]:
# #### Agrego GeoData para obtener destination type y horario local de ci y co

In [None]:
# importo la data de los iatas de un csv ya calculado. 
# no funciona poner .schema(customSchema) y aplicar mi propio schema 
#iata_data = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load("dataset/geoData.csv")
iata_data = sqlContext.read.load("dataset/geoData.parquet", format="parquet")

In [None]:
iata_data.registerTempTable("iata_data2")

In [None]:
iatadata = sqlContext.sql('''
SELECT upper(iata) iata_dest, CAST(latitude AS DOUBLE) latitude_dest, CAST(longitude AS DOUBLE) longitude_dest, 
       upper(country) country_dest, upper(continent) continent_dest, lat_country, lon_country
FROM iata_data2 
''')

In [None]:
# hago un merge de los dos dataframes para agregar country, lat, long y continente
all_data = all_data.join(iatadata, all_data.dc==iatadata.iata_dest, 'left')        .drop(all_data["dc"])

In [None]:
# #### CC Coords 
# Me quedo con las coordenadas centrales del país del sitio, pidiendo de la tabla de Geo el primer iata de una ciudad en MX.

In [None]:
mx_iatas = iata_data.where(iata_data['country'] == site) 

In [None]:
cc_lat = mx_iatas.select("lat_country").first()[0]
cc_lon = mx_iatas.select("lon_country").first()[0]

In [None]:
cc_lat, cc_lon

In [None]:
# #### Checkpoint time!

In [None]:
all_data = checkpoint(all_data)

In [None]:
# el count solo se hace para activar el checkpoint
all_data.count()

In [None]:
# ### Preparo las variables

In [None]:
all_data.registerTempTable("all_data2")

In [None]:
# en este paso:
# - pasé las fechas de ci y co a formato de fecha, 
# - calculé duración y anticipación
# - clasifiqué a los usuarios en couple, single y family
# -  clasifiqué los destinos en domestic, latam o rest_of_world
# - arreglé las variables de precio que a veces separan decimal con coma y a veces con punto

alldata_ant_dur = sqlContext.sql('''
SELECT userid, upper(cc) cc, cant_actions actions_count, max_planning_days planning_days_count, max_session session_count, search search_count,
    detail detail_count, checkout checkout_count, thanks thanks_count, comprador, 
    lower(fl) fl,
    CAST(action_date AS DATE) action_date,
    CAST(ci AS DATE) ci_date,
    CAST(co AS DATE) co_date,
    DATEDIFF(CAST(co AS DATE), CAST(ci AS DATE)) AS duration,
    DATEDIFF(CAST(ci AS DATE), CAST(action_date AS DATE)) AS anticipation, 
    di,
    CASE WHEN di IN ("2|0|0", "2|0", "2") THEN "couple"
         WHEN di IN ("1|0|0", "1|0", "1") THEN "single"
                                          ELSE "family"
         END AS traveler_type,
    CASE WHEN cc != country_dest AND continent_dest IN ("AMC", "SA") THEN "latam"
         WHEN cc = country_dest THEN "domestic"
         ELSE "r_o_w"
         END AS destination_type,
    CAST(hc AS INT),
    CAST(hr AS INT),
    CAST(hid AS INT), 
    CAST(REGEXP_REPLACE(pritax , ',', '.') AS DOUBLE) pritax,
    CAST(REGEXP_REPLACE(pri , ',', '.') AS DOUBLE) pri,
    CAST(REGEXP_REPLACE(exch , ',', '.') AS DOUBLE) exch,
    upper(cur) cur,
    iata_dest, latitude_dest, longitude_dest, country_dest, continent_dest,
    xClient
FROM all_data2 
''')

In [None]:
# #### Ratio finde
# Nro. de viernes, sábados y domingos de estadía, dividido la duración total

In [None]:
# Define udf
#### cuenta como finde las noches de ci viernes,sábado y domingo.
def calcRatioFinde(ci, duracion):
    
    if ci is None or duracion is None:
        return None
    
    dia_ci = ci.isoweekday()
    # cuantas semanas completas de 7 dias (con dos dias de finde) hay en la duracion del viaje?
    # trunc(duracion/7)
    semanas_completas = int(duracion/7)
    
    # el resto de los dias fuera de las semanas completas:
    resto_dias = duracion - semanas_completas * 7
   
    # cuento los dias de fin de semana extras a la semana completa
    if dia_ci == 5: # si el grupo de dias extra empieza un viernes, como maximo pueden contener vie, sab y dom (3 dias) de finde
        dia_finde_extra = min(3, resto_dias) 
    elif dia_ci == 6:  # si el grupo de dias extra empieza un sabado, como maximo pueden contener sab y dom (2 dias) de finde
        dia_finde_extra = min(2, resto_dias) 
    elif dia_ci == 7:  # si el grupo de dias extra empieza un domingo, si son 6 dias extra contienen un domingo un un viernes
        if resto_dias == 6:
            dia_finde_extra = 2
        else:
            dia_finde_extra = 1
    elif dia_ci + resto_dias == 6: # abarca todos los casos en que el grupo de dias extra  empieza lu/ma/mi/ju y termina en vi
        dia_finde_extra = 1
    elif dia_ci + resto_dias == 7: # abarca todos los casos en que el grupo de dias extra  empieza lu/ma/mi/ju y termina en sa
        dia_finde_extra = 2
    elif dia_ci + resto_dias > 8: # abarca los casos en que el grupo de dias extra empieza lu/ma/mi/ju e incluye un dom
        dia_finde_extra = 3
    else:
        dia_finde_extra = 0
    
    # Saco el numero de dias de finde (dos por semana completa, mas los dias extra)
    # el round es porque python, si divide integers, redondea a integer la respuesta
    dias_finde_total = semanas_completas * 2 + round(dia_finde_extra,2)
    
    # saco el ratio "dias de finde" / "duracion" del viaje
    ratio_finde = dias_finde_total / duracion
    
    return round(ratio_finde, 2)


# In[ ]:

udfcalcRatioFinde = udf(calcRatioFinde, DoubleType())

alldata_rfinde= alldata_ant_dur.withColumn("ratio_finde", udfcalcRatioFinde("ci_date", "duration"))


# #### Distancia en km

# In[ ]:

# define udf
#Calculate the great circle distance between two points on the earth (specified in decimal degrees)
def haversine(lon1, lat1, lon2, lat2): 
    
    if lon1 is None or lon2 is None:
        return None
    
    radius = 6371 # Radius of earth in kilometers. Use 3956 for miles    
    # convert decimal degrees to radians
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    #c = 2 * asin(sqrt(a)) 
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    return round(c * radius, 2)


# In[ ]:

udfhaversine = udf(haversine, DoubleType())

alldata_dist = alldata_rfinde.withColumn("distance_km", udfhaversine("longitude_dest", "latitude_dest", lit(cc_lon), lit(cc_lat)))
# como la udf toma los parametros de las columnas de mi DF, no puedo darle un valor fijo como input 
# pero si uso lit() me crea una columna con ese valor constante.

In [None]:
# PDF['ratio_finde'] = PDF.apply(lambda r: calcRatioFinde(r.ci_date, r.duration), axis=1)

In [None]:
# #### Guardar como Pandas DF ####

In [None]:
alldataPDF = alldata_dist.toPandas()

In [None]:
alldataPDF.to_csv("dynamic_fee_processed_user_actions_MX_4WeeksTo18jul.csv", sep=',', index=False)