In [58]:
!pip install pyspark



In [134]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.appName('Bank').getOrCreate()

In [135]:
# đọc dữ liệu với spark
df = spark.read.csv('/content/sample_data/bank.csv',header=True)

df.printSchema()

root
 |-- age: string (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: string (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: string (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- campaign: string (nullable = true)
 |-- pdays: string (nullable = true)
 |-- previous: string (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [136]:
import pyspark.sql.types as tp

# xác định lược đồ
df_schema = tp.StructType([
    tp.StructField(name= 'age',      dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'job', dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'marital',       dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'education',  dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'default',   dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'balance',       dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'housing',      dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'loan', dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'contact',       dataType= tp.StringType(),   nullable= True),
    tp.StructField(name= 'day',  dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'month',   dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'duration',       dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'campaign', dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'pdays',       dataType= tp.IntegerType(),   nullable= True),
    tp.StructField(name= 'previous',  dataType= tp.IntegerType(),    nullable= True),
    tp.StructField(name= 'poutcome',   dataType= tp.StringType(),    nullable= True),
    tp.StructField(name= 'deposit',       dataType= tp.StringType(),    nullable= True) 
])

# đọc dữ liệu với lược đồ đã xác định
df = spark.read.csv('/content/sample_data/bank.csv',schema= df_schema,header= True)

# in lược đồ
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- deposit: string (nullable = true)



In [137]:
# bỏ cột
df2 = df.drop(*['default','contact','poutcome'])
df2.columns

['age',
 'job',
 'marital',
 'education',
 'balance',
 'housing',
 'loan',
 'day',
 'month',
 'duration',
 'campaign',
 'pdays',
 'previous',
 'deposit']

In [138]:
# lấy kích thước của dữ liệu
(df.count() , len(df.columns))

(11162, 17)

In [139]:
# lấy bản tóm tắt của các cột dữ liệu
df.select('age', 'day', 'balance').describe().show()

+-------+------------------+------------------+------------------+
|summary|               age|               day|           balance|
+-------+------------------+------------------+------------------+
|  count|             11162|             11162|             11162|
|   mean|41.231947679627304|15.658036194230425|1528.5385235620856|
| stddev|11.913369192215518| 8.420739541006462| 3225.413325946149|
|    min|                18|                 1|             -6847|
|    max|                95|                31|             81204|
+-------+------------------+------------------+------------------+



In [140]:
# import sql với pyspark
import pyspark.sql.functions as f

# tìm giá trị null ở mỗi cột
df_agg = df.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in df.columns])
df_agg.show()

+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|age|job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+
|  0|  0|      0|        0|      0|      0|      0|   0|      0|  0|    0|       0|       0|    0|       0|       0|      0|
+---+---+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+



In [141]:
# dùng group by
df.groupBy('education').count().show()

+---------+-----+
|education|count|
+---------+-----+
|  unknown|  497|
| tertiary| 3689|
|secondary| 5476|
|  primary| 1500|
+---------+-----+



In [142]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# tạo đối tượng lớp StringIndexer và chỉ định cột đầu vào và đầu ra
SI_education = StringIndexer(inputCol='education',outputCol='education_Index')
SI_marital = StringIndexer(inputCol='marital',outputCol='marital_Index')
SI_job = StringIndexer(inputCol='job',outputCol='job_Index')
SI_default = StringIndexer(inputCol='default',outputCol='default_Index')
SI_housing = StringIndexer(inputCol='housing',outputCol='housing_Index')
SI_loan = StringIndexer(inputCol='loan',outputCol='loan_Index')
SI_contact = StringIndexer(inputCol='contact',outputCol='contact_Index')
SI_month = StringIndexer(inputCol='month',outputCol='month_Index')
SI_poutcome = StringIndexer(inputCol='poutcome',outputCol='poutcome_Index')
SI_deposit = StringIndexer(inputCol='deposit',outputCol='deposit_Index')

# chuyển đổi dữ liệu
df = SI_education.fit(df).transform(df)
df = SI_marital.fit(df).transform(df)
df = SI_job.fit(df).transform(df)
df = SI_default.fit(df).transform(df)
df = SI_housing.fit(df).transform(df)
df = SI_loan.fit(df).transform(df)
df = SI_contact.fit(df).transform(df)
df = SI_month.fit(df).transform(df)
df = SI_poutcome.fit(df).transform(df)
df = SI_deposit.fit(df).transform(df)

# xem dữ liệu sau khi chuyển đổi
df.select('education', 'education_Index', 'marital', 'marital_Index').show(20)

+---------+---------------+--------+-------------+
|education|education_Index| marital|marital_Index|
+---------+---------------+--------+-------------+
|secondary|            0.0| married|          0.0|
|secondary|            0.0| married|          0.0|
|secondary|            0.0| married|          0.0|
|secondary|            0.0| married|          0.0|
| tertiary|            1.0| married|          0.0|
| tertiary|            1.0|  single|          1.0|
| tertiary|            1.0| married|          0.0|
|secondary|            0.0|divorced|          2.0|
|secondary|            0.0| married|          0.0|
|secondary|            0.0|  single|          1.0|
|secondary|            0.0|  single|          1.0|
|secondary|            0.0| married|          0.0|
| tertiary|            1.0| married|          0.0|
| tertiary|            1.0|  single|          1.0|
| tertiary|            1.0|  single|          1.0|
| tertiary|            1.0|divorced|          2.0|
|  primary|            2.0|  si

**One-Hot Encoding**

> Dùng để mã hóa các dữ liệu bị thiếu. Đầu tiên, chúng ta cần sử dụng StringIndexer để chuyển biến thành dạng số và sau đó sử dụng OneHotEncoderEstimator để mã hóa nhiều cột của tập dữ liệu.



In [143]:
from pyspark.ml.feature import OneHotEncoder

# tạo đối tượng và chỉ định cột đầu vào và đầu ra
OHE = OneHotEncoder(inputCols=['education_Index', 'marital_Index'],outputCols=['education_OHE', 'marital_OHE'])

# chuyển đổi dữ liệu
df = OHE.fit(df).transform(df)

# xem dữ liệu
df.select('education', 'education_Index', 'education_OHE', 'marital', 'marital_Index', 'marital_OHE').show(10)


+---------+---------------+-------------+--------+-------------+-------------+
|education|education_Index|education_OHE| marital|marital_Index|  marital_OHE|
+---------+---------------+-------------+--------+-------------+-------------+
|secondary|            0.0|(3,[0],[1.0])| married|          0.0|(2,[0],[1.0])|
|secondary|            0.0|(3,[0],[1.0])| married|          0.0|(2,[0],[1.0])|
|secondary|            0.0|(3,[0],[1.0])| married|          0.0|(2,[0],[1.0])|
|secondary|            0.0|(3,[0],[1.0])| married|          0.0|(2,[0],[1.0])|
| tertiary|            1.0|(3,[1],[1.0])| married|          0.0|(2,[0],[1.0])|
| tertiary|            1.0|(3,[1],[1.0])|  single|          1.0|(2,[1],[1.0])|
| tertiary|            1.0|(3,[1],[1.0])| married|          0.0|(2,[0],[1.0])|
|secondary|            0.0|(3,[0],[1.0])|divorced|          2.0|    (2,[],[])|
|secondary|            0.0|(3,[0],[1.0])| married|          0.0|(2,[0],[1.0])|
|secondary|            0.0|(3,[0],[1.0])|  single|  

**Vector Assembler**


> Kết hợp danh sách các cột thành một vector



In [144]:
from pyspark.ml.feature import VectorAssembler

# chỉ định các cột đầu vào và đầu ra của vector assembler
assembler = VectorAssembler(inputCols=[
                                       'education_Index',
                                       'marital_Index',
                                       'education_OHE',
                                       'marital_OHE'],
                           outputCol='vector')

# điền các chỗ trống là 0
df_schema = df.fillna(0)

# transform the data
final_df = assembler.transform(df)

# view the transformed vector
final_df.select('vector').show()

+--------------------+
|              vector|
+--------------------+
| (7,[2,5],[1.0,1.0])|
| (7,[2,5],[1.0,1.0])|
| (7,[2,5],[1.0,1.0])|
| (7,[2,5],[1.0,1.0])|
|(7,[0,3,5],[1.0,1...|
|[1.0,1.0,0.0,1.0,...|
|(7,[0,3,5],[1.0,1...|
| (7,[1,2],[2.0,1.0])|
| (7,[2,5],[1.0,1.0])|
|(7,[1,2,6],[1.0,1...|
|(7,[1,2,6],[1.0,1...|
| (7,[2,5],[1.0,1.0])|
|(7,[0,3,5],[1.0,1...|
|[1.0,1.0,0.0,1.0,...|
|[1.0,1.0,0.0,1.0,...|
|(7,[0,1,3],[1.0,2...|
|[2.0,1.0,0.0,0.0,...|
| (7,[2,5],[1.0,1.0])|
| (7,[2,5],[1.0,1.0])|
| (7,[1,2],[2.0,1.0])|
+--------------------+
only showing top 20 rows



**Logistic Regression**

In [145]:
df.show()

+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+---------------+-------------+---------+-------------+-------------+----------+-------------+-----------+--------------+-------------+-------------+-------------+
|age|        job| marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|deposit|education_Index|marital_Index|job_Index|default_Index|housing_Index|loan_Index|contact_Index|month_Index|poutcome_Index|deposit_Index|education_OHE|  marital_OHE|
+---+-----------+--------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+-------+---------------+-------------+---------+-------------+-------------+----------+-------------+-----------+--------------+-------------+-------------+-------------+
| 59|     admin.| married|secondary|     no|   2343|    yes|  no|unknown|  5|  may|    1042|       1|   -1|     

In [146]:
import pyspark.sql.functions as F

df_ = df.drop(*['job','marital','education','default','housing','loan','contact','month','poutcome','deposit','education_OHE','marital_OHE','deposit'])
df_.show()

+---+-------+---+--------+--------+-----+--------+---------------+-------------+---------+-------------+-------------+----------+-------------+-----------+--------------+-------------+
|age|balance|day|duration|campaign|pdays|previous|education_Index|marital_Index|job_Index|default_Index|housing_Index|loan_Index|contact_Index|month_Index|poutcome_Index|deposit_Index|
+---+-------+---+--------+--------+-----+--------+---------------+-------------+---------+-------------+-------------+----------+-------------+-----------+--------------+-------------+
| 59|   2343|  5|    1042|       1|   -1|       0|            0.0|          0.0|      3.0|          0.0|          1.0|       0.0|          1.0|        0.0|           0.0|          1.0|
| 56|     45|  5|    1467|       1|   -1|       0|            0.0|          0.0|      3.0|          0.0|          0.0|       0.0|          1.0|        0.0|           0.0|          1.0|
| 41|   1270|  5|    1389|       1|   -1|       0|            0.0|         

In [154]:
df_.toPandas()

Unnamed: 0,age,balance,day,duration,campaign,pdays,previous,education_Index,marital_Index,job_Index,default_Index,housing_Index,loan_Index,contact_Index,month_Index,poutcome_Index,deposit_Index
0,59,2343,5,1042,1,-1,0,0.0,0.0,3.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0
1,56,45,5,1467,1,-1,0,0.0,0.0,3.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0
2,41,1270,5,1389,1,-1,0,0.0,0.0,2.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0
3,55,2476,5,579,1,-1,0,0.0,0.0,4.0,0.0,1.0,0.0,1.0,0.0,0.0,1.0
4,54,184,5,673,2,-1,0,1.0,0.0,3.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
11157,33,1,20,257,1,-1,0,2.0,1.0,1.0,0.0,1.0,0.0,0.0,5.0,0.0,0.0
11158,39,733,16,83,4,-1,0,0.0,0.0,4.0,0.0,0.0,0.0,1.0,3.0,0.0,0.0
11159,32,29,19,156,2,-1,0,0.0,1.0,2.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0
11160,43,0,8,9,2,172,5,0.0,0.0,2.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0


In [155]:
class_name = 'label'
feature_names = df_.columns[:-1]
print(class_name)
print(feature_names)

label
['age', 'balance', 'day', 'duration', 'campaign', 'pdays', 'previous', 'education_Index', 'marital_Index', 'job_Index', 'default_Index', 'housing_Index', 'loan_Index', 'contact_Index', 'month_Index', 'poutcome_Index']


In [156]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler()
assembler.setInputCols(feature_names).setOutputCol('features')

transformed_data = assembler.transform(df_)
transformed_data.show()

+---+-------+---+--------+--------+-----+--------+---------------+-------------+---------+-------------+-------------+----------+-------------+-----------+--------------+-------------+--------------------+
|age|balance|day|duration|campaign|pdays|previous|education_Index|marital_Index|job_Index|default_Index|housing_Index|loan_Index|contact_Index|month_Index|poutcome_Index|deposit_Index|            features|
+---+-------+---+--------+--------+-----+--------+---------------+-------------+---------+-------------+-------------+----------+-------------+-----------+--------------+-------------+--------------------+
| 59|   2343|  5|    1042|       1|   -1|       0|            0.0|          0.0|      3.0|          0.0|          1.0|       0.0|          1.0|        0.0|           0.0|          1.0|(16,[0,1,2,3,4,5,...|
| 56|     45|  5|    1467|       1|   -1|       0|            0.0|          0.0|      3.0|          0.0|          0.0|       0.0|          1.0|        0.0|           0.0|      

In [160]:
training_data, test_data = transformed_data.randomSplit([0.8,0.2])

In [165]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

model = LogisticRegression(featuresCol = 'features',labelCol='deposit_Index')
M = model.fit(training_data)
predictions = M.transform(test_data)

multi_evaluator = MulticlassClassificationEvaluator(labelCol = 'deposit_Index', metricName = 'accuracy')
print('Logistic Regression Accuracy:', multi_evaluator.evaluate(predictions))

Logistic Regression Accuracy: 0.796730004418913
