Dalam latihan ini kita akan melakukan pemrosesan data menggunakan data sales.

Format data adalah sbb :
- seq
- product_name
- qty
- date
- Size
- length
- cust_no

Kita akan menggunakan dataset yang mengandung bad records, atau baris yang formatnya tidak valid.

In [None]:
!wget https://www.dropbox.com/s/gktr38tfatnhcf8/salesdata_bad.csv

# Mempersiapkan environment

##a. Install pyspark package
Untuk menjalankan notebook ini di Google colab, langkah pertama yang perlu dilakukan adalah menginstall package `pyspark`, karena package tersebut tidak disertakan secara default.

Langkah ini perlu dilakukan setiap membuka session/notebook baru.

Instalasi kita lakukan dengan perintah `pip`

In [None]:
!pip install pyspark


##b. Create spark session

In [None]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [None]:
spark = SparkSession.builder.appName('Data Preprocessing').getOrCreate()

# 1.Loading Data : Menangani *bad data*



Cek format file yang sudah kita download dengan perintah `head`

In [None]:
!head salesdata_bad.csv

In [None]:
!wc -l salesdata_bad.csv

Load ke dataframe, karena mengandung header, kita set `header = True`

In [None]:
dfu = spark.read.csv("salesdata_bad.csv", header=True)
dfu.show(5)

Tampilkan skema dataframe. Secara default seluruh kolom dibaca sebagai string.

In [None]:
dfu.printSchema()

Tampilkan summary statistik

In [None]:
dfu.describe().show()

Dari statistik di atas kita bisa melihat bahwa terdapat record dengan nilai `product_name` dan/atau `qty` yang null

Kita juga bisa melihat adanya nilai non numerik pada kolom `qty`

##1.1 Mendefinisikan Skema

Pada umumnya data yang kita hadapi mengandung record-record invalid.

Untuk menanganinya pada saat loading, kita bisa mendefinisikan skema untuk dataframe yang akan kita gunakan.

Kita akan membutuhkan `StructType, StructField, IntegerType, StringType, DateType` dari library `pyspark.sql.types`.

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType

In [None]:
salesSchema = StructType([
    StructField("seq", IntegerType(), True),
    StructField("product_name", StringType(), True),
    StructField("qty", IntegerType(), True),
    StructField("salesdate", StringType(), True),
    StructField("size", StringType(), True),
    StructField("length", StringType(), True),
    StructField("cust_no", IntegerType(), True)])


##1.2 Menangani data reject/malformed

Spark menyediakan 3 mode setting untuk loading file csv dan json.

1. `PERMISSIVE` : load record yang invalid, selama null constrain terpenuhi
2. `DROPMALFORMED` : ignore/skip record yang tidak sesuai formatnya
3. `FAILFAST` : throw exception ketika menemukan record yang invalid

Secara default `mode` diset ke `PERMISSIVE`.

Ketika menggunakan mode ``PERMISSIVE``, semua data diload ke dataframe, dan kolom yang tidak valid diset ke NULL, karena skema yang digunakan nullability-nya diset `True`.


In [None]:
dfu_p = spark.read.csv("salesdata_bad.csv", header=True, schema=salesSchema)
print("Loaded records: ", dfu_p.count())
dfu_p.describe().show()

Ketika mode diset`DROPMALFORMED`, record dengan format invalid tidak di-load.


In [None]:
dfu_d = spark.read.csv("salesdata_bad.csv", header=True, schema=salesSchema, mode="DROPMALFORMED")
print("Loaded records: ", dfu_d.count())
dfu_d.describe().show()
dfu_d.count()

In [None]:
#dfu_d.show(100)

Ketika `mode` diset `FAILFAST`, terjadi error karena ada exception. Kita bisa memilih apa yang akan dilakukan dengan melakukan exception handling.

In [None]:
dfu_d = spark.read.csv("salesdata_bad.csv", header=True, schema=salesSchema, mode="FAILFAST")
print("Loaded records: ", dfu_d.count())
dfu_d.describe().show()

#2.Menangani NULL

##2.1 Menampilkan nilai NULL

Kita dapat memeriksa record mana yang memiliki null di salah satu kolom dengan memilih record menggunakan fungsi `isNull()` pada kolom tersebut.

Di sini terlihat perbedaan antara hasil loading menggunakan mode `PERMISIVE` dan `DROPMALFORMED`

In [None]:
print("PERMISSIVE with seq = null")
dfu_p[dfu_p["seq"].isNull()].show()
print("DROPMALFORMED with seq = null")
#dfu_d[dfu_d["seq"].isNull()].show()

