In [6]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import pandas as pd
import sys
import os
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import to_timestamp
import time
from pyspark.sql import functions as fn

import datetime
from pyspark.sql.functions import year, month, dayofmonth, hour, minute, second
from pyspark.mllib.stat import Statistics
import pandas as pd

In [7]:
spark = SparkSession.builder.appName('data-cleaning').\
                        config("spark.executor.instances", '3').\
                        config("spark.executor.memory", '40g').\
                        config('spark.executor.cores', '5').\
                        config('spark.cores.max', '5').appName('data_clean').\
                        getOrCreate()

In [8]:
sqlContext = SQLContext(spark.sparkContext)

In [9]:
train_data = spark.read.format('csv').option('header', 'true').load('../modeled_data/spark_data3.csv')

In [10]:
train_data.count()

77157

In [11]:
train_data = train_data.drop('latitude18').drop('longitude19').drop('_c0').withColumnRenamed('latitude5', 'latitude').withColumnRenamed('longitude4', 'longitude')
train_data.limit(5).toPandas()

Unnamed: 0,device_id,label_id,app_id,event_id,gender,age,group,timestamp,longitude,latitude,...,town,country,year,month,day,hour,minute,seconds,time_of_day,age_group
0,-6754902882206380496,704,-145658454112781034,35963,M,33,M32-38,2016-05-01 08:55:39,106.56,29.53,...,Chongqing,China,2016,5,1,8,55,39,morning,26-35
1,5416618857406916680,209,2460654806659045896,1773101,M,30,M29-31,2016-05-05 12:08:32,106.56,29.53,...,Chongqing,China,2016,5,5,12,8,32,morning,26-35
2,5416618857406916680,209,2460654806659045896,1773101,M,30,M29-31,2016-05-05 12:08:32,106.56,29.53,...,Chongqing,China,2016,5,5,12,8,32,morning,26-35
3,1698428484639625968,303,-145658454112781034,37174,M,27,M27-28,2016-05-06 07:57:01,116.21,34.26,...,Henan,China,2016,5,6,7,57,1,morning,26-35
4,1698428484639625968,303,-145658454112781034,37174,M,27,M27-28,2016-05-06 07:57:01,116.21,34.26,...,Henan,China,2016,5,6,7,57,1,morning,26-35


In [12]:
training, test = train_data.randomSplit([0.8, 0.2], 0)

In [13]:
training.count()

61677

In [14]:
test.count()

15480

In [18]:
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

float_columns = ['device_id', 'app_id', 'label_id', 'event_id', 'longitude', 'latitude']
int_columns = ['is_active', 'age', 'is_installed', 'day', 'hour', 'minute', 'seconds']
string_columns = ['gender', 'group', 'category', 'phone_brand', 'device_model', 'town', 'country']

training = train_data.select(*(col(c).cast("float").alias(c) for c in float_columns), \
                                                 *(col(c).cast("int").alias(c) for c in int_columns), \
                                                 *(col(c).alias(c) for c in string_columns))
test = train_data.select(*(col(c).cast("float").alias(c) for c in float_columns), \
                                                 *(col(c).cast("int").alias(c) for c in int_columns), \
                                                 *(col(c).alias(c) for c in string_columns))

In [19]:
training.printSchema()

root
 |-- device_id: float (nullable = true)
 |-- app_id: float (nullable = true)
 |-- label_id: float (nullable = true)
 |-- event_id: float (nullable = true)
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- is_active: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- is_installed: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- seconds: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- group: string (nullable = true)
 |-- category: string (nullable = true)
 |-- phone_brand: string (nullable = true)
 |-- device_model: string (nullable = true)
 |-- town: string (nullable = true)
 |-- country: string (nullable = true)



