In [None]:
import re
import datetime
from pychattr.channel_attribution import MarkovModel
import pandas as pd
import numpy as np
from sklearn.metrics import classification_report
import warnings
warnings.filterwarnings('ignore')

In [None]:
# pd.set_option('display.max_colwidth', None)
# import findspark
# findspark.init("/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark","/usr/bin/python2.7")

# import os
# os.environ["JAVA_HOME"] = "/usr/java/jdk1.8.0_181-cloudera"

# from pyspark.sql import functions as F
# from pyspark import SparkContext
# from pyspark.sql import SparkSession,HiveContext,Window
# from pyspark.sql.types import IntegerType, FloatType, DoubleType, ArrayType, StringType, DecimalType
# from pyspark.sql.functions import *

# spark_session = SparkSession.builder.enableHiveSupport().appName("artefact_attribution_analysis") \
#     .config("spark.driver.memory","50g") \
#     .config("spark.pyspark.driver.python","/usr/bin/python2.7")\
#     .config("spark.pyspark.python","/usr/bin/python2.7") \
#     .config("spark.yarn.executor.memoryOverhead","8G") \
#     .getOrCreate()
# hc = HiveContext(spark_session.sparkContext)

##### Fetch data and data processing

In [None]:
activity = hc.sql("select * from marketing_modeling.dw_ts_adhoc_app_activity_i")
activity = activity.toPandas()

In [None]:
# deal_time.csv 为从marketing_modeling.dw_deliver_behavior获取的mobile and behavior_time
deal_time = pd.read_csv('data/deal_time.csv',names=['mobile','deal_time'],sep='\t')

In [None]:
df = pd.read_csv('data/tp_analysis_base.csv',sep = '\t',header = None)
id_mapping = pd.read_csv('data/id_mapping.csv')

In [None]:
df = df.rename(columns = {0:'mobile',1:'touchpoint_id',2:'action_time'})
df = df[['mobile','action_time','touchpoint_id']]
df.head()

In [None]:
def preprocess(df):
    df.dropna(inplace = True)
    df['mobile'] = df['mobile'].astype('string')
    ##filter the uncleaned mobile
    df = df[df.mobile.str.contains('^1\d{10}$')]
    ##phone number of dealer to remove
    mobile_to_remove = pd.read_csv('mobile_to_remove.csv').drop(columns =[ 'Unnamed: 0'])
    mobile_to_remove_list = mobile_to_remove.mobile.unique()
    df = df[~df.mobile.isin(mobile_to_remove_list)]
    return df

def if_has_endpoint(touchpoint_id,list1):
    if touchpoint_id in list1:
        return 1
    else:
        return 0

In [None]:
df = preprocess(df)

# link consumer journey
df = df.sort_values(['mobile', 'action_time'],ascending=[False, True])
df_paths = df.groupby('mobile')['touchpoint_id'].aggregate(lambda x: x.tolist()).reset_index()

##### 总人数 & 车主/意向/机会比例 

In [None]:
def get_proportion(cyberster_activity_mobile, tp_id):
    df_cyberster = df[df.mobile.isin(cyberster_activity_mobile)]
    tmp = df_cyberster[df_cyberster.touchpoint_id.isin(tp_id)].groupby('mobile',as_index = False)['action_time'].min()
    tmp = tmp.merge(deal_time,on = 'mobile',how = 'left')
    owner_percentage = tmp[tmp.action_time > tmp.deal_time].mobile.count()/tmp.mobile.count()
    yixiang = df_cyberster[df_cyberster.touchpoint_id == '004000000000_tp'].groupby('mobile',as_index = False)['action_time'].min()
    yixiang = yixiang.rename(columns = {'action_time':'min_yixiang'})
    tmp = tmp.merge(yixiang, on = 'mobile',how = 'left')
    tmp2 = tmp[~(tmp.action_time > tmp.deal_time)]
    chance_percentage = (tmp2[(tmp2.action_time < tmp2.min_yixiang)].mobile.count() + tmp2.min_yixiang.isnull().sum()) /tmp.mobile.count()
    return owner_percentage,chance_percentage,tmp.mobile.count()

