# PayPay Corporation Data Engineering Challenge

In [1]:
import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.countDistinct

// UTC timezone
val TIME_ZONE = "UTC"   

// Session interval of 15 minutes
val TIME_INTERVAL_THRESHOLD_IN_SECONDS = 15 * 60

Intitializing Scala interpreter ...

Spark Web UI available at http://YuYu-Spectre-COM:4040
SparkContext available as 'sc' (version = 3.1.2, master = local[*], app id = local-1625621447814)
SparkSession available as 'spark'


import org.apache.spark.sql.types.DataTypes
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.lag
import org.apache.spark.sql.functions.unix_timestamp
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.sum
import org.apache.spark.sql.functions.countDistinct
TIME_ZONE: String = UTC
TIME_INTERVAL_THRESHOLD_IN_SECONDS: Int = 900


In [2]:
// Setting UTC timezone for spark session which is necessary to calculate the difference between timestamp datatypes
spark.conf.set("spark.sql.session.timeZone", TIME_ZONE)

## Example of the data from the Amazon marketplace web log file

![title](data_screenshot.png)

Screenshot for the example of the data and any other information such as the schema of the data can be found in the following [link](https://docs.aws.amazon.com/elasticloadbalancing/latest/classic/access-log-collection.html#access-log-entry-format)

In [3]:
// Read in file in dataframe
var wholeDF = spark.read.text("data/2015_07_22_mktplace_shop_web_log_sample.log.gz")

wholeDF: org.apache.spark.sql.DataFrame = [value: string]


# Prepping data before conducting analysis

In [4]:
/* Creating new dataframe to separate out columns from the original dataframe which had all data stored under one column
   Used regex pattern within the split because we are spliting on whitespace and request has multiple words within its column we want to ignore those whitespaces within double quotes
   Regex pattern was found at the following link https://stackabuse.com/regex-splitting-by-character-unless-in-quotes
*/
var splitDF = wholeDF.withColumn("_columns", split($"value", " (?=([^\"]*\"[^\"]*\")*[^\"]*$)")).select(     
    $"_columns".getItem(0).cast(DataTypes.TimestampType).as("time"),
    $"_columns".getItem(1).as("name_of_balancer"),
    $"_columns".getItem(2).as("ip_address"),
    $"_columns".getItem(3).as("private_address"),
    $"_columns".getItem(4).as("request_processing_time"),
    $"_columns".getItem(5).as("backend_processing_time"),
    $"_columns".getItem(6).as("response_processing_time"),
    $"_columns".getItem(7).as("elb_status_code"),
    $"_columns".getItem(8).as("backend_status_code"),
    $"_columns".getItem(9).as("received_bytes"),
    $"_columns".getItem(10).as("sent_bytes"),
    $"_columns".getItem(11).as("request"),
    $"_columns".getItem(12).as("user_agent"),
    $"_columns".getItem(13).as("ssl_cipher"),
    $"_columns".getItem(14).as("ssl_protocol")
)

splitDF: org.apache.spark.sql.DataFrame = [time: timestamp, name_of_balancer: string ... 13 more fields]


In [5]:
// First five rows of dataframe
splitDF.show(5)

+--------------------+----------------+--------------------+---------------+-----------------------+-----------------------+------------------------+---------------+-------------------+--------------+----------+--------------------+--------------------+--------------------+------------+
|                time|name_of_balancer|          ip_address|private_address|request_processing_time|backend_processing_time|response_processing_time|elb_status_code|backend_status_code|received_bytes|sent_bytes|             request|          user_agent|          ssl_cipher|ssl_protocol|
+--------------------+----------------+--------------------+---------------+-----------------------+-----------------------+------------------------+---------------+-------------------+--------------+----------+--------------------+--------------------+--------------------+------------+
|2015-07-22 09:00:...|marketpalce-shop|123.242.248.130:5...|  10.0.6.158:80|               0.000022|               0.026109|            

In [6]:
// Add column - previous timestamp for each record
splitDF = splitDF.withColumn("previous_time", 
                             lag($"time", 1, null).over(Window.partitionBy("ip_address").orderBy("time"))
                            )

splitDF: org.apache.spark.sql.DataFrame = [time: timestamp, name_of_balancer: string ... 14 more fields]


In [7]:
// Add column - time difference in seconds between current timestamp and previous timestamp
splitDF = splitDF.withColumn("time_difference", 
                             col("time").cast("long") - col("previous_time").cast("long")
                            )

splitDF: org.apache.spark.sql.DataFrame = [time: timestamp, name_of_balancer: string ... 15 more fields]


In [8]:
// Add column - whether the record is new session or not determined by whether time difference it is greater than the 15 minute interval or if time difference is null  
splitDF = splitDF.withColumn("is_new_session",
                             when(col("time_difference").$greater(TIME_INTERVAL_THRESHOLD_IN_SECONDS)
                                  .or(col("time_difference").isNull), 1).otherwise(0)
                            )

splitDF: org.apache.spark.sql.DataFrame = [time: timestamp, name_of_balancer: string ... 16 more fields]


In [9]:
// Add column - used to flag each new session per IP address 
splitDF = splitDF.withColumn("session_seq",
                             sum(col("is_new_session")).over(Window.partitionBy(col("ip_address")).orderBy("time"))
                            )

splitDF: org.apache.spark.sql.DataFrame = [time: timestamp, name_of_balancer: string ... 17 more fields]


# PayPay Challenges

## Challenge 1: Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session

Explanation: from my understanding of the first challenge, we want to get count of all page hits which I interrupted a hit as request back to the load balancer (a record) so in this situation we do count and group by ip address to get count of records (hits) by ip address

In [10]:
var countByIP = splitDF.groupBy("ip_address").count()

countByIP: org.apache.spark.sql.DataFrame = [ip_address: string, count: bigint]


In [11]:
// First five rows of dataframe
countByIP.show(5, false)

+---------------------+-----+
|ip_address           |count|
+---------------------+-----+
|107.167.107.202:37235|3    |
|14.139.253.18:52731  |1    |
|116.202.36.65:41369  |1    |
|1.38.22.148:24378    |2    |
|219.64.127.129:24106 |3    |
+---------------------+-----+
only showing top 5 rows



## Challenge 2: Determine the average session time

Explanation: sum the duration of each page hit then divide by the number of sessions (average session duration is in seconds)

In [12]:
var avgSessionTime = splitDF.select(sum("time_difference") / sum("is_new_session")).withColumnRenamed("(sum(time_difference) / sum(is_new_session))", "average_session_time")

avgSessionTime: org.apache.spark.sql.DataFrame = [average_session_time: double]


In [13]:
// Time is in seconds (conversion in minutes: 25)
avgSessionTime.show()

+--------------------+
|average_session_time|
+--------------------+
|   1552.721265586177|
+--------------------+



## Challenge 3: Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session

Explanation: sum the duration of each page hit then divide by the number of sessions (average session duration is in seconds)

In [14]:
// Calculate counts of distinct URLs by ip address and session sequence
var countUniqueURL = splitDF.groupBy("ip_address", "session_seq").agg(countDistinct("request").alias("unique_url_count"));

countUniqueURL: org.apache.spark.sql.DataFrame = [ip_address: string, session_seq: bigint ... 1 more field]


In [15]:
// First five rows of dataframe
countUniqueURL.show(5, false)

+-------------------+-----------+----------------+
|ip_address         |session_seq|unique_url_count|
+-------------------+-----------+----------------+
|1.187.167.214:65257|1          |1               |
|1.187.170.77:64760 |1          |1               |
|1.187.179.217:34549|1          |3               |
|1.187.185.201:46980|1          |1               |
|1.187.202.35:38668 |1          |4               |
+-------------------+-----------+----------------+
only showing top 5 rows



## Challenge 4: Find the most engaged users, ie the IPs with the longest session times
Explanation: summarize time spent within each session (session seq) than order by total session time in descending order to display 10 top longest sessions

In [16]:
// Aggregating to determine the longest session time by session and ip address in seconds
var mostEngagedUsers = splitDF.groupBy("ip_address", "session_seq").sum("time_difference").withColumnRenamed("sum(time_difference)", "total_session_time")

mostEngagedUsers: org.apache.spark.sql.DataFrame = [ip_address: string, session_seq: bigint ... 1 more field]


In [17]:
// Display the TOP 10 longest sessions
mostEngagedUsers.sort(desc("sum(time_difference)")).show(10)

+-------------------+-----------+------------------+
|         ip_address|session_seq|total_session_time|
+-------------------+-----------+------------------+
|106.186.23.95:35629|          2|             66511|
|106.186.23.95:35632|          2|             66511|
|106.186.23.95:35626|          2|             66511|
|106.186.23.95:39247|          2|             66500|
|106.186.23.95:39646|          2|             66500|
|106.186.23.95:40448|          2|             66500|
|106.186.23.95:40598|          2|             66500|
|106.186.23.95:39944|          2|             66500|
|106.186.23.95:40184|          2|             66500|
|106.186.23.95:39994|          2|             66499|
+-------------------+-----------+------------------+
only showing top 10 rows