In [21]:
#Correlation
indexer = feature.StringIndexer(inputCol="gender", outputCol="gender_label")
category = feature.StringIndexer(inputCol='category', outputCol='category_encoded')
brand = feature.StringIndexer(inputCol='phone_brand', outputCol='phone_brand_encoded')
group = feature.StringIndexer(inputCol='group', outputCol='group_encoded')
is_active = feature.StringIndexer(inputCol='is_active', outputCol='is_active_encoded')
device_model = feature.StringIndexer(inputCol='device_model', outputCol='device_model_encoded')
town_model = feature.StringIndexer(inputCol='town', outputCol='town_encoded')
country_model = feature.StringIndexer(inputCol='country', outputCol='country_encoded')

correlation_pipeline = Pipeline(stages=[indexer, category, brand, group, is_active, device_model, town_model, country_model])
encoded_data = correlation_pipeline.fit(training).transform(training)

encoded_data = encoded_data['gender_label','device_id', 'app_id', 'category_encoded', 'phone_brand_encoded', 'group_encoded', 'is_active_encoded', 'device_model_encoded',\
                           'town_encoded', 'country_encoded', 'label_id', 'event_id', 'longitude', 'latitude', 'day', 'hour', 'minute']
col_names = encoded_data.columns
features = encoded_data.rdd.map(lambda row: row[0:])
features
corr_mat=Statistics.corr(features, method="pearson")
corr_df = pd.DataFrame(corr_mat)

corr_df.columns = col_names

corr = corr_df.corr()
corr.style.background_gradient(cmap='coolwarm')

Unnamed: 0,gender_label,device_id,app_id,category_encoded,phone_brand_encoded,group_encoded,is_active_encoded,device_model_encoded,town_encoded,country_encoded,label_id,event_id,longitude,latitude,day,hour,minute
gender_label,1.0,-0.0258661,-0.0765893,-0.148078,-0.146786,0.919765,0.0803768,-0.147658,-0.0625919,0.0284544,0.10037,-0.0792575,-0.0549348,-0.0766222,-0.0480448,-0.0836153,-0.120349
device_id,-0.0258661,1.0,-0.0310098,-0.00179136,-0.165642,-0.0487753,-0.0778395,-0.163258,-0.17118,0.0385876,-0.0453874,-0.0579293,-0.0447738,-0.088855,-0.0433448,-0.0273423,-0.0606061
app_id,-0.0765893,-0.0310098,1.0,-0.0908934,-0.0991888,-0.0699078,0.123239,-0.0922381,-0.101638,-0.0151537,-0.0834778,-0.0758088,-0.012333,-0.0538789,-0.0513495,-0.0467462,-0.0682035
category_encoded,-0.148078,-0.00179136,-0.0908934,1.0,-0.0400255,-0.0807549,-0.0486989,-0.0640188,-0.0209079,-0.0120656,-0.764392,-0.0273412,0.00849255,0.0202918,-0.045524,-0.01372,-0.00985871
phone_brand_encoded,-0.146786,-0.165642,-0.0991888,-0.0400255,1.0,-0.073672,-0.00036408,0.808499,-0.0499345,0.179146,-0.0391763,-0.0759901,-0.273286,-0.284511,-0.119586,-0.0979594,-0.0763347
group_encoded,0.919765,-0.0487753,-0.0699078,-0.0807549,-0.073672,1.0,0.0677084,-0.1077,-0.0609363,-0.0160901,0.0149373,-0.0780187,-0.0217275,-0.0693358,-0.0740593,-0.0434463,-0.113863
is_active_encoded,0.0803768,-0.0778395,0.123239,-0.0486989,-0.00036408,0.0677084,1.0,0.0111316,-0.0744969,0.0408949,-0.12422,-0.101054,-0.0833675,-0.100383,-0.0389588,0.0389417,-0.169219
device_model_encoded,-0.147658,-0.163258,-0.0922381,-0.0640188,0.808499,-0.1077,0.0111316,1.0,0.000306828,0.103089,-0.0364941,-0.0725456,-0.188982,-0.196144,-0.119065,-0.105101,-0.115671
town_encoded,-0.0625919,-0.17118,-0.101638,-0.0209079,-0.0499345,-0.0609363,-0.0744969,0.000306828,1.0,-0.251059,-0.0421473,-0.074953,0.222913,0.451437,-0.0730673,-0.120973,-0.0922845
country_encoded,0.0284544,0.0385876,-0.0151537,-0.0120656,0.179146,-0.0160901,0.0408949,0.103089,-0.251059,1.0,0.022348,0.0176476,-0.961669,-0.8668,0.0670137,0.0399594,-0.043215


