In [1]:
import pyspark
from pyspark.sql import functions as f, Window
import pandas as pd
from kedro.pipeline import *
from kedro.io import *
from kedro.runner import *

import pickle
import os
from pyspark.sql import SparkSession, DataFrame, functions as f
from typing import Dict

In [2]:
def create_spark_session() -> None:
    """
    Placeholder function to create the spark session for a run.

    """
    SparkSession.builder.config("spark.driver.memory", "16g").config(
        "spark.executor.memory", "16g"
    ).config("spark.driver.maxResultSize", "8g").master("local[*]").config(
        "spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"
    ).config(
        "spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.1.1,com.crealytics:spark-excel_2.11:0.11.1"
    ).config(
        "fs.s3a.access.key", ""
    ).config(
        "fs.s3a.secret.key", ""
    ).config(
        "fs.s3a.maxConnections", "5000"
    ).config(
        "spark.sql.execution.arrow.enabled", "true"
    ).config(
        "spark.debug.maxToStringFields", "100"
    ).config(
        "fs.s3a.connection.maximum", "5000"
    ).config(
        "spark.sql.shuffle.partitions", "8"
    ).config(
        "spark.sql.codegen.wholeStage", "false"
    ).appName(
        "comm-analytics"
    ).getOrCreate()

In [3]:
create_spark_session()
spark = SparkSession.builder.getOrCreate()

In [4]:
from pyspark.sql.types import StringType, DateType, StructField, StructType, IntegerType, DoubleType

In [None]:
data = [
        ["WDEM04144542", "event", "2017-01-25"],
        ["WDEM04144542", "call", "2017-01-26"],
        ["WDEM04144542", "call", "2017-01-27"],
        ["WDEM04144542", "event", "2017-01-28"],
        ["WDEM04144542", "call", "2017-02-07"],
        ["WDEM04144542", "call", "2017-02-12"],
        ["WDEM04144542", "event", "2017-02-15"],
        ["WDEM04144542", "event", "2017-02-20"],
        ["WDEM04144542", "call", "2017-02-26"],
        ["WDEM04144542", "call", "2017-03-19"],
        ["WDEM04144542", "call", "2017-03-26"]
    ]

schema = StructType(
    [
        StructField("external_id", StringType()),
        StructField("activity", StringType()),
        StructField("date", StringType())
    ]
)

df_data = spark.createDataFrame(data, schema=schema)

In [None]:
formatted_dates = df_data.withColumn(
        "date", f.to_date(f.col("date"), "yyyy-MM-dd")
    )

In [None]:
with_month = formatted_dates.withColumn("month", f.last_day(f.col("date")))

In [None]:
with_month.show()

In [None]:
with_month = with_month.drop("month")

In [None]:
with_month.createOrReplaceTempView("df")

In [None]:
spark.sql(
"""
SELECT A.external_id, A.activity, A.date,
CASE
WHEN A.activity = 'event' THEN NULL
ELSE (SELECT first(b.date) WHERE b.date < a.date)
END as last_event_date
FROM df A INNER JOIN df B on a.external_id = b.external_id
""").distinct().show()

In [None]:
data = [
        ["WDEM04144542", "event", "2017-02-20"],
        ["WDEM04144542", "call", "2017-02-26"],
        ["WDEM04144542", "call", "2017-03-19"],
        ["WDEM04144542", "call", "2017-03-26"]
    ]

schema = StructType(
    [
        StructField("external_id", StringType()),
        StructField("activity", StringType()),
        StructField("date", StringType())
    ]
)

df = spark.createDataFrame(data, schema=schema)

In [None]:
test = df.alias('df1').join(
        df.alias('df2'), how="inner", on=["external_id"]
    ).select('df1.*', f.col("df2.date").alias("last_event_date"))

In [None]:
test1 = test.filter('activity <> "event" and last_event_date < date')

In [None]:
test1.show()

