In [1]:
%run /Users/production@aarp.com/Utils/env_variables

In [2]:
%run /Users/production@aarp.com/Utils/Utils

In [3]:
tempdir = 's3a://aarp-testing/tmp/redshift_copy'

## Facts about this analysis
* ### 10,000,000,000+ rows of data processed
* ### Spark cluster with 12 r4.2xlarge instances; 732 GB of RAM, 96 CPUs

## Sidenote:

* ### I like flags. 
* ### I also like window functions. 
* ### I don't like serial processing. Bottlenecks kill me. Parallelize it all

In [6]:
from pyspark.sql.functions import udf, lag, when
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql import functions as func

In [7]:
rs_url = 'jdbc:redshift://aarp-rs-temp.c0vmann988mu.us-east-1.redshift.amazonaws.com:5439/dev?user={}&password={}'.format(rs_user, rs_password)

# 1. Pull Data

### Import the Data from Redshift

#### Set conversion event types and start time

#### Pull all engagement events

#### Pull distinct engagement events
##### We pull distinct conversion events because we want to remove the data if multiple conversion of the same type, happen with the same user on the same day. This removes a paths to conversion that are too short and will impact our analysis

#### Pull individual dimension data

#### Union conversion and engagement data to have all event data

# 2. Create Flags

#### Create the conversion flag, to distingush conversion events

### We need to identify when there is a new user as we do not want user paths to get mixed
#### Order by the individual_key and the date, then create a column that has the previous user id, call it the last_individual_key column

In [16]:
# Window by users and dates
last_user_partition = Window.partitionBy().orderBy('individual_key', 'date_key')

#  Apply Window object to events dataframe to get last individual key
events_df = events_df.withColumn('last_individual_key', func.lag(events_df.individual_key).over(last_user_partition))
events_df.cache()

### Use the last_individual_key column to identify a new user
#### last_individual_key column is a 1 if its a new user, 0 if its the same user

In [18]:
from pyspark.sql.functions import col, expr, when

new_user_flag = expr( """IF(individual_key = last_individual_key or last_individual_key is null, 0, 1)""")
events_df = events_df.withColumn('new_user_flag', new_user_flag)
events_df.cache()

### Use new user flag and conversion flag to mark paths to conversion.


#### Create a column that is the increments every time there is a new user or a conversion. This column is then a marker for new conversion paths

In [20]:
#create partioning window
indiv_date_key_partition = Window.partitionBy().orderBy('individual_key', 'date_key')

#apply sum function over individual_key and date
events_df = events_df.withColumn('conversion_path', func.sum(events_df.conversion_flag + events_df.new_user_flag).over(indiv_date_key_partition))
events_df.cache()

### Remove last conversion from path
#### Since the user has not converted since their last conversion flag, we want to flag the last conversion path for a user

In [22]:
conversion_rank_window = Window.partitionBy('individual_key').orderBy('individual_key', 'conversion_path')

events_df = events_df.withColumn('conversion_rank', func.rank().over(conversion_rank_window))

### Flag not converters

#### Create a dataframe of distinct individual keys from the converter dataframe.

In [24]:
converter_df = conversion_events_df.select('individual_key').groupby('individual_key').agg({'*':'count'})


In [25]:
converter_df = converter_df.select(col('individual_key').alias('converter'), col('count(1)').alias('num_conversions'))


#### Left outer join converter keys to events dataframe, this flags keys as converters or non-converters

In [27]:
events_df = events_df.join(converter_df, events_df.individual_key == converter_df.converter, 'left_outer')

## Create column to be used for model
#### if the person has never converted, or it is their last path to conversion it is a 0, otherwise its a 1

In [29]:
model_column = when(col('converter').isNull(), 0).when(col('conversion_rank') == col('num_conversions'), 0).otherwise(1)

#### Add model column flag to the dataframe

In [31]:
events_df = events_df.withColumn('successful_conversion', model_column)

In [32]:
sample_events_df = events_df.sample(True, .0001)
sample_events_df.createOrReplaceTempView('events_temp_table')

# 3. Group dataframe around path

#### Now that we have a key for conversion path, we can group engagements by that flag to give us our data

#### Widen the data with some one hot encoding

### Group by conversion path and individual

### Aggregate all touch points

### Create a time duration column

# 4. Train First Model

#### Import machine learning libraries

#### Split data into training and test set

#### Convert to Vectors for machine learning

#### Initialize model

