In [1]:
# librerie

from numpy.random import choice as sample
from math import sqrt

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql import Row

######################## ml ###############################

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.evaluation import ClusteringEvaluator

######################## MLlib ###############################

from pyspark.mllib.linalg import Vectors as Vectors_mllib
from pyspark.mllib.feature import StandardScaler as StandardScaler_mllib
from pyspark.mllib.stat import Statistics

from pyspark.mllib.clustering import KMeans as KMeans_mllib
from pyspark.mllib.clustering import BisectingKMeans as BisectingKMeans_mllib
from pyspark.mllib.clustering import GaussianMixture as GaussianMixture_mllib

In [2]:
# sessione

sc = SparkContext(appName="DDAM_Project", master="local[*]")
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("DDAM_Project") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [3]:
sdf = spark.read.csv("hdfs://kddrtserver11.isti.cnr.it:9000/user/hpsa04/credit_train.csv", sep=",",
                     inferSchema=True, header=True)

columns = sdf.schema.names

# rinominare le colonne sotituendo lo spazio con l'underscore
for col in columns:
    sdf = sdf.withColumnRenamed(col, col.replace(' ', '_'))

columns = sdf.schema.names

sdf.printSchema()

root
 |-- Loan_ID: string (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Loan_Status: string (nullable = true)
 |-- Current_Loan_Amount: integer (nullable = true)
 |-- Term: string (nullable = true)
 |-- Credit_Score: integer (nullable = true)
 |-- Annual_Income: integer (nullable = true)
 |-- Years_in_current_job: string (nullable = true)
 |-- Home_Ownership: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Monthly_Debt: double (nullable = true)
 |-- Years_of_Credit_History: double (nullable = true)
 |-- Months_since_last_delinquent: string (nullable = true)
 |-- Number_of_Open_Accounts: integer (nullable = true)
 |-- Number_of_Credit_Problems: integer (nullable = true)
 |-- Current_Credit_Balance: integer (nullable = true)
 |-- Maximum_Open_Credit: integer (nullable = true)
 |-- Bankruptcies: string (nullable = true)
 |-- Tax_Liens: string (nullable = true)



In [4]:
def get_nbr_nulls(spark_df, view_name, print_result = True):
    """funzione per ottenere il numero di valori nulli presenti in ogni attributo"""
    
    spark_df.createOrReplaceTempView(view_name)
    
    columns_temp = spark_df.schema.names
    
    Project = []
    for col in columns_temp:
        Project.append('SUM(CASE WHEN {0} IS NULL THEN 1 ELSE 0 END) AS {0}'.format(col))
    Project = ', '.join(Project)

    sql = """\
    SELECT {0}
    FROM {1}\
    """.format(Project, view_name)
    
    nbr_nulls = spark.sql(sql).first().asDict()
    
    if print_result:
        for key, value in nbr_nulls.items():
            print(key + ':', '{:>10}'.format(value))
        
    return nbr_nulls

In [5]:
nbr_nulls = get_nbr_nulls(spark_df = sdf, view_name = 'Bank_Loan_Dataset')

Loan_ID:        514
Customer_ID:        514
Loan_Status:        514
Current_Loan_Amount:        514
Term:        514
Credit_Score:      19668
Annual_Income:      19668
Years_in_current_job:        514
Home_Ownership:        514
Purpose:        514
Monthly_Debt:        514
Years_of_Credit_History:        514
Months_since_last_delinquent:        514
Number_of_Open_Accounts:        514
Number_of_Credit_Problems:        514
Current_Credit_Balance:        514
Maximum_Open_Credit:        516
Bankruptcies:        514
Tax_Liens:        514


In [6]:
def get_nbr_distincts(spark_df, view_name, print_result = True):
    """funzione per ottenere il numero di valori distinti di ciascun attributo.
    il valore nullo non viene contato come valore distinto"""
    
    spark_df.createOrReplaceTempView(view_name)
    
    columns_temp = spark_df.schema.names

    Project = []
    for col in columns_temp:
        Project.append('COUNT(DISTINCT {0}) AS {0}'.format(col))
    Project = ', '.join(Project)

    sql = """\
    SELECT {0}
    FROM {1}\
    """.format(Project, view_name)
    
    nbr_distincts = spark.sql(sql).first().asDict()
    
    if print_result:
        for key, value in nbr_distincts.items():
            print(key + ':', '{:>10}'.format(value))

    return nbr_distincts

In [7]:
nbr_distincts = get_nbr_distincts(spark_df = sdf, view_name = 'Bank_Loan_Dataset')

Loan_ID:      81999
Customer_ID:      81999
Loan_Status:          2
Current_Loan_Amount:      22004
Term:          2
Credit_Score:        324
Annual_Income:      36174
Years_in_current_job:         12
Home_Ownership:          4
Purpose:         16
Monthly_Debt:      65765
Years_of_Credit_History:        506
Months_since_last_delinquent:        117
Number_of_Open_Accounts:         51
Number_of_Credit_Problems:         14
Current_Credit_Balance:      32730
Maximum_Open_Credit:      44596
Bankruptcies:          9
Tax_Liens:         13


# modifiche agli attributi

In [8]:
rdd = sdf.rdd  # di default questa trasformazione genera un RDD di Row()

'''le Row sono tipi particolari di Tuple, quindi sono oggetti immutabili.
in tutto il notebook allora per modificare gli RDD li trasformiamo temporaneamente (dentro le funzioni)
in RDD di Dictionaries.'''

'le Row sono tipi particolari di Tuple, quindi sono oggetti immutabili.\nin tutto il notebook allora per modificare gli RDD li trasformiamo temporaneamente (dentro le funzioni)\nin RDD di Dictionaries.'

gli attributi che riguardano somme di denaro sono quantità denominate in valuta russa (Rubli). Il dataset si riferisce a dati del 2016, quindi per rendere più comprensibile il significato di queste somme tutti questi attributi vengono convertiti in Euro dividendo tutti i loro valori per il tasso di cambio EUR/RUB medio arrotondato alle decine dell'anno 2016: 70.

questa modifica non impatta assolutamente nessuna analisi perché tutte le quantità monetarie vengono trasformate alla stessa maniera e quindi le proporzioni vengono mantenute. Sarà sempre possibile trasformare facilmente di nuovo in Rubli qualora sia necessario.

In [9]:
def change_value(row):
    d = row.asDict()
    
    if d['Current_Loan_Amount'] is not None:
        d['Current_Loan_Amount'] = round(d['Current_Loan_Amount']/70)
    
    if d['Annual_Income'] is not None:    
        d['Annual_Income'] = round(d['Annual_Income']/70)
    
    if d['Monthly_Debt'] is not None:
        d['Monthly_Debt'] = round(float(d['Monthly_Debt']/70), 4)
        
    if d['Current_Credit_Balance'] is not None:
        d['Current_Credit_Balance'] = round(d['Current_Credit_Balance']/70)
        
    if d['Maximum_Open_Credit'] is not None:
        d['Maximum_Open_Credit'] = round(d['Maximum_Open_Credit']/70)
    
    return Row(**d)

rdd = rdd.map(change_value)

#### ci sono alcuni attributi che hanno un Data Type incoerente con il significato dell'attributo: sono letti come stringhe ma in realtà la loro semantica ci suggerisce di trasformarli in numerici.

In [10]:
# Check dei valori distinti degli attributi con data type incoerente

problematic_columns = ['Years_in_current_job', 'Months_since_last_delinquent', 'Bankruptcies', 'Tax_Liens']

for col in problematic_columns:
    sdf.select(col).groupBy(col).count().orderBy('count', ascending=False).show(150)

+--------------------+-----+
|Years_in_current_job|count|
+--------------------+-----+
|           10+ years|31121|
|             2 years| 9134|
|             3 years| 8169|
|            < 1 year| 8164|
|             5 years| 6787|
|              1 year| 6460|
|             4 years| 6143|
|             6 years| 5686|
|             7 years| 5577|
|             8 years| 4582|
|                 n/a| 4222|
|             9 years| 3955|
|                null|  514|
+--------------------+-----+

+----------------------------+-----+
|Months_since_last_delinquent|count|
+----------------------------+-----+
|                          NA|53141|
|                          13|  922|
|                          12|  902|
|                          14|  877|
|                          15|  865|
|                          10|  861|
|                           8|  856|
|                           9|  849|
|                          18|  847|
|                          16|  837|
|                        

- 'Years_in_current_job'

rendiamo numerico quest'attributo in questo modo:

years viene tolto da ogni valore.

sono presenti 4222 valori uguali a 'n/a'. Pensiamo si possa trattare di soggetti per cui non si può dire qunati anni hanno lavorato nel lavoro corrente perché sono attualmente senza occupazione. Il valore viene quindi sostituito con 0.

10+ viene trasformato in 10 perché 10+ non ha valenza semantica non conoscendo la distribuzione dei valori specifici per questa categoria. Questa trasformazione non comporta problematiche per algoritmi di machine learning come il Decision Tree perché l'ordinamento dei valori numerici è preservato (basterà tenere presente che un eventuale split sul valore 10 sarebbe in realtà riferito a valori anche maggiori di 10). L'unica problematica apparente potrebbe manifestarsi per algoritmi basati sulle distanze (es. Clustering, PCA, KNN ecc...) perché la distanza dal valore 10 potrebbe in realtà essere una distanza molto maggiore. Ma non ci preoccupiamo di questo perché le ennuple coinvolte sono relativamente poche e l'errore non avrebbe un impatto rilevante sul calcolo della distanza multidimensionale. E anche soprattutto perché arrivati a 10 anni di lavoro in una posizione si ritiene che il fattore lavorativo non sia ormai più un problema e quindi un'ipotetica "distanza" ad esempio tra 10 e 25 anni è tutto sommato meno rilevante di una distanza, anche minore, ma ad esempio tra 10 e 1.

< 1 viene trasformato nella media dei valori tra 0 e 1, cioè 0.5.

In [11]:
def change_value(row):
    d = row.asDict()
    
    if d['Years_in_current_job'] is not None:
        d['Years_in_current_job'] = str(d['Years_in_current_job']).replace(' years', '')
        d['Years_in_current_job'] = str(d['Years_in_current_job']).replace(' year', '')
        
        if d['Years_in_current_job'] == 'n/a':
            d['Years_in_current_job'] = 0
        if d['Years_in_current_job'] == '10+':
            d['Years_in_current_job'] = 10
        if d['Years_in_current_job'] == '< 1':
            d['Years_in_current_job'] = 0.5
        
        d['Years_in_current_job'] = float(d['Years_in_current_job'])
    
    return Row(**d)

rdd = rdd.map(change_value)

- 'Months_since_last_delinquent'

sono presenti 53141 valori uguali a 'NA' (la maggior parte). la nostra interpretazione di questo valore è che il soggetto in questione non ha mai commesso nessun reato (interpretazione coerente con il fatto che i valori 'NA' sono la maggior parte).

il conteggio degli altri valori distinti è basso, decidiamo quindi di rendere l'attributo categorico raccogliendo i valori in questi range:

[0, 12); [12, 48); [48, 96); [96, +inf); 'Never committed'


In [12]:
def change_value(row):
    d = row.asDict()
    
    if d['Months_since_last_delinquent'] is not None:
        if d['Months_since_last_delinquent'] == 'NA':
            d['Months_since_last_delinquent'] = 'Never committed'
        elif int(d['Months_since_last_delinquent']) < 12:
            d['Months_since_last_delinquent'] = '[0, 12)'
        elif int(d['Months_since_last_delinquent']) < 48:
            d['Months_since_last_delinquent'] = '[12, 48)'
        elif int(d['Months_since_last_delinquent']) < 96:
            d['Months_since_last_delinquent'] = '[48, 96)'
        else:
            d['Months_since_last_delinquent'] = '[96, +inf)'
            
    return Row(**d)

rdd = rdd.map(change_value)

- 'Bankruptcies'

l'attributo viene letto come stringa perché sono presenti 204 valori uguali a 'NA'. Poichè il valore 0 è presente si pensa possa trattarsi di missing values, li trasformiamo quindi in None e rendiamo numerico l'attributo.

In [13]:
def change_value(row):
    d = row.asDict()
    
    if d['Bankruptcies'] is not None:
        if d['Bankruptcies'] == 'NA':
            d['Bankruptcies'] = None
        else:
            d['Bankruptcies'] = int(d['Bankruptcies'])
        
    return Row(**d)

rdd = rdd.map(change_value)

- 'Tax_Liens'

l'attributo viene letto come stringa perché sono presenti 10 valori uguali a 'NA'. Poichè il valore 0 è presente si pensa possa trattarsi di missing values, li trasformiamo quindi in None e rendiamo numerico l'attributo.

In [14]:
def change_value(row):
    d = row.asDict()

    if d['Tax_Liens'] is not None:
        if d['Tax_Liens'] == 'NA':
            d['Tax_Liens'] = None
        else:
            d['Tax_Liens'] = int(d['Tax_Liens'])
        
    return Row(**d)

rdd = rdd.map(change_value)

In [15]:
sdf = rdd.toDF()
sdf.printSchema()

root
 |-- Annual_Income: long (nullable = true)
 |-- Bankruptcies: long (nullable = true)
 |-- Credit_Score: long (nullable = true)
 |-- Current_Credit_Balance: long (nullable = true)
 |-- Current_Loan_Amount: long (nullable = true)
 |-- Customer_ID: string (nullable = true)
 |-- Home_Ownership: string (nullable = true)
 |-- Loan_ID: string (nullable = true)
 |-- Loan_Status: string (nullable = true)
 |-- Maximum_Open_Credit: long (nullable = true)
 |-- Monthly_Debt: double (nullable = true)
 |-- Months_since_last_delinquent: string (nullable = true)
 |-- Number_of_Credit_Problems: long (nullable = true)
 |-- Number_of_Open_Accounts: long (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Tax_Liens: long (nullable = true)
 |-- Term: string (nullable = true)
 |-- Years_in_current_job: double (nullable = true)
 |-- Years_of_Credit_History: double (nullable = true)



In [16]:
for col in problematic_columns:
    sdf.select(col).groupBy(col).count().orderBy('count', ascending=False).show(150)

+--------------------+-----+
|Years_in_current_job|count|
+--------------------+-----+
|                10.0|31121|
|                 2.0| 9134|
|                 3.0| 8169|
|                 0.5| 8164|
|                 5.0| 6787|
|                 1.0| 6460|
|                 4.0| 6143|
|                 6.0| 5686|
|                 7.0| 5577|
|                 8.0| 4582|
|                 0.0| 4222|
|                 9.0| 3955|
|                null|  514|
+--------------------+-----+

+----------------------------+-----+
|Months_since_last_delinquent|count|
+----------------------------+-----+
|             Never committed|53141|
|                    [12, 48)|25789|
|                    [48, 96)|13453|
|                     [0, 12)| 7590|
|                        null|  514|
|                  [96, +inf)|   27|
+----------------------------+-----+

+------------+-----+
|Bankruptcies|count|
+------------+-----+
|           0|88774|
|           1|10475|
|        null|  718|
|        

# Gestione degli errori

alcuni attributi hanno valori errati o non coerenti con il significato che noi reputiamo possa avere l'attributo. Alcuni di questi errori sono stati scoperti in fasi più avanzate del progetto (ex Data Understanding).

- 'Credit_Score'

ci sono 4551 valori dell'attributo 'Credit_Score' con valore superiore a 5000, che è un valore troppo distante rispetto a quelli generici che assume questo attributo (da 500 a 800). Controllando i valori distinti si è scoperto che quei valori sono errati: c'è uno '0' di troppo in fondo al numero, che eliminamo.

In [17]:
def problematic_values_count(row):
    if row['Credit_Score'] is not None:
        return row['Credit_Score'] > 5000

rdd.filter(problematic_values_count).count()

4551

In [18]:
col = 'Credit_Score'
sdf.select(col).groupBy(col).count().orderBy('count', ascending=False).show(400)

+------------+-----+
|Credit_Score|count|
+------------+-----+
|        null|19668|
|         747| 1825|
|         740| 1746|
|         746| 1742|
|         741| 1732|
|         742| 1723|
|         739| 1624|
|         745| 1612|
|         748| 1598|
|         743| 1555|
|         725| 1548|
|         724| 1522|
|         738| 1495|
|         744| 1485|
|         721| 1465|
|         723| 1421|
|         737| 1405|
|         722| 1387|
|         718| 1261|
|         750| 1234|
|         717| 1218|
|         720| 1216|
|         736| 1156|
|         734| 1147|
|         735| 1134|
|         719| 1118|
|         732| 1084|
|         733| 1073|
|         715| 1070|
|         716| 1061|
|         714| 1046|
|         713| 1017|
|         731| 1010|
|         712| 1006|
|         730|  984|
|         728|  931|
|         729|  925|
|         708|  902|
|         709|  865|
|         710|  857|
|         726|  853|
|         749|  827|
|         707|  814|
|         727|  806|
|         711

In [19]:
def errors_correction(row):
    d = row.asDict()
    if d['Credit_Score'] is not None and d['Credit_Score'] > 5000:
        d['Credit_Score'] = int(str(d['Credit_Score'])[:-1])
    return Row(**d)
    
rdd = rdd.map(errors_correction)
rdd.count()

100514

- ci sono diversi valori dell'attributo 'Current_Loan_Amount' che sono pari a 1.428.571 (99.999.999 in Rubli).  un numero eccessivamente più alto rispetto a tutti gli altri valori e che per questo viene eliminato.

In [20]:
sdf.select('Current_Loan_Amount').groupBy('Current_Loan_Amount').count().orderBy('count', ascending=False).show()

+-------------------+-----+
|Current_Loan_Amount|count|
+-------------------+-----+
|            1428571|11484|
|               null|  514|
|               3209|   70|
|               3094|   68|
|               3088|   67|
|               3099|   67|
|               3077|   66|
|               3138|   64|
|               3187|   62|
|               1576|   62|
|               3176|   62|
|               3110|   61|
|               3066|   61|
|               3195|   61|
|               3192|   59|
|               3132|   58|
|               3089|   57|
|               1845|   57|
|               3185|   56|
|               3182|   56|
+-------------------+-----+
only showing top 20 rows



In [21]:
def errors_correction(row):
    if row['Current_Loan_Amount'] is not None:
        return row['Current_Loan_Amount'] != 1428571
    else:
        return row
    
rdd = rdd.filter(errors_correction)
rdd.count()

89030

eliminiamo le righe dove entrambi gli attributi 'Loan_ID' e 'Customer_ID' sono nulli, che corrispondono alle righe dove tutti i valori di tutti gli attributi sono nulli

In [22]:
rdd = rdd.filter(lambda row: not ( (row['Loan_ID'] is None) and (row['Customer_ID'] is None) ))
rdd.count()

88516

eliminiamo le righe duplicate

In [23]:
rdd = rdd.distinct()
rdd.count()

78301

Problema: ci sono coppie di righe con tutti i valori duplicati eccetto per le due colonne 'Credit_Score' e 'Annual_Income', per le quali uno dei due valori è presente e l'altro è nullo.

Soluzione: si raggruppa per tutti gli attributi tranne quei due e poi si calcola la media di quei due. In questo modo se le uniche due righe uguali sono quelle con un valore nullo e uno non nullo per quegli attributi, lo media sarà uguale al valore non nullo; se invece ci fossero altre righe ugauli ma con altri valori diversi non nulli per quegli attributi, viene effettivamente calcolata la media, il che è auspicabile considerando che tutto il resto della riga è uguale e quindi si tratta molto probabilmente dello stesso oggetto, duplicato per errore, di cui dunque prendiamo un valore medio tra quelli presenti.

In [24]:
sdf = rdd.toDF()

sdf.createOrReplaceTempView('Bank_Loan_Dataset')

columns_temp = [col for col in columns if col != 'Credit_Score' and col != 'Annual_Income']
Project = ', '.join(columns_temp)

sql = """
SELECT {0}, BIGINT(AVG(Credit_Score)) AS Credit_Score, BIGINT(AVG(Annual_Income)) AS Annual_Income
FROM Bank_Loan_Dataset
GROUP BY {0}
""".format(Project)

sdf = spark.sql(sql)

del columns_temp

come si nota i Customer e Loan ID che si ripetevano nel dataset originale erano solo righe duplicate. Il dataset pulito non presenta nessuna riga uguale negli ID e possiamo quindi eliminarli. Anche escludendo questi attributi tutte le righe rimangono distinte.

In [25]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT COUNT(*) AS nbr_rows, COUNT(DISTINCT Customer_ID) AS nbr_customers, COUNT(DISTINCT Loan_ID) AS nbr_loans
FROM Bank_Loan_Dataset
"""

spark.sql(sql).show()

+--------+-------------+---------+
|nbr_rows|nbr_customers|nbr_loans|
+--------+-------------+---------+
|   74094|        74094|    74094|
+--------+-------------+---------+



In [26]:
columns = [col for col in columns if col != 'Customer_ID' and col != 'Loan_ID']

columns_categorical = [col.name for col in sdf.schema.fields if isinstance(col.dataType, StringType)]

columns_numerical = [col for col in columns if col not in columns_categorical]

sdf = sdf.select(columns)

In [27]:
sdf.printSchema()

root
 |-- Loan_Status: string (nullable = true)
 |-- Current_Loan_Amount: long (nullable = true)
 |-- Term: string (nullable = true)
 |-- Credit_Score: long (nullable = true)
 |-- Annual_Income: long (nullable = true)
 |-- Years_in_current_job: double (nullable = true)
 |-- Home_Ownership: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Monthly_Debt: double (nullable = true)
 |-- Years_of_Credit_History: double (nullable = true)
 |-- Months_since_last_delinquent: string (nullable = true)
 |-- Number_of_Open_Accounts: long (nullable = true)
 |-- Number_of_Credit_Problems: long (nullable = true)
 |-- Current_Credit_Balance: long (nullable = true)
 |-- Maximum_Open_Credit: long (nullable = true)
 |-- Bankruptcies: long (nullable = true)
 |-- Tax_Liens: long (nullable = true)



# Gestione dei Missing Values

In [28]:
nbr_nulls = get_nbr_nulls(spark_df = sdf, view_name = 'Bank_Loan_Dataset')

Loan_Status:          0
Current_Loan_Amount:          0
Term:          0
Credit_Score:      14947
Annual_Income:      14947
Years_in_current_job:          0
Home_Ownership:          0
Purpose:          0
Monthly_Debt:          0
Years_of_Credit_History:          0
Months_since_last_delinquent:          0
Number_of_Open_Accounts:          0
Number_of_Credit_Problems:          0
Current_Credit_Balance:          0
Maximum_Open_Credit:          2
Bankruptcies:        156
Tax_Liens:          7


Filliamo i Missing values degli attributi "Maximum_Open_Credit", "Bankruptcies" e "Tax_Liens" usando la Mediana di diversi tipi di raggruppamenti. Per questi attributi è stata fatta questa scelta perché presentano un numero relativamente basso di Missing Values.

per farlo usiamo la sintassi dell'SQL analitico per generare le nuove colonne.

In [29]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT *,
    CASE WHEN Maximum_Open_Credit IS NULL THEN
        BIGINT( PERCENTILE(Maximum_Open_Credit, 0.5) OVER(PARTITION BY Years_in_current_job, Home_Ownership, Number_of_Open_Accounts, Years_of_Credit_History) )
    ELSE Maximum_Open_Credit END AS Maximum_Open_Credit_new,
    
    CASE WHEN Bankruptcies IS NULL THEN
        BIGINT( PERCENTILE(Bankruptcies, 0.5) OVER(PARTITION BY Months_since_last_delinquent, Number_of_Credit_Problems) )
    ELSE Bankruptcies END AS Bankruptcies_new,
    
    CASE WHEN Tax_Liens IS NULL THEN
        BIGINT( PERCENTILE(Tax_Liens, 0.5) OVER(PARTITION BY Months_since_last_delinquent, Number_of_Credit_Problems) )
    ELSE Tax_Liens END AS Tax_Liens_new
    
FROM Bank_Loan_Dataset
"""

sdf = spark.sql(sql)

sdf = sdf.drop('Maximum_Open_Credit').withColumnRenamed("Maximum_Open_Credit_new", "Maximum_Open_Credit")
sdf = sdf.drop('Bankruptcies').withColumnRenamed("Bankruptcies_new", "Bankruptcies")
sdf = sdf.drop('Tax_Liens').withColumnRenamed("Tax_Liens_new", "Tax_Liens")

sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT *,
    BIGINT( PERCENTILE(Maximum_Open_Credit, 0.5) OVER(PARTITION BY Years_in_current_job,
                                                Home_Ownership,
                                                Number_of_Open_Accounts,
                                                Years_of_Credit_History) ) AS toFill_Maximum_Open_Credit,
    BIGINT( PERCENTILE(Bankruptcies, 0.5) OVER(PARTITION BY Months_since_last_delinquent,
                                                Number_of_Credit_Problems) ) AS toFill_Bankruptcies,
    BIGINT( PERCENTILE(Tax_Liens, 0.5) OVER(PARTITION BY Months_since_last_delinquent,
                                            Number_of_Credit_Problems) ) AS toFill_Tax_Liens
FROM Bank_Loan_Dataset
"""

sdf = spark.sql(sql)

def fill_nulls(row):
    d = row.asDict()
    if d['Maximum_Open_Credit'] is None:
        d['Maximum_Open_Credit'] = d['toFill_Maximum_Open_Credit']
    if d['Bankruptcies'] is None:
        d['Bankruptcies'] = d['toFill_Bankruptcies']
    if d['Tax_Liens'] is None:
        d['Tax_Liens'] = d['toFill_Tax_Liens']
    return Row(**d)

sdf = sdf.rdd.map(fill_nulls).toDF().select(columns)

In [30]:
nbr_nulls = get_nbr_nulls(spark_df=sdf, view_name = 'Bank_Loan_Dataset')

Loan_Status:          0
Current_Loan_Amount:          0
Term:          0
Credit_Score:      14947
Annual_Income:      14947
Years_in_current_job:          0
Home_Ownership:          0
Purpose:          0
Monthly_Debt:          0
Years_of_Credit_History:          0
Months_since_last_delinquent:          0
Number_of_Open_Accounts:          0
Number_of_Credit_Problems:          0
Current_Credit_Balance:          0
Maximum_Open_Credit:          0
Bankruptcies:          0
Tax_Liens:          0


gli attributi "Credit_Score" e "Annual_Income" hanno invece un numero rilevante di Missing Values. Filliamoli quindi dividendo il dataset in clusters e usando la Mediana dei valori in ciascun cluster.

# Clustering

Eseguiamo il Clustering con la libreria ML. Con MLlib non è possibile iterare con più valori per il parametro k e non è possibile calcolare l'SSE per diversi tipi di modelli.

Proviamo 3 diversi tipi di clustering per diverse iterazioni con un k randomico.

In [31]:
# Per effettuare il clustering dobbiamo considerare solo le colonne numeriche diverse da 'Credit_Score' e 'Annual_Income'.

columns_clustering = [col for col in columns_numerical if col != 'Credit_Score' and col != 'Annual_Income']

columns_clustering

['Current_Loan_Amount',
 'Years_in_current_job',
 'Monthly_Debt',
 'Years_of_Credit_History',
 'Number_of_Open_Accounts',
 'Number_of_Credit_Problems',
 'Current_Credit_Balance',
 'Maximum_Open_Credit',
 'Bankruptcies',
 'Tax_Liens']

In [32]:
def best_clustering(spark_df, columns_clustering, models=['kmeans', 'biskmeans', 'gaussmix'], k_clusters_iter=[2, 10]):
    
    """funzione che prende in input tutto il dataframe, specificando le colonne su cui si vuole eseguire il clustering.
    standardizza i dati rendendo ciascuna colonna a media 0 e varianza unitaria.
    esegue fino a 3 tipi diversi di clustering, iterando inoltre per un numero di clusters specificato.
    restituisce dataframe, modello e Silhouette del clustering migliore in termini di Silhouette."""
    
    columns = spark_df.schema.names
    
    sdf = VectorAssembler(inputCols=columns_clustering, outputCol="columns_clustering").transform(spark_df)
    scaler = StandardScaler(inputCol="columns_clustering", outputCol="scaled_columns_clustering", withStd=True, withMean=True)
    sdf = scaler.fit(sdf).transform(sdf).select(columns + ['scaled_columns_clustering'])
    
    best_silhouette = -1
    
    for k_clusters in k_clusters_iter:
        
        if 'kmeans' in models:
            model = KMeans(featuresCol='scaled_columns_clustering', predictionCol="clusters_label",
                           maxIter=20).setK(k_clusters).fit(sdf)
            sdf = model.transform(sdf)

            silhouette = ClusteringEvaluator(featuresCol='scaled_columns_clustering', predictionCol="clusters_label",
                                             metricName="silhouette").evaluate(sdf)

            if silhouette > best_silhouette:
                best_silhouette = silhouette
                best_model = model
                nbr_clusters = k_clusters
                sdf_clustered = sdf
            
            sdf = sdf.select(columns + ['scaled_columns_clustering'])
                
        if 'biskmeans' in models:
            model = BisectingKMeans(featuresCol='scaled_columns_clustering', predictionCol="clusters_label",
                                    maxIter=20).setK(k_clusters).fit(sdf)
            sdf = model.transform(sdf)

            silhouette = ClusteringEvaluator(featuresCol='scaled_columns_clustering', predictionCol="clusters_label",
                                             metricName="silhouette").evaluate(sdf)

            if silhouette > best_silhouette:
                best_silhouette = silhouette
                best_model = model
                nbr_clusters = k_clusters
                sdf_clustered = sdf
            
            sdf = sdf.select(columns + ['scaled_columns_clustering'])
                  
        if 'gaussmix' in models:
            model = GaussianMixture(featuresCol='scaled_columns_clustering', predictionCol="clusters_label",
                                    maxIter=20).setK(k_clusters).fit(sdf)
            sdf = model.transform(sdf)

            silhouette = ClusteringEvaluator(featuresCol='scaled_columns_clustering', predictionCol="clusters_label",
                                             metricName="silhouette").evaluate(sdf)

            if silhouette > best_silhouette:
                best_silhouette = silhouette
                best_model = model
                nbr_clusters = k_clusters
                sdf_clustered = sdf
            
            sdf = sdf.select(columns + ['scaled_columns_clustering'])
    
    sdf_clustered = sdf_clustered.select(columns + ['clusters_label'])
    
    return sdf_clustered, nbr_clusters, best_model, best_silhouette

In [34]:
k_clusters_list = [k for k in [2, 3, 4, 5, 8, 10] + list(sample(range(11, 30), size=5, replace=False))]

k_clusters_list = [2]

################################################################### NON Runnare

sdf, nbr_clusters, clustering_model, silhouette_score = best_clustering(spark_df=sdf,
                                                                        columns_clustering=columns_clustering,
                                                                        k_clusters_iter=k_clusters_list)

In [35]:
clustering_model

GaussianMixture_c5ad9924221d

In [36]:
nbr_clusters

2

In [37]:
silhouette_score

0.7067610880648694

il migior modello di Clustering è un Gaussian Mixture Model con k=2, cioè un modello che trova due distribuzioni Normali nel dataset che massimizzano la Log-Likelihood.

Usiamo i clusters generati da questo modello per fillare i Missing Values mancanti.

# *** sei qui

In [39]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT *,
    CASE WHEN Credit_Score IS NULL THEN
        BIGINT( PERCENTILE(Credit_Score, 0.5) OVER(PARTITION BY cluster_label, Years_in_current_job, Home_Ownership, Number_of_Open_Accounts, Years_of_Credit_History) )
    ELSE Credit_Score END AS Credit_Score_new,
    
    CASE WHEN Annual_Income IS NULL THEN
        BIGINT( PERCENTILE(Annual_Income, 0.5) OVER(PARTITION BY cluster_label, Years_in_current_job, Home_Ownership, Number_of_Open_Accounts, Years_of_Credit_History) )
    ELSE Annual_Income END AS Annual_Income_new,
    
FROM Bank_Loan_Dataset
"""

sdf = spark.sql(sql)

sdf = sdf.drop('Credit_Score').withColumnRenamed("Credit_Score_new", "Credit_Score")
sdf = sdf.drop('Annual_Income').withColumnRenamed("Annual_Income_new", "Annual_Income")

AnalysisException: "cannot resolve '`Credit_Score`' given input columns: []; line 3 pos 14;\n'Project [CASE WHEN isnull('Credit_Score) THEN 'BIGINT('PERCENTILE('Credit_Score, 0.5) windowspecdefinition('cluster_label, 'Years_in_current_job, 'Home_Ownership, 'Number_of_Open_Accounts, 'Years_of_Credit_History, unspecifiedframe$())) ELSE 'Credit_Score END AS Credit_Score_new#19578, CASE WHEN isnull('Annual_Income) THEN 'BIGINT('PERCENTILE('Annual_Income, 0.5) windowspecdefinition('cluster_label, 'Years_in_current_job, 'Home_Ownership, 'Number_of_Open_Accounts, 'Years_of_Credit_History, unspecifiedframe$())) ELSE 'Annual_Income END AS Annual_Income_new#19579, 'FROM AS Bank_Loan_Dataset#19580]\n+- OneRowRelation\n"

In [41]:
nbr_nulls = get_nbr_nulls(spark_df = sdf, view_name = 'Bank_Loan_Dataset')

Loan_Status:          0
Current_Loan_Amount:          0
Term:          0
Credit_Score:      14947
Annual_Income:      14947
Years_in_current_job:          0
Home_Ownership:          0
Purpose:          0
Monthly_Debt:          0
Years_of_Credit_History:          0
Months_since_last_delinquent:          0
Number_of_Open_Accounts:          0
Number_of_Credit_Problems:          0
Current_Credit_Balance:          0
Maximum_Open_Credit:          0
Bankruptcies:          0
Tax_Liens:          0
clusters_label:          0


# New variables creation - Features Engineering

In [52]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql ="""
SELECT *, ROUND((Monthly_Debt/Current_Loan_Amount)*100, 4) AS Installment_Rate
FROM Bank_Loan_Dataset
"""

sdf = spark.sql(sql)

Si è deciso di creare una nuova variabile "(Monthly_Debt*12)/Annual_Income " per comprendere quanto del reddito annuale percepiuto viene utilizzato per pagare il debito creato verso la banca poichè può influenzare il Loan Status

In [53]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql ="""
SELECT *, ROUND(((Monthly_Debt*12)/Annual_Income)*100, 4) AS Debt_Income_Rate
FROM Bank_Loan_Dataset
"""

sdf = spark.sql(sql)

Number_of_Credit_Problems/Number_of_Open_Accounts?

In [54]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql ="""
SELECT *, ROUND((Number_of_Credit_Problems/Number_of_Open_Accounts)*100, 4) AS Credit_Problems_Perc
FROM Bank_Loan_Dataset
"""

sdf = spark.sql(sql)

In [55]:
sdf.printSchema()

root
 |-- Loan_Status: string (nullable = true)
 |-- Current_Loan_Amount: long (nullable = true)
 |-- Term: string (nullable = true)
 |-- Credit_Score: long (nullable = true)
 |-- Annual_Income: long (nullable = true)
 |-- Years_in_current_job: double (nullable = true)
 |-- Home_Ownership: string (nullable = true)
 |-- Purpose: string (nullable = true)
 |-- Monthly_Debt: double (nullable = true)
 |-- Years_of_Credit_History: double (nullable = true)
 |-- Months_since_last_delinquent: string (nullable = true)
 |-- Number_of_Open_Accounts: long (nullable = true)
 |-- Number_of_Credit_Problems: long (nullable = true)
 |-- Current_Credit_Balance: long (nullable = true)
 |-- Maximum_Open_Credit: long (nullable = true)
 |-- Bankruptcies: long (nullable = true)
 |-- Tax_Liens: long (nullable = true)
 |-- clusters_label: integer (nullable = false)
 |-- Installment_Rate: double (nullable = true)
 |-- Debt_Income_Rate: double (nullable = true)
 |-- Credit_Problems_Perc: double (nullable = true)


la nuova variabile Installment_Rate è coerente, infatti il suo valore medio nel dataset (il tasso medio della rata del prestito) coincide con il tasso governativo decennale Russo del 2016.

In [57]:
new_columns=['Installment_Rate', 'Debt_Income_Rate' , 'Credit_Problems_Perc']

sdf.createOrReplaceTempView('Bank_Loan_Dataset')

for col in new_columns:
    sql = """
    SELECT  AVG({0}), COUNT(DISTINCT {0}) AS DistinctVal, MIN(CAST({0} AS DOUBLE)) AS MinVal, MAX(CAST({0} AS DOUBLE)) AS MaxVal, PERCENTILE(CAST({0} AS DOUBLE),0.5) AS MEDIAN
    FROM Bank_Loan_Dataset
    """.format(col)

    spark.sql(sql).show()

+---------------------+-----------+------+--------+-------+
|avg(Installment_Rate)|DistinctVal|MinVal|  MaxVal| MEDIAN|
+---------------------+-----------+------+--------+-------+
|     7.98956803249926|      55721|   0.0|282.5451|5.74305|
+---------------------+-----------+------+--------+-------+

+---------------------+-----------+------+-------+-------+
|avg(Debt_Income_Rate)|DistinctVal|MinVal| MaxVal| MEDIAN|
+---------------------+-----------+------+-------+-------+
|   17.313526831453842|      14531|   0.0|40.0007|16.9998|
+---------------------+-----------+------+-------+-------+

+-------------------------+-----------+------+------+------+
|avg(Credit_Problems_Perc)|DistinctVal|MinVal|MaxVal|MEDIAN|
+-------------------------+-----------+------+------+------+
|       1.7908512295524486|        104|   0.0| 300.0|   0.0|
+-------------------------+-----------+------+------+------+



Controlliamo le correlazione tra le nuove variabili e quelle da cui hanno origine

In [83]:
Statistics.corr(\
                sdf.select(['Installment_Rate', 'Monthly_Debt', 'Current_Loan_Amount'])\
                .rdd.map(lambda row: Vectors_mllib.dense(row)) )

array([[ 1.        ,  0.32990332, -0.38271919],
       [ 0.32990332,  1.        ,  0.43415914],
       [-0.38271919,  0.43415914,  1.        ]])

In [84]:
Statistics.corr(\
                sdf.select(['Debt_Income_Rate', 'Monthly_Debt','Annual_Income'])\
                .rdd.map(lambda row: Vectors_mllib.dense(row)))

array([[ 1., nan, nan],
       [nan,  1., nan],
       [nan, nan,  1.]])

In [85]:
Statistics.corr(\
                sdf.select(['Credit_Problems_Perc', 'Number_of_Credit_Problems', 'Number_of_Open_Accounts'])\
                .rdd.map(lambda row: Vectors_mllib.dense(row)))

array([[ 1.        ,         nan,         nan],
       [        nan,  1.        , -0.01296336],
       [        nan, -0.01296336,  1.        ]])

# Check finale ed Export nell'HDFS del Dataset definitivo

In [None]:
columns = sdf.schema.names

columns_categorical = [col.name for col in sdf.schema.fields if isinstance(col.dataType, StringType)]

columns_numerical = [col for col in columns if col not in columns_categorical]

In [None]:
columns

In [None]:
columns_categorical

In [None]:
columns_numerical

In [None]:
nbr_nulls = get_nbr_nulls(spark_df = sdf, view_name = 'Bank_Loan_Dataset')

In [None]:
nbr_distincts = get_nbr_distincts(spark_df = sdf, view_name = 'Bank_Loan_Dataset')

In [42]:
sdf.write.parquet("hdfs://kddrtserver11.isti.cnr.it:9000/user/hpsa04/bank_loan_status_dataset")

In [50]:
# il dataset poi viene letto tramite:
# sdf = spark.read.parquet("hdfs://kddrtserver11.isti.cnr.it:9000/user/hpsa04/bank_loan_status_dataset")

In [59]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT COUNT(*) AS nbr_rows
FROM Bank_Loan_Dataset
"""

spark.sql(sql).show()

+--------+
|nbr_rows|
+--------+
|   74094|
+--------+



In [60]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT Loan_Status, COUNT(*) AS nbr_rows
FROM Bank_Loan_Dataset
GROUP BY Loan_Status """

spark.sql(sql).show()

+-----------+--------+
|Loan_Status|nbr_rows|
+-----------+--------+
| Fully Paid|   51455|
|Charged Off|   22639|
+-----------+--------+



In [61]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT Loan_Status, AVG(Credit_Score) AS avg_Credit_Score
FROM Bank_Loan_Dataset
GROUP BY Loan_Status """

spark.sql(sql).show()

+-----------+-----------------+
|Loan_Status| avg_Credit_Score|
+-----------+-----------------+
| Fully Paid|719.9762023988363|
|Charged Off| 710.390470656595|
+-----------+-----------------+



In [62]:
sdf.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT Loan_Status, AVG(Annual_Income) AS avg_Annual_Income
FROM Bank_Loan_Dataset
GROUP BY Loan_Status """

spark.sql(sql).show()

+-----------+------------------+
|Loan_Status| avg_Annual_Income|
+-----------+------------------+
| Fully Paid|20161.219233612323|
|Charged Off|18111.277164439278|
+-----------+------------------+



# Ricordati!

anche se non esistono valori distinti  COUNT(\*)  può differire da COUNT(DISTINCT \*).

perché il primo conta tutte le righe mentre il secondo conta tutte e sole le righe dove non è presente neanche un NULL value

In [None]:
sql = """
SELECT COUNT(*) AS nbr_rows
FROM Bank_Loan_Dataset
"""

spark.sql(sql).show()

In [None]:
sql = """
SELECT COUNT(Credit_Score) AS nbr_rows
FROM Bank_Loan_Dataset
"""

spark.sql(sql).show()

In [None]:
sql = """
SELECT COUNT(DISTINCT *) AS nbr_rows
FROM Bank_Loan_Dataset
"""

spark.sql(sql).show()

# tentativi

In [42]:
'''  tentativo non riuscito per fillare i missing values
rdd = sdf.rdd.map(lambda row: row.asDict())

rdd.keyBy(lambda row: (row['Years_in_current_job'],
                             row['Home_Ownership'],
                             row['Number_of_Open_Accounts'],
                             row['Years_of_Credit_History'])).groupByKey()
                             
def fill_null(d):
    if d['Maximum_Open_Credit'] == None:
        d['Maximum_Open_Credit'] = 
    return d

rdd.mapValues(fill_null)
'''

"  tentativo non riuscito\nrdd = sdf.rdd.map(lambda row: row.asDict())\n\nrdd.keyBy(lambda row: (row['Years_in_current_job'],\n                             row['Home_Ownership'],\n                             row['Number_of_Open_Accounts'],\n                             row['Years_of_Credit_History'])).groupByKey()\n                             \ndef fill_null(d):\n    if d['Maximum_Open_Credit'] == None:\n        d['Maximum_Open_Credit'] = \n    return d\n\nrdd.mapValues(fill_null)\n"

In [42]:
'''
X = sdf.select(columns_clustering).rdd.map(lambda row: Vectors_mllib.dense(row))

X_scaled = StandardScaler_mllib(withMean=True, withStd=True).fit(X).transform(X)


'''''' questo da un errore inspiegabilmente
for k_clusters in sample(range(2, 10), size=1, replace=False):
    current_clustering_model = KMeans.train(X_scaled, k=k_clusters, maxIterations=20, initializationMode="random")
''''''


KMeans_model = KMeans_mllib.train(X_scaled, k = 1, maxIterations=10, initializationMode="random")


def error(point):
    center = KMeans_model.centers[KMeans_model.predict(point)]
    return sqrt(sum([x**2 for x in (point - center)]))

WSSE = X_scaled.map(lambda point: error(point)).reduce(lambda x, y: x + y)
print("Best Within Sum of Squared Error = ", WSSE)


GaussianMixture_model = GaussianMixture_mllib.train(X_scaled, k=1)

BisectingKMeans_model = BisectingKMeans_mllib.train(X_scaled, k=1, maxIterations=10)

'''

'\nX = sdf.select(columns_clustering).rdd.map(lambda row: Vectors_mllib.dense(row))\n\nX_scaled = StandardScaler_mllib(withMean=True, withStd=True).fit(X).transform(X)\n\n\n questo da un errore inspiegabilmente\nfor k_clusters in sample(range(2, 10), size=1, replace=False):\n    current_clustering_model = KMeans.train(X_scaled, k=k_clusters, maxIterations=20, initializationMode="random")\n\n\n\nKMeans_model = KMeans_mllib.train(X_scaled, k = 1, maxIterations=10, initializationMode="random")\n\n\ndef error(point):\n    center = KMeans_model.centers[KMeans_model.predict(point)]\n    return sqrt(sum([x**2 for x in (point - center)]))\n\nWSSE = X_scaled.map(lambda point: error(point)).reduce(lambda x, y: x + y)\nprint("Best Within Sum of Squared Error = ", WSSE)\n\n\nGaussianMixture_model = GaussianMixture_mllib.train(X_scaled, k=1)\n\nBisectingKMeans_model = BisectingKMeans_mllib.train(X_scaled, k=1, maxIterations=10)\n\n'

In [None]:
'''
sdf_prova.createOrReplaceTempView('Bank_Loan_Dataset')

sql = """
SELECT *,
    BIGINT( PERCENTILE(Credit_Score, 0.5) OVER(PARTITION BY cluster_label) ) AS toFill_Credit_Score,
    BIGINT( PERCENTILE(Annual_Income, 0.5) OVER(PARTITION BY cluster_label) ) AS toFill_Annual_Income
FROM Bank_Loan_Dataset
"""

sdf_prova = spark.sql(sql)

def fill_nulls(row):
    d = row.asDict()
    if d['Credit_Score'] is None:
        d['Credit_Score'] = d['toFill_Credit_Score']
    if d['Annual_Income'] is None:
        d['Annual_Income'] = d['toFill_Annual_Income']
    return Row(**d)

sdf_prova = sdf_prova.rdd.map(fill_nulls).toDF().select(columns_prova)
'''