In [None]:
test1.groupBy('external_id', 'activity', 'date').agg(f.first("last_event_date")).show()

In [None]:
data = [
        ["WDEM04144542", "event", "2017-01-25"],
        ["WDEM04144542", "call", "2017-01-26"],
        ["WDEM04144542", "call", "2017-01-27"],
        ["WDEM04144542", "event", "2017-01-28"],
        ["WDEM04144542", "call", "2017-02-07"],
        ["WDEM04144542", "call", "2017-02-12"],
        ["WDEM04144542", "event", "2017-02-15"],
        ["WDEM04144542", "event", "2017-02-20"],
        ["WDEM04144542", "call", "2017-02-26"],
        ["WDEM04144542", "call", "2017-03-19"],
        ["WDEM04144542", "call", "2017-03-26"]
    ]

schema = StructType(
    [
        StructField("external_id", StringType()),
        StructField("activity", StringType()),
        StructField("date", StringType())
    ]
)

df = spark.createDataFrame(data, schema=schema)

In [None]:
test = df.alias('df1').join(
        df.alias('df2'), how="inner", on=["external_id"]
    ).select('df1.*', f.col("df2.date").alias("last_event_date"))

In [None]:
test.show(1000, False)

In [None]:
test.groupBy('external_id', 'activity', 'date').agg(f.last("last_event_date").alias("last_event_date")).show()

In [5]:
data = [
        ["WDEM04144542", "2017-01-25"],        
        ["WDEM04144542", "2017-01-28"],        
        ["WDEM04144542", "2017-02-15"],
        ["WDEM04144542", "2017-02-20"],
        
    ]

schema = StructType(
    [
        StructField("external_id", StringType()),        
        StructField("event_date", StringType())
    ]
)

df_event = spark.createDataFrame(data, schema=schema)

In [6]:
data = [        
        ["WDEM04144542", "2017-01-26"],
        ["WDEM04144542", "2017-01-27"],        
        ["WDEM04144542", "2017-02-07"],
        ["WDEM04144542", "2017-02-12"],        
        ["WDEM04144542", "2017-02-26"],
        ["WDEM04144542", "2017-03-19"],
        ["WDEM04144542", "2017-03-26"]
    ]

schema = StructType(
    [
        StructField("external_id", StringType()),       
        StructField("call_date", StringType())
    ]
)

df_call = spark.createDataFrame(data, schema=schema)

In [10]:
test = df_call.alias('df1').join(
        df_event.alias('df2'), how="left", on=["external_id"]
    ).select('df1.*', f.col("df2.event_date").alias("last_event_date"))

In [11]:
test.count()

28

In [12]:
test.show()

+------------+----------+---------------+
| external_id| call_date|last_event_date|
+------------+----------+---------------+
|WDEM04144542|2017-01-26|     2017-01-25|
|WDEM04144542|2017-01-26|     2017-01-28|
|WDEM04144542|2017-01-26|     2017-02-15|
|WDEM04144542|2017-01-26|     2017-02-20|
|WDEM04144542|2017-01-27|     2017-01-25|
|WDEM04144542|2017-01-27|     2017-01-28|
|WDEM04144542|2017-01-27|     2017-02-15|
|WDEM04144542|2017-01-27|     2017-02-20|
|WDEM04144542|2017-02-07|     2017-01-25|
|WDEM04144542|2017-02-07|     2017-01-28|
|WDEM04144542|2017-02-07|     2017-02-15|
|WDEM04144542|2017-02-07|     2017-02-20|
|WDEM04144542|2017-02-12|     2017-01-25|
|WDEM04144542|2017-02-12|     2017-01-28|
|WDEM04144542|2017-02-12|     2017-02-15|
|WDEM04144542|2017-02-12|     2017-02-20|
|WDEM04144542|2017-02-26|     2017-01-25|
|WDEM04144542|2017-02-26|     2017-01-28|
|WDEM04144542|2017-02-26|     2017-02-15|
|WDEM04144542|2017-02-26|     2017-02-20|
+------------+----------+---------