In [42]:
from pyspark.ml import Pipeline
from pyspark.ml import feature
from pyspark.ml import classification
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, \
    RegressionEvaluator

In [43]:
#Logistic
indexer = feature.StringIndexer(inputCol="gender", outputCol="gender_label")
category = feature.StringIndexer(inputCol='category', outputCol='category_encoded')
brand = feature.StringIndexer(inputCol='phone_brand', outputCol='phone_brand_encoded')
group = feature.StringIndexer(inputCol='group', outputCol='group_encoded')
is_active = feature.StringIndexer(inputCol='is_active', outputCol='is_active_encoded')
device_model = feature.StringIndexer(inputCol='device_model', outputCol='device_model_encoded')
town_model = feature.StringIndexer(inputCol='town', outputCol='town_encoded')
country_model = feature.StringIndexer(inputCol='country', outputCol='country_encoded')
category_wide = feature.StringIndexer(inputCol='category_mapped', outputCol='category_wide_encoded')

vector_assembler = feature.VectorAssembler(inputCols=['device_id', 'app_id', 'label_id', 'event_id', 'longitude', 'latitude', 'is_active', 'age', 
                                                            'day', 'hour', 'minute', 'second', 'category_encoded', 'category_wide_encoded', 'phone_brand_encoded',\
                                                     'is_active_encoded', 'device_model_encoded'],
                                        outputCol='features')
logistic = classification.LogisticRegression(labelCol='gender_label', featuresCol='features')

lr_model = Pipeline(stages=[indexer, category,category_wide, brand, group, is_active, device_model, town_model, country_model, vector_assembler, logistic])

predicted = lr_model.fit(training).transform(test)


evaluator = BinaryClassificationEvaluator(labelCol='gender_label')
evaluator.evaluate(predicted)

0.5991459815762952

In [44]:
#Random Forest
from pyspark.ml.evaluation import BinaryClassificationEvaluator, \
    MulticlassClassificationEvaluator, \
    RegressionEvaluator

indexer = feature.StringIndexer(inputCol="gender", outputCol="gender_label")
category = feature.StringIndexer(inputCol='category', outputCol='category_encoded')
brand = feature.StringIndexer(inputCol='phone_brand', outputCol='phone_brand_encoded')
group = feature.StringIndexer(inputCol='group', outputCol='group_encoded')
is_active = feature.StringIndexer(inputCol='is_active', outputCol='is_active_encoded')
device_model = feature.StringIndexer(inputCol='device_model', outputCol='device_model_encoded')
town_model = feature.StringIndexer(inputCol='town', outputCol='town_encoded')
country_model = feature.StringIndexer(inputCol='country', outputCol='country_encoded')
category_wide = feature.StringIndexer(inputCol='category_mapped', outputCol='category_wide_encoded')

vector_assembler = feature.VectorAssembler(inputCols=['device_id', 'app_id', 'label_id', 'event_id', 'longitude', 'latitude', 'is_active', 'age', 
                                                            'day', 'hour', 'minute', 'second', 'category_wide_encoded', 'phone_brand_encoded',\
                                                     'is_active_encoded'],
                                        outputCol='features')

rf = classification.RandomForestClassifier(labelCol='gender_label', featuresCol='features')
random_forest_pipeline = Pipeline(stages=[indexer, category_wide, brand, group, is_active, device_model, country_model, vector_assembler, rf])
rf_prediction = random_forest_pipeline.fit(training).transform(test)

evaluator = BinaryClassificationEvaluator(labelCol='gender_label')
evaluator.evaluate(rf_prediction)

0.7256918385442579