In [1]:
import findspark
findspark.init()

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
from pyspark.mllib.regression import LabeledPoint

In [2]:
spark = SparkSession \
    .builder \
    .appName("Logistic regression") \
    .getOrCreate()

sc = spark.sparkContext

### Schema of data file Attribute Information:
{P=Positive, A=Average, N=Negative, B=Bankruptcy, NB=Non Bankruptcy}

1. Industrial Risk: {P,A,N}
2. Management Risk: {P,A,N}
3. Financial Flexibilitiy: {P,A,N}
4. Credibility: {P,A,N}
5. Competitiveness: {P,A,N}
6. Operating Risk: {P,A,N}
7. Class: {B,NB}

### Transform each qualititative data in the dataset into a double numeric value

In [3]:
def getDoubleValue(input):
    result = 0.0
    if (input == 'P'):
        result = 3.0
    if (input == 'A'): 
        result = 2.0
    if (input == 'N'): 
        result = 1.0
    if (input == 'NB'): 
        result = 1.0        
    if (input == 'B'): 
        result = 0.0      
    return result

### Read data into memory-in lazy loading (RDD way)

In [4]:
data = sc.textFile("./dataset/Qualitative_Bankruptcy.txt")
data.count()

250

### Prepare data for the logistic regression algorithm

In [5]:
dataTuple=data.map(lambda o: (getDoubleValue(o.split(",")[0]),getDoubleValue(o.split(",")[1]),getDoubleValue(o.split(",")[2]),getDoubleValue(o.split(",")[3]),getDoubleValue(o.split(",")[4]),getDoubleValue(o.split(",")[5]),getDoubleValue(o.split(",")[6])))

In [6]:
df=dataTuple.toDF(['Industrial Risk','Management Risk','Financial Flexibility','Credibility','Competitiveness','Operating Risk','Class'])

In [7]:
df.show(5)

+---------------+---------------+---------------------+-----------+---------------+--------------+-----+
|Industrial Risk|Management Risk|Financial Flexibility|Credibility|Competitiveness|Operating Risk|Class|
+---------------+---------------+---------------------+-----------+---------------+--------------+-----+
|            3.0|            3.0|                  2.0|        2.0|            2.0|           3.0|  1.0|
|            1.0|            1.0|                  2.0|        2.0|            2.0|           1.0|  1.0|
|            2.0|            2.0|                  2.0|        2.0|            2.0|           2.0|  1.0|
|            3.0|            3.0|                  3.0|        3.0|            3.0|           3.0|  1.0|
|            1.0|            1.0|                  3.0|        3.0|            3.0|           1.0|  1.0|
+---------------+---------------+---------------------+-----------+---------------+--------------+-----+
only showing top 5 rows



In [8]:
df.printSchema()

root
 |-- Industrial Risk: double (nullable = true)
 |-- Management Risk: double (nullable = true)
 |-- Financial Flexibility: double (nullable = true)
 |-- Credibility: double (nullable = true)
 |-- Competitiveness: double (nullable = true)
 |-- Operating Risk: double (nullable = true)
 |-- Class: double (nullable = true)



In [9]:
df.groupby('Class').count().show()

+-----+-----+
|Class|count|
+-----+-----+
|  0.0|  107|
|  1.0|  143|
+-----+-----+



In [10]:
ignore = ['Class']
assembler = VectorAssembler(
    inputCols=[x for x in df.columns if x not in ignore],
    outputCol='features')

assembler_df = assembler.transform(df)

In [11]:
assembler_df.show(5)

