# Interview challenge for PayPay.
Data: markplace web session log on `data` folder

Processing & Analytical goals:
    
    - Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session. https://en.wikipedia.org/wiki/Session_(web_analytics)

    - Determine the average session time

    - Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.

    - Find the most engaged users, ie the IPs with the longest session times


In [7]:
# Libraries and setup
# Auto reload changes
%load_ext autoreload
%autoreload 2

import sys
sys.path.append("../")

# source dependencies, log_file_schema & data handler
from src.dependencies import *
from src.log_file_schema import schema
from src.data_handler import DataHandler

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [8]:
# Create Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PayPay_DS_Challange") \
    .getOrCreate()

def duration(start, end):
    """
    Retunr time duration in seconds

    @param start: timestamp
    @param end: timestamnp
    """
    try:
        num_of_seconds = (end - start).total_seconds()
    except:
        num_of_seconds = 0
    
    return num_of_seconds

get_duration = udf(duration, FloatType())

In [17]:
def sessionize(df_logs, gd):
    """
    Aggregate the page hits by session

    session_time: 15 mins
    num_partitions: 15
    
    Session format is - 
        ip_session_info : <IP_address>_<session_count>
        count_session: session count
        

    @param df_logs: data log record
    @param gd: durations
    """
    window_func_ip = Window.partitionBy("client_ip").orderBy("current_timestamp")
    df = df_logs.withColumn("previous_timestamp",
                        lag(col("current_timestamp")).over(window_func_ip)) \
            .withColumn("session_duration",
                        gd(col("previous_timestamp"), col("current_timestamp"))) \
            .withColumn("is_new_session",
                        when((col("session_duration") > session_time), 1).otherwise(0)) \
            .withColumn("count_session",
                        sum(col("is_new_session")).over(window_func_ip)) \
            .withColumn("ip_session_info",
                        concat_ws("_", col("client_ip"), col("count_session")))
    
    sessionized_df = df.select(["ip_session_info", "client_ip", "request_url",
                                "previous_timestamp", "current_timestamp",
                                "session_duration", "is_new_session", "count_session"]) \
                        .orderBy("count_session",ascending=False)
    
    sessionized_df[["ip_session_info", 'count_session']].distinct().orderBy("count_session", ascending=False).show()

    return sessionized_df


def average_session_time(df_ip_session):
    """
    Calcute the average session time

    @param df_ip_session: Aggregrated session
    """
    window_func_session = Window.partitionBy("ip_session_info").orderBy("current_timestamp")
    df_session = df_ip_session \
                    .withColumn("previous_timestamp_session",
                              lag(df_ip_session["current_timestamp"]).over(window_func_session)) \
                    .withColumn("current_session_duration",
                              get_duration(col("previous_timestamp_session"), col("current_timestamp")))
    
    df_session_total = df_session.groupby("ip_session_info") \
                                .agg(
                                    sum("current_session_duration").alias("total_session_time")
                                ).cache()
    
    df_session_avg = df_session_total.select([mean("total_session_time").alias("avg_session_time")]).cache()
    
    df_session_avg.show()
    
    return df_session


def count_unique_request(df_session):
    """
    Determine unique URL visits per session (count a hit to a unique URL only once per session)
    
    @param df_session: average session time
    """
    df_unique_url = df_session.groupby("ip_session_info") \
                        .agg(
                          countDistinct("request_url").alias("count_unique_requests")
                        )
    
    df_unique_url.show()


def get_longest_session_time(df_session):
    """
    Determine most engaged users, ie the IPs with the longest session times
    
    @param df_session: average session time
    """
    df_ip_time = df_session.groupby("client_ip") \
                    .agg(
                        sum("session_duration").alias("session_time_all"),
                        count("client_ip").alias("num_sessions"),
                        max("session_duration").alias("session_duration_max")
                    ) \
                    .withColumn(
                            "avg_session_time", 
                            col("session_time_all") / col("num_sessions")
                            ) \
                    .orderBy(col("avg_session_time"), ascending=False)
    
    df_ip_time.show()


def solve(spark):
    """
    @param Spark: Spark session
    """
    data_handler = DataHandler()
    df_logs = data_handler.initialization(spark).cache()

    print("1. Sessionize the web log by IP = aggregate all page hits by visitor/IP during a session")
    df_ip_session = sessionize(df_logs, gd=get_duration).repartition(num_partitions).cache()

    print("2. Average session time")
    df_session = average_session_time(df_ip_session).repartition(num_partitions).cache()

    print("3. Unique URL visits per session. (count a hit to a unique URL only once per session.)")
    count_unique_request(df_session)

    print("4. Most engaged users, ie the IPs with the longest session times.)")
    get_longest_session_time(df_session)

# DE answer solve
solve(spark=spark)

1. Sessionize the web log by IP = aggregate all page hits by visitor/IP during a session
+-----------------+-------------+
|  ip_session_info|count_session|
+-----------------+-------------+
| 220.226.206.7_12|           12|
| 220.226.206.7_11|           11|
| 220.226.206.7_10|           10|
| 54.255.254.236_9|            9|
| 177.71.207.172_9|            9|
|  54.241.32.108_9|            9|
|  54.243.31.236_9|            9|
|   54.232.40.76_9|            9|
| 176.34.159.236_9|            9|
| 54.252.254.204_9|            9|
|  119.81.61.166_9|            9|
|  54.240.196.33_9|            9|
|   54.228.16.12_9|            9|
|  54.252.79.172_9|            9|
|  54.245.168.44_9|            9|
| 120.29.232.107_9|            9|
|168.235.197.238_9|            9|
|  107.23.255.12_9|            9|
|  106.186.23.95_9|            9|
|   185.20.4.220_9|            9|
+-----------------+-------------+
only showing top 20 rows

2. Average session time
+------------------+
|  avg_session_time|
+--