In [1]:
import numpy as np
import pandas as pd
import pyspark
from pyspark.sql import functions as F
import json
import seaborn as sns
import matplotlib.pyplot as plt
import warnings
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, VectorAssembler
from pyspark.sql.types import IntegerType

warnings.filterwarnings('ignore')

In [2]:
# quick function to view wide data transposed to long
def preview(df, n=4):
    return pd.DataFrame(df.take(n), columns=df.columns).T

In [3]:
with open('schema.json', 'r') as f:
    schema_json = json.load(f)

columns = [column['name'] for column in schema_json['data']['columns']]
columns = ['bid_amount'] + columns

In [4]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

df = spark.read.csv('auctions_rev_2.csv',
                     sep=',',
                     inferSchema=True,
                     header=True)

In [5]:
old_columns = df.schema.names

for old,new in zip(old_columns,columns):
    df = df.withColumnRenamed(old, new)

#df.printSchema()

In [6]:
# build out a schema
# count missing values per feature, unique values per feature
missing = preview(df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]))
unique = preview(df.agg(*(F.countDistinct(F.col(c)).alias(c) for c in df.columns)))
missing_unique = pd.concat([missing,unique],axis=1)
missing_unique.reset_index(inplace=True)

pyspark_dtype = [column[1] for column in df.dtypes]

nanigans_dtype = [column['metadata']['vw_column_type'] 
 if 'vw_column_type' in column['metadata'].keys() 
 else None 
 for column in schema_json['data']['columns']]

schema = pd.concat([missing_unique, pd.DataFrame(nanigans_dtype), pd.DataFrame(pyspark_dtype)], axis=1)
schema.columns = ['name','missing','unique','nanigans_dtype','pyspark_dtype']

In [7]:
#schema

In [8]:
# fill NA string variables with 'missing', NA int variables with 0
# Checked that this will create distinct new categories

df = df.na.fill('missing').na.fill(0)

In [9]:
# number of rows
df.count()

140249

In [10]:
# Turn add size into height, length, and area.
# Log of area plus an insignificant 1, just to get a non null value for the zero area
split_col = F.split(df['impressionFeatures^adSize'], 'x')
df = df.withColumn('ad_height', split_col.getItem(0).cast(IntegerType()))
df = df.withColumn('ad_width', split_col.getItem(1).cast(IntegerType()))
df = df.withColumn('ad_area', df.ad_height * df.ad_width)
df = df.withColumn('log_ad_area', F.log(df.ad_area + 1))

In [11]:
# Extract viewability score as last 4 characters to get just the number, not the source
df = df.withColumn('view_score', df['extensions^ViewabilityScore'].substr(-4, 4).cast(IntegerType()))

In [12]:
#preview(df)

In [13]:
# Categorical variables with too many categories!
# Create a dictionary for features we want to keep. Stuff outside the list will get 'other'

reduce_categories = {}
reduce_categories['deviceFeatures^deviceInfoBrowser'] = ['firefox','mobile','chrome']
reduce_categories['deviceFeatures^deviceInfoModel'] = ['chrome','iphone','android','null','ipad','firefox','edge','safari','internet explorer','sm']
reduce_categories['deviceFeatures^deviceInfoOs'] = ['Windows','Android','iOS','macOS','Mac_OS_X']
reduce_categories['deviceFeatures^devicePlatform'] = ['desktop','mobile']
reduce_categories['extensions^ViewabilityScore'] = ['desktop','mobile']
reduce_categories['impressionFeatures^adPosition'] = ['ABOVE_THE_FOLD','UNKNOWN','BELOW_THE_FOLD']
reduce_categories['nanigansFeatures^exchange'] = ['INDEXEXCHANGE','APPNEXUS']
reduce_categories['nanigansFeatures^strategyGroupId'] = [1677,73445,1679,1712]
reduce_categories['geoFeatures^geoMetro'] = [0,807,506,623,602,511,504,803,524,825,751,505,517,519,533,618,528,510,534,862,532]

