In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')
import pyspark
import numpy as np
from pyspark.sql.functions import isnull, isnan
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName('Preparation').getOrCreate()

In [2]:
age= spark.read.csv('age_gender_bkts.csv',header=True,inferSchema=True)
country= spark.read.csv('countries.csv',header=True,inferSchema=True)
session= spark.read.csv('sessions.csv',header=True,inferSchema=True)
train_users = spark.read.csv('train_users_2.csv',header=True,inferSchema=True)
test_users = spark.read.csv('test_users.csv',header=True,inferSchema=True)

In [3]:
age.drop('year').dtypes
country.drop('lat_destination','lng_destination').dtypes

[('country_destination', 'string'),
 ('distance_km', 'double'),
 ('destination_km2', 'double'),
 ('destination_language ', 'string'),
 ('language_levenshtein_distance', 'double')]

In [4]:
session.select("action","action_type","action_detail","device_type").na.fill("NaN").show()
session.select("secs_elapsed").na.fill(19402).show(5)

+--------------------+-----------+--------------------+---------------+
|              action|action_type|       action_detail|    device_type|
+--------------------+-----------+--------------------+---------------+
|              lookup|        NaN|                 NaN|Windows Desktop|
|      search_results|      click| view_search_results|Windows Desktop|
|              lookup|        NaN|                 NaN|Windows Desktop|
|      search_results|      click| view_search_results|Windows Desktop|
|              lookup|        NaN|                 NaN|Windows Desktop|
|      search_results|      click| view_search_results|Windows Desktop|
|              lookup|        NaN|                 NaN|Windows Desktop|
|         personalize|       data|wishlist_content_...|Windows Desktop|
|               index|       view| view_search_results|Windows Desktop|
|              lookup|        NaN|                 NaN|Windows Desktop|
|      search_results|      click| view_search_results|Windows D

In [5]:
train_users = train_users.withColumn('gender', regexp_replace('gender', 'OTHER', 'NaN'))
train_users = train_users.withColumn('gender', regexp_replace('gender', '-unknown-', 'NaN'))
test_users = test_users.withColumn('gender', regexp_replace('gender', 'OTHER', 'NaN'))
test_users = test_users.withColumn('gender', regexp_replace('gender', '-unknown-', 'NaN'))
test_users.groupBy('gender').count().orderBy('count').show()

+------+-----+
|gender|count|
+------+-----+
|  MALE|13769|
|FEMALE|14483|
|   NaN|33844|
+------+-----+



In [6]:
train_users= train_users.withColumn('age', regexp_replace('age', 'age>114', '-1'))
train_users= train_users.withColumn('age', regexp_replace('age', 'age<18', '-1'))
train_users.select('age').describe().show()

+-------+------------------+
|summary|               age|
+-------+------------------+
|  count|            125461|
|   mean| 49.66833517985669|
| stddev|155.66661183021571|
|    min|               1.0|
|    max|              99.0|
+-------+------------------+



train_users= train_users.withColumn('date_first_active', train_users["timestamp_first_active"] / 1000000)
train_users.show()

In [7]:
train_users.date_account_created= train_users.date_account_created.astype('date')
train_users.date_first_booking= train_users.date_first_booking.astype('date')
#train_users.date_first_active = train_users.date_first_active.astype('date')
train_users.age= train_users.age.astype('int')

In [8]:
train_users.na.fill("NaN")
test_users.na.fill("NaN")

DataFrame[id: string, date_account_created: timestamp, timestamp_first_active: bigint, date_first_booking: string, gender: string, age: double, signup_method: string, signup_flow: int, language: string, affiliate_channel: string, affiliate_provider: string, first_affiliate_tracked: string, signup_app: string, first_device_type: string, first_browser: string]

In [9]:
session_new=session.groupBy("user_id").sum("secs_elapsed")
new_train=train_users.join(session_new,train_users.id== session_new.user_id)
new_train.drop('user_id')
new_train.dtypes
#new_train.describe().show()

