In [1]:
DF = sqlContext.read.format('com.databricks.spark.csv').options(delimiter=',', header='true', inferschema='true').load("dbfs:/mnt/s3/data/train_v2_flatten.csv")
#display(DF)

In [2]:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def deleteNull_castToFloat(x):
  
  if x == None:
    return 0.0
  if x == '(not set)':
    return 0
  if x == 'No':
    return 0.0
  else:
    y = len(x)
    if y>6:
      i = int(x[ : y-6])
      f = int(x[y-6:y])
    else:
      i = 0
      f = int(x)
    a = float(i)
    b = float(f)/1000000
    return a+b
udf_deleteNull_castToFloat = F.udf(deleteNull_castToFloat, T.FloatType())

def deleteNull_castToInt(x):
  
  if x == None:
    return 0
  if x == '(not set)':
    return 0
  if x == 'No':
    return 0
  else:
    return int(x)
udf_deleteNull_castToInt = F.udf(deleteNull_castToInt, T.IntegerType())

def deleteNull_castToInt_visitNumber(x):
  
  if x == None:
    return 0
  if x == '(not set)':
    return 0
  if x == 'No':
    return 0
  if len(x)>10:
    return 0
  else:
    return int(x)
udf_deleteNull_castToInt_visitNumber = F.udf(deleteNull_castToInt_visitNumber, T.IntegerType())

In [3]:
from pyspark.sql.functions import col
DF_select = DF.select(udf_deleteNull_castToFloat("totals_transactionRevenue").alias("revenue"), udf_deleteNull_castToFloat("totals_totalTransactionRevenue").alias("total_revenue"), "device_operatingSystem", "device_browser",  "geoNetwork_country", "channelGrouping", (col('visitStartTime')).cast("string").alias("startTime_cast"), (col('date')).cast("string").alias("date_cast"), "fullVisitorId", udf_deleteNull_castToInt("totals_hits").alias("hits"), udf_deleteNull_castToInt("totals_pageviews").alias("pageview"), udf_deleteNull_castToInt_visitNumber("visitNumber").alias("visitNumber"),
                     "geoNetwork_networkDomain","geoNetwork_region" )

In [4]:
from pyspark.sql.functions import unix_timestamp, to_date
from pyspark.sql.functions import from_unixtime
from pyspark.sql.functions import month, dayofweek, hour, year, weekofyear, dayofyear

DF_select = DF_select.withColumn('date_cast', to_date('date_cast', 'yyyyMMdd'))
#DF_select = DF_select.withColumn("month", month("date_cast"))
#DF_select = DF_select.withColumn("weekday", dayofweek("date_cast"))
#DF_select = DF_select.withColumn("year", year("date_cast"))
DF_select = DF_select.withColumn("yearweek", weekofyear("date_cast"))
DF_select = DF_select.withColumn("yearday", dayofyear("date_cast"))
#DF_select = DF_select.withColumn("startTime_timestamp", from_unixtime("startTime_cast"))
#DF_select = DF_select.withColumn("hour", hour("startTime_timestamp"))
#DF_select = DF_select.drop('startTime_timestamp', "startTime_cast", 'date_cast')
DF_select.cache()

In [5]:
DF_fea = DF_select

In [6]:
#Generating features grouped by geoNetwork_networkDomain


# pageview
DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').sum('pageview').withColumnRenamed('sum(pageview)', 'sum_pageviews_per_network_domain'), "geoNetwork_networkDomain")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').avg('pageview').withColumnRenamed('avg(pageview)', 'avg_pageviews_per_network_domain'), "geoNetwork_networkDomain")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').agg(F.count('pageview')).withColumnRenamed('count(pageview)', 'count_pageviews_per_network_domain'),"geoNetwork_networkDomain")

# hits
DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').sum('hits').withColumnRenamed('sum(hits)', 'sum_hits_per_network_domain'), "geoNetwork_networkDomain")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').avg('hits').withColumnRenamed('avg(hits)', 'avg_hits_per_network_domain'), "geoNetwork_networkDomain")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').agg(F.count('hits')).withColumnRenamed('count(hits)', 'count_hits_per_network_domain'),"geoNetwork_networkDomain")

# DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').sum('revenue').withColumnRenamed('sum(revenue)', 'sum_revenue_per_day'), "geoNetwork_networkDomain")

# DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_networkDomain').avg('revenue').withColumnRenamed('avg(revenue)', 'avg_revenue_per_day'), "geoNetwork_networkDomain")

In [7]:
#Generating features grouped by dayofyear & weekofyear

DF_yearday = DF_fea.groupby('yearday').sum('hits').withColumnRenamed('sum(hits)', 'sum_hits_per_day').withColumnRenamed("yearday", 'yearday2')

DF_fea = DF_fea.join(DF_yearday, DF_yearday.yearday2 == DF_fea.yearday).drop('yearday2')

DF_yearday = DF_fea.groupby('yearday').avg('hits').withColumnRenamed('avg(hits)', 'avg_hits_per_day').withColumnRenamed("yearday", 'yearday2')

DF_fea = DF_fea.join(DF_yearday, DF_yearday.yearday2 == DF_fea.yearday).drop('yearday2')

DF_yearweek = DF_fea.groupby('yearweek').sum('hits').withColumnRenamed('sum(hits)', 'sum_hits_per_week').withColumnRenamed("yearweek", 'yearweek2')

DF_fea = DF_fea.join(DF_yearweek, DF_yearweek.yearweek2 == DF_fea.yearweek).drop('yearweek2')

DF_yearweek = DF_fea.groupby('yearweek').avg('hits').withColumnRenamed('avg(hits)', 'avg_hits_per_week').withColumnRenamed("yearweek", 'yearweek2')

DF_fea = DF_fea.join(DF_yearweek, DF_yearweek.yearweek2 == DF_fea.yearweek).drop('yearweek2')

# DF_fea = DF_fea.join(DF_fea.groupby('dayofyear').sum('revenue').withColumnRenamed('sum(revenue)', 'sum_revenue_per_day'), "dayofyear")

# DF_fea = DF_fea.join(DF_fea.groupby('dayofyear').avg('revenue').withColumnRenamed('avg(revenue)', 'avg_revenue_per_day'), "dayofyear")

In [8]:
#Generating features grouped by geoNetwork_region

#DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_region').sum('pageview').withColumnRenamed('sum(pageview)', 'sum_pageviews_per_region'), "geoNetwork_region")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_region').avg('pageview').withColumnRenamed('avg(pageview)', 'avg_pageviews_per_region'), "geoNetwork_region")

#DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_region').agg(F.count('pageview')).withColumnRenamed('count(pageview)', 'count_pageviews_per_region'),"geoNetwork_region")

#DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_region').sum('hits').withColumnRenamed('sum(hits)', 'sum_hits_per_region'), "geoNetwork_region")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_region').avg('hits').withColumnRenamed('avg(hits)', 'avg_hits_per_region'), "geoNetwork_region")

#DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_region').agg(F.count('hits')).withColumnRenamed('count(hits)', 'count_hits_per_region'),"geoNetwork_region")

In [9]:
#Generating features grouped by geoNetwork_country

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_country').sum('pageview').withColumnRenamed('sum(pageview)', 'sum_pageviews_per_country'), "geoNetwork_country")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_country').avg('pageview').withColumnRenamed('avg(pageview)', 'avg_pageviews_per_country'), "geoNetwork_country")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_country').agg(F.count('pageview')).withColumnRenamed('count(pageview)', 'count_pageviews_per_country'),"geoNetwork_country")


In [10]:
DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_country').sum('hits').withColumnRenamed('sum(hits)', 'sum_hits_per_country'), "geoNetwork_country")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_country').avg('hits').withColumnRenamed('avg(hits)', 'avg_hits_per_country'), "geoNetwork_country")

DF_fea = DF_fea.join(DF_fea.groupby('geoNetwork_country').agg(F.count('hits')).withColumnRenamed('count(hits)', 'count_hits_per_country'),"geoNetwork_country")

In [11]:
#Generating features grouped by fullVisitorId

DF_fea = DF_fea.join(DF_fea.groupby('fullVisitorId').sum('pageview').withColumnRenamed('sum(pageview)', 'sum_pageviews_per_fullVisitorId'), "fullVisitorId")

#DF_fea = DF_fea.join(DF_fea.groupby('fullVisitorId').avg('pageview').withColumnRenamed('avg(pageview)', 'avg_pageviews_per_fullVisitorId'), "fullVisitorId")

