In [None]:
import numpy as np
from pyspark.sql.functions import when
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.feature import Imputer
from pyspark.ml.classification import LogisticRegression



In [None]:
raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"s3://loanclub0610/spark/file.csv")


In [None]:
imputer=Imputer(inputCols=['percent_bc_gt_75','mo_sin_old_il_acct','bc_open_to_buy'],outputCols=['percent_bc_gt_75','mo_sin_old_il_acct','bc_open_to_buy'])
model=imputer.fit(raw_data)
raw_data=model.transform(raw_data)
raw_data.show(5)

In [None]:
cols=['percent_bc_gt_75',
 'mo_sin_old_il_acct',
 'is_il',
 'bc_open_to_buy',
 'mths_since_recent_inq']
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
raw_data=assembler.transform(raw_data)
raw_data.select("features").show(truncate=False)

In [None]:
from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
raw_data.select("features","Scaled_features").show(5)

In [None]:
train, test = raw_data.randomSplit([0.8, 0.2], seed=12345)

In [None]:
dataset_size=float(train.select("status").count())
numPositives=train.select("status").where('status == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

In [None]:
BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

In [None]:
train=train.withColumn("classWeights", when(train.status == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

In [None]:
# lr = LogisticRegression().setWeightCol("classWeights").setLabelCol("Outcome").setFeaturesCol("Aspect")
lr = LogisticRegression(labelCol="status", featuresCol="Scaled_features",weightCol="classWeights",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_test.select("status","prediction").show(10)

In [None]:
evaluator=BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="status"

In [None]:
predict_test.select("status","rawPrediction","prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))