In [None]:
# 1) Cyberster 车主比例 意向比例 总人数
cyberster_activity_mobile = activity[(activity.touchpoint_id == '008002006005_tp')&(activity.activity_type == 'Cyberster')].mobile.unique()
owner_percentage,chance_percentage,num = get_proportion(cyberster_activity_mobile,['008002006005_tp'])
print(owner_percentage,chance_percentage,num)

In [None]:
# 2) 集点积分 车主比例 意向比例 总人数
points_activity_mobile = activity[(activity.touchpoint_id == '008002006005_tp')&(activity.activity_type == '集点积分')].mobile.unique()
owner_percentage,chance_percentage,num = get_proportion(points_activity_mobile,['008002006005_tp'])
print(owner_percentage,chance_percentage,num)

In [None]:
# 3) 社区活动 车主比例 意向比例 总人数
community_activity_mobile = df[df.touchpoint_id.isin(['008002006004_tp','008002006002_tp','008002006003_tp'])].mobile.unique()
owner_percentage,chance_percentage,num = get_proportion(community_activity_mobile,['008002006004_tp','008002006002_tp','008002006003_tp'])
print(owner_percentage,chance_percentage,num)

In [None]:
# 4) APP直播 车主比例 意向比例 总人数
app_activity_mobile = df[df.touchpoint_id.str.startswith('008002002')].mobile.unique()
app_id = [i for i in id_mapping.touchpoint_id.tolist() if i.startswith('008002002')]
owner_percentage,chance_percentage,num = get_proportion(app_activity_mobile,app_id)
print(owner_percentage,chance_percentage,num)

In [None]:
# 5) 推荐有礼 车主比例 意向比例 总人数
liebian_activity_mobile = list(df df.touchpoint_id == '008002006001_tp'].mobile.unique())
tuijian_activity_mobile =  list(activity[(activity.touchpoint_id == '008002006005_tp')&(activity.activity_type == '推荐有礼')].mobile.unique())
liebian_activity_mobile.extend(tuijian_activity_mobile)
liebian_activity_mobile = list(set(liebian_activity_mobile))
owner_percentage,chance_percentage,num = get_proportion(liebian_activity_mobile,['008002006001_tp','008002006005_tp'])
print(owner_percentage,chance_percentage,num)

In [None]:
# 6) 其他 车主比例 意向比例 总人数
qita_activity_mobile =  list(activity[(activity.touchpoint_id == '008002006005_tp')&(activity.activity_type != '推荐有礼')&(activity.activity_type != 'Cyberster')&(activity.activity_type != '集点积分')].mobile.unique())
owner_percentage,chance_percentage,num = get_proportion(qita_activity_mobile,['008002006005_tp'])
print(owner_percentage,chance_percentage,num)

##### 到店&成交转化率

In [None]:
df_paths['cyberster'] = df_paths.mobile.isin(cyberster_activity_mobile).astype('int')
df_paths['points'] = df_paths.mobile.isin(points_activity_mobile).astype('int')
df_paths['community'] = df_paths.mobile.isin(community_activity_mobile).astype('int')
df_paths['app'] = df_paths.mobile.isin(app_activity_mobile).astype('int')
df_paths['liebian'] = df_paths.mobile.isin(liebian_activity_mobile).astype('int')
df_paths['others'] = df_paths.mobile.isin(qita_activity_mobile).astype('int')

In [None]:
adhoc = df_paths.copy()
adhoc['if_008002006004_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002006004_tp',x))
adhoc['if_008002006002_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002006002_tp',x))
adhoc['if_008002006003_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002006003_tp',x))
adhoc['if_008002002000_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002000_tp',x))
adhoc['if_008002002001_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002001_tp',x))
adhoc['if_008002002002_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002002_tp',x))
adhoc['if_008002002003_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002003_tp',x))
adhoc['if_008002002004_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002004_tp',x))
adhoc['if_008002002005_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002005_tp',x))
adhoc['if_008002002006_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002006_tp',x))
adhoc['if_008002002007_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002007_tp',x))
adhoc['if_008002002008_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002002008_tp',x))
adhoc['if_008002006001_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002006001_tp',x))
adhoc['if_008002006005_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002006005_tp',x))
adhoc['if_008002003000_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002003000_tp',x))
adhoc['if_008002003001_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002003001_tp',x))
adhoc['if_008002003002_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002003002_tp',x))
adhoc['if_008002003003_tp'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('008002003003_tp',x))

