# Web log challenge
Notes:
1. This is executed on a jupyter notebook running in spark shell (Windows). Hence, sc(SQLContext) is already initialized in the enviornment (with all the information about the spark cluster)
2. The shell command to execute the jupyter notebook with this configuration on a Windows machine where pyspark is installed - `pyspark.cmd local[n] --conf spark.network.timeout=10000000` where 'n' is the number of local clusters. The timeout has been set to prevent crash issues that I was facing while training the models
3. The pyspark installation instructions for Windows can be found here -  https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c
4. The code needs the extracted log file in the '../data/'directory. Alternatively, the path can be changed in chunk 4


In [1]:
#importing functions and libraries

import shlex #required for easy splits of data using delimiters and respecting quotes
from pyspark.sql import functions as ps_fun  
from pyspark.sql.types import TimestampType,IntegerType, StringType
from pyspark.sql import Window
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import FeatureHasher, Bucketizer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [2]:
#define the coloumn names for the dataset. Taken from amazon documentation and some changes made for easy readibility/parsing
col_names = ['timestamp','elb'
,'client_ip_port'
,'backend_ip_port'
,'request_processing_time'
,'backend_processing_time'
,'response_processing_time'
,'elb_status_code'
,'backend_status_code'
,'received_bytes'
,'sent_bytes'
,'request'
,'user_agent'
,'ssl_cipher'
,'ssl_protocol']

In [3]:
#the data had a few rows where the quotation was present inside the data field. This was leading to more number of columns 
#being parsed and hence leading to errors. The below function has a tolerance for such cases. For such problematic rows, 
# we return a list of Nones which are filtered later. Since the row count for such rows is low, having such an approach will not
#cause any siginificant changes in the outcomes. However, if the row count for such rows is high, then we might need a different
#approach for inclusion of such rows
def split_try(line,cols_expected):
        try:
            ret=shlex.split(line)
        except:
            ret=[None for i in range(0,cols_expected)]
        return(ret)


In [4]:
#Read the text files as an RDD
#Apply the splitting function written above to give the split columns. 
data1 = sc.textFile('../data/2015_07_22_mktplace_shop_web_log_sample.log').map(lambda line:split_try(line,cols_expected=15))

In [5]:
#converting the RDD to a Spark DataFrame for easy manipulation, transformation and analysis
#.cache() stores the DataFrame in each of the clusters in memory. This will lead to faster results. However, this may not be
#feasible for big datasets due to memory constraints.
data = data1.toDF(col_names).cache()

In [6]:
#filtering the data to remove NULL client_ip_port (NULL client_ip_port will be a result of the splitting function 
#used for problematic rows)
data = data.dropna(subset='client_ip_port')

In [7]:
#various string split operations:
#1) to get IP from IP + port. The same IP might be using different ports for connection, but should be recorded as the same user
#2) To get the url from url+agrs - The same url might be called with different arguments but as per the details given, they 
#should be counted as a single hit

#get IP from IP+port
split_col_ip = ps_fun.split(data['client_ip_port'], ':')
data = data.withColumn('client_ip',split_col_ip.getItem(0))

#get URL from URL + args
split_col_url = ps_fun.split(data['request'],' ')
data = data.withColumn('full_url',split_col_url.getItem(1))
split_col_url_2 = ps_fun.split(data['full_url'],'\\?')
data = data.withColumn('full_url_without_args',split_col_url_2.getItem(0))

In [8]:
#specify the time out for sessions in seconds
time_out=15*60 

#### Processing & Analytical goals:
1) Sessionize the web log by IP. Sessionize = aggregrate all page hits by visitor/IP during a session

Approach:
1. I have used spark functionalities as much as possible
2. Played around with different window approaches including last value etc. However, spark will not update the same variable and use in the next window so that did not work. Used cumsum approach instead

In [9]:
#creating a lagged variable for timestamp. last_session_ts gives the timestamp of the previous call for the same user
window_1 = Window.partitionBy("client_ip").orderBy("timestamp")
data = data.withColumn('last_session_ts',ps_fun.lag('timestamp').over(window_1))

