In [0]:
#free version didn't support pd.read_csv from dbfs directly.
import pickle
import os, re, time
#import pandas as pd
from pyspark.sql import SparkSession
from collections import defaultdict
start_time=time.time()

spark = SparkSession.builder \
    .master("local[1]") \
    .appName("SparkByExamples.com") \
    .getOrCreate()

from_path="dbfs:/FileStore/tables/"

#files = os.listdir(from_path)
files=['POS_CASH_balance', 'application_test', 'application_train', 'bureau', 'bureau_balance', 'credit_card_balance', 'installments_payments', 'previous_application']

In [0]:

#test
file="dbfs:/FileStore/tables/tb_application_train.parquet"
table = spark.read.option("header", "true").option("inferschema", "true").parquet(file)


In [0]:

def table_fillna(table):
    missing_data_fill={}
    all_cols=[]
    data_types = defaultdict(list)
    for entry in table.schema.fields:
        data_types[str(entry.dataType)].append(entry.name)
    for cols in list(data_types.values()):
        all_cols.extend(cols)
    string_type = [var for var in data_types["StringType"]]
    number_type = [var for var in all_cols if var not in string_type]

    for var in string_type:
        #if table.filter(table[var].isNull()).count():
        mode_table=table.groupby(var).count().orderBy("count", ascending=False).head(2)
        mode=mode_table[0][0]
        if not mode:
            mode=mode_table[1][0]
        table=table.fillna(mode, subset=[var])
        #missing_data_fill[var] = mode
    for var in number_type:
        #if table.filter(table[var].isNull()).count():
        avg_table=table.agg({var: 'avg'})
        avg=avg_table.first()[0]
        table=table.fillna(avg, subset=[var])
        #missing_data_fill[var] = avg
    return table

In [0]:

names=locals()
for fileName in files:
  fileName='tb_'+fileName+'.parquet'
  rawData_file=from_path+fileName
  match=re.findall('tb_(.*).parquet', fileName)
  tbname='tb_'+match[0]
  filled_tbname='filled_'+tbname
  filled_tbname_path=from_path+filled_tbname+'.parquet'
  filled_table=spark.read.option("header", "true").option("inferschema", "true").parquet(filled_tbname_path)
  names[tbname]=filled_table


In [0]:

def join_table(tb_left, tb_right, key, i):
    cols_left=[entry.name for entry in tb_left.schema.fields if entry.name!=key]
    cols_right=[entry.name for entry in tb_right.schema.fields if entry.name!=key]
    cols_rep=[col for col in cols_left if col in cols_right]
    for col in cols_rep:
        tb_right=tb_right.withColumnRenamed(col,col+str(i)+'_rep')
    return tb_left.join(tb_right, on=[key], how="outer")

In [0]:
#test
tb_joined=table_fillna(table)

In [0]:
from collections import defaultdict

data_types = defaultdict(list)
for entry in tb_joined.schema.fields:
    data_types[str(entry.dataType)].append(entry.name)

strings_used = [var for var in data_types["StringType"]]

all_cols=[]
for cols in list(data_types.values()):
    all_cols.extend(cols)

missing_data_fill = {}
for var in all_cols:
    missing_data_fill[var] = 0

data_set = tb_joined.fillna(missing_data_fill)

In [0]:
#another approach to vector the features
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.sql.functions import col
def get_dummy(df, indexCol, categoricalCols,continuousCols, labelCol, dropLast=False):
    indexers=[StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols]
    encoders=[OneHotEncoder(inputCol=indexer.getOutputCol(),outputCol="{0}_encoded".format(indexer.getOutputCol()), dropLast=dropLast) for indexer in indexers]
    assembler=VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders]+continuousCols, outputCol='features')
    pipeline=Pipeline(stages=indexers+encoders+[assembler])
    model=pipeline.fit(df)
    data=model.transform(df)
    data=data.withColumn('label', col(labelCol))
    if indexCol:
        return data.select(indexCol, 'features','label')
    else:
        return data.select('features','label')
        


In [0]:
indexCol=[]
categoricalCols=[var for var in data_types["StringType"]]
continuousCols=[var for var in all_cols if var not in categoricalCols]
continuousCols.remove("TARGET")
labelCol="TARGET"
data=get_dummy(data_set,indexCol,categoricalCols,continuousCols, labelCol, dropLast=False)

from pyspark.ml.classification import RandomForestClassifier
(train_set, cv_set, test_set) = data.randomSplit([0.7,0.15,0.15], seed=0)
rf = RandomForestClassifier(labelCol = "label", featuresCol = "features", numTrees = 20)
rf_model = rf.fit(train_set)


In [0]:
data.registerTempTable('data')