In [None]:
adhoc['if_go_store'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('006000000000_tp',x))
adhoc['if_order'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('011000000000_tp',x))
adhoc['if_finish_drive'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('007003000000_tp',x))
adhoc['if_review_drive'] = adhoc.touchpoint_id.apply(lambda x:if_has_endpoint('007004000000_tp',x))
adhoc['if_drive'] = ((adhoc['if_finish_drive'] + adhoc['if_review_drive'])>=1).astype('int')

In [None]:
adhoc['tuijian'] = adhoc.mobile.isin(qita_activity_mobile).astype('int')
adhoc['cyberster'] = ((adhoc['cyberster']+ adhoc.if_008002006005_tp) == 2).astype('int')
adhoc['points'] = ((adhoc['points']+ adhoc.if_008002006005_tp) == 2).astype('int')
adhoc['others'] = ((adhoc['others']+ adhoc.if_008002006005_tp) == 2).astype('int')
adhoc['tuijian'] = ((adhoc['tuijian']+ adhoc.if_008002006005_tp) == 2).astype('int')

In [None]:
def exclude_endpoint(touchpoint_id,list1):
    if touchpoint_id in list1:
        list1 = list1[list1.index(touchpoint_id):]
    return list1

In [None]:
# 1) APP直播-到店&成交
tmp = adhoc.copy()
tmp.loc[tmp.app == 1,'touchpoint_id'] = tmp.loc[tmp.app == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002002001_tp',x))
tmp.loc[tmp.app == 1,'touchpoint_id'] = tmp.loc[tmp.app == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002002002_tp',x))
tmp.loc[tmp.app == 1,'touchpoint_id'] = tmp.loc[tmp.app == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002002003_tp',x))
tmp['if_go_store'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('006000000000_tp',x))
tmp['if_order'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('011000000000_tp',x))

# 到店
print(tmp[(tmp.app ==1)&(tmp.if_go_store ==1)].mobile.count())
# 成交
print(tmp[(tmp.app ==1)&(tmp.if_order ==1)].mobile.count())

In [None]:
# 2) Cyberster-到店&成交
tmp = adhoc.copy()
tmp.loc[tmp.cyberster == 1,'touchpoint_id'] = tmp.loc[tmp.cyberster == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002006005_tp',x))
tmp['if_go_store'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('006000000000_tp',x))
tmp['if_order'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('011000000000_tp',x))
tmp['if_finish_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007003000000_tp',x))
tmp['if_review_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007004000000_tp',x))
tmp['if_drive'] = ((tmp['if_finish_drive'] + tmp['if_review_drive'])>=1).astype('int')

# 到店
print(tmp[(tmp.cyberster == 1)&(tmp.if_go_store ==1)].mobile.nunique())
# 成交
print(tmp[(tmp.cyberster == 1)&(tmp.if_order ==1)].mobile.nunique())

In [None]:
# 3) 集点积分-到店&成交
tmp = adhoc.copy()
tmp.loc[tmp.points == 1,'touchpoint_id'] = tmp.loc[tmp.points == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002006005_tp',x))
tmp['if_go_store'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('006000000000_tp',x))
tmp['if_order'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('011000000000_tp',x))
tmp['if_finish_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007003000000_tp',x))
tmp['if_review_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007004000000_tp',x))
tmp['if_drive'] = ((tmp['if_finish_drive'] + tmp['if_review_drive'])>=1).astype('int')
# 到店
tmp[(tmp.points == 1)&(tmp.if_go_store ==1)].mobile.nunique()
# 成交
tmp[(tmp.points == 1)&(tmp.if_order ==1)].mobile.nunique()

In [None]:
# 4) 社区活动-到店&成交
tmp = adhoc.copy()
tmp.loc[tmp.community == 1,'touchpoint_id'] = tmp.loc[tmp.community == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002006004_tp',x))
tmp.loc[tmp.community == 1,'touchpoint_id'] = tmp.loc[tmp.community == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002006002_tp',x))
tmp.loc[tmp.community == 1,'touchpoint_id'] = tmp.loc[tmp.community == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002006003_tp',x))
tmp['if_go_store'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('006000000000_tp',x))
tmp['if_order'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('011000000000_tp',x))
tmp['if_finish_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007003000000_tp',x))
tmp['if_review_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007004000000_tp',x))
tmp['if_drive'] = ((tmp['if_finish_drive'] + tmp['if_review_drive'])>=1).astype('int')

#到店
tmp[(tmp.community == 1)&(tmp.if_go_store ==1)].mobile.nunique()/tmp[(tmp.community == 1)].mobile.nunique()
# 成交
tmp[(tmp.community == 1)&(tmp.if_order ==1)].mobile.nunique()/tmp[(tmp.community == 1)].mobile.nunique()

In [None]:
# 5) 推荐有礼-到店&成交
tmp = adhoc.copy()
tmp.loc[tmp.tuijian == 1,'touchpoint_id'] = tmp.loc[tmp.tuijian == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002006005_tp',x))
tmp['if_go_store'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('006000000000_tp',x))
tmp['if_order'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('011000000000_tp',x))
tmp['if_finish_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007003000000_tp',x))
tmp['if_review_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007004000000_tp',x))
tmp['if_drive'] = ((tmp['if_finish_drive'] + tmp['if_review_drive'])>=1).astype('int')

#到店
tmp[(tmp.tuijian == 1)&(tmp.if_go_store ==1)].mobile.nunique()
# 成交
tmp[(tmp.tuijian == 1)&(tmp.if_order ==1)].mobile.nunique()

In [None]:
# 6) 其他-到店&成交
tmp = adhoc.copy()
tmp.loc[tmp.others == 1,'touchpoint_id'] = tmp.loc[tmp.others == 1,'touchpoint_id'].apply(lambda x:exclude_endpoint('008002006005_tp',x))
tmp['if_go_store'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('006000000000_tp',x))
tmp['if_order'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('011000000000_tp',x))
tmp['if_finish_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007003000000_tp',x))
tmp['if_review_drive'] = tmp.touchpoint_id.apply(lambda x:if_has_endpoint('007004000000_tp',x))
tmp['if_drive'] = ((tmp['if_finish_drive'] + tmp['if_review_drive'])>=1).astype('int')