In [10]:
#calculating the difference between the last hit and current hit in seconds
#Some issues I faced:
#1) IF you notice, the below time stamp ignores the milliseconds and the TZ. unix_timestamp was not reading the timestamp 
#properly if I gave the millisecond strings and TZ strings. 
#However, for this problem this may not be an issue as we are interested in time difference in seconds and all the 
#ts records in a static time zone (Z)
#2) I was able to read the time zones properly using pd.to_datetime() and creating it into a udf but I wanted to stick purely to
# spark functions.
# Tried with the to_timestamp() function too. But did not spend a lot of time on it as milliseconds and TZ are not very important 
#for this problem
timeFmt = "yyyy-MM-dd'T'HH:mm:ss" 
time = ps_fun.unix_timestamp('timestamp',format=timeFmt)
timeDiff = (ps_fun.unix_timestamp('timestamp', format=timeFmt)
            - ps_fun.unix_timestamp('last_session_ts', format=timeFmt))
data = data.withColumn("diff_time", ps_fun.coalesce(timeDiff,ps_fun.lit(-1)))

In [11]:
#Assigning session ID to IPs 
#1) Create a flag if diff_time greater than time_out
#2) For each client_ip and each row, take the sum of all flags till the row (row included). Window.unboundedPreceding is the 
#dyanmic index which helps us take all the preceeding rows. This is done on data ordered in ascending by timestamp
#3) The sum(rather cumsum) is the index of the session_counter for that IP. Combination of IP + session_counter is the unique
#session ID
window_3 = Window.partitionBy("client_ip").orderBy("timestamp").rowsBetween(Window.unboundedPreceding,0) 
data = data.withColumn('is_new',ps_fun.when(ps_fun.col('diff_time')>time_out,1).otherwise(0))
data = data.withColumn('session_counter',ps_fun.sum('is_new').over(window_3))

In [12]:
#creating the client_session as the unique identifier. Defined as concatentation of client_ip and session_counter
data=data.withColumn('client_session',ps_fun.concat(ps_fun.col('client_ip'),ps_fun.lit('____'),ps_fun.col('session_counter')))

#### Processing & Analytical goals:
2. Determine the average session time
3. Determine unique URL visits per session. To clarify, count a hit to a unique URL only once per session.
4. Find the most engaged users, ie the IPs with the longest session times

In [13]:
#Aggregating the data by client_sesson to compute session level metrics
data_session = data.groupBy(["client_session",'client_ip','session_counter']).agg(
    ps_fun.min("timestamp").alias("ts_first"), 
    ps_fun.max("timestamp").alias("ts_last"),
    ps_fun.count("*").alias("calls"),
    ps_fun.countDistinct("full_url_without_args").alias('n_pages')
 ).withColumn('time_spent',(ps_fun.unix_timestamp('ts_last',timeFmt) - ps_fun.unix_timestamp('ts_first',timeFmt)))

In [14]:
#average time spent per session and average URL per visit
data_session.agg({'time_spent':'mean',
                'n_pages':'mean'}).show()

+-----------------+------------------+
|     avg(n_pages)|   avg(time_spent)|
+-----------------+------------------+
|8.044227906347272|100.75318265890739|
+-----------------+------------------+



In [15]:
#url visits for each session
data_session.select(['client_session','n_pages']).show()

+-----------------+-------+
|   client_session|n_pages|
+-----------------+-------+
|1.186.143.37____0|      2|
|1.187.164.29____0|      8|
|  1.22.41.76____0|      5|
| 1.23.208.26____0|      4|
| 1.23.208.26____1|      1|
| 1.23.36.184____0|      4|
|   1.38.19.8____0|      1|
|  1.38.20.34____0|     14|
|  1.39.13.13____0|      2|
| 1.39.32.249____0|      4|
| 1.39.32.249____1|      2|
|  1.39.32.59____0|      1|
| 1.39.33.153____0|      6|
|  1.39.33.33____0|      2|
|  1.39.33.77____0|      2|
|  1.39.33.77____1|      4|
|   1.39.34.4____0|      1|
|  1.39.40.43____0|      2|
|  1.39.60.37____0|     31|
|  1.39.61.53____0|     19|
+-----------------+-------+
only showing top 20 rows



In [16]:
data_by_ip=data_session.groupBy('client_ip').agg(\
                                                 ps_fun.mean('time_spent').alias('average_time_spent'),\
                                                 ps_fun.sum('time_spent').alias('total_time_spent'),\
                                                 ps_fun.count('*').alias('sessions')\
                                                )


In [17]:
#top 10 users by average time spent 
data_by_ip.sort(ps_fun.desc('average_time_spent')).show(10)