In [13]:
test1 = test.filter('last_event_date <= call_date')

In [14]:
test1.show()

+------------+----------+---------------+
| external_id| call_date|last_event_date|
+------------+----------+---------------+
|WDEM04144542|2017-01-26|     2017-01-25|
|WDEM04144542|2017-01-27|     2017-01-25|
|WDEM04144542|2017-02-07|     2017-01-25|
|WDEM04144542|2017-02-07|     2017-01-28|
|WDEM04144542|2017-02-12|     2017-01-25|
|WDEM04144542|2017-02-12|     2017-01-28|
|WDEM04144542|2017-02-26|     2017-01-25|
|WDEM04144542|2017-02-26|     2017-01-28|
|WDEM04144542|2017-02-26|     2017-02-15|
|WDEM04144542|2017-02-26|     2017-02-20|
|WDEM04144542|2017-03-19|     2017-01-25|
|WDEM04144542|2017-03-19|     2017-01-28|
|WDEM04144542|2017-03-19|     2017-02-15|
|WDEM04144542|2017-03-19|     2017-02-20|
|WDEM04144542|2017-03-26|     2017-01-25|
|WDEM04144542|2017-03-26|     2017-01-28|
|WDEM04144542|2017-03-26|     2017-02-15|
|WDEM04144542|2017-03-26|     2017-02-20|
+------------+----------+---------------+



In [15]:
test2 = test1.groupBy('external_id', 'call_date').agg(f.last("last_event_date").alias("last_event_date"))

In [16]:
test2.show()

+------------+----------+---------------+
| external_id| call_date|last_event_date|
+------------+----------+---------------+
|WDEM04144542|2017-01-26|     2017-01-25|
|WDEM04144542|2017-01-27|     2017-01-25|
|WDEM04144542|2017-02-07|     2017-01-28|
|WDEM04144542|2017-02-12|     2017-01-28|
|WDEM04144542|2017-02-26|     2017-02-20|
|WDEM04144542|2017-03-19|     2017-02-20|
|WDEM04144542|2017-03-26|     2017-02-20|
+------------+----------+---------------+



In [38]:
test3 = test2\
            .withColumn("days_diff", f.datediff(test2.call_date, test2.last_event_date))\
            .withColumn("month", f.last_day(f.col("call_date")))\
            .select('external_id', 'month', 'days_diff', 'call_date', 'last_event_date')

In [39]:
test3.show()

+------------+----------+---------+----------+---------------+
| external_id|     month|days_diff| call_date|last_event_date|
+------------+----------+---------+----------+---------------+
|WDEM04144542|2017-01-31|        1|2017-01-26|     2017-01-25|
|WDEM04144542|2017-01-31|        2|2017-01-27|     2017-01-25|
|WDEM04144542|2017-02-28|       10|2017-02-07|     2017-01-28|
|WDEM04144542|2017-02-28|       15|2017-02-12|     2017-01-28|
|WDEM04144542|2017-02-28|        6|2017-02-26|     2017-02-20|
|WDEM04144542|2017-03-31|       27|2017-03-19|     2017-02-20|
|WDEM04144542|2017-03-31|       34|2017-03-26|     2017-02-20|
+------------+----------+---------+----------+---------------+



In [33]:
# test4 = test3.select('external_id', 'month', 'days_diff')

In [34]:
# test4.show()

+------------+----------+---------+
| external_id|     month|days_diff|
+------------+----------+---------+
|WDEM04144542|2017-01-31|        1|
|WDEM04144542|2017-01-31|        2|
|WDEM04144542|2017-02-28|       10|
|WDEM04144542|2017-02-28|       15|
|WDEM04144542|2017-02-28|        6|
|WDEM04144542|2017-03-31|       27|
|WDEM04144542|2017-03-31|       34|
+------------+----------+---------+