onehot_vars = ['deviceFeatures^deviceInfoBrowser',
             'deviceFeatures^deviceInfoModel',
             'deviceFeatures^deviceInfoOs',
             'deviceFeatures^devicePlatform',
             'impressionFeatures^adPosition',
             'nanigansFeatures^exchange',
             'nanigansFeatures^strategyGroupId',
             'geoFeatures^geoMetro',
             'pubFeatures^rqstSiteOrAppPublisherId',
             'timingFeatures^utcHourDayIndex',
             'timingFeatures^utcHourIndex',]

numerical_vars = ['bid_amount',
                  'clientFeatures^numericProductViewCountInLast01d',
                  'clientFeatures^numericProductViewCountInLast07d',
                  'clientFeatures^numericProductViewCountInLast28d',
                  'clientFeatures^numericProductViewCountInLast45d',
                  'ad_height',
                  'ad_width',
                  'ad_area',
                  'log_ad_area',
                  'view_score']

keep_vars = ['deviceFeatures^deviceInfoBrowser',
             'deviceFeatures^deviceInfoModel',
             'deviceFeatures^deviceInfoOs',
             'deviceFeatures^devicePlatform',
             'extensions^ViewabilityScore',
             'impressionFeatures^adPosition',
             'nanigansFeatures^exchange',
             'nanigansFeatures^strategyGroupId',
             'geoFeatures^geoMetro',
             'pubFeatures^rqstSiteOrAppPublisherId',
             'timingFeatures^utcHourDayIndex',
             'timingFeatures^utcHourIndex',
             'ad_height',
             'ad_width',
             'ad_area',
             'view_score']

reduce_vars = ['deviceFeatures^deviceInfoBrowser',
             'deviceFeatures^deviceInfoModel',
             'deviceFeatures^deviceInfoOs',
             'deviceFeatures^devicePlatform',
             'extensions^ViewabilityScore',
             'impressionFeatures^adPosition',
             'nanigansFeatures^exchange',
             'nanigansFeatures^strategyGroupId',
             'geoFeatures^geoMetro',]


In [14]:
# replace rare categories with 'other'
for var in reduce_vars:
    df = df.withColumn(var, F.when(df[var].isin(reduce_categories[var]),df[var]).otherwise('other'))
    
for var in onehot_vars:
    df = df.withColumn(var, df[var].cast('string'))

In [15]:
#preview(df)

In [16]:
# int_vars = schema[(schema.nanigans_dtype=="CATEGORICAL") & (schema.pyspark_dtype == 'int')]
# str_vars = schema[(schema.nanigans_dtype=="CATEGORICAL") & (schema.pyspark_dtype == 'string')]

In [17]:
# list(to_string.name)

In [18]:
#[col(c).cast("string") for c in to_string]

# df2 = df.select([F.col(c).cast(F.StringType()).alias(c) for c in list(to_string.name)])

In [19]:
df = df.withColumn('dt',F.to_timestamp(df.event_date, 'yyyy-MM-dd HH:mm:ss'))

In [20]:
indexers = [
    StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c))
    for c in onehot_vars
]

encoder = OneHotEncoderEstimator(
    inputCols=[indexer.getOutputCol() for indexer in indexers],
    outputCols=[
        "{0}_encoded".format(indexer.getOutputCol()) for indexer in indexers]
)

assembler = VectorAssembler(
    inputCols=encoder.getOutputCols() + numerical_vars,
    outputCol="features"
)

pipeline = Pipeline(stages=indexers + [encoder, assembler])

df = pipeline.fit(df).transform(df)

In [21]:
df.select('features').take(2)

