# UUI pairs analysis (Last 7 day)

### Using user behaviour to examine SFE-SIP results
- Dataset: SFE-SIP results (2019-09-30)
- Dataset: User Behaviour Feature (2019-09-24 ~ 2019-09-30)

In [1]:
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.types as T
from pyspark.ml.linalg import Vectors, VectorUDT

In [2]:
def get_spark_session(**kwargs):
    """Initializing Spark context.

    Parameters
    ----------
    kwargs : dict
        Variable number of keyword arguments to initialize the SparkContext
        object

    Returns
    -------
     : SparkContext object

    """
    conf = SparkConf() \
        .setAppName(kwargs.get('app_name', 'test')) \
        .set("spark.executor.memory", kwargs.get('executor_memory', '10g')) \
        .set("spark.driver.memory", kwargs.get('driver_memory', '30g')) \
        .set("spark.driver.maxResultSize",
             kwargs.get('max_result_size', '100g')) \
        .set("spark.executor.instances", kwargs.get('num_executors', '200')) \
        .set("spark.executor.cores", kwargs.get('num_cores', '5')) \
        .set("spark.sql.crossJoin.enabled", True)  \
        .set("spark.cores.max", kwargs.get('cores_max', '1000'))    \
        .set("spark.network.timeout", kwargs.get('timeout', '3600s')) \
        .set("spark.executor.heartbeatInterval", kwargs.get('heartbeat', '3600s')) \
        .set("spark.sql.shuffle.partitions", kwargs.get('num_partitions', '4000')) \
        .set("spark.yarn.queue", kwargs.get('spark_yarn_queue', 'recommend'))

    spark_session = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()
    
    return spark_session

In [3]:
your_name = "LiuMing" # put your name here
spark_appname = "pyspark_user_path_explore_{}".format(your_name)

# spark conf parameter
executor_memory = '10g'
driver_memory = '30g'

# get the spark object
spark = get_spark_session(app_name=spark_appname,
                         executor_memory=executor_memory,
                         driver_memory=driver_memory)

sc = spark.sparkContext
spark.sql('use shopee')

DataFrame[]

# 1. Understand sfe-sip result

In [4]:
cluster_path = '/projects/regds_uui/uui_live/sfe/output/sfe_sip/sfe_sip_cluster_result_ID_1569772800_1569859200.parquet'
full_edges_path = '/projects/regds_uui/uui_live/sfe/output/sfe_sip/sfe_sip_full_edges_ID_1569772800_1569859200.parquet'

In [5]:
cluster_df = spark.read.parquet(cluster_path)
edges_df = spark.read.parquet(full_edges_path)

In [6]:
cluster_df = cluster_df.select('userid','cluster_id')
cluster_df.show(5)

+--------+----------+
|  userid|cluster_id|
+--------+----------+
|17607429|  17607429|
|23822481|  23822481|
|29588253|  29588253|
|36272550|  36082738|
|36705001|  36701514|
+--------+----------+
only showing top 5 rows



In [7]:
edges_df.show(5)

+--------+--------+----------+
|   node1|   node2|similarity|
+--------+--------+----------+
| 4259299| 8481659|      0.08|
| 3469106|10246004|  0.126875|
| 7775188|10740863|   0.00375|
|17540641|19700906|   0.52125|
|10637643|21006791|  0.251875|
+--------+--------+----------+
only showing top 5 rows



In [8]:
cluster1 = cluster_df.selectExpr('userid as u1', 'cluster_id as c1')
edges_df = edges_df.join(cluster1, 
                         cluster1.u1 == edges_df.node1,
                         how='left').na.fill(0)

cluster2 = cluster_df.selectExpr('userid as u2', 'cluster_id as c2')
edges_df = edges_df.join(cluster2, 
                         cluster2.u2 == edges_df.node2,
                         how='left').na.fill(1)
edges_df.show(3)

+-------+--------+----------+---+---+---+---+
|  node1|   node2|similarity| u1| c1| u2| c2|
+-------+--------+----------+---+---+---+---+
|4259299| 8481659|      0.08|  0|  0|  1|  1|
|3469106|10246004|  0.126875|  0|  0|  1|  1|
|7775188|10740863|   0.00375|  0|  0|  1|  1|
+-------+--------+----------+---+---+---+---+
only showing top 3 rows



In [9]:
@F.udf(T.StringType())
def label(c1, c2, sim):
    if sim >= 0.8:
        return 'uui_high_score_pairs'
    elif sim >= 0.7 and c1 == c2:
        return 'uui_grey_score_pairs'
    elif sim >= 0.7:
        return 'non_uui_grey_score_pairs'
    elif c1==c2:
        return 'uui_low_score_pairs'
    else:
        return 'non_uui_low_score_pairs'

In [10]:
edges_df = edges_df.withColumn('label', label(F.col('c1'), F.col('c2'), F.col('similarity')))
edges_df.show(4)

+--------+--------+----------+---+---+---+---+--------------------+
|   node1|   node2|similarity| u1| c1| u2| c2|               label|
+--------+--------+----------+---+---+---+---+--------------------+
| 4259299| 8481659|      0.08|  0|  0|  1|  1|non_uui_low_score...|
| 3469106|10246004|  0.126875|  0|  0|  1|  1|non_uui_low_score...|
| 7775188|10740863|   0.00375|  0|  0|  1|  1|non_uui_low_score...|
|17540641|19700906|   0.52125|  0|  0|  1|  1|non_uui_low_score...|
+--------+--------+----------+---+---+---+---+--------------------+
only showing top 4 rows



In [11]:
edges_df.groupBy('label').count().show(5, False)

+------------------------+--------+
|label                   |count   |
+------------------------+--------+
|uui_high_score_pairs    |787353  |
|non_uui_low_score_pairs |27975953|
|uui_grey_score_pairs    |1273694 |
|non_uui_grey_score_pairs|2856372 |
|uui_low_score_pairs     |278154  |
+------------------------+--------+



# 2. Select active user of the same period of those uui edges (09-24 ~ 09-30)

active user = user which has larger than 5 or 10 sessions in select period

In [12]:
def pull_all_action_data(spark, country, begin, end):
    """ 
    pull all user action data (click & view) on certain day 

    Args:
        spark: spark session object
        country: str country code e.g. 'ID'
        date: str e.g. ('2019-09-09')

    Returns:
        action_df: spark dataframe
            - schema: uid, page_type, page_section, operation, 
                event_timestamp
    """

    VIEW_QUERY = """  
        SELECT 
            userid as uid, 
            page_type,
            page_section[0] as section,
            operation,
            target_type,
            event_timestamp,
            hour
        FROM 
            user_behavior_mart_dwd__view_event
        WHERE 
            grass_region='%s' and utc_date >='%s' and utc_date <='%s'
    """ % (country, begin, end)

    CLICK_QUERY = """
        SELECT 
            userid as uid, 
            page_type,
            page_section[0] as section,
            operation,
            target_type,
            event_timestamp,
            hour
        FROM 
            user_behavior_mart_dwd__click_event
        WHERE 
            grass_region='%s' and utc_date >='%s' and utc_date <='%s'
    """ % (country, begin, end)

    click_df = spark.sql(CLICK_QUERY)
    view_df = spark.sql(VIEW_QUERY)
    union_df = click_df\
        .union(view_df)\
        .orderBy(['uid', 'event_timestamp'])\
        .where(F.col('uid').isNotNull())
    action_df = union_df.withColumn('event_timestamp', F.col('event_timestamp')/1000)
    return action_df


