In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=9a97c3ee4c523e267dc3e2eb9e78e4894270bbdd728ed9c7bebc7fad844f916d
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('learn_data_cleansing').getOrCreate()

customers_df = spark.read.format('csv').option("inferSchema","true").option("header","true").load("./customers.csv")

#temp table
customers_df.createOrReplaceTempView("customers_table")

In [4]:
print(customers_df.printSchema())
customers_df.summary().show()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- home_address: string (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)

None
+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|summary|       customer_id|customer_name|           gender|               age|        home_address|         zip_code|     city|               state|  country|
+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|  count|              1007|         1007|              989|              1007|                1007|             1007|     1007|                1007|

In [7]:
from pyspark.sql.functions import isnull, isnan

customers_df.where(isnull('gender') | isnan('gender')).show()
print("Jumlah missing value: ", customers_df.where(isnull('gender') | isnan('gender')).count())

+-----------+-------------+------+---+--------------------+--------+--------------------+--------------------+---------+
|customer_id|customer_name|gender|age|        home_address|zip_code|                city|               state|  country|
+-----------+-------------+------+---+--------------------+--------+--------------------+--------------------+---------+
|         39|     fulan 39|  NULL| 80|7440 Cameron Esta...|    4622|North Victoriache...|  Northern Territory|Australia|
|        168|    fulan 168|  NULL| 27|2781 Berge MallSu...|    1975|      North Leoburgh|   Western Australia|Australia|
|        322|    fulan 322|  NULL| 30|593 Becker Circle...|    1640|          Jacobiview|   Western Australia|Australia|
|        393|    fulan 393|  NULL| 34|5158 Levi HillSui...|    1474|          Johnsburgh|          Queensland|Australia|
|        442|    fulan 442|  NULL| 26|5157 Feil RoadApt...|    7249|          Port Chloe|     New South Wales|Australia|
|        720|    fulan 720|  NUL

In [8]:
print("Jumlah duplikasi : ", (customers_df.count() - customers_df.distinct().count()))

Jumlah duplikasi :  6


In [9]:
from pyspark.sql.functions import col

new_customers_df = customers_df.withColumn("customer_id", col("customer_id").cast("string"))
new_customers_df = new_customers_df.withColumn("zip_code", col("zip_code").cast("string"))
new_customers_df.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- home_address: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)



In [10]:
new_customers_df.na.drop()
# new_customers_df.na.drop(how="any") # Menghapus seluruh baris data yang mengandung missing value (walaupun hanya 1)
# customers_df.na.drop(how="all") # Menghapus seluruh baris data jika seluruh baris tersebut bernilai null atau nan
# new_customers_df.na.drop(thresh=2) # Menghapus seluruh baris data jika baris tersebut mengandung minimal 2 missing value

DataFrame[customer_id: string, customer_name: string, gender: string, age: int, home_address: string, zip_code: string, city: string, state: string, country: string]

In [11]:
new_customers_df = new_customers_df.na.drop()
print("Jumlah missing value : ", new_customers_df.where(isnull('gender') | isnan('gender')).count())

Jumlah missing value :  0


In [12]:
customers_df.na.fill("Prefer not to say", subset=["gender"])

DataFrame[customer_id: int, customer_name: string, gender: string, age: int, home_address: string, zip_code: int, city: string, state: string, country: string]

In [13]:
from pyspark import pandas as ps

customers_df_pandas = ps.read_csv("./customers.csv")
customers_df_pandas["age"].interpolate(method='linear')



0       30.0
1       69.0
2       59.0
3       67.0
4       30.0
5       40.0
6       76.0
7       75.0
8       51.0
9       70.0
10      39.0
11      78.0
12      42.0
13      36.0
14      34.0
15      75.0
16      32.0
17      79.0
18      41.0
19      55.0
20      31.0
21      20.0
22      32.0
23      49.0
24      75.0
25      50.0
26      55.0
27      79.0
28      20.0
29      38.0
30      36.0
31      32.0
32      44.0
33      55.0
34      80.0
35      27.0
36      75.0
37      34.0
38      80.0
39      59.0
40      65.0
41      37.0
42      34.0
43      60.0
44      27.0
45      68.0
46      54.0
47      79.0
48      56.0
49      63.0
50      52.0
51      47.0
52      51.0
53      62.0
54      45.0
55      33.0
56      65.0
57      39.0
58      58.0
59      20.0
60      65.0
61      32.0
62      78.0
63      75.0
64      64.0
65      34.0
66      59.0
67      23.0
68      79.0
69      48.0
70      63.0
71      42.0
72      48.0
73      79.0
74      75.0
75      51.0
76      62.0