[Row(features=SparseVector(3032, {0: 1.0, 3: 1.0, 10: 1.0, 16: 1.0, 19: 1.0, 23: 1.0, 25: 1.0, 46: 1.0, 3010: 1.0, 3022: 302.0, 3024: 1.0, 3025: 17.0, 3026: 84.0, 3027: 300.0, 3028: 250.0, 3029: 75000.0, 3030: 11.2253, 3031: 10.0})),
 Row(features=SparseVector(3032, {0: 1.0, 4: 1.0, 11: 1.0, 15: 1.0, 16: 1.0, 19: 1.0, 23: 1.0, 25: 1.0, 86: 1.0, 2994: 1.0, 3006: 1.0, 3022: 300.0, 3024: 3.0, 3025: 20.0, 3026: 42.0, 3027: 300.0, 3028: 250.0, 3029: 75000.0, 3030: 11.2253, 3031: 10.0}))]

In [22]:
df.groupBy('dt').count().show()

+-------------------+-----+
|                 dt|count|
+-------------------+-----+
|2019-05-29 00:00:00|11223|
|2019-05-27 00:00:00| 6972|
|2019-05-31 00:00:00| 9624|
|2019-05-30 00:00:00|11741|
|2019-05-24 00:00:00| 9099|
|2019-05-28 00:00:00|10160|
|2019-06-04 00:00:00|10891|
|2019-05-23 00:00:00|10463|
|2019-06-01 00:00:00| 7314|
|2019-05-26 00:00:00| 6359|
|2019-06-03 00:00:00|11630|
|2019-05-22 00:00:00| 9403|
|2019-05-25 00:00:00| 6800|
|2019-06-02 00:00:00| 7968|
|2019-06-05 00:00:00|10602|
+-------------------+-----+



In [23]:
train = df.where(df.dt < '2019-05-29 00:00:00')
test = df.where(df.dt >= '2019-05-29 00:00:00')

In [24]:
train.groupBy('dt').count().show()

+-------------------+-----+
|                 dt|count|
+-------------------+-----+
|2019-05-27 00:00:00| 6972|
|2019-05-24 00:00:00| 9099|
|2019-05-28 00:00:00|10160|
|2019-05-23 00:00:00|10463|
|2019-05-26 00:00:00| 6359|
|2019-05-22 00:00:00| 9403|
|2019-05-25 00:00:00| 6800|
+-------------------+-----+



In [25]:
train_positives = train.where(train.won_bid == 1)
train_negatives = train.where(train.won_bid == 0)

In [26]:
train_positives_os = train_positives.sample(withReplacement = True, fraction = 15.0, seed = 42)

In [27]:
train_os = train_negatives.union(train_positives_os)

In [28]:
#train.groupBy('geoFeatures^geoMetro').count().sort(F.desc("count")).show(200)

In [29]:
#train.groupBy('deviceFeatures^deviceInfoModel').count().show()

In [30]:
#train.groupBy('deviceFeatures^deviceInfoBrowser').count().show()

In [31]:
# Quick look at the funnel
df.groupBy('won_bid','has_click').count().show()

+-------+---------+------+
|won_bid|has_click| count|
+-------+---------+------+
|    1.0|      1.0|    31|
|    1.0|      0.0| 13616|
|    0.0|      0.0|126602|
+-------+---------+------+



In [32]:
train.groupBy('bid_amount').count().sort(F.desc("count")).show(200)

+----------+-----+
|bid_amount|count|
+----------+-----+
|         3| 5461|
|       298| 4440|
|        79| 4034|
|        36| 3685|
|        59| 3554|
|         6| 3516|
|       224| 2946|
|       223| 2080|
|       137| 1977|
|       299| 1977|
|       138| 1850|
|       302| 1201|
|       140|  996|
|       237|  912|
|       226|  850|
|        81|  843|
|        61|  786|
|       895|  670|
|       139|  600|
|       301|  578|
|        37|  531|
|       896|  508|
|       300|  503|
|       225|  491|
|       599|  426|
|       897|  399|
|       227|  394|
|       276|  343|
|       598|  302|
|       603|  282|
|       597|  259|
|       906|  253|
|       228|  249|
|       275|  248|
|       604|  235|
|       449|  204|
|       179|  202|
|      1343|  195|
|       900|  184|
|        62|  183|
|       144|  177|
|        38|  172|
|       596|  171|
|       356|  165|
|       304|  163|
|        83|  161|
|       609|  154|
|       898|  150|
|      1796|  142|
|       277|

