In [1]:
# encoding: utf-8

############################################################
# Prepare training set using over/undersampling techniques #
############################################################

In [2]:
import numpy as np
import pandas as pd
from scipy import stats

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
from pyspark.conf import SparkConf

import multiprocessing

In [3]:
# create SparkContext and SparkSession to process files
cores = multiprocessing.cpu_count() - 2

conf = SparkConf()
conf.setMaster('local[' + str(cores) + ']') \
    .setAppName('example') \
    .set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .set('spark.driver.memory', '15g')

sc = SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
# random seed
my_seed = 1980

In [5]:
# csv files' path
my_path = '../data/'

In [6]:
# app customers
app = spark.read.format('csv') \
                .option('sep', ',') \
                .option('header', 'True') \
                .option('inferSchema', 'True') \
                .load(my_path + 'c5_e1_1_app.csv')

In [7]:
cols = ['label', 'v2', 'v3', 'v10', 'v12', 'v13', 'v16', 'v17', 'v18', 'v21', 'v29', 'v32', 
        'v33', 'v34', 'v37', 'v38', 'v50', 'v51', 'v57', 'v58', 'v61']



In [8]:
app = app.select(cols)

In [9]:
# separate test set
train, test = app.randomSplit([.7, .3], my_seed)
train.cache()
app.unpersist()

DataFrame[label: int, v2: int, v3: int, v10: int, v12: int, v13: int, v16: int, v17: int, v18: int, v21: int, v29: double, v32: double, v33: double, v34: double, v37: double, v38: double, v50: double, v51: double, v57: double, v58: double, v61: int]

In [10]:
train.repartition(1).write.csv(my_path + 'c5_e3_1_train', header=True)

In [11]:
test.repartition(1).write.csv(my_path + 'c5_e3_1_test', header=True)

In [12]:
total_count = train.count()
min_count = train.where('label = 1').count()
maj_count = total_count - min_count

In [13]:
train_under = train.where('label = 1') \
                   .union(train.where('label = 0') \
                              .sample(False, min_count/np.float(total_count), my_seed))

In [14]:
train_under.repartition(1).write.csv(my_path + 'c5_e3_1_train_under', header=True)

In [15]:
train_over = train.where('label = 0') \
                  .union(train.where('label = 1') \
                             .sample(True, maj_count/np.float(min_count), my_seed))

In [16]:
train_over.repartition(1).write.csv(my_path + 'c5_e3_1_train_over', header=True)

In [17]:
sc.stop()