# Paytm Interview WebLog Challenge [PySpark] | Submitted by Saurav Kaushik

## Importing required libraries

In [1]:
import os
from datetime import datetime
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from pyspark.mllib.tree import DecisionTree
from pyspark import SparkContext


## Initialization

In [2]:
os.environ["SPARK_HOME"] = "C:\opt\spark\spark-2.1.1-bin-hadoop2.7"


In [3]:
sc

<pyspark.context.SparkContext at 0x1fd717bdf98>

## Reading the challange data

In [4]:
data = sc.textFile("C:/Users/dell/Desktop/2015_07_22_mktplace_shop_web_log_sample.log", 1)


In [5]:
data.take(5)

['2015-07-22T09:00:28.019143Z marketpalce-shop 123.242.248.130:54635 10.0.6.158:80 0.000022 0.026109 0.00002 200 200 0 699 "GET https://paytm.com:443/shop/authresponse?code=f2405b05-e2ee-4b0d-8f6a-9fed0fcfe2e0&state=null HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.130 Safari/537.36" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2',
 '2015-07-22T09:00:27.894580Z marketpalce-shop 203.91.211.44:51402 10.0.4.150:80 0.000024 0.15334 0.000026 200 200 0 1497 "GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1; rv:39.0) Gecko/20100101 Firefox/39.0" ECDHE-RSA-AES128-GCM-SHA256 TLSv1.2',
 '2015-07-22T09:00:27.885745Z marketpalce-shop 1.39.32.179:56419 10.0.4.244:80 0.000024 0.164958 0.000017 200 200 0 157 "GET https://paytm.com:443/shop/wallet/txnhistory?page_size=10&page_number=0&channel=web&version=2 HTTP/1.1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHT

# 1. Problem Statements | Processing & Analytical goals:



## 1.A Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a fixed time window. https://en.wikipedia.org/wiki/Session (web_analytics)  


### Modifying the data and taking the difference of time stamp from 2010-1-1


In [6]:
def ts(tp):
    return (datetime.strptime(tp, '%Y-%m-%dT%H:%M:%S.%fZ') - datetime(2010, 1, 1)).total_seconds()


def splitting(x):
    fields = x.split()
    return ((fields[2], ts(fields[0])), (fields[1], fields[3], fields[12]))


In [7]:
rdd_sess1 = data.map(splitting) \
    .sortByKey().map(lambda x: (x[0][0], (x[0][1], x[1]))) \
    .groupByKey() 


In [8]:
rdd_sess1.take(5)

[('1.186.112.52:51563',
  <pyspark.resultiterable.ResultIterable at 0x1fd7335b7b8>),
 ('1.186.146.103:50987',
  <pyspark.resultiterable.ResultIterable at 0x1fd7335b828>),
 ('1.186.180.154:50330',
  <pyspark.resultiterable.ResultIterable at 0x1fd7335b8d0>),
 ('1.186.247.60:51646',
  <pyspark.resultiterable.ResultIterable at 0x1fd7335b7f0>),
 ('1.186.28.220:38900',
  <pyspark.resultiterable.ResultIterable at 0x1fd7335b940>)]

### Let's use the fixed time window of 30 minutes which is also the mot popular standard for defing a standard. Defining the logic for the same.


In [9]:


def sessionize(data):
    prev = None
    start = 0
    output = []
    session_out = set()
    user_ip, l = data
    size = len(l) - 1
    for c, (timestamp, metadata) in enumerate(l):
        if not prev:
            prev = timestamp
            start = timestamp
        if timestamp - prev <= 900:
            session_out.add(metadata)
        else:
            delta = prev - start
            start = timestamp
            output.append((delta, len(session_out)))
            session_out = set([metadata])
        if c == size:
            delta = timestamp - start
            session_out.add(metadata)
            output.append((delta, len(session_out)))
        prev = timestamp

    return (user_ip, output)



In [10]:
rdd_sess2 = rdd_sess1.map(sessionize)

In [11]:
rdd_sess2.take(10)

[('1.186.112.52:51563', [(0.0, 1)]),
 ('1.186.146.103:50987', [(11.268976002931595, 2)]),
 ('1.186.180.154:50330', [(7.198314994573593, 3)]),
 ('1.186.247.60:51646', [(9.972968995571136, 3)]),
 ('1.186.28.220:38900', [(3.1270779967308044, 3)]),
 ('1.186.28.220:45747', [(0.0, 1)]),
 ('1.186.28.220:51962', [(0.0, 1)]),
 ('1.186.32.38:58337', [(0.0, 1)]),
 ('1.186.32.78:65243', [(0.0, 1)]),
 ('1.186.33.25:52486', [(0.0, 1)])]

## 1.B Determine the average session time

In [12]:

def avg_sess(kv):
    key, val = kv
    return (key, sum([v[0] for v in val]) / len(val))

In [13]:
avg_sess_time = rdd_sess2.map(avg_sess)

In [14]:
avg_sess_time.take(10)

[('1.186.112.52:51563', 0.0),
 ('1.186.146.103:50987', 11.268976002931595),
 ('1.186.180.154:50330', 7.198314994573593),
 ('1.186.247.60:51646', 9.972968995571136),
 ('1.186.28.220:38900', 3.1270779967308044),
 ('1.186.28.220:45747', 0.0),
 ('1.186.28.220:51962', 0.0),
 ('1.186.32.38:58337', 0.0),
 ('1.186.32.78:65243', 0.0),
 ('1.186.33.25:52486', 0.0)]

# 2. Problem Statements | Additional questions for Machine Learning Engineer (MLE) candidates:



## 2.2 Predict the session length for a given IP



## Using 2 lag regression for training machine learning model.

In [15]:


def avg_time_and_num_sess(data):
    user_ip, l = data
    return l

def ips(data):
    user_ip, l = data
    return user_ip


In [16]:
avg_time_and_num_sess_rdd = rdd_sess2.flatMap(avg_time_and_num_sess)

ips_rdd = rdd_sess2.map(ips)

In [17]:
ips_rdd.take(5)

['1.186.112.52:51563',
 '1.186.146.103:50987',
 '1.186.180.154:50330',
 '1.186.247.60:51646',
 '1.186.28.220:38900']

In [18]:
avg_time_and_num_sess_rdd.take(5)

[(0.0, 1),
 (11.268976002931595, 2),
 (7.198314994573593, 3),
 (9.972968995571136, 3),
 (3.1270779967308044, 3)]

## Creating a sparse matrix for data:

In [19]:


def data_prep(data):
    user_ip, l = data
    if len(l) < 3: return
    for i in range(0, len(l) - 2):
        yield LabeledPoint(l[i + 2][1], [l[i][1], l[i + 1][1]])




In [20]:
data = rdd_sess2.flatMap(data_prep)


## Creating training and validation sets: 75% and 25%

In [21]:
(train, val) = data.randomSplit([0.75, 0.25],seed=0)


## Not able to find a fesible way to plot in PySpark without using pandas. So, I'll be sticking to tree based models based on my understanding from the python version of the solution.

## Training the decesion tree model.

In [22]:

model = DecisionTree.trainRegressor(train, maxDepth=4,
                                 categoricalFeaturesInfo={},
                                 impurity='variance', maxBins=32)



## Visualizing the model: Let's see what we have got

In [23]:

print(model.toDebugString())


DecisionTreeModel regressor of depth 4 with 11 nodes
  If (feature 0 <= 21.0)
   If (feature 1 <= 6.0)
    If (feature 0 <= 3.0)
     If (feature 1 <= 1.0)
      Predict: 1.0324590163934426
     Else (feature 1 > 1.0)
      Predict: 1.0138888888888888
    Else (feature 0 > 3.0)
     Predict: 4.0
   Else (feature 1 > 6.0)
    If (feature 0 <= 12.0)
     Predict: 12.0
    Else (feature 0 > 12.0)
     Predict: 10.0
  Else (feature 0 > 21.0)
   Predict: 83.0



## Predicting for the validation set

In [24]:

predictions = model.predict(val.map(lambda x: x.features))
labelsAndPredictions = val.map(lambda lp: lp.label).zip(predictions)


## Calculating the Mean Square Error on the validation set 

In [25]:
MSE = labelsAndPredictions.map(lambda kv: (kv[0] - kv[1])**2).reduce(lambda x, y: x + y) / labelsAndPredictions.count()
print("Mean Squared Error = " + str(MSE))


Mean Squared Error = 0.19777190149926108


# Thank You!