In [95]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext, Window
from pyspark.sql import functions as F
from pyspark.sql import types as T
import re
# Load PySpark
spark = SparkSession.builder.appName('Analysis').getOrCreate()
sc = pyspark.SparkContext.getOrCreate()

Processing & Analytical goals:
------------

In [96]:
regx = r"^(\S+) (\S+) (\S+) (\S+) (\S+) (\S+) (\S+) (\S+) (\S+) (\S+) (\S+) \S+ (\S+) \S+ (.*) (\S+) (\S+)$"
rdd = sc.textFile("data/2015_07_22_mktplace_shop_web_log_sample.log.gz").map(lambda x: re.split(regx, x)[1:16])
rdd_ = rdd.map(lambda x: (x[0], x[1], x[2], x[3], float(x[4]), float(x[5]), float(x[6]), int(x[7]), int(x[8]), int(x[9]), int(x[10]), x[11], x[12], x[13], x[14]))

In [97]:
Schema = T.StructType([T.StructField("timestamp", T.StringType(), True),
                                    T.StructField("elb", T.StringType(), True),
                                    T.StructField("client_port", T.StringType(), True),
                                    T.StructField("backend_port", T.StringType(), True),
                                    T.StructField("request_processing_time", T.DoubleType(), True),
                                    T.StructField("backend_processing_time", T.DoubleType(), True),
                                    T.StructField("response_processing_time", T.DoubleType(), True),
                                    T.StructField("elb_status_code", T.LongType(), True),
                                    T.StructField("backend_status_code", T.LongType(), True),
                                    T.StructField("received_bytes", T.LongType(), True),
                                    T.StructField("sent_bytes", T.LongType(), True),
                                    T.StructField("request", T.StringType(), True),
                                    T.StructField("user_agent", T.StringType(), True),
                                    T.StructField("ssl_cipher", T.StringType(), True),
                                    T.StructField("ssl_protocol", T.StringType(), True)])

In [98]:
# df = spark.read.csv("data/2015_07_22_mktplace_shop_web_log_sample.log.gz", Schema, sep = " ", ignoreLeadingWhiteSpace=True, ignoreTrailingWhiteSpace=True).cache()
df = spark.createDataFrame(rdd_, schema=Schema)

1) Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session.
---------
https://en.wikipedia.org/wiki/Session_(web_analytics)

In [99]:
def sessionize(df):
    time_frame = 15 * 60  # 15 mins * 60
    w = Window.partitionBy("client_ip").orderBy("timestamp")
    diff = F.coalesce(F.unix_timestamp(F.col("timestamp")) - F.unix_timestamp(F.lag(F.col("timestamp"), 1).over(w)), F.lit(0))
    cum_diff = F.sum(diff).over(w)
    subgroup = (cum_diff / time_frame).cast('integer').alias("session_id")
    return df.select("*", subgroup)

In [100]:
# Determine the average session time
def get_avg_session_time(df):
    df_ = df.groupby("client_ip", "session_id").agg(((F.unix_timestamp(F.max("timestamp")) - F.unix_timestamp(F.min("timestamp")) + 1)/60).alias("session_length"))
    avg_session_time = df_.agg(F.avg("session_length").alias("avg_session_time")).collect()[0]["avg_session_time"]
    return avg_session_time

# Find the most engaged users, ie the IPs with the longest session times
def get_most_engaged_user(df):
    df_ = df.groupby("client_ip", "session_id").agg(F.min("timestamp").alias("from_timestamp"), F.max("timestamp").alias("to_timestamp"), ((F.unix_timestamp(F.max("timestamp")) - F.unix_timestamp(F.min("timestamp")) + 1)/60).alias("session_length"))
    return df_.orderBy("session_length", ascending=False)

# Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.
def get_unique_url_request(df):
    return df.groupby("client_ip", "session_id").agg(F.countDistinct("request").alias("unique_url_request"))

def analyze(df):
    df_ = df.groupby("client_ip", "session_id").agg(F.min("timestamp").alias("from_timestamp"), F.max("timestamp").alias("to_timestamp"), ((F.unix_timestamp(F.max("timestamp")) - F.unix_timestamp(F.min("timestamp")) + 1)/60).alias("session_length"), F.countDistinct("request").alias("unique_url_request"))
    return df_
    

In [101]:
df_1 = df.withColumn("client_ip", F.split(F.col("client_port"), ':')[0])\
.withColumn("timestamp", F.col("timestamp").substr(0,19).cast('timestamp')).select("client_ip", "timestamp", F.lower(F.col("request")).alias("request"))

df_2 = sessionize(df_1)
df_3 = analyze(df_2).cache()

2) Determine the average session time
----------

In [102]:
print("Number of Distinct Sessions = {}".format(df_2.groupby("client_ip", "session_id").count().count()))
print("Average Session Time (Minutes) = {}".format(df_3.agg(F.avg("session_length").alias("avg_session_time")).collect()[0]["avg_session_time"]))


Number of Distinct Sessions = 115936
Average Session Time (Minutes) = 1.3433538043978264


3) Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.
----------------

In [103]:
print("Number of clients which made request to only 1 unique URL = {}".format(df_3.where("unique_url_request = 1").count()))

print()
print("20 Clients with most unique URL visits per session")
df_3.select("client_ip", "from_timestamp", "to_timestamp", "unique_url_request").sort("unique_url_request", ascending=False).show(20, False)

Number of clients which made request to only 1 unique URL = 25923

20 Clients with most unique URL visits per session
+-------------+-------------------+-------------------+------------------+
|client_ip    |from_timestamp     |to_timestamp       |unique_url_request|
+-------------+-------------------+-------------------+------------------+
|119.81.61.166|2015-07-22 16:10:28|2015-07-22 16:25:05|8016              |
|52.74.219.71 |2015-07-22 16:10:28|2015-07-22 16:25:05|5478              |
|52.74.219.71 |2015-07-22 10:30:28|2015-07-22 10:39:47|5057              |
|106.186.23.95|2015-07-22 21:05:28|2015-07-22 21:10:13|4320              |
|119.81.61.166|2015-07-22 17:40:28|2015-07-22 17:45:28|3928              |
|119.81.61.166|2015-07-22 18:00:28|2015-07-22 18:05:27|3637              |
|119.81.61.166|2015-07-22 02:40:06|2015-07-22 02:45:03|3334              |
|52.74.219.71 |2015-07-22 18:00:28|2015-07-22 18:05:27|2907              |
|119.81.61.166|2015-07-22 21:05:28|2015-07-22 21:10:04|28

4) Find the most engaged users, ie the IPs with the longest session times.
------------

In [104]:
print("Printing 20 Most Engaged Users with there session start and end time, session length in minutes")
df_3.select("client_ip", "from_timestamp", "to_timestamp", "session_length").sort("session_length", ascending=False).show(20, False)

Printing 20 Most Engaged Users with there session start and end time, session length in minutes
+---------------+-------------------+-------------------+--------------+
|client_ip      |from_timestamp     |to_timestamp       |session_length|
+---------------+-------------------+-------------------+--------------+
|120.56.178.102 |2015-07-22 10:45:54|2015-07-22 11:00:53|15.0          |
|112.79.37.154  |2015-07-22 10:45:42|2015-07-22 11:00:41|15.0          |
|103.29.159.62  |2015-07-22 10:45:35|2015-07-22 11:00:34|15.0          |
|122.176.150.156|2015-07-22 10:31:43|2015-07-22 10:46:42|15.0          |
|106.76.143.18  |2015-07-22 10:46:34|2015-07-22 11:01:33|15.0          |
|220.227.161.206|2015-07-22 10:34:11|2015-07-22 10:49:10|15.0          |
|117.218.65.155 |2015-07-22 10:46:47|2015-07-22 11:01:46|15.0          |
|180.188.249.253|2015-07-22 10:46:23|2015-07-22 11:01:22|15.0          |
|14.99.239.11   |2015-07-22 10:34:37|2015-07-22 10:49:36|15.0          |
|165.241.31.254 |2015-07-22 

Additional questions for Machine Learning Engineer (MLE) candidates:
---------------

In [220]:
df_t_1 = df.withColumn("client_ip", F.split(F.col("client_port"), ':')[0])\
.withColumn("hour", df.timestamp.substr(12,2).cast('integer'))\
.withColumn("minute", df.timestamp.substr(15,2).cast('integer'))\
.withColumn("timestamp", F.col("timestamp").substr(0,16).cast('timestamp'))

df_t_2 = df_t_1.groupby("timestamp").agg((F.count("timestamp")/60).alias("load"), 
                                F.avg("received_bytes").alias("received_bytes"), 
                                F.avg("sent_bytes").alias("sent_bytes"), 
                                F.countDistinct("request").alias("unique_request_count"), 
                                F.countDistinct("client_ip").alias("unique_ip_count"), 
                                F.first("hour").alias("hour"), 
                                F.first("minute").alias("minute")).sort(["hour", "minute"]).cache()

In [221]:
w = Window.partitionBy().orderBy(["hour", "minute"])
df_t_3 = df_t_2.select("hour", "minute", F.lag("sent_bytes").over(w).alias("prev_sent_bytes"), F.lag("received_bytes").over(w).alias("prev_received_bytes"), F.lag("unique_request_count").over(w).alias("prev_unique_request_count"), F.lag("unique_ip_count").over(w).alias("prev_unique_ip_count"), "load").where(F.col("prev_sent_bytes").isNotNull()).cache()

In [222]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=["hour", "minute", "prev_sent_bytes", "prev_received_bytes", 
                                       "prev_unique_request_count", "prev_unique_ip_count"], outputCol="vectorized")

df_t_4 = assembler.transform(df_t_3)

In [223]:
train, test = df_t_4.randomSplit([0.8, 0.2], seed=2019)
training = train.withColumn("label", F.col("load"))

1) Predict the expected load (requests/second) in the next minute
----------------

