In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,lit, udf
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.util import MLUtils

from user_agent_extractor import parse_device_type, parse_browser_type, parse_browser_version

In [2]:
def parse_device_type_function(agent):
    return parse_device_type(agent)

def parse_browser_type_function(agent):
    return parse_browser_type(agent)

def parse_browser_version_function(agent):
    return parse_browser_version(agent)

In [3]:
spark = SparkSession.builder.master("local[4]").getOrCreate()

In [4]:
parse_device_type_f = udf(parse_device_type_function)
parse_browser_type_f = udf(parse_browser_type_function)
parse_browser_version_f = udf(parse_browser_version_function)

In [5]:
events = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("sample_data/events.csv")
    .withColumn("DEVICE_TYPE", parse_device_type_f(col("USER_AGENT")))
    .withColumn("BROWSER_TYPE", parse_browser_type_f(col("USER_AGENT")))
    .withColumn("BROWSER_VERSION", parse_browser_version_f(col("USER_AGENT"))))

                                                                                

In [6]:
events.show()

[Stage 2:>                                                          (0 + 1) / 1]

+--------------------+-------------+------------+----------------+---------------+------------------+--------------------+-----------+--------------------+---------------+
|          USER_AGENT|HOTEL_CITY_ID|SITE_COUNTRY|HOTEL_COUNTRY_ID|TIME_TO_ARRIVAL|TIME_SPENT_ON_SITE|                  ID|DEVICE_TYPE|        BROWSER_TYPE|BROWSER_VERSION|
+--------------------+-------------+------------+----------------+---------------+------------------+--------------------+-----------+--------------------+---------------+
|Mozilla/5.0 (Linu...|        65018|          IL|              72|             14|               292|6c1030008ddb237e1...|     Mobile|    Samsung Internet|            7.0|
|Mozilla/5.0 (Linu...|        65018|          IL|              72|             14|               292|14ad1535c071e8246...|     Mobile|    Samsung Internet|            7.0|
|Mozilla/5.0 (Linu...|        65018|          IL|              72|             14|               292|8c062fd9c998f9827...|     Mobile|    Sa

                                                                                

In [7]:
conversions = (spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .csv("sample_data/conversions.csv")
    .withColumn("CONVERTED", lit(1.0)))

                                                                                

In [8]:
labeled_events = events.join(conversions, "ID", "left")

labeled_events.printSchema()
labeled_events.show()

root
 |-- ID: string (nullable = true)
 |-- USER_AGENT: string (nullable = true)
 |-- HOTEL_CITY_ID: integer (nullable = true)
 |-- SITE_COUNTRY: string (nullable = true)
 |-- HOTEL_COUNTRY_ID: integer (nullable = true)
 |-- TIME_TO_ARRIVAL: integer (nullable = true)
 |-- TIME_SPENT_ON_SITE: integer (nullable = true)
 |-- DEVICE_TYPE: string (nullable = true)
 |-- BROWSER_TYPE: string (nullable = true)
 |-- BROWSER_VERSION: string (nullable = true)
 |-- CONVERTED: double (nullable = true)





+--------------------+--------------------+-------------+------------+----------------+---------------+------------------+-----------+-------------+---------------+---------+
|                  ID|          USER_AGENT|HOTEL_CITY_ID|SITE_COUNTRY|HOTEL_COUNTRY_ID|TIME_TO_ARRIVAL|TIME_SPENT_ON_SITE|DEVICE_TYPE| BROWSER_TYPE|BROWSER_VERSION|CONVERTED|
+--------------------+--------------------+-------------+------------+----------------+---------------+------------------+-----------+-------------+---------------+---------+
|0030d576a174e5cbd...|Mozilla/5.0 (Linu...|        11635|          IT|               8|           -999|               527|     Mobile|Chrome Mobile|      74.0.3729|     null|
|00d48f83ffd53d695...|Mozilla/5.0 (X11;...|        68550|          GB|             178|           -999|                45|   Computer|       Chrome|      74.0.3729|     null|
|00e6b9e26f77f81dd...|Mozilla/5.0 (iPho...|        40075|          US|               1|              1|               474|   

                                                                                

In [10]:
as_double = (lambda v: float(v) if v is not None else 0.0)

In [11]:
featureData = (labeled_events.rdd.map(lambda r: LabeledPoint(
          as_double(r["CONVERTED"]),
          Vectors.dense(
              as_double(r["HOTEL_CITY_ID"]),
              as_double(r["TIME_TO_ARRIVAL"]),
              as_double(r["TIME_SPENT_ON_SITE"]))))).toDF()

                                                                                

In [12]:
training_and_test_data = MLUtils.convertVectorColumnsToML(featureData).randomSplit([0.7, 0.3])


21/08/11 16:10:52 WARN MLUtils: Vector column conversion has serialization overhead. Please migrate your datasets and workflows to use the spark.ml package.


In [13]:
lr = LogisticRegression(probabilityCol = "probability")


In [14]:
model = lr.fit(training_and_test_data[0])


21/08/11 16:11:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
21/08/11 16:11:39 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
                                                                                

In [15]:
auc = model.summary.areaUnderROC


                                                                                

In [16]:
print('Training AUC: ' + str(auc))


Training AUC: 0.8101708481920293


In [17]:
test = model.transform(training_and_test_data[1])
test.select("features", "label", "probability").show()

+------------------+-----+--------------------+
|          features|label|         probability|
+------------------+-----+--------------------+
|  [0.0,-999.0,0.0]|  0.0|[0.88017451639178...|
|  [0.0,-999.0,0.0]|  0.0|[0.88017451639178...|
|  [0.0,-999.0,4.0]|  0.0|[0.88024394765198...|
|  [0.0,-999.0,4.0]|  1.0|[0.88024394765198...|
|  [0.0,-999.0,9.0]|  0.0|[0.88033068784675...|
| [0.0,-999.0,18.0]|  0.0|[0.88048668339736...|
| [0.0,-999.0,24.0]|  0.0|[0.88059058276873...|
| [0.0,-999.0,40.0]|  1.0|[0.88086726610550...|
| [0.0,-999.0,45.0]|  0.0|[0.88095361588460...|
| [0.0,-999.0,54.0]|  0.0|[0.88110890907983...|
| [0.0,-999.0,59.0]|  0.0|[0.88119510733391...|
| [0.0,-999.0,60.0]|  0.0|[0.88121234049504...|
| [0.0,-999.0,61.0]|  0.0|[0.88122957149336...|
| [0.0,-999.0,63.0]|  0.0|[0.88126402700223...|
| [0.0,-999.0,65.0]|  0.0|[0.88129847386191...|
| [0.0,-999.0,77.0]|  1.0|[0.88150497346468...|
| [0.0,-999.0,77.0]|  1.0|[0.88150497346468...|
| [0.0,-999.0,78.0]|  0.0|[0.88152216772