In [33]:
# SQL way to do the same thing
df.registerTempTable('data')

win_clicks = spark.sql(r"""SELECT won_bid, has_click, COUNT(*)
                            FROM data
                            GROUP BY won_bid, has_click""")
win_clicks.show()

+-------+---------+--------+
|won_bid|has_click|count(1)|
+-------+---------+--------+
|    1.0|      1.0|      31|
|    1.0|      0.0|   13616|
|    0.0|      0.0|  126602|
+-------+---------+--------+



In [34]:
# from pandas.plotting import scatter_matrix

# # sns.set_palette('dark')
# # sns.set_context('notebook')
# # sns.set_style('white')

# # get the names of columns that are integers or doubles
# numeric_features = [t[0] for t in df.dtypes if t[1] in ['int', 'double']]

# # sample 10% of this data and convert to a Pandas dataframe
# sampled_data = df.select(numeric_features).sample(False, 0.1).toPandas()

# # make the scatter plot
# axs = pd.plotting.scatter_matrix(sampled_data, figsize=(12, 12));

# # Rotate axis labels and remove axis ticks

# for i in range(len(sampled_data.columns)):
#     v = axs[i, 0]
#     v.yaxis.label.set_rotation(0)
#     v.yaxis.label.set_ha('right')
#     #v.set_yticks(())
#     h = axs[n-1, i]
#     h.xaxis.label.set_rotation(90)
#     #h.set_xticks(())

In [35]:
# categorical_columns = [column['name'] for column in schema['data']['columns']
#  if 'vw_column_type' in column['metadata'].keys() 
#  and column['metadata']['vw_column_type'] == 'CATEGORICAL']

# numerical_cat_columns = [column['name'] for column in schema['data']['columns']
#  if 'vw_column_type' in column['metadata'].keys() 
#  and column['metadata']['vw_column_type'] == 'CATEGORICAL'
#  and column['pyspark_dtype'] != 'string']

# string_cat_columns = [column['name'] for column in schema['data']['columns']
#  if 'vw_column_type' in column['metadata'].keys() 
#  and column['metadata']['vw_column_type'] == 'CATEGORICAL' 
#  and column['pyspark_dtype'] == 'string']

# string_indexes = [name + '_index' for name in string_columns]
# categorical_vectors = [name + '_vec' for name in categorical_columns]

In [36]:
#train.printSchema()

In [37]:
# scaler = StandardScaler(inputCol=numerical_vars, outputCol="scaledNumericalVars",
#                         withStd=True, withMean=True)

# # Compute summary statistics by fitting the StandardScaler
# scalerModel = scaler.fit(train)

# # Normalize each feature to have unit standard deviation.
# train_scaled = scalerModel.transform(train)
# preview(train_scaled)

In [38]:
from pyspark.ml.feature import StandardScaler