+--------------+------------------+----------------+--------+
|     client_ip|average_time_spent|total_time_spent|sessions|
+--------------+------------------+----------------+--------+
|103.29.159.138|            2065.0|            2065|       1|
|125.16.218.194|            2065.0|            2065|       1|
|  14.99.226.79|            2063.0|            2063|       1|
| 122.169.141.4|            2060.0|            2060|       1|
| 14.139.220.98|            2058.0|            2058|       1|
|117.205.158.11|            2057.0|            2057|       1|
|  111.93.89.14|            2055.0|            2055|       1|
|  182.71.63.42|            2051.0|            2051|       1|
| 223.176.3.130|            2048.0|            2048|       1|
|183.82.103.131|            2042.0|            2042|       1|
+--------------+------------------+----------------+--------+
only showing top 10 rows



In [18]:
#top 10 users by total time spent 
data_by_ip.sort(ps_fun.desc('total_time_spent')).show(10)

+--------------+------------------+----------------+--------+
|     client_ip|average_time_spent|total_time_spent|sessions|
+--------------+------------------+----------------+--------+
| 220.226.206.7| 522.6923076923077|            6795|      13|
| 119.81.61.166| 683.6666666666666|            6153|       9|
|  52.74.219.71|             525.8|            5258|      10|
| 54.251.151.39|             523.6|            5236|      10|
|121.58.175.128|             498.8|            4988|      10|
| 106.186.23.95|             493.1|            4931|      10|
|  125.19.44.66|             465.3|            4653|      10|
| 54.169.191.85|           577.625|            4621|       8|
|  207.46.13.22|             453.5|            4535|      10|
|180.179.213.94| 501.3333333333333|            4512|       9|
+--------------+------------------+----------------+--------+
only showing top 10 rows



In [19]:
#sessions with maximum time spent
data_session.withColumn('max_time_spent',ps_fun.max('time_spent').over(Window.partitionBy())).filter('time_spent=max_time_spent').show()

+------------------+-------------+---------------+--------------------+--------------------+-----+-------+----------+--------------+
|    client_session|    client_ip|session_counter|            ts_first|             ts_last|calls|n_pages|time_spent|max_time_spent|
+------------------+-------------+---------------+--------------------+--------------------+-----+-------+----------+--------------+
| 52.74.219.71____4| 52.74.219.71|              4|2015-07-22T10:30:...|2015-07-22T11:04:...|11609|   9530|      2069|          2069|
|119.81.61.166____4|119.81.61.166|              4|2015-07-22T10:30:...|2015-07-22T11:04:...| 1818|   1739|      2069|          2069|
|106.186.23.95____4|106.186.23.95|              4|2015-07-22T10:30:...|2015-07-22T11:04:...| 2848|   2731|      2069|          2069|
+------------------+-------------+---------------+--------------------+--------------------+-----+-------+----------+--------------+



In [20]:
ts_format_minute = "yyyy-MM-dd'T'HH:mm"
ts_format_second = "yyyy-MM-dd'T'HH:mm:ss"

### Predict the session length/ unique URLs for a given IP
This problem can potentially be solved by using two seperate models - one for repeat user and one for first time users. The repeat users model can use features meant for new users also as they may add to the predictive power of the model

I have given below the features that can be used for determining session length. The same method/approach may also be extended to determine unique URLs (features will have URL counts etc. instead of session time, the target will also change). Anyhow, both will be fairly correlated.

#### Returning users:
There is lot of scope in using information from the past behavior of the customer. Namely we can derive the following features leveraging the past history (limited set, can be expanded further):
1. time_spent_last_session - 
2. time_spent_in_the_day_till_now - To capture that users spend a fixed amount of time online everyday. Can also interact with day of the week which will capture users spending variable amounts of time on weekdays versus weekends
3. current_and_last_session_time_difference - Sessions happening very close to each other may lead to shorter sessions
4. average_time_spent_all_past_sessions

#### New users:
For new users we have to rely on the overall features for example(limited set, can be expanded further):
1. avg_session_duration_new_users
2. avg_session_duration_new_users_same_hour
3. avg_session_duration_new_users_same_day
4. avg_session_duration_new_users_same_day_hour
5. source_of_the_user
6. country_of_the_IP
7. hour_of_the_day
8. day_of_the_week

I have illustrated the training of returning users model (due to time constraints); however, the feature engineering has been done for both.

**Note:** I am limiting myself to the use of web log only. If we have more data there are several features that can be used (sample list):
1. source - organic versus marketing versus search. An organic user may spend more time since he specifically browsed on the site for some purpose. 
2. lat/long of user - Can use IP to determine that. Can leverage that information to get location specific trends