[('id', 'string'),
 ('date_account_created', 'timestamp'),
 ('timestamp_first_active', 'bigint'),
 ('date_first_booking', 'timestamp'),
 ('gender', 'string'),
 ('age', 'string'),
 ('signup_method', 'string'),
 ('signup_flow', 'int'),
 ('language', 'string'),
 ('affiliate_channel', 'string'),
 ('affiliate_provider', 'string'),
 ('first_affiliate_tracked', 'string'),
 ('signup_app', 'string'),
 ('first_device_type', 'string'),
 ('first_browser', 'string'),
 ('country_destination', 'string'),
 ('user_id', 'string'),
 ('sum(secs_elapsed)', 'double')]

In [10]:
new_test=test_users.join(session_new,test_users.id== session_new.user_id)
new_test=new_test.drop('user_id')
#new_test.dtypes


Feature selection
Delete the timestamp_first_active|date_first_booking\id

In [11]:
new_train=new_train.select('gender','age','signup_method','signup_flow','language','affiliate_channel','affiliate_provider','first_affiliate_tracked','signup_app','first_device_type','first_browser','sum(secs_elapsed)','country_destination')
cols=new_train.columns
new_train.printSchema()

root
 |-- gender: string (nullable = true)
 |-- age: string (nullable = true)
 |-- signup_method: string (nullable = true)
 |-- signup_flow: integer (nullable = true)
 |-- language: string (nullable = true)
 |-- affiliate_channel: string (nullable = true)
 |-- affiliate_provider: string (nullable = true)
 |-- first_affiliate_tracked: string (nullable = true)
 |-- signup_app: string (nullable = true)
 |-- first_device_type: string (nullable = true)
 |-- first_browser: string (nullable = true)
 |-- sum(secs_elapsed): double (nullable = true)
 |-- country_destination: string (nullable = true)



In [19]:
# Import the relevant packages.
from pyspark.ml.feature import (VectorAssembler,VectorIndexer,OneHotEncoder,StringIndexer)

# First create a string indexer which converts every string into a number, such as male = 0 and female = 1.
# A number will be assigned to every category in the column.
gender_indexer=StringIndexer(inputCol='gender',outputCol='genderIndex')
signup_method_indexer=StringIndexer(inputCol='signup_method',outputCol='signup_methodIndex')
language_indexer=StringIndexer(inputCol='language',outputCol='languageIndex')
affiliate_channel_indexer=StringIndexer(inputCol='affiliate_channel',outputCol='affiliate_channelIndex')
affiliate_provider_indexer=StringIndexer(inputCol='affiliate_provider',outputCol='affiliate_providerIndex')
first_affiliate_tracked_indexer=StringIndexer(inputCol='first_affiliate_tracked',outputCol='first_affiliate_trackedIndex')
signup_app_indexer=StringIndexer(inputCol='signup_app',outputCol='signup_appIndex')
first_device_type_indexer=StringIndexer(inputCol='first_device_type',outputCol='first_device_typeIndex')
first_browser_indexer=StringIndexer(inputCol='first_browser',outputCol='first_browserIndex')
country_destination_indexer=StringIndexer(inputCol='country_destination',outputCol='country_destinationIndex')
# Now we can one hot encode these numbers. This converts the various outputs into a single vector.
# Multiple columns are collapsed into one. 
# This makes it easier to process when you have multiple classes.
gender_encoder = OneHotEncoder(inputCol='genderIndex',outputCol='genderVec')
signup_method_encoder = OneHotEncoder(inputCol='signup_methodIndex',outputCol='signup_methodVec')
language_encoder = OneHotEncoder(inputCol='languageIndex',outputCol='languageVec')
affiliate_channel_encoder = OneHotEncoder(inputCol='affiliate_channelIndex',outputCol='affiliate_channelVec')
affiliate_provider_encoder = OneHotEncoder(inputCol='affiliate_providerIndex',outputCol='affiliate_providerVec')
first_affiliate_tracked_encoder = OneHotEncoder(inputCol='first_affiliate_trackedIndex',outputCol='first_affiliate_trackedVec')
signup_app_encoder = OneHotEncoder(inputCol='signup_appIndex',outputCol='signup_appVec')
first_device_encoder = OneHotEncoder(inputCol='first_deviceIndex',outputCol='first_deviceVec')
first_browser_encoder = OneHotEncoder(inputCol='first_browserIndex',outputCol='first_browserVec')