print("PERMISSIVE with product_name = null")
dfu_p[dfu_p["product_name"].isNull()].show()
print("DROPMALFORMED with product_name = null")
#dfu_d[dfu_d["product_name"].isNull()].show()

print("PERMISSIVE with qty = null")
dfu_p[dfu_p["qty"].isNull()].show()
print("DROPMALFORMED with qty = null")
#dfu_d[dfu_d["qty"].isNull()].show()


Pertanyaan selanjutnya adalah: Apa yang harus dilakukan dengan nilai NULL?

Ada beberapa pilihan untuk menangani NULL, misalnya menghapus record, mengupdate nilai NULL ke suatu nilai konstan, atau berdasar kolom-kolom lainnya.


##2.2 Menghapus record dengan nilai NULL

Kita dapat menghapus record yang mengandung nilai null dengan menggunakan `dropna(how, thresh, subset)`.

Ada beberapa opsi yang dapat digunakan :
- hapus record dengan nilai NULL di kolom mana saja --> set `how = `**any**
- hapus record dengan semua kolom = null --> atur `how = `**all**
- hapus record dengan jumlah kolom yg null < n --> atur `thresh = (jumlah kolom - n)`
- hapus record dengan nilai NULL di kolom tertentu --> masukkan nama kolom di `subset`


Misalnya kita ingin menghapus semua record dengan nilai null di kolom manapun (kita akan menggunakan data yang dimuat dengan mode `PERMISIVE`)


In [None]:
print(dfu_p.count())
dfu_p.dropna().count()


Jika kita ingin memperbolehkan 1 nilai null per record, kita bisa gunakan parameter `thresh`

In [None]:
dfu_p.dropna(thresh=6).count()

Untuk menghapus record dengan kolom `qty` = null

In [None]:
dfu_p = dfu_p.dropna(subset=["qty"])
dfu_p.count()

##2.3 Mengganti NULL dengan default value

Untuk mengganti nilai NULL dengan nilai tertentu, gunakan `fillna(value, subset)`.

Misalnya untuk mengganti nilai null di kolom `qty` dengan 0 :

In [None]:
df_1 = dfu_p.fillna(0,["qty"])
df_1[df_1["qty"].isNull()].show()
df_1.describe("qty").show()

##WRAPPING UP 1
============================

Try this for exercise :
- Drop records with ``seq`` and/or ``salesdate`` is null
- Set null ``qty`` to 0
- Set null ``product_name`` to "UNKNOWN"

Coba kerjakan latihan berikut ini:
- Hapus record dengan kolom `seq` dan/atau `salesdate` null
- Replace `product_name` yang null menjadi **UNKNOWN**

#3.Menangani data duplikat



###3.1 Menghapus data duplikat


Untuk menghapus record duplikat, gunakan ` dropDuplicates(subset)` atau `drop_duplicates(subset)`.


Jika kita tidak menentukan nama kolom di parameter `subset`, record yang dihapus adalah record yang semua kolomnya identik.

Dalam file kita, terdapat record seperti berikut ini:
  <br>``seq, product_name , qty, salesdate``
  <br>``88 ,Kemeja Bergaris Ungu, 1 ,2019/01/30``
  <br>``88 ,Kemeja Bergaris Ungu, 1 ,2019/01/30``
  <br>``88 ,Kemeja Hitam , 2 ,2019/01/30``



Tanpa menentukan nama kolom, hanya record kedua yang akan dibuang.

In [None]:
df_1[df_1.seq == 88].show()

In [None]:
df_1.dropDuplicates()[df_1.seq == 88].show()

Jika kita menentukan bahwa kolom `seq` harus unik, maka record kedua dan ketiga akan dihapus.

In [None]:
df_1 = df_1.dropDuplicates(["seq"])
df_1[df_1.seq == 88].show()

#4.Further Cleaning : Reformatting Columns

In this section we will use :
- ``WithColumn()`` function
- Dataframe operation with pyspark built in functions
- Conditional operation with ``when()`` and ``when().otherwise()``

DataFrame (and RDD) is immutable, so if we want to update/reformat a column, we have to add the formatted value as a new column and drop the original later

We can add new column based on existing columns's value by using ``WithColumn()`` function.


##4.1 Standarisasi nilai

Kita akan bersihkan kolom `size`. Pertama, kita tampilkan nilai unik kolom tersebut

In [None]:
df_1.select("size").distinct().show()

Kita bisa melihat ada beberapa nilai yang tidak standar seperti ``Small``, ``xs``, dan ``m``. Mari ubah nilai tidak standar menjadi 'extra small', 'small', 'medium', dan 'big'.