scaler = StandardScaler(inputCol="features", outputCol="features_scaled",
                        withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(train)

# Normalize each feature to have unit standard deviation.
train_scaled = scalerModel.transform(train)
test_scaled = scalerModel.transform(test)

In [39]:
scaler_os = StandardScaler(inputCol="features", outputCol="features_scaled",
                        withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel_os = scaler_os.fit(train_os)

# Normalize each feature to have unit standard deviation.
train_os_scaled = scalerModel_os.transform(train_os)
test_os_scaled = scalerModel_os.transform(test)

In [40]:
#preview(train)

In [63]:
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, \
                                      GBTClassifier, LogisticRegression

In [64]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [44]:
areaROC = BinaryClassificationEvaluator(labelCol='won_bid',
                                        rawPredictionCol='prediction',
                                        metricName='areaUnderROC')

areaPR = BinaryClassificationEvaluator(labelCol='won_bid',
                                       rawPredictionCol='prediction',
                                       metricName='areaUnderPR')

In [68]:
#features_scaled[0]

In [69]:
# logistic regression
lr = LogisticRegression(labelCol='won_bid', 
                        featuresCol='features_scaled',
                        predictionCol='prediction')

lr_model = lr.fit(train_scaled)
lr_pred = lr_model.transform(test_scaled)

In [70]:
lr_os = LogisticRegression(labelCol='won_bid', 
                        featuresCol='features_scaled',
                        predictionCol='prediction')

lr_os_model = lr_os.fit(train_os_scaled)
lr_os_pred = lr_os_model.transform(test_os_scaled)

In [81]:
areaPR.evaluate(lr_pred)

0.22534393994960447

In [82]:
areaPR.evaluate(lr_os_pred)

0.1736083025863839

In [83]:
areaROC.evaluate(lr_pred)

0.5135583557098724

In [84]:
areaROC.evaluate(lr_os_pred)

0.6651486703165833

In [75]:
# logit with elastic net reg
lren = LogisticRegression(labelCol='won_bid', 
                        featuresCol='features_scaled',
                        predictionCol='prediction',
                        regParam=.01,
                        elasticNetParam=0.5)

lren_model = lren.fit(train_scaled)
lren_pred = lren_model.transform(test_scaled)


In [76]:
# logit with elastic net reg
lren_os = LogisticRegression(labelCol='won_bid', 
                        featuresCol='features_scaled',
                        predictionCol='prediction',
                        regParam=.01,
                        elasticNetParam=0.5)

lren_os_model = lren_os.fit(train_os_scaled)
lren_os_pred = lren_os_model.transform(test_os_scaled)


In [85]:
areaPR.evaluate(lren_pred)

0.268957021318683

In [86]:
areaPR.evaluate(lren_os_pred)

0.16321433245718822

In [87]:
areaROC.evaluate(lren_pred)

0.5065922029150671

In [88]:
areaROC.evaluate(lren_os_pred)

0.6550616193792536

In [93]:
lren_pred.select('probability')

DataFrame[probability: vector]

In [45]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

lr = LogisticRegression(labelCol='won_bid',
                            featuresCol='features',
                            predictionCol='prediction')
paramGrid = ParamGridBuilder()\
    .addGrid(lr.regParam, [0.05, 0.01, 0.005])\
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
    .build()

valid = TrainValidationSplit(estimator=lr,
                             estimatorParamMaps=paramGrid,
                             evaluator=areaPR,
                             trainRatio=0.8)
model = valid.fit(train)
result = model.bestModel.transform(test)

In [46]:
areaPR.evaluate(result)

0.26318906453202384

In [57]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# specify layers for the neural network:
# input layer of size 4 (features), two intermediate of size 5 and 4
# and output of size 3 (classes)
layers = [3032, 20, 2]

# create the trainer and set its parameters
mlp = MultilayerPerceptronClassifier(maxIter=20, layers=layers, featuresCol='features_scaled', labelCol='won_bid', blockSize=128, seed=42)

# train the model
mlp_model = mlp.fit(train)

Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:49868)

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1152, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/anaconda3/lib/python3.6/socket.py", line 586, in readinto
    return self._sock.recv_into(b)
ConnectionResetError: [Errno 54] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 985, in send_command
    response = connection.send_command(command)
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1164, in send_command
    "Error while receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while receiving
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anacon

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most 

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused
ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most 

In [None]:
# compute accuracy on the test set
mlp_result = mlp_model.transform(test)
predictionAndLabels = mlp_result.select("prediction", "won_bid")
evaluator = MulticlassClassificationEvaluator(metricName="accuracy")
print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

In [54]:
areaPR.evaluate(predictionAndLabels)

NameError: name 'predictionAndLabels' is not defined

In [56]:
predictionAndLabels.collect()

NameError: name 'predictionAndLabels' is not defined

In [101]:
areaPR.evaluate(result)

0.0695017194920427

In [76]:
# Random Forest (also of depth = 2)
rf = RandomForestClassifier(maxDepth=10, 
                            labelCol='won_bid',
                            featuresCol='features',
                            predictionCol='prediction')

rf_model = rf.fit(train)
rf_pred = rf_model.transform(test)

In [77]:
areaPR.evaluate(rf_pred)

0.07790333520947887

In [68]:
# Gradient Boosted Tree (depth = 2)
gbt = GBTClassifier(maxDepth=10, 
                    labelCol='won_bid', 
                    featuresCol='features',
                    predictionCol='prediction')

gbt_model = gbt.fit(train)
gbt_pred = gbt_model.transform(test)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:49868)