In [21]:
#derive returning users features:
data_session_2 = data_session.withColumn('date_last',ps_fun.to_date('ts_last', 'yyyy-MM-dd'))\
                             .withColumn('date_first',ps_fun.to_date('ts_first', 'yyyy-MM-dd'))\
                             .withColumn('date',ps_fun.greatest('date_last','date_first'))\
                             .withColumn('datetime',ps_fun.to_timestamp('ts_first',ts_format_second))\
                            .withColumn('hour_of_the_day',ps_fun.hour('datetime'))\
                            .withColumn('day_of_the_week',ps_fun.dayofweek('datetime'))\
                             .withColumn('ts_first',ps_fun.unix_timestamp('ts_first',ts_format_minute))\
                             .withColumn('ts_last',ps_fun.unix_timestamp('ts_last',ts_format_minute))

In [22]:
#derive existing user features
#To compute the IP level features without restricting to a date. The lower bound of rows is set to last 500 to ignore
#data from very old sessions. 500 is an ADHOC choice and can be improved
window_3 = Window.partitionBy('client_ip').orderBy('session_counter').rowsBetween(-500,-1)
#Set a partition by IP and date to compute the past activity in the same day. Here we do not put a lower bound as all 
#activity in a day must be included
window_4 = Window.partitionBy(['client_ip','date']).orderBy('session_counter').rowsBetween(Window.unboundedPreceding,-1)

#actual application of aggregations over different windows
data_returning = data_session_2.withColumn('time_spent_last_session',ps_fun.last('time_spent').over(window_3))\
                             .withColumn('current_and_last_session_time_difference',ps_fun.col('ts_first')-ps_fun.last('ts_last').over(window_3))\
                             .withColumn('average_time_spent_all_past_sessions',ps_fun.mean('time_spent').over(window_3))\
                             .withColumn('time_spent_in_the_day_till_now',ps_fun.coalesce(ps_fun.sum('time_spent').over(window_3),ps_fun.lit(0)))


In [23]:
#derive new user features
#Setting different partitions to compute features at different levels - overall, by hour, by day of the week and by day of the
# week and hour. Restricted to last 5000 sessions as we want to limit influence from very old sessions. 5000 is ADHOC and can be
#improved
window_5 = Window.partitionBy().orderBy('ts_last').rowsBetween(-5000,-1)
window_6 = Window.partitionBy('hour_of_the_day').orderBy('ts_last').rowsBetween(-5000,-1)
window_7 = Window.partitionBy('day_of_the_week').orderBy('ts_last').rowsBetween(-5000,-1)
window_8 = Window.partitionBy(['day_of_the_week','hour_of_the_day']).orderBy('ts_last').rowsBetween(-5000,-1)

#computing features
data_time_trends = data_session_2\
        .withColumn('avg_session_duration',ps_fun.mean('time_spent').over(window_5))\
        .withColumn('avg_session_duration_same_hour',ps_fun.mean('time_spent').over(window_6))\
        .withColumn('avg_session_duration_same_day',ps_fun.mean('time_spent').over(window_7))\
        .withColumn('avg_session_duration_same_day_hour',ps_fun.mean('time_spent').over(window_8))\
        .select('client_session',
                'avg_session_duration',
                'avg_session_duration_same_hour',
                'avg_session_duration_same_day',
                'avg_session_duration_same_day_hour'
                #'client_ip',
                #'ts_first',
                #'ts_last'
               )

In [24]:
#join new user and returning user features
data_all_features = data_returning.join(data_time_trends,on='client_session',how='outer')

In [25]:
#defining features for the models along with the target variable
feature_list_returning=[
'time_spent_last_session',
'current_and_last_session_time_difference',
'average_time_spent_all_past_sessions',
'time_spent_in_the_day_till_now',
'avg_session_duration',
'avg_session_duration_same_hour',
'avg_session_duration_same_day',
'avg_session_duration_same_day_hour',
'hour_of_the_day',
'day_of_the_week'
]
feature_list_new = [
'avg_session_duration',
'avg_session_duration_same_hour',
'avg_session_duration_same_day',
'avg_session_duration_same_day_hour',
'hour_of_the_day',
'day_of_the_week'   
]
target = 'time_spent'

In [26]:
#seperating data into returning and new users
# dropna can be avoided by better imputation of NAs (some may be known missings)
data_returning = data_all_features.filter('session_counter>0').select(feature_list_returning+[target]).withColumnRenamed(target,'label').dropna()
data_new = data_all_features.filter('session_counter==0').select(feature_list_new+[target]).withColumnRenamed(target,'label').dropna()