#### Construct Parameter Grid

#### Build Pipeline
##### In the future we can turn our above data transformations into estimators and add them into the pipeline

#### Use above objects to build Cross Validator

#### Fit Cross Validator to Model
##### This took a long time

#### Print coeffecients to determine performance

# 5. Survival Analysis

#### In order to mitigate numerous activities from creating noise in the data we conducted a survival analysis on each type of event.
#### logistic regression does not factor in time to activity. Cannot neglect the fact that ads fade from memory over time, so to compensate we conducted a survival analysis. 
#### Limiting the time scope of our analysis to two years to censor data

In [49]:
total_conversions = conversion_events_df.count()

#### Conduct survival analysis on each type of event

In [51]:
duration_df = final_df.groupBy('duration')

agg_list = [func.count('*').alias('converters_on_day')]

for event in column_name:
  agg_list.append(func.sum(event).alias('total_' + event))

#sum the amount of emails received on each day of conversion and the number of people that converted on that day
duration_df = duration_df.agg(*agg_list)

#calculate the total number of people that have converted thus far
sum_converted_window = Window.partitionBy().orderBy('duration')
duration_df = duration_df.withColumn('remaining_converters', total_conversions - func.sum(duration_df.converters_on_day).over(sum_converted_window))

#divide the number of events that happened by the remaining conversions
for event in column_name:
  duration_df = duration_df.withColumn(event + '_hazard_probability', duration_df['total_' + event]/duration_df.remaining_converters)
duration_df.cache()

### Create Dataframe of Survival Rates

In [53]:
from pyspark.ml.linalg import Vectors

#initialize two lists, one for data, another for columns
survival_rate_data = []
duration_column = duration_df.select('duration').collect()
survival_rate_data.append(Vectors.dense(duration_column))
survival_rate_columns = ['duration']



#### Define function to get survival data from hazard data
##### Luckily data is only length of duration of data, no more than two years, so we can use regular python

In [55]:
def calculate_survival_rate(hazard_list):
  survival_list = []
  for n, hazard in enumerate(hazard_list):
    
    if n == 0:
      survival_val = 0
    else:
      survival_val = survival_list[n-1]
    survival_list.append((1-hazard)*survival_val)
  return survival_list

#### Loop thru each event type and create columns from hazard data

In [57]:
for event in column_name:
  hazard_list = duration_df.select(event + '_hazard_probability').collect()
  survival_rate_data.append(calculate_survival_rate(hazard_list))
  survival_rate_columns.append(event + '_survival_probability')
  
survival_df = spark.DataFrame(survival_rate_data, survival_rate_columns)

#### Join data back to path dataframe on the duration of each path

In [59]:
final_weighted_df = final_df.join(survival_df, survival_df.duration = event_df.duration, 'left_outer')

#### Calculate survival analysis weighted touchpoint columns
##### Multiply the survival analysis probabilty by the number of touchpoint engagements

In [61]:
for event in column_name:
  final_weighted_df.withColumn('weighted_' + event_name, final_weighted_df[event + '_survival_probability']*final_weighted_df[event])

#### Create dataframe with weighted columns

In [63]:
#create list of required columns
weighted_columns = [weighted for weighted in final_weighted_df.columns if weighted.startswith('weighted')]

#add successful conversion to the end
weighted_columns.append('successful_conversion')


ml_df = final_weighted_df.select(weighted_columns)

# 6. Machine Learning on survival analysis weighted data

In [65]:
drop_list = ['individual_key', 'conversion_path', 'min_date', 'max_date']

ml_df = final_df.select([keep for keep in final_df.columns if keep not in drop_list])

In [66]:
(trainingData, testData) = ml_df.randomSplit([0.7, 0.3], seed = 100)

In [67]:
testData = testData.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
trainingData = trainingData.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])

In [68]:
lr = LogisticRegression()

In [69]:
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [70]:
# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="successful_conversion")

In [71]:
# Create 5-fold CrossValidator
crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=2)  

In [72]:
wlr_cv_model = crossval.fit(trainingData)

In [73]:
predictions = wlr_cv_model.transform(testData)

In logistic regression, we were interested in studying how risk
factors were associated with presence or absence of disease.
Sometimes, though, we are interested in how a risk factor or
treatment affects time to disease or some other event. Or we
may have study dropout, and therefore subjects who we are
not sure if they had disease or not. In these cases, logistic
regression is not appropriate.