In [79]:
areaPR.evaluate(gbt_pred)

0.1837888636794876

In [75]:
gbt_pred

DataFrame[price_paid: double, won_bid: double, has_click: double, bid_request_id: string, clientFeatures^nanAppId: int, clientFeatures^numericProductViewCountInLast01d: double, clientFeatures^numericProductViewCountInLast07d: double, clientFeatures^numericProductViewCountInLast28d: double, clientFeatures^numericProductViewCountInLast45d: double, deviceFeatures^deviceInfoBrowser: string, deviceFeatures^deviceInfoModel: string, deviceFeatures^deviceInfoOs: string, deviceFeatures^deviceMake: string, deviceFeatures^deviceModel: string, deviceFeatures^deviceOs: string, deviceFeatures^devicePlatform: string, extensions^ViewabilityScore: string, geoFeatures^geoCountry: string, geoFeatures^geoMetro: string, geoFeatures^geoRegion: string, impressionFeatures^adPosition: string, impressionFeatures^adSize: string, impressionFeatures^adSizeAndPosition: string, impressionFeatures^skuCategory: string, nanigansFeatures^exchange: string, nanigansFeatures^strategyGroupId: string, pubFeatures^reqstappCat

In [1095]:
# Random Forest (also of depth = 2)
rf2 = RandomForestClassifier(maxDepth=10, 
                            labelCol='won_bid',
                            featuresCol='features',
                            predictionCol='prediction')

rf2_model = rf2.fit(train)
rf2_pred = rf2_model.transform(test)
areaPR.evaluate(rf2_pred)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:65145)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:65145)

In [67]:
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator

gbt = GBTClassifier(labelCol='won_bid', featuresCol='features_scaled')

paramGrid = ParamGridBuilder() \
    .addGrid(gbt.maxDepth, [10,20,None]) \
    .build()

# crossval = CrossValidator(estimator=gbt,
#                           estimatorParamMaps=paramGrid,
#                           evaluator=areaROC,
#                           numFolds=3)

trainval = TrainValidationSplit(estimator=gbt,
                          estimatorParamMaps=paramGrid,
                          evaluator=areaROC,
                          trainratio=0.8)

gbt_model = gbt.fit(train)
gbt_pred = gbt_model.transform(test)
areaPR.evaluate(gbt_pred)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:49868)
Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 61] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:49868)

# Stuff below here is for applying standard scalar on only certain rows!

In [1005]:
train2.show()

+---------+----------+
|ad_height|ad_height2|
+---------+----------+
|        0|     [0.0]|
|        0|     [0.0]|
|      300|   [300.0]|
|      320|   [320.0]|
|      300|   [300.0]|
|      300|   [300.0]|
|      300|   [300.0]|
|      300|   [300.0]|
|        0|     [0.0]|
|        0|     [0.0]|
|      300|   [300.0]|
|      300|   [300.0]|
|      300|   [300.0]|
|        0|     [0.0]|
|      300|   [300.0]|
|      300|   [300.0]|
|        0|     [0.0]|
|        0|     [0.0]|
|      300|   [300.0]|
|      300|   [300.0]|
+---------+----------+
only showing top 20 rows