def map_feature(action_df, feature_map):
    """ 
    map raw action (page_type + page_section + operation) to selected
    concise action features.

    Args:
        action_df: spark dataframe with raw action featrue
            - schema: page_type, page_section, operation, ...
        feature_map: spark dataframe which record mapping relationship 
            - schema: page_type, page_section, operation, action_feature

    Returns:
        action_df: spark dataframe with concise action featrue
            - schema: action_feature, ...
    """

    # concat operation - page type - page section
    feature_map = feature_map.na.fill('null')
    feature_map = feature_map.withColumn('crossfeature',
                                         F.concat(
                                             F.col('operation'),
                                             F.lit('_'),
                                             F.col('page_type'),
                                             F.lit('_'),
                                             F.col('section'),
                                             F.lit('_'),
                                             F.col('target_type')))

    action_df = action_df.na.fill('null')
    action_df = action_df.withColumn('cross_feature',
                                     F.concat(
                                         F.col('operation'),
                                         F.lit('_'),
                                         F.col('page_type'),
                                         F.lit('_'),
                                         F.col('section'),
                                         F.lit('_'),
                                         F.col('target_type')))

    # join action_df with feature_map
    action_df = action_df.join(
        feature_map,
        action_df.cross_feature == feature_map.crossfeature)

    # delete useless columns
    action_df = action_df\
        .where(F.col('action_feature_v2') != 'ignore')
    action_df = action_df\
        .selectExpr('uid', 'action_feature_v2 as action_feature', 'event_timestamp', 'hour')

    return action_df


def split_session(action_df, session_idle_threshold=10 * 60):
    """ 
    calculate intervals between event_timestamp. split sessions using 
    predefined threshold (default 10 min). generate session id.

    Args:
        action_df: spark dataframe
            - schema: uid|event_timestamp|...
        session_idle_threshold: int (miliseconds)

    Returns:
        action_df: spark dataframe
            - schema: uid|event_timestamp|session_id|...
    """

    # Get intervals in miliseconds between click or views
    op_window = Window.partitionBy('uid').orderBy('event_timestamp')
    action_df = action_df.withColumn('last_op_timestamp',
                                     F.lag(action_df.event_timestamp)
                                     .over(op_window)
                                     )
    action_df = action_df.withColumn('op_interval',
                                     F.when(
                                         F.isnull(action_df.event_timestamp - action_df.last_op_timestamp), 0)
                                     .otherwise(action_df.event_timestamp - action_df.last_op_timestamp)
                                     )
    action_df = action_df.drop('last_op_timestamp')

    # get session split signal
    action_df = action_df.withColumn('session_start',
                                     F.when(action_df.op_interval >
                                            session_idle_threshold, 1)
                                     .otherwise(0)
                                     )

    # Keep This Interval For Calculateing op_interval matrix
    # action_df = action_df.drop('op_interval')

    # get the cum sum of session_start to get human split session id.
    windowval = (Window.partitionBy('uid').orderBy('event_timestamp')
                 .rangeBetween(Window.unboundedPreceding, 0))
    action_df = action_df.withColumn('session_id',
                                     F.sum('session_start').over(windowval)
                                     )
    action_df = action_df.drop('session_start')

    return action_df


def construct_2gram(action_df):
    """ 
    construct 2 gram of action feature within each human split session.
    add session_start and session_end to the beginning and end of each session.

    Args:
        action_df: spark dataframe

    Returns:
        action_df: spark dataframe
    """

    # add extra row showing "session_end" for each user * session partition
    session_end_df = action_df.groupBy(['uid', 'session_id']).agg(
            F.max('event_timestamp').alias('event_timestamp'),
            F.max('hour').alias('hour'),
            F.lit(0.1).alias('op_interval')
        )
    session_end_df = session_end_df.withColumn('event_timestamp', session_end_df.event_timestamp + 0.1)
    session_end_df = session_end_df.withColumn('action_feature', F.lit('session_end'))

    # pay attention to schema ! do not mix them
    action_df = action_df.select('uid','session_id','event_timestamp', 'hour', 'op_interval', 'action_feature')
    action_df = action_df.union(session_end_df)

    # generate action 2 gram with in each session
    session_window = Window.partitionBy(["uid", "session_id"]).orderBy(['uid', 'event_timestamp'])
    action_df = action_df.withColumn(
        'last_action', F.lag(action_df.action_feature).over(session_window))
    action_df = action_df.withColumn(
        'action_2gram',
        F.when(
            F.isnull(action_df.last_action), F.concat(F.lit('session_begin-'), F.col('action_feature')))
        .otherwise(F.concat(F.col('last_action'), F.lit('-'), F.col('action_feature')))
    )
    action_df = action_df.drop('last_action')

    return action_df


def get_action_features():
    """ get action features from file """

    feature_map_file = 'feature_mapping_v2.csv'
    df = pd.read_csv(feature_map_file)

    action_features = sorted(list(df['action_feature_v2'].unique()))
    action_features.insert(0, 'session_begin')
    action_features.append('session_end')

    return action_features


def get_feature_map(action_features):
    """ get action features from file """
    action_map = {}
    for i, action in enumerate(action_features):
        action_map[action] = i

    action_2gram_map = {}
    for j, action1 in enumerate(action_features):
        for k, action2 in enumerate(action_features):
            action_2gram_map[action1 + '-' + action2] = j * len(action_features) + k
    
    time_map = {}
    for hour in range(24):
        time_map[hour] = hour

    return action_map, action_2gram_map, time_map


#### Global Variable
action_features = get_action_features()
action_map, action_2gram_map, time_map = get_feature_map(action_features)

@F.udf(T.ArrayType(T.IntegerType()))
def map_array1(columns):  
    index_map = action_map
    dense_vec = [0] * len(index_map)
    for row in columns:
        content, value  = row[0], row[1]
        try:
            dense_vec[index_map[content]] = value
        except:
            pass
    return dense_vec

@F.udf(T.ArrayType(T.IntegerType()))
def map_array2(columns):

    index_map = action_2gram_map
    dense_vec = [0] * len(index_map)
    for row in columns:
        content, value  = row[0], row[1]
        try:
            dense_vec[index_map[content]] = value
        except:
            pass
    return dense_vec

@F.udf(T.ArrayType(T.DoubleType()))
def map_array4(columns):

    index_map = action_2gram_map
    dense_vec = [0] * len(index_map)
    for row in columns:
        content, value  = row[0], row[1]
        try:
            dense_vec[index_map[content]] = value
        except:
            pass
    return dense_vec