Kita bisa melakukannya dalam 3 langkah:
- ubah menjadi huruf kecil / lowercase
- hapus karakter non-alfabetikal
- ganti *xs* dan *m* masing-masing menjadi 'extra small' dan 'medium'

###1.Ubah ke lowercase

In [None]:
df_1_lower = df_1.withColumn("size_lower", F.lower("size"))
df_1_lower.select("size_lower").distinct().show()


###2.Hapus karakter non-alfabet

Kita akan gunakan regex (regular expression) untuk menghapus karakter nonalfabet, dengan fungsi `regexp_replace`.
Pola regex untuk alfabet huruf kecil adalah `[^a-z]` (karena kita sudah menerapkan `lower` sebelumnya).

In [None]:
df_1_regex = df_1_lower.withColumn(
        'size_regex',
        F.regexp_replace('size_lower', r'([^a-z])', ''))
df_1_regex.select('size_regex').distinct().show()

###3.Ubah nilai berdasar aturan tertentu

Untuk menerapkan aturan/kondisi tertentu, gunakan fungsi `when()` dan `otherwise()`

In [None]:
df_1_clean = df_1_regex.withColumn('size_clean',
                      F.when(F.col('size_regex') == "xs" , "extra small")
                        .when(F.col('size_regex') == "m", "medium")
                          .otherwise(F.col('size_regex')))

df_1_clean.select("size_clean").distinct().show()
df_1_clean.show(3)

##4.2 Date Formatting

Kolom salesdate masih berformat string. Kita bisa mengubahnya menjadi format Date dengan menggunakan fungsi `to_date()`.

*Catatan : Kita juga bisa menentukan format Date ini ketika loading.*

In [None]:
df_date = df_1_clean.withColumn("salesdate", F.to_date("salesdate",'yyyy/MM/dd'))

df_date.show()
df_date.printSchema()

##WRAPPING UP 2
=====================

Lakukan latihan berikut ini :
- Remove duplicate di kolom `seq`
- Bersihkan kolom `length`, replace null menjadi '-' and nilai lain ke dalam salah satu nilai ini : `long, midi, short`


#5.User Defined Function

UDF relatif sulit untuk dioptimasi, sehingga sebisa mungkin kita gunakan fungsi yang sudah disediakan oleh spark.

Sebelum memutuskan untuk menggunakan UDF, cek terlebih dahulu ke https://spark.apache.org/docs/latest/sql-ref.html untuk memastikan bahwa fungsi yang kita perlukan memang belum tersedia.


Untuk latihan ini, kita akan memproses kolom `product_name`.

In [None]:
df_date.show()

Kita akan ubah huruf pertama dari setiap kata menjadi uppercase, dan sisanya menjadi huruf kecil, dengan menggunakan UDF.

Pertama-tama kita buat fungsi python biasa.

In [None]:
def convertCase(str):
    myStr = ""
    words = []
    if str: words = str.split(" ")
    for x in words:
       myStr = myStr + x[0:1].upper() + x[1:len(x)].lower() + " "
    return myStr

Selanjutnya *wrap* fungsi tersebut dengan `pyspark.sql.functions.udf()`

In [None]:
from pyspark.sql.functions import udf
convertUDF = udf(lambda x: convertCase(x),StringType())

Terapkan ke dataframe, dan tampilkan hasilnya

In [None]:
df_date.select("seq", \
    convertUDF("product_name").alias("ProductName") ) \
   .show(5, truncate=False)

For exercise : gunakan UDF dan `withColumn()` to mengupdate kolom product_name.

In [None]:
df_clean_2 = df_date.withColumn("product_name", convertUDF("product_name"))
df_clean_2.show(5, truncate=False)

#6.Enrichment - Joining multiple DataFrame

Kita akan melakukan data enrichment dengan menggabungkan dataframe sales dengan data customer.

In [None]:
!wget https://www.dropbox.com/s/hsb6lfkni466hpz/customers.csv

Cek file yang kita download

In [None]:
!head customers.csv

##6.1 Loading data referensi

In [None]:
df_cust = spark.read.csv("customers.csv",header=True,inferSchema=True)
df_cust.show(5, truncate=False)

df_cust.printSchema()

##6.2 Join Dataframe

In [None]:
df_joined = df_clean_2.join(df_cust,df_clean_2.cust_no ==  df_cust.seq, how = 'left')
df_joined.show(5, truncate=False)

In [None]:
df_joined.count()

Update history

2302.1555
* _Translate dan lengkapi penjelasan_
* _Rearrange sections_
* _Code cleanup_
