In [40]:
from pyspark.sql.types import ArrayType, StringType

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

import pandas as pd
import matplotlib

# this requires data to be in libsvm format, good for sparse vectors but our vector has 4 points
#from pyspark.ml.classification import DecisionTreeClassifier

# https://scikit-learn.org/stable/modules/tree.html
from sklearn import tree


# do not truncate arrays
import numpy as np
np.set_printoptions(threshold=np.nan)

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# see creation of decision tree training data: https://github.com/nuria/study/tree/master/poc-bot-classifier
# df = spark.sql(" drop table if exists nuria.classifier_data_features;") \
# df = spark.sql("create table nuria.classifier_data_features as select session_length_secs, number_of_requests, \
# nocookies, label  from nuria.classifier_data_processed")

from sklearn import datasets

#iris = datasets.load_iris()

#print (iris)

In [1]:
# see how balanced are tables in terms of data

df = spark.sql("select count(*), label from nuria.classifier_data_sorted_processed group by label")
df.show(4)

+---------+---------+
| count(1)|    label|
+---------+---------+
|   178373|automated|
|   823669|     null|
|164336104|     user|
+---------+---------+



In [46]:
df = spark.sql("select count(*), label from nuria.classifier_training_data_human_sorted_processed group by label")
df.show(4)

+--------+---------+
|count(1)|    label|
+--------+---------+
|     398|automated|
|    1124|     null|
|   98478|     user|
+--------+---------+



In [2]:
df = spark.sql("select count(*), label from nuria.classifier_training_data_bot_sorted_processed group by label")
df.show(4)

+--------+---------+
|count(1)|    label|
+--------+---------+
|    2085|automated|
|     257|     null|
|    5488|     user|
+--------+---------+



In [65]:
df = spark.sql("select * from nuria.model_training_data order by sessionId")
df.show(10, False)



+--------------------------------+-------------------+------------------+---------------------+---------+-----------------+---------+
|sessionid                       |session_length_secs|number_of_requests|request_ratio_per_min|nocookies|user_agent_length|label    |
+--------------------------------+-------------------+------------------+---------------------+---------+-----------------+---------+
|0000bdb4077f6a7c1172244e623f882e|0                  |1                 |null                 |0.0      |70               |user     |
|00016ebd5652fcbcd169f742b98fb9ec|80627              |26                |0                    |26.0     |1                |automated|
|0001bbaa864fe48798b49af1eee86af3|0                  |1                 |null                 |0.0      |68               |user     |
|0001eb4aad9f994f6c3ccffe9303249a|27862              |17                |0                    |17.0     |11               |automated|
|0003093bbf6b976417c999e57aa38f3f|417                |7       

In [41]:
'''
hive (nuria)> desc model_training_data;
OK
col_name	data_type	comment
session_length_secs 	bigint
number_of_requests  	bigint
request_ratio_per_min	int
nocookies           	double
user_agent_length   	int
label               	string
'''


# move data into  two arrays
# As with other classifiers, DecisionTreeClassifier takes as input 
# two arrays: an array X, sparse or dense, of size [n_samples, n_features] holding the training samples, and an array 
# Y of integer values, size [n_samples], holding the class labels for the training samples:

df = spark.sql("select * from nuria.model_training_data order by sessionId")

def format_input_data(df):
    # vector of labels, they need to be integer values
    # we use 1 for automated, 0 for user
    labels = []
    # array with feature value, a matrix of [samples (rows) * features(feature cardinality)]
    features = []

    for row in df.collect():
        datapoint_label = row["label"]
        if datapoint_label=="automated":
            labels.append(1)
        else:
            labels.append(0)
            
        if row["request_ratio_per_min"] == None:
            request_ratio_per_min = 0
        else:
            request_ratio_per_min = row["request_ratio_per_min"]
            
        datapoint_features = [row["session_length_secs"], row["number_of_requests"],request_ratio_per_min, row["nocookies"], row["user_agent_length"]]
        
        features.append(datapoint_features)
        
    return (labels, features)

#print (labels)

(labels, features) = format_input_data(df)

#print (features) 

clf = tree.DecisionTreeClassifier()
clf = clf.fit(features, labels)


In [53]:
#tree.plot_tree(clf.fit(features, labels))

In [64]:
# get sessions from our testing data and see how are those classified

df_testing_data = spark.sql("select * from nuria.model_testing_data")

(labels_testing_data, features_testing_data) = format_input_data(df_testing_data)

predicted_labels = clf.predict(features_testing_data)

automated = 0
user = 0

for item in predicted_labels:
    if item == 0:
        automated = automated + 1
    elif item == 1:
        user = user + 1 

print ( "{0} of sessions are labeled as automated, {1} are labelled as user ".format(automated, user ))
                            
# print( predicted_labels )

# print(labels_testing_data)
  

2218 of sessions are labeled as automated, 1782 are labelled as user 
