# Importing spark

In [1]:
# import findspark
# findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark").getOrCreate()
sc = spark.sparkContext

# Preparing the data

In [2]:
df_transactions = spark.read.option("header", True)\
    .option("delimiter", "|")\
    .option("delimiter", ",")\
    .option("inferSchema", "true")\
    .csv('data/train.csv')\
    .withColumnRenamed('default_payment_next_month', 'label')

In [3]:
df_transactions.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: decimal(7,0) (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: decimal(6,0) (nullable = true)
 |-- BILL_AMT2: decimal(6,0) (nullable = true)
 |-- BILL_AMT3: decimal(7,0) (nullable = true)
 |-- BILL_AMT4: decimal(6,0) (nullable = true)
 |-- BILL_AMT5: decimal(6,0) (nullable = true)
 |-- BILL_AMT6: decimal(6,0) (nullable = true)
 |-- PAY_AMT1: decimal(6,0) (nullable = true)
 |-- PAY_AMT2: decimal(7,0) (nullable = true)
 |-- PAY_AMT3: decimal(6,0) (nullable = true)
 |-- PAY_AMT4: decimal(6,0) (nullable = true)
 |-- PAY_AMT5: decimal(6,0) (nullable = true)
 |-- PAY_AMT6: dec

In [4]:
train, test = df_transactions.randomSplit([0.8, 0.2])

# Preparing the model

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

In [6]:
assembler = VectorAssembler(
    inputCols=["MARRIAGE", "EDUCATION", "PAY_0", "PAY_2", "PAY_3"],
    outputCol="features"
)

lr = LogisticRegression(maxIter=10, regParam=10.0, elasticNetParam=0.0)

In [7]:
pipeline = Pipeline(stages=[assembler, lr])

# Fitting the model

In [8]:
model = pipeline.fit(train)

# Evaluation of the model

In [9]:
predictions = model.transform(test)

In [10]:
predictions.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- LIMIT_BAL: decimal(7,0) (nullable = true)
 |-- SEX: integer (nullable = true)
 |-- EDUCATION: integer (nullable = true)
 |-- MARRIAGE: integer (nullable = true)
 |-- AGE: integer (nullable = true)
 |-- PAY_0: integer (nullable = true)
 |-- PAY_2: integer (nullable = true)
 |-- PAY_3: integer (nullable = true)
 |-- PAY_4: integer (nullable = true)
 |-- PAY_5: integer (nullable = true)
 |-- PAY_6: integer (nullable = true)
 |-- BILL_AMT1: decimal(6,0) (nullable = true)
 |-- BILL_AMT2: decimal(6,0) (nullable = true)
 |-- BILL_AMT3: decimal(7,0) (nullable = true)
 |-- BILL_AMT4: decimal(6,0) (nullable = true)
 |-- BILL_AMT5: decimal(6,0) (nullable = true)
 |-- BILL_AMT6: decimal(6,0) (nullable = true)
 |-- PAY_AMT1: decimal(6,0) (nullable = true)
 |-- PAY_AMT2: decimal(7,0) (nullable = true)
 |-- PAY_AMT3: decimal(6,0) (nullable = true)
 |-- PAY_AMT4: decimal(6,0) (nullable = true)
 |-- PAY_AMT5: decimal(6,0) (nullable = true)
 |-- PAY_AMT6: dec

In [11]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator_roc = BinaryClassificationEvaluator(
    labelCol='label', 
    rawPredictionCol='rawPrediction',
    metricName='areaUnderROC'
)

In [12]:
print('Area under ROC = %s' % evaluator_roc.evaluate(predictions))

Area under ROC = 0.6825956971417326


# Streaming data

The stream will be produced by ```generate_transactions.py```

In [13]:
from pyspark.streaming import StreamingContext

ssc = StreamingContext(sc, 1)

In [14]:
def process(time, rdd):
    print("========= %s =========" % str(time))
    try:

        # Convert RDD[String] to DataFrame
        rdd_proper = rdd.map(lambda line: line.split(','))

        df_stream = spark.createDataFrame(rdd_proper)
        
        # changing schema
        for c, i in zip(df_stream.columns, train.schema):
            df_stream = df_stream.withColumn(i.name, df_stream[c].cast(i.dataType))
            
        if df_stream.count() > 0:
            predictions = model.transform(df_stream)
            print(
                'Area under ROC = %s (with %ld elements)' % (evaluator_roc.evaluate(predictions), df_stream.count())
            )
    except Exception as e:
        print(e)


In [15]:
stream = ssc.textFileStream('data/output/')

stream.foreachRDD(process)

ssc.start()
# ssc.awaitTermination()

RDD is empty
Area under ROC = 0.6372881355932203 (with 143 elements)
RDD is empty
Area under ROC = 0.7389446302489785 (with 140 elements)
RDD is empty
Area under ROC = 0.7580645161290323 (with 37 elements)
RDD is empty
Area under ROC = 0.7276995305164321 (with 86 elements)
RDD is empty
Area under ROC = 0.6388888888888888 (with 15 elements)
RDD is empty
Area under ROC = 0.4365079365079365 (with 16 elements)
RDD is empty
Area under ROC = 0.5946428571428571 (with 43 elements)
RDD is empty
Area under ROC = 0.7056034482758621 (with 136 elements)
RDD is empty
Area under ROC = 0.6848484848484848 (with 56 elements)
RDD is empty


In [None]:
# stop stream
ssc.stop(True, True)