#DF_fea = DF_fea.join(DF_fea.groupby('fullVisitorId').agg(F.count('pageview')).withColumnRenamed('count(pageview)', 'count_pageviews_per_fullVisitorId'),"fullVisitorId")

DF_fea = DF_fea.join(DF_fea.groupby('fullVisitorId').sum('hits').withColumnRenamed('sum(hits)', 'sum_hits_per_fullVisitorId'), "fullVisitorId")

#DF_fea = DF_fea.join(DF_fea.groupby('fullVisitorId').avg('hits').withColumnRenamed('avg(hits)', 'avg_hits_per_fullVisitorId'), "fullVisitorId")

#DF_fea = DF_fea.join(DF_fea.groupby('fullVisitorId').agg(F.count('hits')).withColumnRenamed('count(hits)', 'count_hits_per_fullVisitorId'),"fullVisitorId")



In [12]:
sqlContext.registerDataFrameAsTable(DF_fea, "DF")

DF_fea = DF_fea.join(sqlContext.sql("select fullVisitorId, sum_pageviews_per_fullVisitorId/avg_pageviews_per_region as user_pageviews_to_region from DF"), "fullVisitorId")

DF_fea = DF_fea.join(sqlContext.sql("select fullVisitorId, sum_hits_per_fullVisitorId/avg_hits_per_region as user_hits_to_region from DF"), "fullVisitorId")

In [13]:
sqlContext.sql("select fullVisitorId, visitNumber/pageview as Test from DF")

In [14]:
%sql

insert into DF select visitNumber/pageview as Test from DF

In [15]:
DF_select_group = DF_select.groupby('fullVisitorId').avg('visitNumber')

In [16]:
display(DF_select.join(DF_select_group,DF_select.fullVisitorId == DF_select_group.fullVisitorId))

In [17]:
# process geoNetwork_country
import numpy as np
country_5k_10k = DF_select.groupBy("geoNetwork_country").count().filter("count>5000").filter("count<10000").select('geoNetwork_country')
country_0_5k = DF_select.groupBy("geoNetwork_country").count().filter("count<=5000").select('geoNetwork_country')
country_5k_10k = np.array(country_5k_10k.toPandas().geoNetwork_country)
country_0_5k = np.array(country_0_5k.toPandas().geoNetwork_country)

def process_country(x):
  if x == None:
    return 'Other'
  elif x == '(not set)':
    return 'Other'
  elif len(x)>40:
    return 'Other'
  elif x in country_0_5k:
    return 'Other_0_5k'
  elif x in country_5k_10k:
    return 'Other_5k_10k'
  else:
    return x
  
  
udf_process_country = F.udf(process_country, T.StringType())
DF_fea = DF_fea.withColumn('process_country', udf_process_country('geoNetwork_country'))
display(DF_fea.groupBy("process_country").agg(F.count("process_country")).orderBy("count(process_country)"))
                      

In [18]:
country_list = np.array(DF_fea.groupBy('process_country').count().orderBy('count').toPandas().process_country)

In [19]:
# process OS
OS_0_10k = DF_select.groupBy("device_operatingSystem").count().filter("count<10000").select('device_operatingSystem')
OS_0_10k = np.array(OS_0_10k.toPandas().device_operatingSystem)


def process_OS(x):
  if x == None:
    return 'Other'
  elif x == '(not set)':
    return 'Other'
  elif x in OS_0_10k:
    return 'Other_0_10k'
  else:
    return x
  
udf_process_OS = F.udf(process_OS, T.StringType())
DF_fea = DF_fea.withColumn('process_OS', udf_process_OS('device_operatingSystem'))
display(DF_fea.groupBy("process_OS").agg(F.count("process_OS")).orderBy("count(process_OS)"))

In [20]:
OS_list = np.array(DF_fea.groupBy('process_OS').count().orderBy('count').toPandas().process_OS)
OS_list

In [21]:
# process geoNetwork_country
import numpy as np
browser_5k_10k = DF_select.groupBy("device_browser").count().filter("count>5000").filter("count<10000").select('device_browser')
browser_0_5k = DF_select.groupBy("device_browser").count().filter("count<=5000").select('device_browser')
browser_5k_10k = np.array(browser_5k_10k.toPandas().device_browser)
browser_0_5k = np.array(browser_0_5k.toPandas().device_browser)