In [0]:
%sql
select * from data limit 5

features,label
"Map(vectorType -> sparse, length -> 245, indices -> List(0, 2, 6, 7, 9, 16, 24, 29, 35, 41, 61, 73, 124, 128, 131, 138, 140, 142, 143, 144, 145, 146, 148, 151, 152, 153, 161, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 236, 238, 239, 240, 241, 242, 243, 244), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 282780.0, -16680.0, -1578.0, -236.0, 1.0, 1.0, 1.0, 2.0, 2.0, 9.0, 1.0, 202500.0, 270000.0, 12987.0, 270000.0, 0.035792000000000004, -10244.0, 25.0, 2.0, 0.5021298056566629, 0.7518937040185983, 0.5108529061799763, 0.11744049917464455, 0.08844221905179937, 0.977734858162356, 0.7524714325927213, 0.044620715411350646, 0.07894151232418824, 0.1497246700680518, 0.22628190703667428, 0.23189350049054103, 0.06633318417239686, 0.10077477495067379, 0.1073990193325975, 0.008808672617211482, 0.02835775707579679, 0.11423100693298313, 0.08754321224758557, 0.9770653729429176, 0.7596373227337517, 0.042553137750145686, 0.07448973610917752, 0.1451926586456919, 0.22231504747851427, 0.22805849255074206, 0.06495768445657653, 0.10564485674942355, 0.10597505043712181, 0.008076387544283676, 0.02702231968598767, 0.1178499207659259, 0.08795485466574646, 0.9777522640695075, 0.7557462721916182, 0.044595101785290164, 0.07807784431137672, 0.1492127807287016, 0.22589659009264543, 0.23162493804933085, 0.06716874904939925, 0.10195447324071563, 0.10860673604899411, 0.008651013330213089, 0.028235920597261793, 0.10254666268544145, 5.0, 5.0, -768.0, 0.006402448193930645, 0.0070002105326475985, 0.0343619356973142, 0.26739526000781977, 0.26547414959848414, 1.899974435321363))",0
"Map(vectorType -> sparse, length -> 245, indices -> List(0, 2, 5, 8, 9, 16, 24, 32, 35, 41, 60, 68, 124, 128, 131, 138, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 151, 152, 153, 158, 159, 161, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 238, 244), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 282781.0, 1.0, -16812.0, -1226.0, -365.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 13.0, 1.0, 1.0, 1.0, 112500.0, 508495.5, 21541.5, 454500.0, 0.004849, -9805.0, 12.061090818687727, 2.0, 0.3462852642065439, 0.5449275995743299, 0.4471785780453068, 0.11744049917464455, 0.08844221905179937, 0.977734858162356, 0.7524714325927213, 0.044620715411350646, 0.07894151232418824, 0.1497246700680518, 0.22628190703667428, 0.23189350049054103, 0.06633318417239686, 0.10077477495067379, 0.1073990193325975, 0.008808672617211482, 0.02835775707579679, 0.11423100693298313, 0.08754321224758557, 0.9770653729429176, 0.7596373227337517, 0.042553137750145686, 0.07448973610917752, 0.1451926586456919, 0.22231504747851427, 0.22805849255074206, 0.06495768445657653, 0.10564485674942355, 0.10597505043712181, 0.008076387544283676, 0.02702231968598767, 0.1178499207659259, 0.08795485466574646, 0.9777522640695075, 0.7557462721916182, 0.044595101785290164, 0.07807784431137672, 0.1492127807287016, 0.22589659009264543, 0.23162493804933085, 0.06716874904939925, 0.10195447324071563, 0.10860673604899411, 0.008651013330213089, 0.028235920597261793, 0.10254666268544145, -303.0, 2.0))",0
"Map(vectorType -> sparse, length -> 245, indices -> List(0, 2, 5, 8, 10, 17, 25, 30, 35, 41, 61, 68, 124, 128, 131, 138, 140, 141, 142, 143, 144, 145, 146, 148, 151, 152, 153, 161, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 282782.0, 1.0, -14665.0, -695.0, -3505.0, 1.0, 1.0, 1.0, 2.0, 2.0, 9.0, 1.0, 112500.0, 502497.0, 28188.0, 454500.0, 0.019101, -2392.0, 12.061090818687727, 2.0, 0.6013498744376716, 0.6004224563230101, 0.5108529061799763, 0.11744049917464455, 0.08844221905179937, 0.977734858162356, 0.7524714325927213, 0.044620715411350646, 0.07894151232418824, 0.1497246700680518, 0.22628190703667428, 0.23189350049054103, 0.06633318417239686, 0.10077477495067379, 0.1073990193325975, 0.008808672617211482, 0.02835775707579679, 0.11423100693298313, 0.08754321224758557, 0.9770653729429176, 0.7596373227337517, 0.042553137750145686, 0.07448973610917752, 0.1451926586456919, 0.22231504747851427, 0.22805849255074206, 0.06495768445657653, 0.10564485674942355, 0.10597505043712181, 0.008076387544283676, 0.02702231968598767, 0.1178499207659259, 0.08795485466574646, 0.9777522640695075, 0.7557462721916182, 0.044595101785290164, 0.07807784431137672, 0.1492127807287016, 0.22589659009264543, 0.23162493804933085, 0.06716874904939925, 0.10195447324071563, 0.10860673604899411, 0.008651013330213089, 0.028235920597261793, 0.10254666268544145, 6.0, 1.0, 6.0, 1.0, -2291.0, 0.006402448193930645, 0.0070002105326475985, 0.0343619356973142, 0.26739526000781977, 0.26547414959848414, 1.899974435321363))",0
"Map(vectorType -> sparse, length -> 245, indices -> List(0, 2, 5, 8, 9, 16, 24, 29, 35, 43, 61, 72, 124, 128, 131, 138, 140, 141, 142, 143, 144, 145, 146, 148, 151, 152, 153, 161, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 236, 238, 239, 240, 241, 242, 243, 244), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 282783.0, 1.0, -10600.0, -4025.0, -3268.0, 1.0, 1.0, 1.0, 3.0, 3.0, 9.0, 1.0, 76500.0, 269550.0, 21294.0, 225000.0, 0.020246, -304.0, 12.061090818687727, 3.0, 0.22869327409835, 0.28074289496136084, 0.5108529061799763, 0.1619, 0.1293, 0.9841, 0.7524714325927213, 0.044620715411350646, 0.3448, 0.1667, 0.23189350049054103, 0.0416, 0.10077477495067379, 0.1532, 0.008808672617211482, 7.0E-4, 0.1649, 0.1342, 0.9841, 0.7596373227337517, 0.042553137750145686, 0.3448, 0.1667, 0.22805849255074206, 0.0426, 0.10564485674942355, 0.1596, 0.008076387544283676, 8.0E-4, 0.1634, 0.1293, 0.9841, 0.7557462721916182, 0.044595101785290164, 0.3448, 0.1667, 0.23162493804933085, 0.0424, 0.10195447324071563, 0.156, 0.008651013330213089, 8.0E-4, 0.1364, 6.0, 6.0, -1379.0, 0.006402448193930645, 0.0070002105326475985, 0.0343619356973142, 0.26739526000781977, 0.26547414959848414, 1.899974435321363))",0
"Map(vectorType -> sparse, length -> 245, indices -> List(0, 3, 6, 7, 9, 16, 25, 29, 35, 44, 60, 66, 124, 128, 131, 138, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 151, 152, 153, 161, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 197, 198, 199, 200, 201, 202, 203, 205, 206, 207, 208, 209, 211, 212, 213, 214, 215, 216, 217, 219, 220, 221, 222, 223, 225, 226, 227, 228, 229, 230, 231, 233, 234, 236, 238, 242, 244), values -> List(1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 282784.0, 2.0, -11653.0, -994.0, -4309.0, 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, 10.0, 1.0, 225000.0, 450000.0, 48595.5, 450000.0, 0.026392000000000002, -1321.0, 12.0, 4.0, 0.5021298056566629, 0.7375414129476389, 0.5673792367572691, 0.0495, 0.08844221905179937, 0.9747, 0.7524714325927213, 0.044620715411350646, 0.1034, 0.125, 0.23189350049054103, 0.06633318417239686, 0.10077477495067379, 0.0413, 0.008808672617211482, 0.0504, 0.08754321224758557, 0.9747, 0.7596373227337517, 0.042553137750145686, 0.1034, 0.125, 0.22805849255074206, 0.06495768445657653, 0.10564485674942355, 0.043, 0.008076387544283676, 0.05, 0.08795485466574646, 0.9747, 0.7557462721916182, 0.044595101785290164, 0.1034, 0.125, 0.23162493804933085, 0.06716874904939925, 0.10195447324071563, 0.042, 0.008651013330213089, 0.0325, 2.0, 2.0, -1691.0, 1.0, 1.0))",0


In [0]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator 
predictions = rf_model.transform(cv_set)
evaluator= BinaryClassificationEvaluator(labelCol = "label", rawPredictionCol="probability", metricName= "areaUnderROC")
score = evaluator.evaluate(predictions)
score


Out[20]: 0.6975436432545

In [0]:
end_time=time.time()
total_time=end_time-start_time
print(f'total time is {round(total_time/60,2)}')

total time is 32.03