In [14]:
new_customers_df.where(new_customers_df.age > 100).show()

+-----------+-------------+-----------------+---+--------------------+--------+----------+------------------+---------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|      city|             state|  country|
+-----------+-------------+-----------------+---+--------------------+--------+----------+------------------+---------+
|        216|    fulan 216|Prefer not to say|500|038 Haley MewsApt...|    3991| Bayertown|Northern Territory|Australia|
|        961|    fulan 961|Prefer not to say|700|29 Farrell Parade...|    6528|New Joseph|   South Australia|Australia|
+-----------+-------------+-----------------+---+--------------------+--------+----------+------------------+---------+



In [15]:
from pyspark.sql.functions import when

new_customers_df = new_customers_df.withColumn(
    "age", when(new_customers_df.age == 700,70)
    .when(new_customers_df.age == 500,50)
    .otherwise(new_customers_df.age)
)

new_customers_df.summary().show()

+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|summary|       customer_id|customer_name|           gender|               age|        home_address|         zip_code|     city|               state|  country|
+-------+------------------+-------------+-----------------+------------------+--------------------+-----------------+---------+--------------------+---------+
|  count|               989|          989|              989|               989|                 989|              989|      989|                 989|      989|
|   mean|498.27805864509605|         NULL|             NULL|49.876643073811934|                NULL|5026.199191102123|     NULL|                NULL|     NULL|
| stddev|287.67376465771207|         NULL|             NULL|17.651855611617894|                NULL|2880.569897954812|     NULL|                NULL|     NULL|
|    min|                 1|      fulan 

In [18]:
#Drop duplicate
new_customers_df = new_customers_df.dropDuplicates()
print("Jumlah duplikasi: ", (new_customers_df.count() - new_customers_df.distinct().count()))

Jumlah duplikasi:  0


In [19]:
#Preprocessing

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler

# Mengubah nilai pada kolom age menjadi vector
assembler = VectorAssembler().setInputCols(['age']).setOutputCol('vec_age')
preprocess_customers_df = assembler.transform(new_customers_df)

standard_scaler = StandardScaler().setInputCol('vec_age').setOutputCol("StandardScaler_age")
preprocess_customers_df = standard_scaler.fit(preprocess_customers_df).transform(preprocess_customers_df)
preprocess_customers_df.show(5)

+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|             city|               state|  country|vec_age|  StandardScaler_age|
+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+
|        145|    fulan 145|Prefer not to say| 43|2044 Johns DriveA...|    8332|    Lake Madeline|Australian Capita...|Australia| [43.0]| [2.435520713222145]|
|        210|    fulan 210|Prefer not to say| 79|476 Mueller Crest...|    5663|West Claudiaburgh|     New South Wales|Australia| [79.0]| [4.474561310338359]|
|        289|    fulan 289|Prefer not to say| 43|068 Logan CourtSu...|     612|   East Kaylabury|Australian Capita...|Australia| [43.0]| [2.435520713222145]|
|        574|    fulan 574|Prefer not to say| 53|180

In [20]:
#Min Max Scaling
from pyspark.ml.feature import MinMaxScaler

min_max_scaler = MinMaxScaler().setInputCol("vec_age").setOutputCol("MinMaxScaler_age")
preprocess_customers_df = min_max_scaler.fit(preprocess_customers_df).transform(preprocess_customers_df)

preprocess_customers_df.show(5)