def process_browser(x):
  if x == None:
    return 'Other'
  elif x == '(not set)':
    return 'Other'
  elif x in browser_5k_10k:
    return 'Other_5k_10k'
  elif x in browser_0_5k:
    return 'Other_0_5k'
  else:
    return x
  
udf_process_browser = F.udf(process_browser, T.StringType())
DF_fea = DF_fea.withColumn('process_browser', udf_process_browser('device_browser'))
display(DF_fea.groupBy("process_browser").agg(F.count("process_browser")).orderBy("count(process_browser)"))

In [22]:
browser_list = np.array(DF_fea.groupBy('process_browser').count().orderBy('count').toPandas().process_browser)
browser_list

In [23]:
# process channelGrouping
def process_CG(x):
  if x == 'Affiliates':
    return 'Affiliates'
  if x == 'Paid Search':
    return 'Paid Search'
  if x == 'Display':
    return 'Display'
  if x == 'Referral':
    return 'Referral'
  if x == 'Direct':
    return 'Direct'
  if x == 'Social':
    return 'Social'
  if x == 'Organic Search':
    return 'Organic Search'
  else:
    return 'Other'
  
udf_process_CG = F.udf(process_CG, T.StringType())
DF_fea = DF_fea.withColumn('process_CG', udf_process_CG('channelGrouping'))
display(DF_fea.groupBy("process_CG").agg(F.count("process_CG")).orderBy("count(process_CG)"))

In [24]:
CG_list = np.array(DF_fea.groupBy('process_CG').count().orderBy('count').toPandas().process_CG)
CG_list

In [25]:
display(DF_fea)

In [27]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator
all_col = ['revenue', 'process_CG', 'process_browser', 'process_country', 'process_OS', 'hour', 'weekday', 'month', 'year', 'hits', 'pageview', 'visitNumber']
categorical_features = ['process_CG', 'process_browser', 'process_country', 'process_OS', 'hour', 'weekday', 'month', 'year']
conti_features = ['hits', 'pageview', 'visitNumber']
stages = [] # stages in our Pipeline
# One-hot encode cotegorical feature
for i in categorical_features:
  stringIndexer = StringIndexer(inputCol=i, outputCol=i + "_Index").setHandleInvalid('skip')
  #StringIndexer.handleInvalid('skip')
  encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],
                                     outputCols=[i + "_Vec"])
  stages += [stringIndexer, encoder]

In [28]:
assemblerInputs = [i + "_Vec" for c in categorical_features]+conti_features
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [29]:
DF_train = DF_select.select('revenue', 'process_CG', 'process_browser', 'process_country', 'process_OS', 'hour', 'weekday', 'month', 'year', 'hits', 'pageview', 'visitNumber')
display(DF_train)

In [30]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(DF_train)
model = pipelineModel.transform(DF_train)

In [31]:
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator
gbt = GBTRegressor(featuresCol="features", labelCol="revenue", maxIter=20, maxDepth=30)
train_data, test_data = model.randomSplit([.8,.2],seed=1234)

In [32]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
crossval.setEstimator(pipeline)

# Let's tune over our dt.maxDepth parameter on the values 2 and 3, create a paramter grid using the ParamGridBuilder
maxDepth_grid = [10,20,30]
maxBins_grid = [30,50,70]
maxIter_grid = [15,20,25]
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, maxDepth_grid)
             .addGrid(gbt.maxBins, maxBins_grid)
             .addGrid(gbt.maxIter, maxIter_grid)
             .build())

# Add the grid to the CrossValidator
crossval.setEstimatorParamMaps(paramGrid)

# Now let's find and return the best model
gbtModel = crossval.fit(train_data).bestModel

In [33]:
# Make prediction
predictions = gbtModel.transform(test_data)
display(predictions.filter("revenue>0").select("revenue", "prediction"))

In [34]:
display(predictions.select("revenue", "prediction"))

In [35]:
# Now let's compute an evaluation metric for our test dataset
from pyspark.ml.evaluation import RegressionEvaluator

# Create an RMSE evaluator using the label and predicted columns
regEval = RegressionEvaluator(predictionCol="prediction", labelCol="revenue", metricName="rmse")

# Run the evaluator on the DataFrame
rmse = regEval.evaluate(predictions)
rmse