In [1]:
import findspark
findspark.init() #Por defecto findspark mira en la variable de entorno del sistema SPARK_HOME

import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

import spark_utils as su

In [2]:
spark = su.start_spark()

In [3]:
train_df = spark['spark'].read.csv('file:/C:/train.csv', 
               header = True, 
               mode = 'DROPMALFORMED', 
               schema = StructType([StructField('ip', IntegerType()), 
                                    StructField('app', IntegerType()), 
                                    StructField('device', IntegerType()),
                                    StructField('os', IntegerType()),
                                    StructField('channel', IntegerType()),
                                    StructField('click_time', TimestampType(), True),
                                    StructField('attributed_time', TimestampType(), True),
                                    StructField('is_attributed', IntegerType())
                                   ]),
               timestampFormat='yyyy-MM-dd HH:mm:ss'
              )

In [4]:
train_df = su.do_transformations(train_df, spark['spark'])

train_df.persist(pyspark.StorageLevel.MEMORY_AND_DISK_SER)

train_df.show(n = 1, truncate = False)

+-------+---+---+------+-----+-------------------+---------------+---------------+--------------------+-------------+---------------+-----------------+---------------+----------+-------------------+------+-----------------+-------+------------------+-----------+--------------------+
|channel|app|os |device|ip   |click_time         |click_time_wday|attributed_time|attributed_time_wday|is_attributed|click_time_hour|n_previous_clicks|click_time_diff|device_cat|device_custom_score|os_cat|os_custom_score  |app_cat|app_custom_score  |channel_cat|channel_custom_score|
+-------+---+---+------+-----+-------------------+---------------+---------------+--------------------+-------------+---------------+-----------------+---------------+----------+-------------------+------+-----------------+-------+------------------+-----------+--------------------+
|463    |14 |13 |1     |52557|2017-11-08 22:19:08|Wed            |null           |null                |0            |22             |772            

In [5]:
for_modelling = train_df.drop('app').drop('device').drop('os').drop('ip').drop('channel').drop('click_time').drop('attributed_time').drop('attributed_time_wday')

In [67]:
train_df.select('is_attributed').groupBy('is_attributed').count().show()

+-------------+---------+
|is_attributed|    count|
+-------------+---------+
|            1|   456846|
|            0|184447044|
+-------------+---------+



In [68]:
print('% 1s')
print((456846*100)/184903890)
print('% 0s')
print((184447044*100)/184903890)

% 1s
0.2470721410998979
% 0s
99.75292785890011


De primeras, coger todos los 1s y un número de 0s más elevado (por ejemplo, 0.80%). Para train forzaremos que haya igual número de 0s que de 1s, para train-dev, dev y test, dará igual, siempre y cuando tengan la misma distribución de valores las variables independientes.

In [6]:
# 184,903,890 rows

custom_seed = 16121993

for_modelling_balanced1 = for_modelling.sampleBy('is_attributed', {1: 1, 0: 0.0080}, seed=custom_seed)
for_modelling_balanced2 = for_modelling.sampleBy('is_attributed', {1: 1, 0: 0.0080}, seed=1)
for_modelling_balanced3 = for_modelling.sampleBy('is_attributed', {1: 1, 0: 0.0080}, seed=2)
for_modelling_balanced4 = for_modelling.sampleBy('is_attributed', {1: 1, 0: 0.0080}, seed=3)

In [7]:
for_modelling_balanced1.toPandas().to_csv('./train_data_all_changes_made_balanced1.csv', index = False, na_rep = 'null')
for_modelling_balanced2.toPandas().to_csv('./train_data_all_changes_made_balanced2.csv', index = False, na_rep = 'null')
for_modelling_balanced3.toPandas().to_csv('./train_data_all_changes_made_balanced3.csv', index = False, na_rep = 'null')
for_modelling_balanced4.toPandas().to_csv('./train_data_all_changes_made_balanced4.csv', index = False, na_rep = 'null')

Parar SparkContext

In [12]:
spark['sc'].stop()