## Additional questions for Machine Learning Engineer (MLE) candidates
1. Predict the expected load (requests/second) in the next minute <br>
2. Predict the session length for a given IP<br>
3. Predict the number of unique URL visits by a given IP<br>

## Import Libraries

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import col, countDistinct

from pyspark.sql.window import Window
from pyspark.sql.functions import lit

In [2]:
# SparkSession
spark = SparkSession.builder \
    .master("local") \
    .appName("Predictive Analytics") \
    .config("spark.executor.memory", "1gb") \
    .getOrCreate()
sc = spark.sparkContext

## Load Data

In [3]:
# read log file
rdd = sc.textFile('/Users/NK/Projects/pytm/2015_07_22_mktplace_shop_web_log_sample.log')
# split by " "
rdd = rdd.map(lambda line: line.split(" "))

In [4]:
# Keep timestamp, ip and link 
# convert the RDD to DF 
df = rdd.map(lambda line: Row(timestamp=line[0], ipaddress=line[2].split(':')[0], link=line[12])).toDF()
df.show(5)

+---------------+--------------------+--------------------+
|      ipaddress|                link|           timestamp|
+---------------+--------------------+--------------------+
|123.242.248.130|https://paytm.com...|2015-07-22T09:00:...|
|  203.91.211.44|https://paytm.com...|2015-07-22T09:00:...|
|    1.39.32.179|https://paytm.com...|2015-07-22T09:00:...|
| 180.179.213.94|https://paytm.com...|2015-07-22T09:00:...|
| 120.59.192.208|https://paytm.com...|2015-07-22T09:00:...|
+---------------+--------------------+--------------------+
only showing top 5 rows



## 1. Predict the expected load (requests/second) in the next minute 
time range = 60 seconds <br>
convert to datetime type <br>
create 60 sec time range <br>
groupby time_range <br>
hit/Ip rate calculation

In [5]:
df = df.withColumn('timestamp', df['timestamp'].cast(TimestampType()))
df_range = df.select(window("timestamp", "60 seconds").alias('time_range'),'timestamp',"ipaddress","link")
df_range.show(5)