@F.udf(T.ArrayType(T.IntegerType()))
def map_array3(columns):

    index_map = time_map
    dense_vec = [0] * len(index_map)
    for row in columns:
        content, value  = row[0], row[1]
        try:
            dense_vec[index_map[content]] = value
        except:
            pass
    return dense_vec


def get_action_feature(action_df):
    """ get action count, in map format """

    df = action_df\
        .groupBy(['uid', 'action_feature'])\
        .agg(F.count('action_feature').alias('action_feature_count'))\
        .orderBy(['uid', 'action_feature'])

    action_distribute = df.groupBy('uid').agg(
        map_array1(F.collect_list(F.struct("action_feature",
                                          "action_feature_count"))
                 ).alias("action_count")
    ).orderBy('uid')

    return action_distribute


def get_transition_feature(action_df):
    """ get action 2 gram count and intervals, in map format """

    df = action_df\
        .groupBy(['uid', 'action_2gram'])\
        .agg(
            F.count('action_2gram').alias('action_2gram_count'),
            F.sum('op_interval').alias('action_2gram_interval'))\
        .orderBy(['uid', 'action_2gram'])

    action_2gram_distribute = df.groupBy('uid').agg(
        map_array2(F.collect_list(F.struct("action_2gram",
                                                   "action_2gram_count"))
                 ).alias("action_2gram_count"),
        map_array4(F.collect_list(F.struct("action_2gram",
                                                   "action_2gram_interval"))
                 ).alias("action_2gram_interval")
    ).orderBy('uid')
    
    return action_2gram_distribute


def get_general_feature(action_df):
    """ 
    get general feature from user behavior data, such as
        - Total session count per day 
        - Average operations per session 
    """
    # generate session level feature
    session_df = action_df.groupBy(['uid', 'session_id'])\
        .agg(F.min("event_timestamp").alias('begin'),
            F.max("event_timestamp").alias('end'),
            F.count(F.lit(1)).alias('session_ops'))\
        .orderBy(['uid', 'begin'])

    session_df = session_df.withColumn(
        'session_duration', F.col('end') - F.col('begin'))

    user_df = session_df.groupBy('uid')\
        .agg(
            F.sum("session_duration").alias('daily_active_time'),
            F.count(F.lit(1)).alias("daily_session_count"),
            F.sum("session_ops").alias("daily_operation_count")) \
        .orderBy('uid')

    df = action_df.groupBy(['uid', 'hour'])\
        .agg(
            F.count(F.lit(1)).alias('active_hour_count')
        ).orderBy('uid')

    active_hour_df = df.groupBy('uid').agg(
        map_array3(
            F.collect_list(F.struct("hour","active_hour_count"))
        ).alias("active_hour_map")
    ).orderBy('uid')

    return user_df, active_hour_df

In [13]:
action_df = pull_all_action_data(spark, 'ID', '2019-09-24', '2019-09-30')

# map feature
feature_map = spark.read.load('uui-click-path/feature_mapping_v2.csv',
                 format="csv", sep=",", inferSchema="true", header="true")

action_df = map_feature(action_df, feature_map)
action_df.cache()
action_df.show(3)

+---+--------------+---------------+----+
|uid|action_feature|event_timestamp|hour|
+---+--------------+---------------+----+
|  0|          home|3.16403013766E8|   6|
|  0|          home| 3.1640305217E8|   6|
|  0|     microsite|3.16403053182E8|   6|
+---+--------------+---------------+----+
only showing top 3 rows



In [14]:
action_df = split_session(action_df, 10 * 60)
action_df = construct_2gram(action_df)

In [15]:
action_distribute = get_action_feature(action_df)

In [16]:
action_2gram_distribute = get_transition_feature(action_df)

In [17]:
user_df, active_hour_df = get_general_feature(action_df)

In [18]:
active_user_df = user_df.where(user_df.daily_session_count >= 10)

In [19]:
user_info = (active_user_df
             .join(action_distribute, ["uid"])
             .join(action_2gram_distribute, ["uid"])
             .join(active_hour_df, ["uid"]))

In [20]:
user_info.coalesce(400).write.mode('overwrite').parquet('user_info')

# Save result to save time

In [21]:
# user_info = spark.read.parquet('user_info')

In [22]:
user_info = user_info.repartition("uid").orderBy('uid')
user_info.show(1)