In [42]:
test3.filter(f.datediff(test3.call_date, test3.last_event_date) <= 30)\
        .groupby(test3.external_id, test3.month).count().show()

+------------+----------+-----+
| external_id|     month|count|
+------------+----------+-----+
|WDEM04144542|2017-01-31|    2|
|WDEM04144542|2017-02-28|    3|
|WDEM04144542|2017-03-31|    1|
+------------+----------+-----+



In [88]:
def calculate_feature(df, feature_name):
    return df.filter(f.datediff(df.call_date, df.last_event_date) <= 30)\
        .groupby(df.external_id, df.month)\
        .count()\
        .withColumnRenamed("count", feature_name)\
        .withColumnRenamed("external_id", "ext_id")\
        .withColumnRenamed("month", "mnth")


def calculate_last_event_date(df_call, df_event):
    return df_call.alias("a")\
        .join(df_event.alias("b"), how="left", on=["external_id"])\
        .select("a.*", f.col("b.event_date").alias("last_event_date"))\
        .filter("last_event_date <= call_date")\
        .groupBy("external_id", "call_date")\
        .agg(f.last("last_event_date").alias("last_event_date"))\
        .withColumn("month", f.last_day(f.col("call_date")))\
        .select("external_id", "month", "call_date", "last_event_date")

Testing

In [93]:
data = [
        ["1", "2017-01-25", "Attended", "AAA"],
        ["1", "2017-01-28", "Attended", "BBB"],
        ["1", "2017-02-15", "Attended", "CCC"],
        ["1", "2017-02-20", "Attended", "DDD"],
    ]

schema = StructType(
    [
        StructField("external_id", StringType()),
        StructField("date", StringType()),
        StructField("status", StringType()),
        StructField("event_id", StringType()),
    ]
)

df_event = spark.createDataFrame(data, schema=schema)\
            .withColumn("date", f.to_date(f.col("date"), "yyyy-MM-dd"))\
            .withColumn("month", f.last_day(f.col("date")))

In [94]:
df_event.show()

+-----------+----------+--------+--------+----------+
|external_id|      date|  status|event_id|     month|
+-----------+----------+--------+--------+----------+
|          1|2017-01-25|Attended|     AAA|2017-01-31|
|          1|2017-01-28|Attended|     BBB|2017-01-31|
|          1|2017-02-15|Attended|     CCC|2017-02-28|
|          1|2017-02-20|Attended|     DDD|2017-02-28|
+-----------+----------+--------+--------+----------+



In [83]:
data = [
        ["1", "2017-01-26", "VIMPAT", "Face to Face"],
        ["1", "2017-01-27", "VIMPAT", "Face to Face"],
        ["1", "2017-02-07", "VIMPAT", "Face to Face"],
        ["1", "2017-02-12", "VIMPAT", "Face to Face"],
        ["1", "2017-02-26", "VIMPAT", "Face to Face"],
        ["1", "2017-03-19", "VIMPAT", "Face to Face"],
        ["1", "2017-03-26", "VIMPAT", "Face to Face"],
    ]

schema = StructType(
    [
        StructField("external_id", StringType()),
        StructField("date", StringType()),
        StructField("product_category", StringType()),
        StructField("channel", StringType()),
    ]
)

df_call = spark.createDataFrame(data, schema=schema)\
                .withColumn("date", f.to_date(f.col("date"), "yyyy-MM-dd"))

In [84]:
df_call.show()

+-----------+----------+----------------+------------+
|external_id|      date|product_category|     channel|
+-----------+----------+----------------+------------+
|          1|2017-01-26|          VIMPAT|Face to Face|
|          1|2017-01-27|          VIMPAT|Face to Face|
|          1|2017-02-07|          VIMPAT|Face to Face|
|          1|2017-02-12|          VIMPAT|Face to Face|
|          1|2017-02-26|          VIMPAT|Face to Face|
|          1|2017-03-19|          VIMPAT|Face to Face|
|          1|2017-03-26|          VIMPAT|Face to Face|
+-----------+----------+----------------+------------+