+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+--------------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|             city|               state|  country|vec_age|  StandardScaler_age|    MinMaxScaler_age|
+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+--------------------+
|        145|    fulan 145|Prefer not to say| 43|2044 Johns DriveA...|    8332|    Lake Madeline|Australian Capita...|Australia| [43.0]| [2.435520713222145]|[0.3833333333333333]|
|        210|    fulan 210|Prefer not to say| 79|476 Mueller Crest...|    5663|West Claudiaburgh|     New South Wales|Australia| [79.0]| [4.474561310338359]|[0.9833333333333333]|
|        289|    fulan 289|Prefer not to say| 43|068 Logan CourtSu...|     612|   East Kaylabury|Australi

In [21]:
from pyspark.ml.feature import StringIndexer

label_encoder = StringIndexer().setInputCol("gender").setOutputCol("label_gender")
preprocess_customers_df = label_encoder.fit(preprocess_customers_df).transform(preprocess_customers_df)
preprocess_customers_df.show(5)

+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+--------------------+------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|             city|               state|  country|vec_age|  StandardScaler_age|    MinMaxScaler_age|label_gender|
+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+--------------------+------------+
|        145|    fulan 145|Prefer not to say| 43|2044 Johns DriveA...|    8332|    Lake Madeline|Australian Capita...|Australia| [43.0]| [2.435520713222145]|[0.3833333333333333]|         0.0|
|        210|    fulan 210|Prefer not to say| 79|476 Mueller Crest...|    5663|West Claudiaburgh|     New South Wales|Australia| [79.0]| [4.474561310338359]|[0.9833333333333333]|         0.0|
|        289|    fulan 289|Prefer not to

In [22]:
  from pyspark.ml.feature import OneHotEncoder

  one_hot_encoder = OneHotEncoder().setInputCol("label_gender").setOutputCol("one_hot_gender")
  one_hot_encoder.fit(preprocess_customers_df).transform(preprocess_customers_df).show(5)

+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+--------------------+------------+--------------+
|customer_id|customer_name|           gender|age|        home_address|zip_code|             city|               state|  country|vec_age|  StandardScaler_age|    MinMaxScaler_age|label_gender|one_hot_gender|
+-----------+-------------+-----------------+---+--------------------+--------+-----------------+--------------------+---------+-------+--------------------+--------------------+------------+--------------+
|        145|    fulan 145|Prefer not to say| 43|2044 Johns DriveA...|    8332|    Lake Madeline|Australian Capita...|Australia| [43.0]| [2.435520713222145]|[0.3833333333333333]|         0.0| (2,[0],[1.0])|
|        210|    fulan 210|Prefer not to say| 79|476 Mueller Crest...|    5663|West Claudiaburgh|     New South Wales|Australia| [79.0]| [4.474561310338359]|[0.983333333333

In [23]:
from pyspark.ml.feature import VectorAssembler, PCA

#Membuat feature buatan untuk digunakan dalam proses PCA
assembler = VectorAssembler().setInputCols(['StandardScaler_age', 'MinMaxScaler_age']).setOutputCol('features')
preprocess_customers_df = assembler.transform(preprocess_customers_df)

pca = PCA().setInputCol("features").setOutputCol("PCA_age").setK(1)
preprocess_customers_df = pca.fit(preprocess_customers_df).transform(preprocess_customers_df)
preprocess_customers_df.show(5, truncate=False)

+-----------+-------------+-----------------+---+--------------------------+--------+-----------------+----------------------------+---------+-------+--------------------+--------------------+------------+--------------------------------------+---------------------+
|customer_id|customer_name|gender           |age|home_address              |zip_code|city             |state                       |country  |vec_age|StandardScaler_age  |MinMaxScaler_age    |label_gender|features                              |PCA_age              |
+-----------+-------------+-----------------+---+--------------------------+--------+-----------------+----------------------------+---------+-------+--------------------+--------------------+------------+--------------------------------------+---------------------+
|145        |fulan 145    |Prefer not to say|43 |2044 Johns DriveApt. 201  |8332    |Lake Madeline    |Australian Capital Territory|Australia|[43.0] |[2.435520713222145] |[0.3833333333333333]|0.0    