#到店
tmp[(tmp.others == 1)&(tmp.if_go_store ==1)].mobile.nunique()
# 成交
tmp[(tmp.others == 1)&(tmp.if_order ==1)].mobile.nunique()

##### 7天活跃率

In [None]:
app_id = [i for i in id_mapping.touchpoint_id.tolist() if i.startswith('003')] + \
[i for i in id_mapping.touchpoint_id.tolist() if i.startswith('008002')] + \
[i for i in id_mapping.touchpoint_id.tolist() if i.startswith('002') and i.startswith('002011') == False] + \
[i for i in id_mapping.touchpoint_id.tolist() if i.startswith('019')] 

def get_seven_days_active(cyberster_activity_mobile):
    cyberster = df[df.mobile.isin(cyberster_activity_mobile)]
    cyberster['action_time'] = pd.to_datetime(cyberster['action_time'])
    tmp = cyberster[cyberster.touchpoint_id.isin(app_id)].groupby('mobile',as_index = False).action_time.min()
    tmp['7_days_later'] = tmp['action_time'] + timedelta(days = 7)
    tmp = tmp.rename(columns = {'action_time':'min_time'})
    cyberster = cyberster.merge(tmp, on = 'mobile',how = 'left')
    a = cyberster[(cyberster.touchpoint_id.str.startswith('00800200'))&(cyberster.action_time > cyberster.min_time)&(cyberster.action_time < cyberster['7_days_later'])].mobile.nunique()
    b = cyberster.mobile.nunique()
    return a,b,a/b

In [None]:
get_seven_days_active(cyberster_activity_mobile)

In [None]:
get_seven_days_active(points_activity_mobile)

In [None]:
get_seven_days_active(community_activity_mobile)

In [None]:
get_seven_days_active(app_activity_mobile)

In [None]:
get_seven_days_active(liebian_activity_mobile)

In [None]:
get_seven_days_active(qita_activity_mobile)