In [85]:
data = [["1", "2017-01-31"], ["1", "2017-02-28"], ["1", "2017-03-31"]]

schema = StructType(
    [StructField("external_id", StringType()), StructField("month", StringType())]
)

df_spine = spark.createDataFrame(data, schema=schema).withColumn("month", f.to_date(f.col("month"), "yyyy-MM-dd"))


In [87]:
df_spine.show()

+-----------+----------+
|external_id|     month|
+-----------+----------+
|          1|2017-01-31|
|          1|2017-02-28|
|          1|2017-03-31|
+-----------+----------+



+------------+----------+
| external_id|event_date|
+------------+----------+
|WDEM04144542|2017-01-25|
|WDEM04144542|2017-01-28|
|WDEM04144542|2017-02-15|
|WDEM04144542|2017-02-20|
+------------+----------+



In [63]:
vimpat_calls_within_30days = (
        vimpat_calls.alias("a")
        .join(events.alias("b"), how="left", on=["external_id"])
        .select("a.*", f.col("b.event_date").alias("last_event_date"))
        .filter("last_event_date <= call_date")
        .groupBy("external_id", "call_date")
        .agg(f.last("last_event_date").alias("last_event_date"))
        .withColumn("month", f.last_day(f.col("call_date")))
        .select("external_id", "month", "call_date", "last_event_date")
    )

In [64]:
vimpat_calls_within_30days.show()

+------------+----------+----------+---------------+
| external_id|     month| call_date|last_event_date|
+------------+----------+----------+---------------+
|WDEM04144542|2017-01-31|2017-01-26|     2017-01-25|
|WDEM04144542|2017-01-31|2017-01-27|     2017-01-25|
|WDEM04144542|2017-02-28|2017-02-07|     2017-01-28|
|WDEM04144542|2017-02-28|2017-02-12|     2017-01-28|
|WDEM04144542|2017-02-28|2017-02-26|     2017-02-20|
|WDEM04144542|2017-03-31|2017-03-19|     2017-02-20|
|WDEM04144542|2017-03-31|2017-03-26|     2017-02-20|
+------------+----------+----------+---------------+



In [65]:
vimpat_calls_within_30days = (
        vimpat_calls_within_30days.filter(
            f.datediff(
                vimpat_calls_within_30days.call_date,
                vimpat_calls_within_30days.last_event_date,
            )
            <= 30
        )
        .groupby(
            vimpat_calls_within_30days.external_id, vimpat_calls_within_30days.month
        )
        .count()
        .withColumnRenamed("count", "feature_vimpat_f2f_call_30days_after_event")
        .withColumnRenamed("external_id", "ext_id")
        .withColumnRenamed("month", "mnth")
    )

In [66]:
vimpat_calls_within_30days.show()

+------------+----------+------------------------------------------+
|      ext_id|      mnth|feature_vimpat_f2f_call_30days_after_event|
+------------+----------+------------------------------------------+
|WDEM04144542|2017-01-31|                                         2|
|WDEM04144542|2017-02-28|                                         3|
|WDEM04144542|2017-03-31|                                         1|
+------------+----------+------------------------------------------+



In [97]:
spine_events = (
        df_spine.join(
            df_event,
            how="left",
            on=["external_id", "month"]
        )
        .fillna(0)        
    )

In [98]:
spine_events.show()

+-----------+----------+----------+--------+--------+
|external_id|     month|      date|  status|event_id|
+-----------+----------+----------+--------+--------+
|          1|2017-01-31|2017-01-25|Attended|     AAA|
|          1|2017-01-31|2017-01-28|Attended|     BBB|
|          1|2017-02-28|2017-02-15|Attended|     CCC|
|          1|2017-02-28|2017-02-20|Attended|     DDD|
|          1|2017-03-31|      null|    null|    null|
+-----------+----------+----------+--------+--------+