+---+-----------------+-------------------+---------------------+--------------------+--------------------+---------------------+--------------------+
|uid|daily_active_time|daily_session_count|daily_operation_count|        action_count|  action_2gram_count|action_2gram_interval|     active_hour_map|
+---+-----------------+-------------------+---------------------+--------------------+--------------------+---------------------+--------------------+
|  0|858461.6318747997|               1295|               349854|[0, 390, 8923, 2,...|[0, 2, 65, 0, 0, ...| [, 27961.25, 1416...|[9813, 12630, 143...|
+---+-----------------+-------------------+---------------------+--------------------+--------------------+---------------------+--------------------+
only showing top 1 row



In [23]:
user_info.printSchema()

root
 |-- uid: long (nullable = true)
 |-- daily_active_time: double (nullable = true)
 |-- daily_session_count: long (nullable = false)
 |-- daily_operation_count: long (nullable = true)
 |-- action_count: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- action_2gram_count: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- action_2gram_interval: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- active_hour_map: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [24]:
@F.udf(T.ArrayType(T.DoubleType()))
def divide(interval, count):
    result = []
    for i in range(len(interval)):
        if count[i] and interval[i]:
            result.append(float(interval[i]) / float(count[i]))
        else:
            result.append(0.0)
    return result

@F.udf(T.ArrayType(T.DoubleType()))
def markov(count):
    dim = 32
    result = []
    for i in range(dim):
        tmp = count[i*dim:(i+1)*dim]
        s = float(sum(tmp))
        if s == 0:
            s += 1
        result.extend([x / s for x in tmp])
    return result

@F.udf(T.ArrayType(T.DoubleType()))
def normalise(count):
    s = float(sum(count))
    if s == 0:
        s += 1.0
    result = [x / s for x in count]
    return result


to_vector = F.udf(lambda a: Vectors.dense(a), VectorUDT())

In [25]:
user_info = user_info.withColumn('hour_normalise', to_vector(normalise(F.col('active_hour_map'))))

In [26]:
user_info = user_info.withColumn('action_normalise', to_vector(normalise(F.col('action_count'))))

In [27]:
user_info = user_info.withColumn('op_interval', to_vector(divide(F.col('action_2gram_interval'), 
                                                                 F.col('action_2gram_count'))))

In [28]:
user_info = user_info.withColumn('markov', to_vector(markov(F.col('action_2gram_count'))))

In [29]:
# user_info.show(1)

In [30]:
user_info.printSchema()

root
 |-- uid: long (nullable = true)
 |-- daily_active_time: double (nullable = true)
 |-- daily_session_count: long (nullable = false)
 |-- daily_operation_count: long (nullable = true)
 |-- action_count: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- action_2gram_count: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- action_2gram_interval: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- active_hour_map: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- hour_normalise: vector (nullable = true)
 |-- action_normalise: vector (nullable = true)
 |-- op_interval: vector (nullable = true)
 |-- markov: vector (nullable = true)



In [31]:
user_info.coalesce(400).write.mode('overwrite').parquet('user_info')

In [32]:
print("There are %d users which have more than 10 sessions during 09-01 ~ 09-30" % (user_info.count()))

There are 4769162 users which have more than 10 sessions during 09-01 ~ 09-30


# 3. Experiment Starts Here ...

In [12]:
user_info = spark.read.parquet('user_info')

In [13]:
user_info.printSchema()

root
 |-- uid: long (nullable = true)
 |-- daily_active_time: double (nullable = true)
 |-- daily_session_count: long (nullable = true)
 |-- daily_operation_count: long (nullable = true)
 |-- action_count: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- action_2gram_count: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- action_2gram_interval: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- active_hour_map: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- hour_normalise: vector (nullable = true)
 |-- action_normalise: vector (nullable = true)
 |-- op_interval: vector (nullable = true)
 |-- markov: vector (nullable = true)



In [14]:
edges_df = edges_df.selectExpr('node1', 'node2', 'similarity', 'label')
user_info_df1 = user_info.selectExpr('uid as u1', 
                                     'daily_active_time as active_time1', 
                                     'daily_session_count as session_count1',
                                     'daily_operation_count as operation_count1',
                                     'action_normalise as action_normalise1',
                                     'markov as markov1',
                                     'op_interval as interval1',
                                     'hour_normalise as hour_normalise1'
                                     )

user_info_df2 = user_info.selectExpr('uid as u2', 
                                     'daily_active_time as active_time2', 
                                     'daily_session_count as session_count2',
                                     'daily_operation_count as operation_count2',
                                     'action_normalise as action_normalise2',
                                     'markov as markov2',
                                     'op_interval as interval2',
                                     'hour_normalise as hour_normalise2'
                                     )

In [15]:
user_info_df2.printSchema()

root
 |-- u2: long (nullable = true)
 |-- active_time2: double (nullable = true)
 |-- session_count2: long (nullable = true)
 |-- operation_count2: long (nullable = true)
 |-- action_normalise2: vector (nullable = true)
 |-- markov2: vector (nullable = true)
 |-- interval2: vector (nullable = true)
 |-- hour_normalise2: vector (nullable = true)



In [16]:
total_df = edges_df
total_df = total_df.join(user_info_df1, user_info_df1.u1 == total_df.node1)
total_df = total_df.join(user_info_df2, user_info_df2.u2 == total_df.node2)
# total_df.cache()
# total_df.show(1)

In [17]:
# user_info.unpersist()
# user_info.take(1)

In [18]:
# write our UDF for cosine similarity
@F.udf(T.DoubleType())
def cos_dis(a,b):
    if (np.linalg.norm(a) * np.linalg.norm(b))== 0.0:
        return -1.0
    return 1 - float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))

@F.udf(T.DoubleType())
def L2_dis(a,b):
    return float(np.linalg.norm(a-b))

result_df = total_df
result_df = result_df.withColumn("action_normalise_cos_dis", cos_dis(F.col('action_normalise1'), 
                                                                     F.col('action_normalise2')))


result_df = result_df.withColumn("markov_cos_dis", cos_dis(F.col('markov1'),
                                                           F.col('markov2')))


result_df = result_df.withColumn("interval_cos_dis", cos_dis(F.col('interval1'),
                                                                   F.col('interval2')))


result_df = result_df.withColumn("hour_normalise_cos_dis", cos_dis(F.col('hour_normalise1'),
                                                                   F.col('hour_normalise2')))


In [19]:
result_df.printSchema()

root
 |-- node1: integer (nullable = true)
 |-- node2: integer (nullable = true)
 |-- similarity: float (nullable = false)
 |-- label: string (nullable = true)
 |-- u1: long (nullable = true)
 |-- active_time1: double (nullable = true)
 |-- session_count1: long (nullable = true)
 |-- operation_count1: long (nullable = true)
 |-- action_normalise1: vector (nullable = true)
 |-- markov1: vector (nullable = true)
 |-- interval1: vector (nullable = true)
 |-- hour_normalise1: vector (nullable = true)
 |-- u2: long (nullable = true)
 |-- active_time2: double (nullable = true)
 |-- session_count2: long (nullable = true)
 |-- operation_count2: long (nullable = true)
 |-- action_normalise2: vector (nullable = true)
 |-- markov2: vector (nullable = true)
 |-- interval2: vector (nullable = true)
 |-- hour_normalise2: vector (nullable = true)
 |-- action_normalise_cos_dis: double (nullable = true)
 |-- markov_cos_dis: double (nullable = true)
 |-- interval_cos_dis: double (nullable = true)
 |-- h

In [20]:
result_df=result_df.drop('u1').drop('u2')

In [21]:
# result_df.coalesce(300).write.mode('overwrite').parquet('uui_user_sim')

# Save result to save time

In [22]:
# pair_df = pd.read_parquet('uui_user_sim')

In [23]:
pair_df = result_df.select('node1','node2',
                           'similarity','label',
                           'active_time1','active_time2',
                           'session_count1','session_count2',
                           'operation_count1','operation_count2',
                           'action_normalise_cos_dis',
                           'markov_cos_dis',
                           'hour_normalise_cos_dis',
                           'interval_cos_dis'
                          ).toPandas()

In [24]:
pair_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 854641 entries, 0 to 854640
Data columns (total 14 columns):
node1                       854641 non-null int32
node2                       854641 non-null int32
similarity                  854641 non-null float32
label                       854641 non-null object
active_time1                854641 non-null float64
active_time2                854641 non-null float64
session_count1              854641 non-null int64
session_count2              854641 non-null int64
operation_count1            854641 non-null int64
operation_count2            854641 non-null int64
action_normalise_cos_dis    854641 non-null float64
markov_cos_dis              854641 non-null float64
hour_normalise_cos_dis      854641 non-null float64
interval_cos_dis            854641 non-null float64
dtypes: float32(1), float64(6), int32(2), int64(4), object(1)
memory usage: 81.5+ MB


In [25]:
pair_df.groupby('label')['node1'].count()

label
non_uui_grey_score_pairs     36355
non_uui_low_score_pairs     809864
uui_grey_score_pairs          3783
uui_high_score_pairs          4310
uui_low_score_pairs            329
Name: node1, dtype: int64

# Active Pair Analysis

## Cosine distance distribution for each column

In [26]:
high_sim = pair_df[pair_df['action_normalise_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['markov_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['hour_normalise_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['interval_cos_dis'] < 0.05]

print('There are %d high_sim records, about %f percentage of total pairs'%
      (len(high_sim), float(len(high_sim)) / len(pair_df)))

high_sim.groupby('label')['node1'].count()

There are 17143 high_sim records, about 0.020059 percentage of total pairs


label
non_uui_grey_score_pairs    9053
non_uui_low_score_pairs     4629
uui_grey_score_pairs        1211
uui_high_score_pairs        2235
uui_low_score_pairs           15
Name: node1, dtype: int64

In [27]:
import plotly.express as px
import plotly.graph_objects as go

In [33]:
g = pair_df.groupby('label')
sample_df = g.apply(lambda x: x.sample(329).reset_index(drop=True)).reset_index(drop=True)

In [35]:
axis = 'action_normalise_cos_dis'
bins = 100

x1=sample_df[sample_df['label']=='uui_high_score_pairs'][axis].values
x2=sample_df[sample_df['label']=='uui_grey_score_pairs'][axis].values
x3=sample_df[sample_df['label']=='non_uui_grey_score_pairs'][axis].values
x4=sample_df[sample_df['label']=='uui_low_score_pairs'][axis].values
x5=sample_df[sample_df['label']=='non_uui_low_score_pairs'][axis].values

fig = go.Figure()
fig.add_trace(go.Histogram(x=x1, nbinsx=bins, name="uui_high_score_pairs"))
fig.add_trace(go.Histogram(x=x2, nbinsx=bins, name="uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x3, nbinsx=bins, name="non_uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x4, nbinsx=bins, name="uui_low_score_pairs"))
fig.add_trace(go.Histogram(x=x5, nbinsx=bins, name="non_uui_low_score_pairs"))

# Overlay both histograms
fig.update_layout(barmode='overlay',  yaxis_type="log")

# Reduce opacity to see both histograms
fig.update_traces(opacity=0.75)
fig.show()

In [36]:
axis = 'markov_cos_dis'
bins = 100

x1=sample_df[sample_df['label']=='uui_high_score_pairs'][axis].values
x2=sample_df[sample_df['label']=='uui_grey_score_pairs'][axis].values
x3=sample_df[sample_df['label']=='non_uui_grey_score_pairs'][axis].values
x4=sample_df[sample_df['label']=='uui_low_score_pairs'][axis].values
x5=sample_df[sample_df['label']=='non_uui_low_score_pairs'][axis].values

fig = go.Figure()
fig.add_trace(go.Histogram(x=x1, nbinsx=bins, name="uui_high_score_pairs"))
fig.add_trace(go.Histogram(x=x2, nbinsx=bins, name="uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x3, nbinsx=bins, name="non_uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x4, nbinsx=bins, name="uui_low_score_pairs"))
fig.add_trace(go.Histogram(x=x5, nbinsx=bins, name="non_uui_low_score_pairs"))

# Overlay both histograms
fig.update_layout(barmode='overlay',  yaxis_type="log")

# Reduce opacity to see both histograms
fig.update_traces(opacity=0.75)
fig.show()

In [37]:
axis = 'interval_cos_dis'
bins = 100

x1=sample_df[sample_df['label']=='uui_high_score_pairs'][axis].values
x2=sample_df[sample_df['label']=='uui_grey_score_pairs'][axis].values
x3=sample_df[sample_df['label']=='non_uui_grey_score_pairs'][axis].values
x4=sample_df[sample_df['label']=='uui_low_score_pairs'][axis].values
x5=sample_df[sample_df['label']=='non_uui_low_score_pairs'][axis].values

fig = go.Figure()
fig.add_trace(go.Histogram(x=x1, nbinsx=bins, name="uui_high_score_pairs"))
fig.add_trace(go.Histogram(x=x2, nbinsx=bins, name="uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x3, nbinsx=bins, name="non_uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x4, nbinsx=bins, name="uui_low_score_pairs"))
fig.add_trace(go.Histogram(x=x5, nbinsx=bins, name="non_uui_low_score_pairs"))

# Overlay both histograms
fig.update_layout(barmode='overlay',  yaxis_type="log")

# Reduce opacity to see both histograms
fig.update_traces(opacity=0.75)
fig.show()

In [38]:
axis = 'hour_normalise_cos_dis'

bins = 100

x1=sample_df[sample_df['label']=='uui_high_score_pairs'][axis].values
x2=sample_df[sample_df['label']=='uui_grey_score_pairs'][axis].values
x3=sample_df[sample_df['label']=='non_uui_grey_score_pairs'][axis].values
x4=sample_df[sample_df['label']=='uui_low_score_pairs'][axis].values
x5=sample_df[sample_df['label']=='non_uui_low_score_pairs'][axis].values

fig = go.Figure()
fig.add_trace(go.Histogram(x=x1, nbinsx=bins, name="uui_high_score_pairs"))
fig.add_trace(go.Histogram(x=x2, nbinsx=bins, name="uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x3, nbinsx=bins, name="non_uui_grey_score_pairs"))
fig.add_trace(go.Histogram(x=x4, nbinsx=bins, name="uui_low_score_pairs"))
fig.add_trace(go.Histogram(x=x5, nbinsx=bins, name="non_uui_low_score_pairs"))

# Overlay both histograms
fig.update_layout(barmode='overlay',  yaxis_type="log")

# Reduce opacity to see both histograms
fig.update_traces(opacity=0.75)
fig.show()

# UUI with user behaviour

In [39]:
pair_df['active_time_diff'] = abs(pair_df['active_time1'] - pair_df['active_time2']) / 60
pair_df['session_count_diff'] = abs(pair_df['session_count1'] - pair_df['session_count2'])
pair_df['operation_count_diff'] = abs(pair_df['operation_count1'] - pair_df['operation_count2']) 

In [40]:
stats_df = pair_df.groupby("label")[
    'active_time_diff',
    'session_count_diff',
    'operation_count_diff',
    'action_normalise_cos_dis',
    'markov_cos_dis',
    'hour_normalise_cos_dis',
    'interval_cos_dis'
].agg([np.mean, np.std])

stats_df

Unnamed: 0_level_0,active_time_diff,active_time_diff,session_count_diff,session_count_diff,operation_count_diff,operation_count_diff,action_normalise_cos_dis,action_normalise_cos_dis,markov_cos_dis,markov_cos_dis,hour_normalise_cos_dis,hour_normalise_cos_dis,interval_cos_dis,interval_cos_dis
Unnamed: 0_level_1,mean,std,mean,std,mean,std,mean,std,mean,std,mean,std,mean,std
label,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2
non_uui_grey_score_pairs,10.729777,61.929358,2.05273,5.628253,37.423766,306.591868,0.015524,0.054193,0.098752,0.164389,0.25478,0.271967,0.045032,0.133379
non_uui_low_score_pairs,183.751313,328.933677,17.848633,26.408557,866.78799,2163.437164,0.197881,0.200558,0.396411,0.19295,0.628093,0.258583,0.29215,0.271577
uui_grey_score_pairs,16.557499,36.170201,6.360825,9.176143,35.88184,94.20979,0.035899,0.075852,0.18971,0.212023,0.173485,0.195022,0.159673,0.241434
uui_high_score_pairs,24.165242,101.145815,3.80464,9.120802,79.840139,413.303389,0.02351,0.074711,0.120292,0.173867,0.103469,0.167522,0.107719,0.210396
uui_low_score_pairs,24.246352,25.786854,9.43769,9.923944,41.012158,55.047816,0.081571,0.114823,0.28954,0.257193,0.386992,0.247924,0.269222,0.264048


In [41]:
labels=['non_uui_grey_score_pairs', 
        'non_uui_low_score_pairs', 
        'uui_grey_score_pairs', 
        'uui_high_score_pairs', 
        'uui_low_score_pairs']

fig = go.Figure(data=[
    go.Bar(name='active_time_diff',         x=labels, y=stats_df['active_time_diff']['mean'].values),
    go.Bar(name='session_count_diff',       x=labels, y=stats_df['session_count_diff']['mean'].values),
    go.Bar(name='operation_count_diff',     x=labels, y=stats_df['operation_count_diff']['mean'].values),
    go.Bar(name='action_normalise_cos_dis', x=labels, y=stats_df['action_normalise_cos_dis']['mean'].values),
    go.Bar(name='markov_cos_dis',           x=labels, y=stats_df['markov_cos_dis']['mean'].values),
    go.Bar(name='interval_cos_dis',         x=labels, y=stats_df['interval_cos_dis']['mean'].values),
    go.Bar(name='hour_normalise_cos_dis',   x=labels, y=stats_df['hour_normalise_cos_dis']['mean'].values)
])
# Change the bar mode
fig.update_layout(barmode='group')
fig.show()

# make high threshold on user behaviour

### For users who spend total time larger than 30 min (in 2019-09)

In [42]:
session_count_threshold = 10
active_time_threshold = 30 * 60


df = pair_df[pair_df['session_count1'] >= session_count_threshold]
df = df[df['session_count2'] >= session_count_threshold]
df = df[df['active_time1'] >= active_time_threshold]
df = df[df['active_time2'] >= active_time_threshold]

In [43]:
df.groupby('label')['node1'].count()

label
non_uui_grey_score_pairs      1981
non_uui_low_score_pairs     484277
uui_grey_score_pairs           574
uui_high_score_pairs           668
uui_low_score_pairs             92
Name: node1, dtype: int64

In [44]:
stat_df = df.groupby("label")[
    'active_time_diff',
    'session_count_diff',
    'operation_count_diff',
    'action_normalise_cos_dis',
    'markov_cos_dis',
    'hour_normalise_cos_dis',
    'interval_cos_dis'
].agg([np.mean, np.std])

stat_df

Unnamed: 0_level_0,active_time_diff,active_time_diff,session_count_diff,session_count_diff,operation_count_diff,operation_count_diff,action_normalise_cos_dis,action_normalise_cos_dis,markov_cos_dis,markov_cos_dis,hour_normalise_cos_dis,hour_normalise_cos_dis,interval_cos_dis,interval_cos_dis
Unnamed: 0_level_1,mean,std,mean,std,mean,std,mean,std,mean,std,mean,std,mean,std
label,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2
non_uui_grey_score_pairs,134.370831,224.339413,13.536598,17.002029,555.260979,1183.694171,0.088029,0.133166,0.341891,0.156857,0.299258,0.226706,0.298823,0.245762
non_uui_low_score_pairs,282.299358,378.675723,26.328017,28.96544,1340.881052,2619.516415,0.267949,0.194955,0.466326,0.132558,0.559652,0.21332,0.40627,0.255627
uui_grey_score_pairs,36.873701,69.730057,11.010453,9.097104,90.280488,179.664307,0.046528,0.093731,0.26003,0.157165,0.214426,0.149032,0.266178,0.234715
uui_high_score_pairs,121.824815,227.638394,12.654192,16.858042,425.543413,961.202164,0.05427,0.111118,0.286524,0.152949,0.228477,0.192845,0.296477,0.26346
uui_low_score_pairs,21.01939,20.657761,10.206522,8.029531,44.347826,59.426274,0.109754,0.155864,0.344585,0.238941,0.255881,0.147156,0.356394,0.220969


In [45]:
labels=['non_uui_grey_score_pairs', 
        'non_uui_low_score_pairs', 
        'uui_grey_score_pairs', 
        'uui_high_score_pairs', 
        'uui_low_score_pairs']

fig = go.Figure(data=[
    go.Bar(name='active_time_diff',         x=labels, y=stats_df['active_time_diff']['mean'].values),
    go.Bar(name='session_count_diff',       x=labels, y=stats_df['session_count_diff']['mean'].values),
    go.Bar(name='operation_count_diff',     x=labels, y=stats_df['operation_count_diff']['mean'].values),
    go.Bar(name='action_normalise_cos_dis', x=labels, y=stats_df['action_normalise_cos_dis']['mean'].values),
    go.Bar(name='markov_cos_dis',           x=labels, y=stats_df['markov_cos_dis']['mean'].values),
    go.Bar(name='interval_cos_dis',         x=labels, y=stats_df['interval_cos_dis']['mean'].values),
    go.Bar(name='hour_normalise_cos_dis',   x=labels, y=stats_df['hour_normalise_cos_dis']['mean'].values)
])
# Change the bar mode
fig.update_layout(barmode='group')
fig.show()

In [46]:
high_sim = df[pair_df['action_normalise_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['markov_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['hour_normalise_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['interval_cos_dis'] < 0.05]

print('There are %d high_sim records, about %f percentage of total pairs'%
      (len(high_sim), float(len(high_sim)) / len(pair_df)))

high_sim.groupby('label')['node1'].count()

There are 61 high_sim records, about 0.000071 percentage of total pairs



Boolean Series key will be reindexed to match DataFrame index.



label
non_uui_grey_score_pairs    18
non_uui_low_score_pairs     36
uui_grey_score_pairs         1
uui_high_score_pairs         6
Name: node1, dtype: int64

###  For users who have total session >= 30 (in 2019 - 09)

In [47]:
session_count_threshold = 30

df = pair_df[pair_df['session_count1'] >= session_count_threshold]
df = df[df['session_count2'] >= session_count_threshold]

In [48]:
df.groupby('label')['node1'].count()

label
non_uui_grey_score_pairs      1023
non_uui_low_score_pairs     117234
uui_grey_score_pairs           764
uui_high_score_pairs           496
uui_low_score_pairs             84
Name: node1, dtype: int64

In [49]:
stat_df = df.groupby("label")[
    'active_time_diff',
    'session_count_diff',
    'operation_count_diff',
    'action_normalise_cos_dis',
    'markov_cos_dis',
    'hour_normalise_cos_dis',
    'interval_cos_dis'
].agg([np.mean, np.std])

stat_df

Unnamed: 0_level_0,active_time_diff,active_time_diff,session_count_diff,session_count_diff,operation_count_diff,operation_count_diff,action_normalise_cos_dis,action_normalise_cos_dis,markov_cos_dis,markov_cos_dis,hour_normalise_cos_dis,hour_normalise_cos_dis,interval_cos_dis,interval_cos_dis
Unnamed: 0_level_1,mean,std,mean,std,mean,std,mean,std,mean,std,mean,std,mean,std
label,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2
non_uui_grey_score_pairs,145.46323,245.247218,13.766373,16.038273,587.090909,1448.451712,0.061679,0.10778,0.339922,0.150055,0.191037,0.147319,0.276448,0.238394
non_uui_low_score_pairs,405.320549,492.040475,31.869125,32.407463,1832.818781,3460.604788,0.212805,0.191979,0.437833,0.134946,0.394679,0.189392,0.386781,0.238126
uui_grey_score_pairs,37.052923,49.349557,9.823298,7.151895,66.560209,100.431991,0.038144,0.058234,0.262836,0.165854,0.191935,0.081884,0.271166,0.2305
uui_high_score_pairs,107.597103,232.706161,12.256048,16.118405,299.832661,918.567329,0.039115,0.0882,0.278868,0.161758,0.162431,0.106769,0.252679,0.23225
uui_low_score_pairs,29.754301,26.698863,9.357143,7.351162,51.690476,47.357389,0.044557,0.060025,0.244345,0.168719,0.180371,0.057461,0.296486,0.227116


In [50]:
labels=['non_uui_grey_score_pairs', 
        'non_uui_low_score_pairs', 
        'uui_grey_score_pairs', 
        'uui_high_score_pairs', 
        'uui_low_score_pairs']

fig = go.Figure(data=[
    go.Bar(name='active_time_diff',         x=labels, y=stats_df['active_time_diff']['mean'].values),
    go.Bar(name='session_count_diff',       x=labels, y=stats_df['session_count_diff']['mean'].values),
    go.Bar(name='operation_count_diff',     x=labels, y=stats_df['operation_count_diff']['mean'].values),
    go.Bar(name='action_normalise_cos_dis', x=labels, y=stats_df['action_normalise_cos_dis']['mean'].values),
    go.Bar(name='markov_cos_dis',           x=labels, y=stats_df['markov_cos_dis']['mean'].values),
    go.Bar(name='interval_cos_dis',         x=labels, y=stats_df['interval_cos_dis']['mean'].values),
    go.Bar(name='hour_normalise_cos_dis',   x=labels, y=stats_df['hour_normalise_cos_dis']['mean'].values)
])
# Change the bar mode
fig.update_layout(barmode='group')
fig.show()

In [51]:
high_sim = df[pair_df['action_normalise_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['markov_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['hour_normalise_cos_dis'] < 0.05]
high_sim = high_sim[high_sim['interval_cos_dis'] < 0.05]

print('There are %d high_sim records, about %f percentage of total pairs'%
      (len(high_sim), float(len(high_sim)) / len(pair_df)))

high_sim.groupby('label')['node1'].count()

There are 8 high_sim records, about 0.000009 percentage of total pairs



Boolean Series key will be reindexed to match DataFrame index.



label
non_uui_grey_score_pairs    1
non_uui_low_score_pairs     5
uui_high_score_pairs        2
Name: node1, dtype: int64

# Case Study

In [52]:
high_sim.head(30)

Unnamed: 0,node1,node2,similarity,label,active_time1,active_time2,session_count1,session_count2,operation_count1,operation_count2,action_normalise_cos_dis,markov_cos_dis,hour_normalise_cos_dis,interval_cos_dis,active_time_diff,session_count_diff,operation_count_diff
21090,175924895,175925363,0.499375,non_uui_low_score_pairs,4102.377996,4082.268996,34,32,1885,1973,0.000835,0.047311,0.024098,0.000498,0.33515,2,88
187273,175846386,175848183,0.425625,non_uui_low_score_pairs,4850.214996,4298.096997,37,32,2009,1863,0.000648,0.046601,0.016386,0.0014,9.201967,5,146
259802,144437935,144443841,0.641875,non_uui_low_score_pairs,4588.993996,5126.046995,42,49,153,193,0.008286,0.040188,0.039575,0.040543,8.950883,7,40
364824,39782939,80397833,0.424375,non_uui_low_score_pairs,12425.328996,13163.690996,36,33,1702,1625,2.9e-05,0.006666,0.002626,0.004912,12.306033,3,77
526790,174577324,174581162,0.736875,non_uui_grey_score_pairs,9708.976996,10455.082998,38,33,494,529,0.002008,0.014656,0.019172,0.000387,12.4351,5,35
782564,171850824,171852395,0.85625,uui_high_score_pairs,11416.250996,8759.302997,36,30,2248,1811,3.4e-05,0.024252,0.046912,0.018795,44.282467,6,437
821657,172342718,172486816,0.416875,non_uui_low_score_pairs,12081.051996,12536.996996,37,44,3304,3646,0.000266,0.000998,0.03684,0.002624,7.599083,7,342
852744,169791521,169791809,0.844375,uui_high_score_pairs,3550.122997,4481.146995,36,50,127,210,0.012943,0.032723,0.048621,0.047882,15.517067,14,83


In [54]:
def case_study_data(u1, u2, spark=spark, country='ID', begin='2019-09-01', end='2019-09-30'):
    """ 
    pull all user action data (click & view) on certain day 

    Args:
        spark: spark session object
        country: str country code e.g. 'ID'
        date: str e.g. ('2019-09-09')

    Returns:
        action_df: spark dataframe
            - schema: uid, page_type, page_section, operation, 
                event_timestamp
    """

    VIEW_QUERY = """  
        SELECT 
            userid, 
            page_type,
            page_section[0] as section,
            operation,
            target_type,
            event_timestamp,
            hour,
            utc_date
        FROM 
            user_behavior_mart_dwd__view_event
        WHERE 
            grass_region='%s' and utc_date >='%s' and utc_date <='%s' and (userid=%d or userid=%d)
    """ % (country, begin, end, u1, u2)

    CLICK_QUERY = """
        SELECT 
            userid, 
            page_type,
            page_section[0] as section,
            operation,
            target_type,
            event_timestamp,
            hour,
            utc_date
        FROM 
            user_behavior_mart_dwd__click_event
        WHERE 
            grass_region='%s' and utc_date >='%s' and utc_date <='%s' and (userid=%d or userid=%d)
    """ % (country, begin, end, u1, u2)

    click_df = spark.sql(CLICK_QUERY)
    view_df = spark.sql(VIEW_QUERY)
    union_df = click_df.union(view_df).orderBy(['utc_date', 'userid', 'event_timestamp'])
    return union_df

In [117]:
case_study_data(130957307, 130957530, begin='2019-09-24').show(200)

+---------+----------+-------+--------------------+-----------+---------------+----+----------+
|   userid| page_type|section|           operation|target_type|event_timestamp|hour|  utc_date|
+---------+----------+-------+--------------------+-----------+---------------+----+----------+
|130957307|      null|   null|action_login_success|       null|  1569289763029|   1|2019-09-24|
|130957307|coins_page|   null|                view|       null|  1569289763472|   1|2019-09-24|
|130957530|      null|   null|action_login_success|       null|  1569289789362|   1|2019-09-24|
|130957530|coins_page|   null|                view|       null|  1569289789791|   1|2019-09-24|
|130957307|coins_page|   null|                view|       null|  1569379056995|   2|2019-09-25|
|130957307|      null|   null|action_login_success|       null|  1569379061330|   2|2019-09-25|
|130957307|      null|   null|action_login_success|       null|  1569380442768|   3|2019-09-25|
|130957307|coins_page|   null|          

In [55]:
case_study_data(174577324, 174581162, begin='2019-09-24').show(200)

+---------+-------------+-------------------+--------------------+--------------------+---------------+----+----------+
|   userid|    page_type|            section|           operation|         target_type|event_timestamp|hour|  utc_date|
+---------+-------------+-------------------+--------------------+--------------------+---------------+----+----------+
|174577324|         home|               null|                view|                null|  1569308166962|   6|2019-09-24|
|174577324|         null|               null|action_login_success|                null|  1569309667575|   7|2019-09-24|
|174577324|notifications|notification_folder|               click|                 tab|  1569309687294|   7|2019-09-24|
|174577324|       search|               null|                view|                null|  1569310148483|   7|2019-09-24|
|174577324|       search|               null|               click|                shop|  1569310178176|   7|2019-09-24|
|174577324|         shop|               

# Extreme Cases 
for user who have super long sessions or super long active time

In [61]:
pair_df[(pair_df['active_time1'] > 3 * 24 * 3600 ) | (pair_df['active_time2'] > 3 * 24 * 3600)].head(5)

Unnamed: 0,node1,node2,similarity,label,active_time1,active_time2,session_count1,session_count2,operation_count1,operation_count2,action_normalise_cos_dis,markov_cos_dis,hour_normalise_cos_dis,interval_cos_dis,active_time_diff,session_count_diff,operation_count_diff
10534,117742253,118479126,0.1825,non_uui_low_score_pairs,284775.722987,139905.793983,138,185,9763,4825,0.071234,0.264403,0.141717,0.229593,2414.498817,47,4938
11879,48135685,50323129,0.123125,non_uui_low_score_pairs,75092.924988,275233.875985,111,172,3283,16512,0.454511,0.567918,0.244148,0.406058,3335.682517,61,13229
16869,114791222,175375499,0.053125,non_uui_low_score_pairs,423649.75999,23604.071991,100,80,41410,1674,0.22447,0.482244,0.212058,0.473169,6667.428133,20,39736
17859,139638930,181289910,0.085,non_uui_low_score_pairs,259489.952979,4716.481999,221,12,21304,350,0.827196,0.708082,0.471429,0.54102,4246.224516,209,20954
18805,75674574,118037262,0.105,non_uui_low_score_pairs,87329.738992,310429.303988,91,142,6030,26290,0.182936,0.343563,0.069905,0.258415,3718.326083,51,20260


In [56]:
case_study_data(8210111, 1000000000000, begin='2019-09-24').show(200)

+-------+-------------------+--------------------+--------------------+---------------+---------------+----+----------+
| userid|          page_type|             section|           operation|    target_type|event_timestamp|hour|  utc_date|
+-------+-------------------+--------------------+--------------------+---------------+---------------+----+----------+
|8210111|streaming_room_push|                null| action_live_details|           null|  1569283195926|   0|2019-09-24|
|8210111|streaming_room_push|                null| action_live_details|           null|  1569283197925|   0|2019-09-24|
|8210111|streaming_room_push|                null| action_live_details|           null|  1569283199934|   0|2019-09-24|
|8210111|streaming_room_push|                null| action_live_details|           null|  1569283201931|   0|2019-09-24|
|8210111|streaming_room_push|                null| action_live_details|           null|  1569283203930|   0|2019-09-24|
|8210111|streaming_room_push|           

In [57]:
case_study_data(9756598, 1000000000000, begin='2019-09-24').show(200)

+-------+-------------------+-------+-------------------+-----------+---------------+----+----------+
| userid|          page_type|section|          operation|target_type|event_timestamp|hour|  utc_date|
+-------+-------------------+-------+-------------------+-----------+---------------+----+----------+
|9756598|streaming_room_push|   null|action_live_details|       null|  1569283180207|   0|2019-09-24|
|9756598|streaming_room_push|   null|action_live_details|       null|  1569283182221|   0|2019-09-24|
|9756598|streaming_room_push|   null|action_live_details|       null|  1569283184228|   0|2019-09-24|
|9756598|streaming_room_push|   null|action_live_details|       null|  1569283186887|   0|2019-09-24|
|9756598|streaming_room_push|   null|action_live_details|       null|  1569283188892|   0|2019-09-24|
|9756598|streaming_room_push|   null|action_live_details|       null|  1569283190912|   0|2019-09-24|
|9756598|streaming_room_push|   null|action_live_details|       null|  15692831929

In [60]:
pair_df[(pair_df['session_count1'] > 200 ) | (pair_df['session_count2'] > 200 )].head(5)

Unnamed: 0,node1,node2,similarity,label,active_time1,active_time2,session_count1,session_count2,operation_count1,operation_count2,action_normalise_cos_dis,markov_cos_dis,hour_normalise_cos_dis,interval_cos_dis,active_time_diff,session_count_diff,operation_count_diff
45,67940462,71956746,0.220625,non_uui_low_score_pairs,94759.356978,18349.585994,230,65,6852,2266,0.690868,0.436714,0.399676,0.214315,1273.496183,165,4586
254,1331191,6674115,0.08875,non_uui_low_score_pairs,25433.377996,114088.251979,35,218,2248,6570,0.611496,0.498877,0.260575,0.576888,1477.581233,183,4322
823,82726621,137872604,0.0075,non_uui_low_score_pairs,105078.02598,40590.460995,205,48,7324,2604,0.355554,0.610375,0.283379,0.434114,1074.79275,157,4720
934,23122487,36956629,0.4575,non_uui_low_score_pairs,55204.695982,213154.848983,184,209,6159,28421,0.007176,0.212717,0.032833,0.174839,2632.50255,25,22262
1463,97139668,151138096,0.050625,non_uui_low_score_pairs,42811.37998,6528.076997,210,31,2159,734,0.159742,0.606575,0.414183,0.20292,604.721716,179,1425


In [59]:
case_study_data(159287310, 1000000000000, begin='2019-09-24').show(200)

+---------+-------------------+--------------------+--------------------+-----------------+---------------+----+----------+
|   userid|          page_type|             section|           operation|      target_type|event_timestamp|hour|  utc_date|
+---------+-------------------+--------------------+--------------------+-----------------+---------------+----+----------+
|159287310|               null|                null|action_click_navi...|             null|  1569283227814|   0|2019-09-24|
|159287310|                 me|                null|                view|             null|  1569283229107|   0|2019-09-24|
|159287310|                 me|                null|               click|   selling_button|  1569283230689|   0|2019-09-24|
|159287310|             seller|                null|                view|             null|  1569283231668|   0|2019-09-24|
|159287310|             seller|                null|               click|          to_ship|  1569283233677|   0|2019-09-24|
|1592873