In [None]:
from pyspark.sql import HiveContext , Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import date

from pyspark.sql.functions import col, expr, udf, coalesce, lit
from pyspark.sql.types import *

In [9]:
__author__ = '*****@***.com'
APP_NAME = 'customer_service_recontact_rate'

In [10]:
def create_spark_context():
    return (
        SparkSession
        .builder
        .appName(APP_NAME)
        .config("hive.exec.dynamic.partition", "true")
        .config("hive.exec.dynamic.partition.mode", "nonstrict")
        .config("hive.exec.max.dynamic.partitions", "100000")
        .enableHiveSupport()
        .getOrCreate()
    )

In [11]:
def get_logger(spark):
    log4j = spark.sparkContext._jvm.org.apache.log4j
    logger = log4j.LogManager.getLogger(spark.sparkContext.appName)
    logger.setLevel(log4j.Level.INFO)
    return logger

In [12]:
from pyspark.sql.functions import expr

def get_data(spark):
    query = """
        SELECT DISTINCT action_id,
               ticket_id,
               a.reservation_id,
               a.created_date AS contact_date,
               channel,
               checkin,
               topic,
               subtopic,
               booking_window,
               DATEDIFF(checkin, a.created_date) AS contact_days_before_checkin
        FROM table_contact a
        JOIN table_reservation r
            ON a.reservation_id = r.reservation_id
        WHERE has_staff_interaction == true
        AND direction =='inbound'
        """
    df = spark.sql(query).cache()
    df = df.withColumn("date",expr("to_date(contact_date, 'yyyy-mm-dd')"))
    df = df.withColumn('timestamp',df.date.astype('Timestamp').cast("long"))
    
    return df

In [13]:
from pyspark.sql import Window
from pyspark.sql import functions as sf

def window_groupby(df):
    days = lambda i: i * 96400
    
    recontact_same_day = Window.partitionBy('ticket_id').orderBy(sf.asc('date')).rangeBetween(0,0)
    recontact_within_3days = Window.partitionBy('ticket_id').orderBy('timestamp').rangeBetween(-days(3),0)
    recontact_within_7days = Window.partitionBy('ticket_id').orderBy('timestamp').rangeBetween(-days(7),0)
    
    df_recontact = (
    df
    .withColumn('7day_recontact',sf.count('action_id').over(recontact_within_7days))
    .withColumn('3day_recontact',sf.count('action_id').over(recontact_within_3days))
    .withColumn('sameday_recontact', sf.count('action_id').over(recontact_same_day))
    )
    
    df_recontact = df_recontact.drop('action_id','channel').dropDuplicates()
    
    return df_recontact

In [14]:
def write_to_hadoop(spark_df, tableName, writeMode):
    
    spark_df.write.saveAsTable(tableName, format='orc', mode = writeMode)
    return None

In [None]:
def main():
    spark = create_spark_context()
    logger = get_logger(spark)

    logger.info('Starting')
    logger.info('Getting source data on contact level')
    df = get_data(spark)
    
    logger.info('Applying window function to group by recontacts')
    df_recontact = window_groupby(df)
    logger.info('Get same day, 3 days, 7 days recontacts')
    
    logger.info('Create binary indicator')
    df_recontact = df_recontact.withColumn('duration_3day',
                                       sf.when(sf.col('sameday_recontact') < sf.col('3day_recontact'),
                                               1).otherwise(0))
    df_recontact = df_recontact.withColumn('duration_7day',
                                       sf.when(sf.col('3day_recontact') < sf.col('7day_recontact'),
                                               1).otherwise(0))
    
    logger.info('write spark dataframe to Hadoop')
    print('Writing data to Hive')
    write_to_hadoop(df_recontact, 'table_recontact', 'overwrite')
    
    print('DONE !')
    logger.info('Finished')

    
    return df_recontact

In [None]:
if __name__ == '__main__':
    main()