+--------------------+--------------------+---------------+--------------------+
|          time_range|           timestamp|      ipaddress|                link|
+--------------------+--------------------+---------------+--------------------+
|[2015-07-22 05:00...|2015-07-22 05:00:...|123.242.248.130|https://paytm.com...|
|[2015-07-22 05:00...|2015-07-22 05:00:...|  203.91.211.44|https://paytm.com...|
|[2015-07-22 05:00...|2015-07-22 05:00:...|    1.39.32.179|https://paytm.com...|
|[2015-07-22 05:00...|2015-07-22 05:00:...| 180.179.213.94|https://paytm.com...|
|[2015-07-22 05:00...|2015-07-22 05:00:...| 120.59.192.208|https://paytm.com...|
+--------------------+--------------------+---------------+--------------------+
only showing top 5 rows



In [6]:
df_range.describe().show()

+-------+------------+--------------------+
|summary|   ipaddress|                link|
+-------+------------+--------------------+
|  count|     1158500|             1158500|
|   mean|        null|                null|
| stddev|        null|                null|
|    min|1.186.101.79|http://123.249.24...|
|    max|  99.8.170.3|https://www.paytm...|
+-------+------------+--------------------+



In [7]:
# Number of hits per time range regardless of ipaddress
hit_per_timerange = df_range.groupBy('time_range').count()
hit_per_timerange.show(5, False)

+------------------------------------------+-----+
|time_range                                |count|
+------------------------------------------+-----+
|[2015-07-22 12:11:00, 2015-07-22 12:12:00]|24120|
|[2015-07-22 07:57:00, 2015-07-22 07:58:00]|21   |
|[2015-07-22 11:51:00, 2015-07-22 11:52:00]|9    |
|[2015-07-22 06:34:00, 2015-07-22 06:35:00]|20915|
|[2015-07-22 14:04:00, 2015-07-22 14:05:00]|16340|
+------------------------------------------+-----+
only showing top 5 rows



In [8]:
# rename count column 
hit_per_timerange = hit_per_timerange.withColumnRenamed('count', 'hit_min')

In [9]:
# Sort bt time_range
sorted_hit_per_timerange = hit_per_timerange.orderBy('time_range', ascending=True)
sorted_hit_per_timerange.show(5, False)

+------------------------------------------+-------+
|time_range                                |hit_min|
+------------------------------------------+-------+
|[2015-07-21 22:40:00, 2015-07-21 22:41:00]|4681   |
|[2015-07-21 22:41:00, 2015-07-21 22:42:00]|6787   |
|[2015-07-21 22:42:00, 2015-07-21 22:43:00]|5586   |
|[2015-07-21 22:43:00, 2015-07-21 22:44:00]|4734   |
|[2015-07-21 22:44:00, 2015-07-21 22:45:00]|4680   |
+------------------------------------------+-------+
only showing top 5 rows



In [13]:
# get id for each window
Features = sorted_hit_per_timerange.withColumn("hit_sec", col("hit_min")/60.0)

In [69]:
#from pyspark.sql.functions import monotonically_increasing_id 
#Features = Features.select("*").withColumn("Id", monotonically_increasing_id())
Features.show(10)

+--------------------+-------+--------------------+-----------+
|          time_range|hit_min|             hit_sec|         Id|
+--------------------+-------+--------------------+-----------+
|[2015-07-21 22:40...|   4681|   78.01666666666667|          0|
|[2015-07-21 22:41...|   6787|  113.11666666666666| 8589934592|
|[2015-07-21 22:42...|   5586|                93.1|17179869184|
|[2015-07-21 22:43...|   4734|                78.9|25769803776|
|[2015-07-21 22:44...|   4680|                78.0|34359738368|
|[2015-07-21 22:45...|    323|   5.383333333333334|42949672960|
|[2015-07-22 01:09...|      1|0.016666666666666666|51539607552|
|[2015-07-22 01:10...|  10099|  168.31666666666666|60129542144|
|[2015-07-22 01:11...|  11670|               194.5|68719476736|
|[2015-07-22 01:12...|  12255|              204.25|77309411328|
+--------------------+-------+--------------------+-----------+
only showing top 10 rows



## Feature Engineering
### - Date Time Features
Date Time Features: these are components of the time step itself for each observation. <br>
### - Lag Features
Lag Features: these are values at prior time steps. <br>
### - Rolling Window Statistics
Window Features: these are a summary of values over a fixed window of prior time steps.<br>

### Features and Labels

In [None]:
Window1= Window.orderBy("time_range").rowsBetween(1,1)
Window0= Window.orderBy("time_range").rowsBetween(0,1)
avg_hit_sec = avg(Features['hit_sec']).over(Window1)
min_hit_sec = min(Features['hit_sec']).over(Window0)
max_hit_sec = max(Features['hit_sec']).over(Window0)
mean_hit_sec = mean(Features['hit_sec']).over(Window0)

In [77]:
Features_target = Features.select(Features['time_range'], Features['hit_sec'],
                                  min_hit_sec.alias("F1"), max_hit_sec.alias("F2"),mean_hit_sec.alias("F3"),
                                  avg_hit_sec.alias("hit_in_next_min"))
Features_target.show(10)

+--------------------+--------------------+--------------------+------------------+------------------+--------------------+
|          time_range|             hit_sec|                  F1|                F2|                F3|     hit_in_next_min|
+--------------------+--------------------+--------------------+------------------+------------------+--------------------+
|[2015-07-21 22:40...|   78.01666666666667|   78.01666666666667|113.11666666666666| 95.56666666666666|  113.11666666666666|
|[2015-07-21 22:41...|  113.11666666666666|                93.1|113.11666666666666|103.10833333333332|                93.1|
|[2015-07-21 22:42...|                93.1|                78.9|              93.1|              86.0|                78.9|
|[2015-07-21 22:43...|                78.9|                78.0|              78.9|             78.45|                78.0|
|[2015-07-21 22:44...|                78.0|   5.383333333333334|              78.0| 41.69166666666667|   5.383333333333334|
|[2015-0

In [78]:

Features_target.printSchema()

root
 |-- time_range: struct (nullable = false)
 |    |-- start: timestamp (nullable = true)
 |    |-- end: timestamp (nullable = true)
 |-- hit_sec: double (nullable = true)
 |-- F1: double (nullable = true)
 |-- F2: double (nullable = true)
 |-- F3: double (nullable = true)
 |-- hit_in_next_min: double (nullable = true)



In [79]:
Features_target.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
hit_sec,110,175.53030303030303,153.99226406331138,0.016666666666666666,440.95
F1,110,137.06893939393942,143.92251310400198,0.016666666666666666,401.5833333333333
F2,110,213.65590909090906,154.3774496161323,0.03333333333333333,440.95
F3,110,175.36242424242423,142.21541791932123,0.025,421.26666666666665
hit_in_next_min,109,176.4249235474006,154.41610897182733,0.016666666666666666,440.95


In [96]:
# REMOVE NULL VALUES
Features_target = Features_target.na.drop()

In [97]:
# "time_range" is not required for analysis
Features_target_s = Features_target.drop ("time_range")
Features_target_s.describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
hit_sec,109,176.7637614678899,154.15668055485474,0.016666666666666666,440.95
F1,109,137.9495412844037,144.289267848117,0.016666666666666666,401.5833333333333
F2,109,215.23914373088684,154.1907413864437,0.03333333333333333,440.95
F3,109,176.59434250764525,142.28145164642547,0.025,421.26666666666665
hit_in_next_min,109,176.4249235474006,154.41610897182733,0.016666666666666666,440.95


## Correlation Analysis

In [82]:

import six
for i in Features_target_s.columns:
    if not( isinstance(Features_target_s.select(i).take(1)[0][0], six.string_types)):
        print( "Correlation to hit_in_next_min for ", i, Features_target_s.stat.corr('hit_in_next_min',i))

('Correlation to hit_in_next_min for ', 'hit_sec', 0.7033871078152598)
('Correlation to hit_in_next_min for ', 'F1', 0.888602586524289)
('Correlation to hit_in_next_min for ', 'F2', 0.8720443352929288)
('Correlation to hit_in_next_min for ', 'F3', 0.9229445782978808)
('Correlation to hit_in_next_min for ', 'hit_in_next_min', 1.0)


## Prepare data for Machine Learning

In [98]:
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['hit_sec', 'F1', 'F2', 'F3'], outputCol = 'features')
vfeatures_df = vectorAssembler.transform(Features_target_s)
vfeatures_df = vfeatures_df.select(['features', 'hit_in_next_min'])
vfeatures_df.show(3)

+--------------------+------------------+
|            features|   hit_in_next_min|
+--------------------+------------------+
|[78.0166666666666...|113.11666666666666|
|[113.116666666666...|              93.1|
|[93.1,78.9,93.1,8...|              78.9|
+--------------------+------------------+
only showing top 3 rows



In [99]:
splits = vfeatures_df.randomSplit([0.7, 0.3])
train_df = splits[0]
test_df = splits[1]

## Linear Regression

In [100]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'features', labelCol='hit_in_next_min', maxIter=10, regParam=0.3)
lr_model = lr.fit(train_df)
# Print the coefficients and intercept for linear regression model
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))

Coefficients: [-0.981349941554299,0.6404834977003858,0.6372013072833304,0.7031158193211009]
Intercept: 0.201502195949


In [101]:
# Summarize the model over the training set and print out some metrics
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)

RMSE: 1.140578
r2: 0.999949


In [106]:
print("Coefficient Standard Errors: " + str(trainingSummary.coefficientStandardErrors))
print("T Values: " + str(trainingSummary.tValues))
print("P Values: " + str(trainingSummary.pValues))



Coefficient Standard Errors: [0.002346198936135336, 0.00943586490329917, 0.00943386066606993, 0.01855189515439349, 0.2545809332533291]
T Values: [-418.27226431650365, 67.87756122668164, 67.54406598086669, 37.899945718191915, 0.791505449266184]
P Values: [0.0, 0.0, 0.0, 0.0, 0.43157283292268445]


In [107]:
train_df.describe().show()

+-------+--------------------+
|summary|     hit_in_next_min|
+-------+--------------------+
|  count|                  69|
|   mean|  180.95217391304345|
| stddev|  160.39693058916941|
|    min|0.016666666666666666|
|    max|              440.95|
+-------+--------------------+



In [108]:
# Predictions
lr_predictions = lr_model.transform(test_df)
lr_predictions.select("prediction","hit_in_next_min","features").show(5)

from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
                 labelCol="hit_in_next_min",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))

+-------------------+--------------------+--------------------+
|         prediction|     hit_in_next_min|            features|
+-------------------+--------------------+--------------------+
| 164.68177612566063|              166.35|[0.01666666666666...|
|  166.6263359189836|  168.31666666666666|[0.01666666666666...|
| 189.28540130728084|  191.23333333333332|[0.01666666666666...|
|0.21828319525161566|0.016666666666666666|[0.03333333333333...|
|  160.4962069650283|  162.11666666666667|[0.03333333333333...|
+-------------------+--------------------+--------------------+
only showing top 5 rows

R Squared (R2) on test data = 0.999947


In [109]:
test_result = lr_model.evaluate(test_df)
print("Root Mean Squared Error (RMSE) on test data = %g" % test_result.rootMeanSquaredError)

Root Mean Squared Error (RMSE) on test data = 1.04584


## Decision tree regression

In [114]:
from pyspark.ml.regression import DecisionTreeRegressor
dt = DecisionTreeRegressor(featuresCol ='features', labelCol = 'hit_in_next_min')
dt_model = dt.fit(train_df)
dt_predictions = dt_model.transform(test_df)
dt_predictions.select('prediction', 'hit_in_next_min', 'features').show(5)

+------------------+--------------------+--------------------+
|        prediction|     hit_in_next_min|            features|
+------------------+--------------------+--------------------+
|205.68333333333337|              166.35|[0.01666666666666...|
|205.68333333333337|  168.31666666666666|[0.01666666666666...|
|205.68333333333337|  191.23333333333332|[0.01666666666666...|
|             67.25|0.016666666666666666|[0.03333333333333...|
|205.68333333333337|  162.11666666666667|[0.03333333333333...|
+------------------+--------------------+--------------------+
only showing top 5 rows



In [115]:
dt_evaluator = RegressionEvaluator(
    labelCol="hit_in_next_min", predictionCol="prediction", metricName="rmse")
rmse = dt_evaluator.evaluate(dt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 40.9492


In [116]:
# The most important feature: 
dt_model.featureImportances

SparseVector(4, {0: 0.0442, 1: 0.0297, 2: 0.0501, 3: 0.876})

## Gradient-boosted tree regression

In [112]:
from pyspark.ml.regression import GBTRegressor
gbt = GBTRegressor(featuresCol = 'features', labelCol = 'hit_in_next_min', maxIter=10)
gbt_model = gbt.fit(train_df)
gbt_predictions = gbt_model.transform(test_df)
gbt_predictions.select('prediction', 'hit_in_next_min', 'features').show(5)

+------------------+--------------------+--------------------+
|        prediction|     hit_in_next_min|            features|
+------------------+--------------------+--------------------+
|208.68233932001274|              166.35|[0.01666666666666...|
|208.68233932001274|  168.31666666666666|[0.01666666666666...|
|208.68233932001274|  191.23333333333332|[0.01666666666666...|
| 67.10159959983535|0.016666666666666666|[0.03333333333333...|
| 206.1692497878579|  162.11666666666667|[0.03333333333333...|
+------------------+--------------------+--------------------+
only showing top 5 rows



In [113]:
gbt_evaluator = RegressionEvaluator(
    labelCol="hit_in_next_min", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

Root Mean Squared Error (RMSE) on test data = 39.83