# And finally, using vector assembler to turn all of these columns into one column (named features).
assembler = VectorAssembler(inputCols=['genderVec','signup_methodVec','languageVec','affiliate_channelVec',
                                       'affiliate_providerVec','first_affiliate_trackedVec','signup_appVec','first_deviceVec','first_browserVec',
                                       'age','signup_flow', 'sum(secs_elapsed)'], outputCol="features")





In [None]:
from pyspark.ml import Pipeline

# Then go through our steps. It's essentially sequential to the above.
pipeline = Pipeline(stages=[gender_indexer, signup_method_indexer, language_indexer, affiliate_channel_indexer,
                            affiliate_provider_indexer, first_affiliate_tracked_indexer, signup_app_indexer, first_device_type_indexer,
                            first_browser_indexer,country_destination_indexer,gender_encoder,signup_method_encoder,language_encoder,
                            affiliate_channel_encoder,affiliate_provider_encoder,first_affiliate_tracked_encoder,signup_app_encoder,
                            first_device_encoder,first_browser_encoder, assembler])

# Now that we've got a number of steps, let's apply it to the DataFrame.
pipeline_model = pipeline.fit(new_train)

# Incorporate results into a new DataFrame.
pipe_df = pipeline_model.transform(new_train)

# Remove all variables other than features and label. 
pipe_df = pipe_df.select('label', 'features')

In [None]:
from pyspark.ml.classification import LogisticRegression

# Split our data. Note that the new DataFrame is being used.
train_data, test_data = pipe_df.randomSplit([0.7,0.3])
print("Training Dataset Count: " + str(train_data.count()))
print("Test Dataset Count: " + str(test_data.count()))

# Instantiate the model.
lr_model = LogisticRegression(featuresCol='features',labelCol='label')

# Fit the model.
lr_model = lr_model.fit(train_data)

# And evaluate the model using the test data.
results = lr_model.transform(test_data)

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Visualising the coefficients. Sort from lowest to highest.
beta = np.sort(lr_model.coefficients)

# Plot the data.
plt.plot(beta)

# Add a label to the data.
plt.ylabel('Beta Coefficients')

# Show the graph. 
plt.show()

In [None]:
# Let's get a summary of the data.
training_summary = lr_model.summary

# Convert the DataFrame to a Pandas DataFrame.
ROC = training_summary.roc.toPandas()

# Plot the true positive and false positive rates.
plt.plot(ROC['FPR'],ROC['TPR'])

# Define the labels.
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.title('ROC Curve')
plt.show()

# Print the AUC statistic. 
print('Area Under the Curve: ' + str(training_summary.areaUnderROC))

In [None]:
# Convert DataFrame to Pandas DataFrame.
pr = training_summary.pr.toPandas()

# Plot model recall and precision.
plt.plot(pr['recall'],pr['precision'])

# Define the labels and show the graph. 
plt.ylabel('Precision')
plt.xlabel('Recall')
plt.show()

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.feature import OneHotEncoderEstimator

categoricalColumns = ['gender','signup_method','language','affiliate_channel','affiliate_provider','first_affiliate_tracked','signup_app','first_device_type','first_browser']
stages = []
inputCols=[]
outputCols=[]
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'country_destination', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'signup_flow', 'sum(secs_elapsed)']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

from pyspark.ml.feature import RFormula

dataset = spark.createDataFrame(
    [(7, "US", 18, 1.0),
     (8, "CA", 12, 0.0),
     (9, "NZ", 15, 0.0)],
    ["id", "country", "hour", "clicked"])

formula = RFormula(
    formula="clicked ~ country + hour",
    featuresCol="features",
    labelCol="label")

output = formula.fit(new_train).transform(new_train)
output.select("features", "label").show()