In [255]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PolynomialExpansion

polyExpansion = PolynomialExpansion(inputCol="vectorized", outputCol="features")
lr = LinearRegression(maxIter=100, featuresCol="features", labelCol="label")
pipeline = Pipeline(stages=[polyExpansion, lr])


paramGrid = ParamGridBuilder().addGrid(polyExpansion.degree, [1, 2, 3])\
.addGrid(lr.regParam, [0.1, 0.01, 0.001])\
.addGrid(lr.fitIntercept, [False, True])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.build()

crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(), numFolds=5)
# Run cross-validation, and choose the best set of parameters.
cvModel = crossval.fit(training)

# Make predictions on test documents. cvModel uses the best model found (lrModel).
prediction = cvModel.transform(test)


In [256]:
print("Best model degree of polynomail = {}".format(cvModel.bestModel.stages[0]._java_obj.getDegree()))
print("Best model regParam value = {}".format(cvModel.bestModel.stages[1]._java_obj.getRegParam()))
print("Best model fitIntercept value = {}".format(cvModel.bestModel.stages[1]._java_obj.getFitIntercept()))
print("Best model elasticNetParam value = {}".format(cvModel.bestModel.stages[1]._java_obj.getElasticNetParam()))

Best model degree of polynomail = 1
Best model regParam value = 0.1
Best model fitIntercept value = True
Best model elasticNetParam value = 0.0


In [257]:
prediction.select("load", "prediction").show(100, False)

+--------------------+-------------------+
|load                |prediction         |
+--------------------+-------------------+
|78.0                |79.86522967288911  |
|25.55               |192.06314501523212 |
|0.06666666666666667 |36.33495495168938  |
|224.55              |179.47996270635434 |
|390.18333333333334  |360.7445611799494  |
|207.35              |120.51825269450566 |
|410.7               |305.6405420405466  |
|348.5833333333333   |326.55421616874145 |
|192.83333333333334  |80.81338313203538  |
|405.35              |208.48229686086225 |
|381.4166666666667   |322.43855347671536 |
|0.5                 |-19.939321071743336|
|0.5166666666666667  |8.100657726289029  |
|0.85                |70.77168548518706  |
|0.03333333333333333 |68.31719307735177  |
|350.8666666666667   |315.53061575573884 |
|33.0                |330.00454435468475 |
|0.016666666666666666|70.36889737819303  |
|281.9               |336.5469843692852  |
|0.03333333333333333 |267.751740345182   |
|351.716666

In [258]:
trainingSummary = cvModel.bestModel.stages[1].summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 99.109393
r2: 0.582128


In [134]:
from pyspark.sql import Row
t = spark.createDataFrame([Row(date='2016-01-01', get_avg=5, get_first=1),
                            Row(date='2016-01-01', get_avg=5, get_first=2),
                            Row(date='2016-01-02', get_avg=10, get_first=3),
                            Row(date='2016-01-02', get_avg=20, get_first=3),
                            Row(date='2016-01-10', get_avg=30, get_first=3),
                            Row(date='2016-01-10', get_avg=10, get_first=3),
                            Row(date='2016-01-10', get_avg=20, get_first=3),
                            Row(date='2016-01-12', get_avg=30, get_first=3),
                            Row(date='2016-01-12', get_avg=8, get_first=4)])

In [135]:
t_ = t.groupby("date").agg(F.avg("get_avg").alias("get_avg")).sort("date")
t_.show(10, False)

+----------+-------+
|date      |get_avg|
+----------+-------+
|2016-01-01|5.0    |
|2016-01-02|15.0   |
|2016-01-10|20.0   |
|2016-01-12|19.0   |
+----------+-------+



In [136]:
w = Window.partitionBy().orderBy("date")

In [138]:
t_.select("*", F.lag(t_.get_avg, 2).over(w)).show()

+----------+-------+------------------------------------------------------------------------------+
|      date|get_avg|lag(get_avg, 2, NULL) OVER (ORDER BY date ASC NULLS FIRST unspecifiedframe$())|
+----------+-------+------------------------------------------------------------------------------+
|2016-01-01|    5.0|                                                                          null|
|2016-01-02|   15.0|                                                                          null|
|2016-01-10|   20.0|                                                                           5.0|
|2016-01-12|   19.0|                                                                          15.0|
+----------+-------+------------------------------------------------------------------------------+



In [54]:
df.select(df.date, F.datediff(F.col("date"), F.lit('2016-01-09').cast('date'))).show(10, False)

+----------+----------------------------------------+
|date      |datediff(date, CAST(2016-01-09 AS DATE))|
+----------+----------------------------------------+
|2016-01-01|-8                                      |
|2016-01-01|-8                                      |
|2016-01-02|-7                                      |
|2016-01-02|-7                                      |
|2016-01-10|1                                       |
|2016-01-10|1                                       |
|2016-01-10|1                                       |
|2016-01-12|3                                       |
|2016-01-12|3                                       |
+----------+----------------------------------------+

