# Pre-Processing Pipeline Stage

## Import Dependencies

In [1]:
import pandas  as pd
import numpy   as np
import seaborn as sb
import time    as ti

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.linalg         import Vectors
from pyspark.ml.feature        import OneHotEncoderEstimator, StringIndexer, VectorAssembler, Imputer, StandardScaler
from pyspark.ml                import Pipeline

from pyspark.sql               import SparkSession, SQLContext
from pyspark.sql.types         import StructType, StructField, StringType, FloatType, BooleanType, IntegerType, DoubleType, BinaryType
from pyspark.sql.functions     import countDistinct, col

from os.path                   import exists

## Setup Spark Application

In [2]:
ss = SparkSession.builder\
     .config('spark.executor.memory',       '4G')\
     .config('spark.driver.memory',        '40G')\
     .config('spark.driver.maxResultSize', '10G')\
     .getOrCreate()
sc = ss.sparkContext
sq = SQLContext(sc)

## Read Training Data

In [3]:
%%time

if  not exists('../data/train.parquet.original'):

    ds = StructType([StructField(f'ctr'    ,  FloatType(), True)                      ] + \
                    [StructField(f'i{f:02}',  FloatType(), True) for f in range(1, 14)] + \
                    [StructField(f's{f:02}', StringType(), True) for f in range(1, 27)])

    df = sq.read.format('csv')\
                .options(header = 'true', delimiter = '\t')\
                .schema(ds)\
                .load('../data/train.txt')

    df.write.parquet('../data/train.parquet.original')

df = ss.read.parquet('../data/train.parquet.original')
tf = df.sample(fraction = 0.01, seed = 2019)

num_columns = [f for f in df.columns if 's' in f]
cat_columns = [f for f in df.columns if 's' in f]
cat_indexes = [f'{f}_index' for f in cat_columns]

CPU times: user 4.32 ms, sys: 647 µs, total: 4.96 ms
Wall time: 2.26 s


## Split Dev and Test; SaveOut

## Index Categorical Features

In [4]:
%%time

if  not exists('../data/train.parquet.indexed'):

    stages   = [StringIndexer(inputCol = f, outputCol= f'{f}_index').setHandleInvalid('keep') for f in cat_columns]
    pipeline = Pipeline(stages = stages)
    model    = pipeline.fit(df)
    df       = model.transform(df)

    """
    for c in cat_indexes:
        df = df.withColumn(c, col(c).cast('float'))
    
    df = df.select(['ctr'] + num_columns + cat_indexes)
    """
    df.write.parquet('../data/train.parquet.indexed')
    
df = ss.read.parquet('../data/train.parquet.indexed')
tf = df.sample(fraction = 0.01, seed = 2019)

Py4JJavaError: An error occurred while calling o1674.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 55 in stage 53.0 failed 1 times, most recent failure: Lost task 55.0 in stage 53.0 (TID 4372, localhost, executor driver): ExecutorLostFailure (executor driver exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 336930 ms
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 33 more


In [None]:
xf = df

In [None]:
for c in [f'{f}_index' for f in cat_features]:
    df = df.withColumn(c, col(c).cast('float'))

In [None]:
xf.describe()

In [None]:
df.write.parquet('../data/train.parquet')

In [None]:
df = ss.read.parquet('../data/train.parquet')

In [None]:
cat_distinct = {}

for f in cat_features:
    s  = ti.time()
    cat_distinct[f] = df.agg(countDistinct(f)).collect()[0][0]
    print( f'{f} : {cat_distinct[f]:>8} : {ti.time() - s:.3f}' )

print( f'sum : {sum(cat_distinct.values()):>8}' )

In [None]:
cat_distinct = {}

for f in cat_features:
    s  = ti.time()
    cat_distinct[f] = df.agg(countDistinct(f)).collect()[0][0]
    print( f'{f} : {cat_distinct[f]:>8} : {ti.time() - s:.3f}' )

print( f'sum : {sum(cat_distinct.values()):>8}' )

In [None]:
%%time
imputer = Imputer(inputCols = num_features, outputCols = num_features)
model   = imputer.fit(df)
xf      = model.transform(df)
xf.describe(num_features).toPandas().T

In [None]:
%%time
num_assembler = VectorAssembler(inputCols = num_features, outputCol = 'num_features')
cat_assembler = VectorAssembler(inputCols = cat_features, outputCol = 'cat_features')
xf            = num_assembler.transform(xf)
#xf            = cat_assembler.transform(xf)

xf.describe(num_features).toPandas().T

In [None]:
%%time
scaler = StandardScaler(inputCol = 'num_features', outputCol = 'num_features_scaled', withStd = True, withMean = False)
model  = scaler.fit(xf)
xf_sd  = model.transform(xf)

xf_sd.describe().toPandas().T

In [None]:
%%time
xf_sd.select('num_features_scaled').head( 10 )

In [None]:
%%time
xf_sd.select(num_features).head( 10 )

In [None]:
stages  = []
indexes = []

for cat in cat_features :
    cat_index = cat + '_index'
    stages   += [StringIndexer(inputCol = cat, outputCol= cat_index).setHandleInvalid('keep')]
    indexes.append(cat_index)

stages += [StringIndexer(inputCol = 'ctr', outputCol = 'label')]
stages += [VectorAssembler(inputCols = indexes + num_features, outputCol ='features')]

process = Pipeline(stages = stages)

In [None]:
model   = process.fit(df)

In [None]:
%%time
xf = model.transform(tf)