In [1004]:
train2 = train.select(
   list_to_vector_udf(df["ad_height"]).alias("ad_height2")
)

scaler = StandardScaler(inputCol="ad_height2", outputCol="scaledad_height",
                        withStd=True, withMean=True)

# Compute summary statistics by fitting the StandardScaler
scalerModel = scaler.fit(train2)

# Normalize each feature to have unit standard deviation.
train_scaled = scalerModel.transform(train2)
train_scaled.select("scaledad_height").show(10)

+--------------------+
|     scaledad_height|
+--------------------+
|[-1.2477085354608...|
|[-1.2477085354608...|
|[0.40242251470944...|
|[0.5124312513874582]|
|[0.40242251470944...|
|[0.40242251470944...|
|[0.40242251470944...|
|[0.40242251470944...|
|[-1.2477085354608...|
|[-1.2477085354608...|
+--------------------+
only showing top 10 rows



In [989]:
train.withColumn("ad_height", train.ad_height.cast("tinyint"))

DataFrame[price_paid: double, won_bid: double, has_click: double, bid_request_id: string, clientFeatures^nanAppId: int, clientFeatures^numericProductViewCountInLast01d: double, clientFeatures^numericProductViewCountInLast07d: double, clientFeatures^numericProductViewCountInLast28d: double, clientFeatures^numericProductViewCountInLast45d: double, deviceFeatures^deviceInfoBrowser: string, deviceFeatures^deviceInfoModel: string, deviceFeatures^deviceInfoOs: string, deviceFeatures^deviceMake: string, deviceFeatures^deviceModel: string, deviceFeatures^deviceOs: string, deviceFeatures^devicePlatform: string, extensions^ViewabilityScore: string, geoFeatures^geoCountry: string, geoFeatures^geoMetro: string, geoFeatures^geoRegion: string, impressionFeatures^adPosition: string, impressionFeatures^adSize: string, impressionFeatures^adSizeAndPosition: string, impressionFeatures^skuCategory: string, nanigansFeatures^exchange: string, nanigansFeatures^strategyGroupId: string, pubFeatures^reqstappCat

In [978]:
type(train.select("ad_height", "ad_width"))

pyspark.sql.dataframe.DataFrame

In [974]:
train["clientFeatures^numericProductViewCountInLast28d"]

pyspark.sql.column.Column

In [994]:
from pyspark.ml.linalg import Vectors

xx =spark.createDataFrame([(Vectors.dense([0.0]),), (Vectors.dense([2.0]),)], ["a"])

In [1003]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())


In [1002]:
train.printSchema()

root
 |-- price_paid: double (nullable = false)
 |-- won_bid: double (nullable = false)
 |-- has_click: double (nullable = false)
 |-- bid_request_id: string (nullable = false)
 |-- clientFeatures^nanAppId: integer (nullable = true)
 |-- clientFeatures^numericProductViewCountInLast01d: double (nullable = false)
 |-- clientFeatures^numericProductViewCountInLast07d: double (nullable = false)
 |-- clientFeatures^numericProductViewCountInLast28d: double (nullable = false)
 |-- clientFeatures^numericProductViewCountInLast45d: double (nullable = false)
 |-- deviceFeatures^deviceInfoBrowser: string (nullable = false)
 |-- deviceFeatures^deviceInfoModel: string (nullable = false)
 |-- deviceFeatures^deviceInfoOs: string (nullable = false)
 |-- deviceFeatures^deviceMake: string (nullable = false)
 |-- deviceFeatures^deviceModel: string (nullable = false)
 |-- deviceFeatures^deviceOs: string (nullable = false)
 |-- deviceFeatures^devicePlatform: string (nullable = false)
 |-- extensions^Viewabil

In [1001]:
xx.show()

+-----+
|    a|
+-----+
|[0.0]|
|[2.0]|
+-----+