In [27]:
#show sample rows
data_returning.show()

+-----------------------+----------------------------------------+------------------------------------+------------------------------+--------------------+------------------------------+-----------------------------+----------------------------------+---------------+---------------+-----+
|time_spent_last_session|current_and_last_session_time_difference|average_time_spent_all_past_sessions|time_spent_in_the_day_till_now|avg_session_duration|avg_session_duration_same_hour|avg_session_duration_same_day|avg_session_duration_same_day_hour|hour_of_the_day|day_of_the_week|label|
+-----------------------+----------------------------------------+------------------------------------+------------------------------+--------------------+------------------------------+-----------------------------+----------------------------------+---------------+---------------+-----+
|                     52|                                    1080|                                52.0|                           

In [28]:
#defining the modelling pipeline
#faced major issues here in terms of the runtime on a small dataset. I am sure I am missing something in terms of distributing 
#the job. Will need to dig in and figure out what engineering can be done to reduce the runtime and make the process efficient
rf = RandomForestRegressor(featuresCol='features')
hasher = FeatureHasher(inputCols=feature_list_returning,
                       outputCol="features")
pipeline = Pipeline(stages=[hasher,rf])

paramGrid = ParamGridBuilder() \
    .addGrid(rf.maxDepth, [1]) \
    .addGrid(rf.numTrees,[50])\
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(),
                          numFolds=2,
                         seed=123)  

In [27]:
cvModel = crossval.fit(dataset=data_returning.cache())

In [35]:
#the average CV metric. I am limiting myself to just training due to time constraints. Will not go into validation, improvement,
#analyzing importances, picturing partials, and finally setting a scoring pipeline
cvModel.avgMetrics

[245.5129805939468]

### Additional Questions:

1) Predict the expected load (requests/second) in the next minute

#### Quality checks

In [29]:
#create time features
data = data.withColumn('timestamp_minute',ps_fun.to_timestamp(ps_fun.col('timestamp'),ts_format_minute)).\
            withColumn('hour_of_the_day',ps_fun.hour('timestamp_minute')).\
            withColumn('timestamp_second',ps_fun.to_timestamp(ps_fun.col('timestamp'),ts_format_second)).\
            withColumn('second',ps_fun.to_timestamp(ps_fun.second('timestamp_second'))).\
            withColumn('minute',ps_fun.minute('timestamp_second')).\
            withColumn('min-sec',ps_fun.concat('minute',ps_fun.lit('-'),'second'))

In [30]:
#the table below aims to illustrate the unique min-sec combinations within each hour. It shows that the data is not collected 
#for each minute within an hour. Next we try to see that if the data is collected for a minute, is it collected for all seconds
#in that minute
data.groupby('hour_of_the_day').agg(ps_fun.countDistinct('min-sec').alias('count')).sort(ps_fun.desc('count')).show(100)

+---------------+-----+
|hour_of_the_day|count|
+---------------+-----+
|             16|  820|
|             10|  813|
|             11|  337|
|             17|  309|
|              5|  305|
|             18|  304|
|              2|  301|
|             21|  301|
|              6|  298|
|              9|  258|
|             13|   89|
|             19|   60|
|             15|   59|
|              7|    8|
|             12|    7|
+---------------+-----+



In [31]:
#the below table illustrates that even if the data is collected for a minute, it is not being collected for all the seconds. 
#Now to make any reliable predictions, the dataset should be systematically generated/sampled. For the purpose of making this 
#true for this dataset, I am assuming that if for a minute, all the seconds are recorded then all the data within those seconds
#are recorded. Hence, the load for that minute is reliable. Thus, for modelling I will take only those minutes that have all
#seconds recorded
data_counts=data.groupby('timestamp_minute').agg(ps_fun.countDistinct('second').alias('count_unique_second'),ps_fun.count('*').alias('requests')).sort(ps_fun.desc('requests'))
data_counts.show(100)