+---------------+---------------+---------------------+-----------+---------------+--------------+-----+--------------------+
|Industrial Risk|Management Risk|Financial Flexibility|Credibility|Competitiveness|Operating Risk|Class|            features|
+---------------+---------------+---------------------+-----------+---------------+--------------+-----+--------------------+
|            3.0|            3.0|                  2.0|        2.0|            2.0|           3.0|  1.0|[3.0,3.0,2.0,2.0,...|
|            1.0|            1.0|                  2.0|        2.0|            2.0|           1.0|  1.0|[1.0,1.0,2.0,2.0,...|
|            2.0|            2.0|                  2.0|        2.0|            2.0|           2.0|  1.0|[2.0,2.0,2.0,2.0,...|
|            3.0|            3.0|                  3.0|        3.0|            3.0|           3.0|  1.0|[3.0,3.0,3.0,3.0,...|
|            1.0|            1.0|                  3.0|        3.0|            3.0|           1.0|  1.0|[1.0,1.0,3.0,3

In [12]:
label_df = assembler_df.withColumn("label", assembler_df["Class"])
label_df.show()

+---------------+---------------+---------------------+-----------+---------------+--------------+-----+--------------------+-----+
|Industrial Risk|Management Risk|Financial Flexibility|Credibility|Competitiveness|Operating Risk|Class|            features|label|
+---------------+---------------+---------------------+-----------+---------------+--------------+-----+--------------------+-----+
|            3.0|            3.0|                  2.0|        2.0|            2.0|           3.0|  1.0|[3.0,3.0,2.0,2.0,...|  1.0|
|            1.0|            1.0|                  2.0|        2.0|            2.0|           1.0|  1.0|[1.0,1.0,2.0,2.0,...|  1.0|
|            2.0|            2.0|                  2.0|        2.0|            2.0|           2.0|  1.0|[2.0,2.0,2.0,2.0,...|  1.0|
|            3.0|            3.0|                  3.0|        3.0|            3.0|           3.0|  1.0|[3.0,3.0,3.0,3.0,...|  1.0|
|            1.0|            1.0|                  3.0|        3.0|         

### Standard scaler

In [13]:
scaler = StandardScaler().setInputCol('features').setOutputCol('scaled_features')

scaler_model = scaler.fit(label_df)
scaler_df = scaler_model.transform(label_df)

In [14]:
parsed_data = scaler_df.rdd.map(lambda x: LabeledPoint(x[6], x[:6]))
parsed_data

PythonRDD[29] at RDD at PythonRDD.scala:53

In [15]:
parsed_data.take(5)

[LabeledPoint(1.0, [3.0,3.0,2.0,2.0,2.0,3.0]),
 LabeledPoint(1.0, [1.0,1.0,2.0,2.0,2.0,1.0]),
 LabeledPoint(1.0, [2.0,2.0,2.0,2.0,2.0,2.0]),
 LabeledPoint(1.0, [3.0,3.0,3.0,3.0,3.0,3.0]),
 LabeledPoint(1.0, [1.0,1.0,3.0,3.0,3.0,1.0])]

In [16]:
type(parsed_data)

pyspark.rdd.PipelinedRDD

### Split train & test

In [17]:
train_data, test_data = parsed_data.randomSplit([0.6, 0.4], seed=11)

print("Training Dataset Count:" + str(train_data.count()))
print("Test Dataset Count:" + str(test_data.count()))

Training Dataset Count:162
Test Dataset Count:88


### Evaluating the model on training data

In [18]:
model = LogisticRegressionWithLBFGS.train(train_data, numClasses=2)

In [19]:
labels_and_preds = parsed_data.map(lambda p: (p.label, model.predict(p.features)))

In [20]:
train_err = labels_and_preds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsed_data.count())

In [21]:
print("Training Error = " + str(train_err))

Training Error = 0.188


----------------------------------------
Exception happened during processing of request from ('127.0.0.1', 34902)
Traceback (most recent call last):
  File "/opt/conda/lib/python3.8/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/opt/conda/lib/python3.8/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/opt/conda/lib/python3.8/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/opt/conda/lib/python3.8/socketserver.py", line 720, in __init__
    self.handle()
  File "/usr/local/spark/python/pyspark/accumulators.py", line 268, in handle
    poll(accum_updates)
  File "/usr/local/spark/python/pyspark/accumulators.py", line 241, in poll
    if func():
  File "/usr/local/spark/python/pyspark/accumulators.py", line 245, in accum_updates
    num_updates = read_int(self.rfile)
  File "/usr/local/spark/python/pysp