+-------------------+-------------------+--------+
|   timestamp_minute|count_unique_second|requests|
+-------------------+-------------------+--------+
|2015-07-22 16:43:00|                 60|   26457|
|2015-07-22 16:21:00|                 60|   24841|
|2015-07-22 10:33:00|                 60|   24642|
|2015-07-22 10:46:00|                 60|   24321|
|2015-07-22 16:11:00|                 60|   24120|
|2015-07-22 16:42:00|                 60|   24095|
|2015-07-22 09:01:00|                 60|   24076|
|2015-07-22 10:38:00|                 60|   23944|
|2015-07-22 11:01:00|                 60|   23898|
|2015-07-22 09:02:00|                 60|   23411|
|2015-07-22 10:47:00|                 60|   23387|
|2015-07-22 10:36:00|                 60|   23301|
|2015-07-22 09:03:00|                 60|   23012|
|2015-07-22 10:37:00|                 60|   22896|
|2015-07-22 10:48:00|                 60|   22885|
|2015-07-22 11:02:00|                 60|   22847|
|2015-07-22 16:22:00|          

In [33]:
#subsetting data for minutes with all seconds recoded. Reason given above
data_minute = data_counts.filter('count_unique_second=60')

In [34]:
#I have straight away used the the lag to compute requests last minute. A shortcoming here would be that if for a minute 
#the data is not present, it will take the requests from the last avaialable minute. This is the probably a bad feature for the
#dataset at hand but can be quite useful for the full dataset
data_minute = data_minute.withColumn('requests_last_minute',ps_fun.lag('requests').over(Window.partitionBy().orderBy('timestamp_minute')))

In [35]:
#not doing a lot of feature engineering. A lot of other features can be thought of here
data_minute = data_minute.withColumn('hour_of_the_day', ps_fun.hour('timestamp_minute')).\
                          withColumn('day_of_the_week',ps_fun.dayofweek('timestamp_minute'))  

In [36]:
#for predicting something like workload, it is not important for us to be acuurate till the units/tens place; for example,the 
#workload of 200 and workload of 210 may mean the same thing in terms of infrastructure planning. However, what is essential 
#to predict is jumps in the workload  - for example 200 versus 1000. Hence, it is logical to bucketize the target variable. 
#I am choosing 5 buckets here. This, however, should be determined together with the product team to guage the inflexion points
# for infra requirement based on workloads
gre_histogram = data_minute.select('requests').rdd.flatMap(lambda x: x).histogram(5)

In [37]:
#viewing the cutpoints along with their frequency
gre_histogram

([3300.0, 7931.4, 12562.8, 17194.199999999997, 21825.6, 26457],
 [8, 5, 6, 11, 19])

In [38]:
#converting the target variable into buckets using the above cutpoints
bucketizer = Bucketizer(splits=[0]+gre_histogram[0]+[float('Inf')],inputCol="requests", outputCol="label")
data_minute = bucketizer.setHandleInvalid("keep").transform(data_minute)

In [39]:
data_minute = data_minute.dropna()

In [40]:
data_minute.show()

+-------------------+-------------------+--------+--------------------+---------------+---------------+-----+
|   timestamp_minute|count_unique_second|requests|requests_last_minute|hour_of_the_day|day_of_the_week|label|
+-------------------+-------------------+--------+--------------------+---------------+---------------+-----+
|2015-07-22 02:42:00|                 60|    5586|                6787|              2|              4|  1.0|
|2015-07-22 02:43:00|                 60|    4734|                5586|              2|              4|  1.0|
|2015-07-22 02:44:00|                 60|    4680|                4734|              2|              4|  1.0|
|2015-07-22 05:11:00|                 60|   11670|                4680|              5|              4|  2.0|
|2015-07-22 05:12:00|                 60|   12255|               11670|              5|              4|  2.0|
|2015-07-22 05:13:00|                 60|   14463|               12255|              5|              4|  3.0|
|2015-07-2

In [None]:
#defining the modelling pipeline
#faced major issues here in terms of the runtime on a small dataset. I am sure I am missing something in terms of distributing 
#the job. Will need to dig in and figure out what engineering can be done to reduce the runtime and make the process efficient
lr = LogisticRegression(maxIter=2,family="multinomial")
hasher = FeatureHasher(inputCols=['requests_last_minute','hour_of_the_day','day_of_the_week'],
                       outputCol="features")
pipeline = Pipeline(stages=[hasher,lr])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1])\
    .build()

crossval = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(),
                          numFolds=2,
                         seed=123)  

In [25]:
#training the model with cross validation
crossval_logistic = crossval.fit(dataset=data_minute.cache())

In [26]:
#the average CV metric. I am limiting myself to just training due to time constraints. Will not go into validation, improvement,
#analyzing importances, picturing partials, and finally setting scoring pipeline
crossval_logistic.avgMetrics

[0.2607669082125604]