# Introduction to Supervised Learning for Big Data

Contoh berikut adalah implementasi supervised learning dalam konteks big data. Terutama, fokus pada kecepatan komputasi.

Sebelum dimulai, berikut adalah metode untuk mengukur waktu eksekusi.

In [1]:
import time

In [2]:
start_time = time.time()
time.sleep(3)  # an operation you want to evaluate
elapsed_time = time.time() - start_time
print('Elapsed time: {} seconds'.format(elapsed_time))

Elapsed time: 3.0028927326202393 seconds


Menggunakan cara ini, mari kita ukur berapa lama waktu eksekusi sistem linear.

Pertama, terdapat matrix $X\in \mathbb{R}^{N\times d}$ dan $Y \in \mathbb{R}^{N\times 1}$ untuk beberapa integer pofitif $N$ dan $d < N$.

In [3]:
import numpy as np

In [4]:
N = 10000
d = 500
X = np.random.normal(loc=0, scale=1, size=[N, d])
Y = np.random.normal(loc=0, scale=1, size=[N, 1])

print(X)
print(Y)

[[ 0.0092584  -0.05155933 -0.63938133 ... -0.81276714  0.18985544
   0.51100434]
 [-1.07290968  0.79838761 -0.30924359 ... -0.1107372  -0.47734671
   2.03921444]
 [ 0.16161299 -1.53734186 -0.06170273 ... -1.31733631 -0.17028783
  -1.4085652 ]
 ...
 [ 1.12600886 -0.19601986 -1.28124697 ...  0.71852638  1.57901976
  -0.46438197]
 [ 0.85178491  0.42955739 -0.17704912 ...  2.42262787  0.84601429
  -0.50677911]
 [ 1.59915763 -0.76716477 -0.24586984 ...  0.39212999  0.72757443
  -1.54608596]]
[[ 0.49162459]
 [ 0.28031612]
 [-0.89856809]
 ...
 [-0.21447013]
 [ 0.14521757]
 [ 0.45407945]]


Untuk persamaan sistem linear $Y = XA$, solusi kuadrat terkecil untuk sistem ini dikenal sebagai:

\begin{equation*}
A = ((X^\top X)^{-1}X^\top)Y
\end{equation*}

Untuk menghitung persamaan in, pendekatan yang sederhana adalah (1) hitung $X^\top X$ first, (2) hitung invers $(X^\top X)^{-1}$, (3) kalikan $X^\top$ ke hasil, dan terakhir (4) kalikan $Y$. Berikut ini adalah analisis tentang berapa lama waktu komputasi yang diperlukan untuk setiap langkah.

In [5]:
start_time = time.time()
XTX = np.matmul(X.T, X)
XTX_elapsed_time = time.time() - start_time
print('Elapsed time for XTX: {} seconds'.format(XTX_elapsed_time))

start_time = time.time()
inv = np.linalg.inv(XTX)
inv_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse: {} seconds'.format(inv_elapsed_time))

start_time = time.time()
invXT = np.matmul(inv, X.T)
invXT_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse times XT: {} seconds'.format(invXT_elapsed_time))

start_time = time.time()
A = np.matmul(invXT, Y)
A_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse times XT times Y: {} seconds'.format(A_elapsed_time))

print('Total: {} seconds'.format(XTX_elapsed_time + inv_elapsed_time + invXT_elapsed_time + A_elapsed_time))

Elapsed time for XTX: 0.4805736541748047 seconds
Elapsed time for the inverse: 0.11841201782226562 seconds
Elapsed time for the inverse times XT: 0.6154403686523438 seconds
Elapsed time for the inverse times XT times Y: 0.009085416793823242 seconds
Total: 1.2235114574432373 seconds


Kini, trik sederhana dapat membuat perbedaan besar dalam waktu komputasi. Pertimbangkan persamaan yang sama seperti di atas, tetapi kali ini, mari kita ubah sedikit urutan komputasinya.

\begin{equation*}
A = (X^\top X)^{-1}(X^\top Y)
\end{equation*}

Yaitu, kali ini, kita akan (1) menghitung $X^\top X$ terlebih dahulu, (2) mengambil invers $(X^\top X)^{-1}$, (3) menghitung $X^\top Y$, dan terakhir (4) mengalikan $(X^\top X)^{-1}$ dan $X^\top Y$. Langkah (1) dan (2) sama, tetapi (3) dan (4) dalam urutan yang berbeda. Mari kita lihat berapa banyak waktu yang diperlukan untuk menghitung solusi dengan strategi ini.

In [6]:
start_time = time.time()
XTX = np.matmul(X.T, X)
XTX_elapsed_time = time.time() - start_time
print('Elapsed time for XTX: {} seconds'.format(XTX_elapsed_time))

start_time = time.time()
inv = np.linalg.inv(XTX)
inv_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse: {} seconds'.format(inv_elapsed_time))

start_time = time.time()
XTY = np.matmul(X.T, Y)
XTY_elapsed_time = time.time() - start_time
print('Elapsed time for XTY: {} seconds'.format(XTY_elapsed_time))

start_time = time.time()
A = np.matmul(inv, XTY)
A_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse times XTY: {} seconds'.format(A_elapsed_time))

print('Total: {} seconds'.format(XTX_elapsed_time + inv_elapsed_time + XTY_elapsed_time + A_elapsed_time))

Elapsed time for XTX: 0.5222883224487305 seconds
Elapsed time for the inverse: 0.0856313705444336 seconds
Elapsed time for XTY: 0.009995460510253906 seconds
Elapsed time for the inverse times XTY: 0.001585245132446289 seconds
Total: 0.6195003986358643 seconds


Perhatikan pengurangan waktu komputasi yang signifikan?

### Tugas
- Langkah manakah yang menunjukkan perbedaan terbesar?
- Mengapa?
- Tetapkan $d = 500$ tetapi cobalah untuk meningkatkan $N$ dari 10.000 menjadi 20.000, 50.000, dan 100.000. Bagaimana waktu komputasi berubah? Apakah ada pola tertentu?
- Tetapkan $N = 10000$ tetapi tingkatkan $d$ dari 500 menjadi 1.000, 2.000, dan 5.000. Bagaimana waktu komputasi berubah? Apakah ada pola tertentu?

In [7]:
def measure_computation_time(N, d):
    # Generate random data for X and Y
    X = np.random.randn(N, d)  # Random matrix X of shape (N, d)
    Y = np.random.randn(N, 1)  # Random matrix Y of shape (N, 1)

    # Measure time for each step
    start_time = time.time()
    XTX = np.matmul(X.T, X)
    XTX_elapsed_time = time.time() - start_time
    print('Elapsed time for XTX: {:.4f} seconds'.format(XTX_elapsed_time))

    start_time = time.time()
    inv = np.linalg.inv(XTX)
    inv_elapsed_time = time.time() - start_time
    print('Elapsed time for the inverse: {:.4f} seconds'.format(inv_elapsed_time))

    start_time = time.time()
    XTY = np.matmul(X.T, Y)
    XTY_elapsed_time = time.time() - start_time
    print('Elapsed time for XTY: {:.4f} seconds'.format(XTY_elapsed_time))

    start_time = time.time()
    A = np.matmul(inv, XTY)
    A_elapsed_time = time.time() - start_time
    print('Elapsed time for the inverse times XTY: {:.4f} seconds'.format(A_elapsed_time))

    total_time = XTX_elapsed_time + inv_elapsed_time + XTY_elapsed_time + A_elapsed_time
    print('Total time: {:.4f} seconds\n'.format(total_time))
    return XTX_elapsed_time, inv_elapsed_time, XTY_elapsed_time, A_elapsed_time, total_time

# Part 1: Varying N (Fix d=500)
d = 500
Ns = [10000, 20000, 50000, 100000]
print("Varying N with fixed d=500:")
for N in Ns:
    print(f'Running for N={N}, d={d}')
    measure_computation_time(N, d)

# Part 2: Varying d (Fix N=10000)
N = 10000
ds = [500, 1000, 2000, 5000]
print("\nVarying d with fixed N=10000:")
for d in ds:
    print(f'Running for N={N}, d={d}')
    measure_computation_time(N, d)


Varying N with fixed d=500:
Running for N=10000, d=500
Elapsed time for XTX: 0.4745 seconds
Elapsed time for the inverse: 0.0689 seconds
Elapsed time for XTY: 0.0072 seconds
Elapsed time for the inverse times XTY: 0.0003 seconds
Total time: 0.5510 seconds

Running for N=20000, d=500
Elapsed time for XTX: 0.8097 seconds
Elapsed time for the inverse: 0.0646 seconds
Elapsed time for XTY: 0.0129 seconds
Elapsed time for the inverse times XTY: 0.0003 seconds
Total time: 0.8875 seconds

Running for N=50000, d=500
Elapsed time for XTX: 1.6377 seconds
Elapsed time for the inverse: 0.0858 seconds
Elapsed time for XTY: 0.0526 seconds
Elapsed time for the inverse times XTY: 0.0075 seconds
Total time: 1.7835 seconds

Running for N=100000, d=500
Elapsed time for XTX: 2.0007 seconds
Elapsed time for the inverse: 0.0384 seconds
Elapsed time for XTY: 0.0561 seconds
Elapsed time for the inverse times XTY: 0.0023 seconds
Total time: 2.0975 seconds


Varying d with fixed N=10000:
Running for N=10000, d=5

##### Jelaskan analisa sesuai pertanyaan di sini

### Note: Advanced Profiling

Mengukur waktu untuk menjalankan operasi tahap demi tahahap disebut sebagai pembuatan profil. Menggunakan perintah `time` cukup mudah, tetapi terkadang kita mungkin memerlukan beberapa metode yang lebih canggih. Misalnya, Anda mungkin telah memperhatikan bahwa waktu komputasi dari kode yang sama dapat bervariasi setiap kali Anda menjalankan kode tersebut.

Salah satu cara untuk membuat profil kode Anda adalah dengan menggunakan tag `%timeit` di depan baris yang ingin Anda evaluasi. Misalnya:
```python
%timeit inv = np.linalg.inv(XTX)
```
menjalankan `inv = np.linalg.inv(XTX)` beberapa kali dan mengambil rata-rata dan deviasi standar dari waktu komputasi.

Cara lain untuk melakukannya adalah dengan menggunakan tag `%prun` di depan baris. Misalnya:
```python
%prun inv = np.linalg.inv(XTX)
```
akan memberikan perincian proses yang lebih mendalam. Namun, jika Anda tidak begitu familiar dengan pemrograman komputer, `%prun` mungkin terlalu berlebihan, karena memberikan informasi yang terlalu rinci. Dalam kasus ini, Anda cukup menggunakan `%timeit` atau metode `time.time()`.

In [8]:
%timeit inv = np.linalg.inv(XTX)

43.7 ms ± 2.72 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


In [9]:
%prun inv = np.linalg.inv(XTX)

 

## Distributed Optimization

Meskipun ada trik komputasi di atas, mungkin masih sulit untuk menghitung pseudo-invers ($(X^\top X)^{-1}X^\top Y$) karena keterbatasan ruang memori, dll. Faktanya, banyak kerangka kerja analitik big data (termasuk Spark) menggunakan strategi pengoptimalan terdistribusi. Di bawah ini, kita akan melihat contoh algoritma penurunan gradien terdistribusi untuk memahami cara kerjanya.

[Gradient descent](https://en.wikipedia.org/wiki/Gradient_descent) adalah algoritma optimasi orde pertama yang populer untuk menemukan minimum lokal. Kita tidak akan membahas lebih dalam tentang apa sebenarnya algoritma gradient descent itu, tetapi berikut adalah rumus pembaruan iteratif yang digunakan oleh algoritma gradient descent:

$\theta \longleftarrow \theta - \alpha \frac{\partial\mathcal{L}}{\partial \theta}$

di mana $\theta$ adalah parameter model ($w$ dan $b$ dalam kasus regresi linier). $\frac{\partial\mathcal{L}}{\partial \theta}$ adalah turunan orde pertama (gradien) dari lost function (kesalahan) $\mathcal{L}$ sehubungan dengan parameter model $\theta$.

Ingat, lost function diformulasikan sebagai:

$\mathcal{L}(w, b) = \frac{1}{N}\sum_{i=1}^N\|x^{(i)}w+b - y^{(i)}\|^2$

Jika kita mengembangkan simbol penjumlahan, lost function akan terlihat seperti ini:

$\mathcal{L} = \mathcal{L}^{(1)} + \mathcal{L}^{(2)} + \cdots + \mathcal{L}^{(N)}$

Perhatikan $\mathcal{L}^{(i)}$ menunjukkan suku ke-$i$ dalam lost function, yang sesuai dengan titik data ke-$i$. Bila kita menghilangkan superskrip demi kesederhanaan, setiap suku $\mathcal{L}^{(i)}$ akan tampak seperti ini:

$\mathcal{L}^{(i)}=\frac{1}{N}\|xw+b - y\|^2=\frac{1}{N}\|\hat{y} - y\|^2$

Di sini notasi baru $\hat{y}:= xw+b$ telah diperkenalkan untuk menunjukkan keluaran yang diprediksi oleh model regresi linier. Dengan notasi ini, hanya diperlukan kalkulus sederhana untuk menghitung turunan orde pertama:

$\frac{\partial\mathcal{L}^{(i)}}{\partial w} = \frac{2}{N}(\hat{y} - y) x^\top$

$\frac{\partial\mathcal{L}^{(i)}}{\partial b} = \frac{2}{N}(\hat{y} - y)$

Ingat bahwa turunan di atas adalah untuk suku ke-$i$ dalam lost function, yang hanya dikaitkan dengan titik data ke-$i$. Dengan kata lain, gradien global (gradien dari seluruh lost function) hanyalah penjumlahan dari gradien lokal (gradien suku ke-$i$), yang dapat dihitung secara independen dari titik data lainnya:

$\frac{\partial\mathcal{L}}{\partial W} = \frac{\partial\mathcal{L}^{(1)}}{\partial W} + \frac{\partial\mathcal{L}^{(2)}}{\partial W} + \cdots + \frac{\partial\mathcal{L}^{(N)}}{\partial W}$

Ini berarti bahwa, tidak peduli bagaimana data didistribusikan, kita dapat menghitung gradien lokal ke-$i$ pada setiap titik data $(i)$ dan kemudian menggabungkannya untuk menghasilkan gradien global.

Untuk memahami konsepnya dengan lebih jelas, berikut ini contoh 10.000 titik data yang dibagi menjadi dua simpul komputasi (yang disimulasikan). Pertama, mari kita buat kumpulan data acak dengan $N=10.000$ dan $d=10%.

In [10]:
N = 10000
d = 10
X = np.random.normal(loc=0, scale=1, size=[N, d])
Y = np.random.normal(loc=0, scale=1, size=[N, 1])

Sekarang, mari kita bagi data.

In [11]:
N_node1 = N//2
N_node2 = N - N_node1

X_node1 = X[:N_node1, :]
Y_node1 = Y[:N_node1]
X_node2 = X[N_node1:, :]
Y_node2 = Y[N_node1:]

Meskipun, kita tidak membagi kumpulan data secara fisik, kita beranggapan bahwa `X_node1` dan `Y_node1` hanya dapat diakses dari `node1` dan `X_node2` dan `Y_node2` hanya dapat diakses dari `node2`. Sekarang, pada node `master`, perintah berikut akan dijalankan:

In [12]:
# Initialize model parameters
w = np.random.uniform(-1.0, 1.0, [d,1])
b = np.random.uniform(-1.0, 1.0)

MAX_ITER = 1000
learning_rate = 0.01
for i in range(MAX_ITER):
    # Talk to node1 and ask it to compute the local gradient.
    # (Pretend the following three lines are computed on node1)
    err_node1 = np.matmul(X_node1, w) - Y_node1
    dldw_node1 = np.mean(2*err_node1*X_node1, axis=0, keepdims=True)
    dldb_node1 = np.mean(2*err_node1, axis=0, keepdims=True)

    # Simultaneously, talk to node2 and ask the same.
    # (Pretend the following three lines are computed on node2)
    err_node2 = np.matmul(X_node2, w) - Y_node2
    dldw_node2 = np.mean(2*err_node2*X_node2, axis=0, keepdims=True)
    dldb_node2 = np.mean(2*err_node2, axis=0, keepdims=True)

    # Aggregate the gradients by weighting them with the number of data available at each node.
    dldw = (N_node1/N)*dldw_node1 + (N_node2/N)*dldw_node2
    dldb = (N_node1/N)*dldb_node1 + (N_node2/N)*dldb_node2

    # Update the model with the global gradient.
    w -= learning_rate*dldw.T
    b -= learning_rate*dldb.T

    # If the solution does not improve much, break out of the for loop
    if np.mean(dldw) < 1e-06:
        break

Tentu saja, hal di atas merupakan implementasi sederhana dari distributed gradient descent. Namun, dengan versi yang sederhana tersebut, kita dapat melihat hasilnya hampir sama dengan solusi pseudo-inverse.

In [13]:
XTX = np.linalg.inv(np.matmul(X.T, X))
XTY = np.matmul(X.T, Y)
w_true = np.matmul(XTX, XTY)
print(w)   # distributed gradient descent solution
print(w_true)  # pseudo-inverse solution (ground truth)

[[-0.04311594]
 [ 0.033538  ]
 [ 0.02315537]
 [-0.38982729]
 [-0.9304175 ]
 [ 0.6836212 ]
 [-0.04414589]
 [-0.45899713]
 [ 0.83113198]
 [-0.23302045]]
[[ 5.02539438e-03]
 [-8.56898903e-03]
 [ 1.34404986e-02]
 [ 1.83127182e-05]
 [ 5.38381977e-03]
 [-1.57746743e-04]
 [-1.39371623e-02]
 [ 7.27290828e-03]
 [-6.97840401e-03]
 [-3.17077837e-03]]


Dengan contoh di atas, saya harap konsepnya kini sudah jelas di benak Anda. Bahkan jika Anda masih belum begitu yakin apakah Anda benar-benar dapat menulis kode seperti di atas dari awal, Anda seharusnya baik-baik saja, sejauh Anda memiliki gambaran besar yang jelas. Faktanya, penerapan pengoptimalan terdistribusi dan semacamnya ditangani oleh Spark. Sebaliknya, sebagai data scientist, Anda hanya perlu memiliki pemahaman dasar tentang cara kerjanya secara mendalam. Jadi, jangan khawatir.

## Linear Regression in Spark

Spark memiliki semua implementasi yang kuat dan teroptimasi dengan baik dari metode pengoptimalan terdistribusi tersebut (dan masih banyak lagi) di balik layar. Bahkan, dari sudut pandang pengguna, 99% dari waktu, Anda tidak perlu terlalu peduli dengan apa yang terjadi di balik layar. Spark akan memilih algoritme pengoptimalan yang paling sesuai untuk Anda dan melakukan semua pekerjaan berat di balik layar.

Untuk melihat cara kerjanya, mari kita konfigurasikan Spark di Colab terlebih dahulu. (Jika Anda menjalankan notebook ini di komputer lokal dan telah mengonfigurasi Spark, Anda dapat melewati sel ini.)

In [14]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

import findspark
findspark.init()

Sekarang, mari unduh kumpulan data untuk dicoba. Untuk tutorial ini, kita akan menggunakan [kumpulan data avocado](https://github.com/chainhaus/pythoncourse/raw/refs/heads/master/avocado.csv)

In [15]:
!wget https://github.com/chainhaus/pythoncourse/raw/refs/heads/master/avocado.csv

--2024-10-25 11:31:12--  https://github.com/chainhaus/pythoncourse/raw/refs/heads/master/avocado.csv
Resolving github.com (github.com)... 140.82.114.4
Connecting to github.com (github.com)|140.82.114.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/chainhaus/pythoncourse/refs/heads/master/avocado.csv [following]
--2024-10-25 11:31:12--  https://raw.githubusercontent.com/chainhaus/pythoncourse/refs/heads/master/avocado.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1989197 (1.9M) [text/plain]
Saving to: ‘avocado.csv’


2024-10-25 11:31:13 (143 MB/s) - ‘avocado.csv’ saved [1989197/1989197]



Pada kuliah sebelumnya, kita telah mempelajari cara membaca file CSV sebagai Spark DataFrame. Di sini kita akan mengulang apa yang telah kita pelajari:

In [16]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('avocado').getOrCreate()

In [17]:
df = spark.read.csv('avocado.csv', header=True, sep=',', inferSchema=True)
df.show()

+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+
|_c0|      Date|AveragePrice|Total Volume|   4046|     4225|  4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|
+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+
|  0|2015-12-27|        1.33|    64236.62|1036.74| 54454.85| 48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|
|  1|2015-12-20|        1.35|    54876.98| 674.28| 44638.81| 58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|
|  2|2015-12-13|        0.93|   118220.22|  794.7|109149.67| 130.5|   8145.35|   8042.21|    103.14|        0.0|conventional|2015|Albany|
|  3|2015-12-06|        1.08|    78992.15| 1132.0| 71976.41| 72.58|   5811.16|    5677.4|    133.76|        0.0|conventional|2015|Albany|
|  4|2015-11-29|        1.28|     

Sekarang, untuk menggunakan library Spark ML, Anda harus terlebih dahulu mengonversi kolom menjadi vektor fitur. Untuk kumpulan data ini, kita diharapkan dapat memprediksi harga avocado (kolom ke-dua) menggunakan fitur-fitur seperti Total, Volume, dll. (semua kolom lainnya).

Untuk tujuan ini, Spark menyediakan metode praktis yang disebut `VectorAssembler` untuk menghasilkan vektor fitur dengan merakit kolom-kolom DataFrame.

Namun perlu dilihat bahwa `VectorAssemble` hanya untuk data numerk. Oleh sebab itu kolom non numerik dihilangkan dulu. Dan juga kolom yang tidak dibutuhkan lainnya.

In [18]:
df = df.drop('_c0', 'Date', 'type', 'year', 'region')

In [19]:
from pyspark.ml.feature import VectorAssembler
# Creates a new column called 'features' that contains feature vectors.
assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol="features")
df_vec = assembler.transform(df)
df_vec.show()

+------------+------------+-------+---------+------+----------+----------+----------+-----------+--------------------+
|AveragePrice|Total Volume|   4046|     4225|  4770|Total Bags|Small Bags|Large Bags|XLarge Bags|            features|
+------------+------------+-------+---------+------+----------+----------+----------+-----------+--------------------+
|        1.33|    64236.62|1036.74| 54454.85| 48.16|   8696.87|   8603.62|     93.25|        0.0|[1.33,64236.62,10...|
|        1.35|    54876.98| 674.28| 44638.81| 58.33|   9505.56|   9408.07|     97.49|        0.0|[1.35,54876.98,67...|
|        0.93|   118220.22|  794.7|109149.67| 130.5|   8145.35|   8042.21|    103.14|        0.0|[0.93,118220.22,7...|
|        1.08|    78992.15| 1132.0| 71976.41| 72.58|   5811.16|    5677.4|    133.76|        0.0|[1.08,78992.15,11...|
|        1.28|     51039.6| 941.48| 43838.39| 75.78|   6183.95|   5986.26|    197.69|        0.0|[1.28,51039.6,941...|
|        1.26|    55979.78|1184.27| 48067.99| 43

Sekarang setelah kita membuat vektor fitur, mari kita bagi set data menjadi dua, yaitu set pelatihan dan set pengujian. Spark DataFrame menawarkan metode siap pakai untuk melakukannya:

In [20]:
df_train, df_test = df_vec.randomSplit([0.7, 0.3])  # 70% of the original data will be used for training and 30% for testing

Sekarang, bagian training sebenarnya cukup sederhana seperti dijelaskan di bawah ini.

In [21]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features', labelCol='AveragePrice')
lr_model = lr.fit(df_train)

Setelah proses training selesai, hasilnya dapat ditemukan dengan memanggil anggota model seperti `lr_model.coefficients` atau `lr_model.intercept`. Untuk detail selengkapnya, lihat https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegressionModel

In [22]:
print( lr_model.coefficients )  # slope of the linear equation
print( lr_model.intercept )     # intercept of the line

[1.0000000000000016,-1.2554165511188418e-19,1.2572195484961194e-19,1.2545376381167065e-19,1.2622890124481839e-19,1.2343246348503944e-19,2.0537545694172307e-21,2.360631011498478e-21]
-2.54729474236351e-15


Selain itu, Anda juga dapat melihat ringkasan hasil training dengan memanggil fungsi (`lr_model.summary`). Untuk selanjutnya, daftar fungsi-fungsi tersedia di [URL ini](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegressionModel).

In [23]:
lr_model.summary.residuals

DataFrame[residuals: double]

In [24]:
lr_model.summary.rootMeanSquaredError
# lr_model.summary.meanAbsoluteError
# lr_model.summary.meanSquaredError

6.65761461752283e-16

In [25]:
lr_model.summary.coefficientStandardErrors

[1.499209506198322e-17,
 1.0274675039389723e-19,
 1.0274621900191644e-19,
 1.0274634085446786e-19,
 1.0274810931516787e-19,
 1.0272043725145008e-19,
 6.458056880127409e-22,
 6.212280658393967e-22,
 2.2194688393483494e-17]

In [26]:
lr_model.summary.r2
lr_model.summary.r2adj
# lr_model.summary.pValues

1.0

Model yang ditraining menggunakan Spark dapat diuji dengan menggunakan metode `evaluate()`.

In [27]:
evaluation_summary = lr_model.evaluate(df_test)

Evaluation summary object pada dasarnya sama dengan model summary object. Dengan kata lain, apa yang telah Anda lihat di atas berlaku untuk ringkasan evaluasi:

In [28]:
evaluation_summary.rootMeanSquaredError

6.660947188737428e-16

In [29]:
lr_model.summary.totalIterations

0

Evaluation summary di atas pada dasarnya adalah metrics untuk mengukur kinerja regresi. Ini bisa Anda implementasikan di kasus regresi atau perhitungan estimasi lainnya. Seperti kalian lihar, nilai RMSE `Root Mean Square Error` sangat rendah, berarti keberhasilan prediksi sangat tinggi.


Sekarang, kalian pasti penasaran, bagaimana hasil prediksinya. Jalankan perintah berikut:

In [30]:
predictions = lr_model.transform(df_test)
predictions.show()

+------------+------------+----------+---------+--------+----------+----------+----------+-----------+--------------------+-------------------+
|AveragePrice|Total Volume|      4046|     4225|    4770|Total Bags|Small Bags|Large Bags|XLarge Bags|            features|         prediction|
+------------+------------+----------+---------+--------+----------+----------+----------+-----------+--------------------+-------------------+
|        0.46|  2200550.27|1200632.86|531226.65|18324.93| 450365.83| 113752.17|  330583.1|    6030.56|[0.46,2200550.27,...|0.45999999999999847|
|        0.49|  1137707.43|  738314.8|286858.37|11642.46|  100891.8|  70749.02|  30142.78|        0.0|[0.49,1137707.43,...| 0.4899999999999983|
|        0.51|    41987.86|    225.44|  5734.39|     0.0|  36028.03|    473.98|  35554.05|        0.0|[0.51,41987.86,22...| 0.5099999999999982|
|        0.51|  1442973.47|1037699.01|259846.68| 14567.4| 130860.38|   76814.4|  54045.98|        0.0|[0.51,1442973.47,...| 0.5099999999

## Transformers and Estimators

Saya harap sekarang Anda sudah lebih memahami cara kerja Spark ML. Nah, dalam latihan di atas, ada banyak detail. Namun, ada beberapa konsep penting yang perlu Anda kuasai, agar benar-benar dapat memanfaatkan Spark ML.

Kita buka load lagi file avocado.csv.

In [31]:
df = spark.read.csv('avocado.csv', header=True, sep=',', inferSchema=True)
df.show()

+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+
|_c0|      Date|AveragePrice|Total Volume|   4046|     4225|  4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|
+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+
|  0|2015-12-27|        1.33|    64236.62|1036.74| 54454.85| 48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|
|  1|2015-12-20|        1.35|    54876.98| 674.28| 44638.81| 58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|
|  2|2015-12-13|        0.93|   118220.22|  794.7|109149.67| 130.5|   8145.35|   8042.21|    103.14|        0.0|conventional|2015|Albany|
|  3|2015-12-06|        1.08|    78992.15| 1132.0| 71976.41| 72.58|   5811.16|    5677.4|    133.76|        0.0|conventional|2015|Albany|
|  4|2015-11-29|        1.28|     

Nah, berikut topik utama hari ini. Yang baru saja kita buat adalah DataFrame yang berisi kolom dengan variabel kategoris. Kolom category dalam bentuk string, jadi kita perlu mengonversinya menjadi nilai numerik. Ini adalah tugas sehari-hari yang cukup umum bagi ilmuwan data. (Ya, tentu saja, kita bisa saja membuat kolom `type` dengan indeks numerik di tempat pertama, tetapi kita sedang mensimulasikan situasi dunia nyata di sini.)

Di spark, ada beberapa helper method untuk melakukannya, yakni `StringIndexer` dan `OneHotEncoder`. StringIndexer secara harfiah adalah metode untuk mengonversi variabel kategoris tipe string menjadi indeks numerik. OneHotEncoder, di sisi lain, adalah metode lain yang mengonversi indeks numerik menjadi [one-hot encoder](https://en.wikipedia.org/wiki/One-hot). Ini adalah fitur yang cukup sering digunakan.

In [32]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol = 'type', outputCol = 'type_index')
df = indexer.fit(df).transform(df)
df.show()

+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+
|_c0|      Date|AveragePrice|Total Volume|   4046|     4225|  4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|type_index|
+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+
|  0|2015-12-27|        1.33|    64236.62|1036.74| 54454.85| 48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|       0.0|
|  1|2015-12-20|        1.35|    54876.98| 674.28| 44638.81| 58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|       0.0|
|  2|2015-12-13|        0.93|   118220.22|  794.7|109149.67| 130.5|   8145.35|   8042.21|    103.14|        0.0|conventional|2015|Albany|       0.0|
|  3|2015-12-06|        1.08|    78992.15| 1132.0| 71976.41| 72.58|   5811.16|    5677.4|    133.76|      

In [33]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['type_index'], outputCols=['type_dummy'])
df = encoder.fit(df).transform(df)
df.show()

+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+-------------+
|_c0|      Date|AveragePrice|Total Volume|   4046|     4225|  4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|type_index|   type_dummy|
+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+-------------+
|  0|2015-12-27|        1.33|    64236.62|1036.74| 54454.85| 48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|
|  1|2015-12-20|        1.35|    54876.98| 674.28| 44638.81| 58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|
|  2|2015-12-13|        0.93|   118220.22|  794.7|109149.67| 130.5|   8145.35|   8042.21|    103.14|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|
|  3|2015-12-06|      

Sekarang, pada contoh di atas, kami menggunakan metode `fit()` dan `transform()` pada DataFrame `df`. Kedua fungsi ini, pada kenyataannya, terkait dengan dua konsep inti yang ingin kami pelajari dalam sesi ini---yaitu **transformer** dan **estimator**.

Menurut [Dokumentasi Resmi Spark](https://spark.apache.org/docs/latest/ml-pipeline.html), **transformer** adalah abstraksi yang mencakup transformer fitur dan model yang dipelajari. Secara sederhana, transformer adalah objek yang dilengkapi dengan logika yang dapat mengubah satu DataFrame menjadi yang lain. Misalnya, transformer dapat mengambil DataFrame, membaca kolom (misalnya variabel kategoris), memetakannya ke kolom baru (misalnya indeks numerik), dan mengeluarkan DataFrame baru dengan kolom yang dipetakan ditambahkan di bagian akhir. Untuk contoh lain, transformer juga dapat mengambil DataFrame, membaca vektor fitur dalam kolom, memprediksi label untuk setiap vektor fitur, dan mengeluarkan DataFrame baru dengan label yang diprediksi. Oleh karena itu, model pembelajaran mesin (yang dilatih) adalah transformer, dalam artian yang mengubah DataFrame input menjadi DataFrame output.

Di sisi lain, **estimator** mengabstraksikan konsep algoritma pembelajaran atau algoritma apa pun yang sesuai atau dilatih pada data. Estimator secara harfiah mengambil DataFrame dan *memperkirakan* parameter dengan menggunakan nilai dalam DataFrame. Misalnya, `LinearRegression` yang kita lihat di atas adalah estimator dan memanggil `fit()` untuk melatih `LinearRegressionModel`, yang merupakan output dari `fit()`. Output `LinearRegressionModel` di sisi lain adalah transformer, yang dapat digunakan untuk mengubah DataFrame uji untuk menghasilkan label yang diprediksi.

Cara mudah untuk membedakan transformer dan estimator adalah dengan melihat metode yang dimilikinya. Secara teknis, transformer mengimplementasikan metode `transform()`, yang mengubah satu DataFrame menjadi yang lain, umumnya dengan menambahkan satu atau beberapa kolom. Di sisi lain, estimator mengimplementasikan metode `fit()`, yang mengambil DataFrame dan menghasilkan, biasanya transformer.

Dengan semua ini, mari kita uraikan sel-sel di atas. Pertama-tama, `StringIndexer` adalah estimator yang mengambil DataFrame dan menghasilkan transformer. Bagaimana saya mengetahuinya? Yah, ia dilengkapi dengan fungsi `fit()`, jadi saya dapat dengan aman berasumsi bahwa ia adalah estimator. Lebih khusus lagi, estimator StringIndexer memperkirakan jumlah kategori yang berbeda dari DataFrame dan menghasilkan transformer yang mengimplementasikan logika untuk memetakan setiap kategori yang berbeda menjadi nilai numerik. Jumlah kategori berbeda dalam DataFrame bervariasi dari satu masalah ke masalah lainnya, jadi Anda perlu memperkirakan parameter tersebut (jumlah kategori) terlebih dahulu, sebelum dapat mengubah DataFrame.

Output dari `StringIndexer.fit()`, seperti yang disebutkan di atas, adalah sebuah transformator. Ia berisi logika untuk memetakan setiap kategori string ke dalam indeks numerik. Memanggil `transform()` akan menerapkan logika tersebut dan akan mengubah DataFrame input menjadi DataFrame output.

Apakah sintaks `indexer.fit(df).transform(df)` masuk akal sekarang? Anda dapat menginterpretasikan kode `OneHotEncoder` dengan cara yang sama. Mengetahui bahwa Anda semua adalah orang pintar, saya tidak akan menjelaskan secara eksplisit apa itu baris demi baris.

Baiklah, sekarang Anda baru saja menguasai salah satu gagasan terpenting dalam Spark. Sebagai catatan kecil, Anda akan melihat bahwa satu representasi hot encoded dalam kolom `type_dummy` terlihat agak aneh:

In [34]:
df.select('type').distinct().show()

+------------+
|        type|
+------------+
|     organic|
|conventional|
+------------+



In [35]:
df.where(df['type']=='organic').show()

+---+----------+------------+------------+-----+------+----+----------+----------+----------+-----------+-------+----+------+----------+----------+
|_c0|      Date|AveragePrice|Total Volume| 4046|  4225|4770|Total Bags|Small Bags|Large Bags|XLarge Bags|   type|year|region|type_index|type_dummy|
+---+----------+------------+------------+-----+------+----+----------+----------+----------+-----------+-------+----+------+----------+----------+
|  0|2015-12-27|        1.83|      989.55| 8.16| 88.59| 0.0|     892.8|     892.8|       0.0|        0.0|organic|2015|Albany|       1.0| (1,[],[])|
|  1|2015-12-20|        1.89|     1163.03|30.24|172.14| 0.0|    960.65|    960.65|       0.0|        0.0|organic|2015|Albany|       1.0| (1,[],[])|
|  2|2015-12-13|        1.85|      995.96|10.44| 178.7| 0.0|    806.82|    806.82|       0.0|        0.0|organic|2015|Albany|       1.0| (1,[],[])|
|  3|2015-12-06|        1.84|     1158.42|90.29|104.18| 0.0|    963.95|    948.52|     15.43|        0.0|organic

Seperti yang dapat Anda lihat di atas, nilai-nilai dalam kolom `type_dummy` tampak seperti `(1,[],[])`, dan Anda mungkin bertanya-tanya, apa...? Nah, ini sebenarnya adalah representasi vektor sparse yang digunakan oleh Spark. Jika Anda pikirkan tentang hal itu, one hot encoding adalah cara yang cukup tidak efisien untuk merepresentasikan sebuah kelas. Dalam kasus kita, kita hanya memiliki dua kelas, yaitu `conventional` dan `organic`. Namun dalam banyak kasus, Anda akan sering melihat ratusan atau ribuan kategori. Dalam kasus tersebut, one-hot encoding akan berakhir tampak seperti vektor seribu dimensi (vektor dengan seribu entri) yang semuanya diisi dengan '0' kecuali untuk satu entri. Jadi, menyimpan nol yang tidak perlu dalam DataFrame adalah pemborosan memori yang sangat besar, terutama jika Anda berbicara tentang masalah big data. Atas dasar ini, Spark menggunakan representasi sparse di mana, alih-alih menyimpan semua entri dengan nol, ia hanya menyimpan elemen yang bukan nol. Untuk melakukannya, Anda memerlukan tiga hal. Pertama, Anda perlu mengetahui dimensi vektor, karena Anda akan melewatkan banyak entri. Kedua, Anda perlu mengetahui posisi nilai bukan nol, karena alasan yang jelas. Terakhir, Anda perlu mengetahui apa sebenarnya nilai-nilai tersebut.

Ketiga hal ini, pada kenyataannya, adalah apa yang disimpan Spark untuk mewakili vektor enkode one-hot. Angka pertama dalam tuple `(1,[],[])` menunjukkan dimensi vektor enkode one-hot, yang dalam kasus ini adalah 1 (= jumlah kategori - 1). Objek kedua dalam tuple adalah daftar posisi elemen bukan nol, yang dalam kasus ini adalah array kosong---artinya tidak ada elemen bukan nol. Terakhir, objek ketiga adalah daftar nilai sebenarnya dari elemen bukan nol, yang sekali lagi, dalam kasus kita adalah array kosong karena kita tidak memiliki elemen bukan nol. Jadi, rekonstruksi representasi sparse `(1,[],[])` akan menjadi `[0]` dalam representasi padat---vektor satu dimensi dengan elemen '0'.

Perhatikan bahwa ada dua `type` avocado lainnya, yakni `conventional`:

In [36]:
df.where(df['type']=='conventional').show()

+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+-------------+
|_c0|      Date|AveragePrice|Total Volume|   4046|     4225|  4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|type_index|   type_dummy|
+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+-------------+
|  0|2015-12-27|        1.33|    64236.62|1036.74| 54454.85| 48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|
|  1|2015-12-20|        1.35|    54876.98| 674.28| 44638.81| 58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|
|  2|2015-12-13|        0.93|   118220.22|  794.7|109149.67| 130.5|   8145.35|   8042.21|    103.14|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|
|  3|2015-12-06|      

Apakah Anda memperhatikan bahwa sekarang kita memiliki `(1,[0],[1.0])` alih-alih `(1,[],[])`? Sekali lagi, interpretasinya sama: kita memiliki vektor satu dimensi (`1`); posisi elemen bukan nol berada pada posisi ke-0 (`[0]`); nilai elemen bukan nol adalah 1.0 (`[1.0]`).

Oke, lanjut. Sebagai langkah berikutnya, kita perlu memvektorisasi kolom untuk melatih model regresi linier. Sebenarnya, kita sudah melakukannya di atas, jadi kita cukup mendaur ulang kode sebelumnya. Berhati-hatilah karena kita akhirnya menambahkan beberapa kolom lagi sebagai hasil dari StringIndexer dan OneHotEncoder. Namun terlebih dahulu hilangkan komom non numerik lainnya

In [37]:
df = df.drop('Date', '_c0', 'type', 'year', 'region')

In [38]:
assembler = VectorAssembler(inputCols=df.columns[:-4] + ['type_dummy'], outputCol="features")
df_vec = assembler.transform(df).select(['features', 'AveragePrice'])
df_vec.show()

+--------------------+------------+
|            features|AveragePrice|
+--------------------+------------+
|[1.33,64236.62,10...|        1.33|
|[1.35,54876.98,67...|        1.35|
|[0.93,118220.22,7...|        0.93|
|[1.08,78992.15,11...|        1.08|
|[1.28,51039.6,941...|        1.28|
|[1.26,55979.78,11...|        1.26|
|[0.99,83453.76,13...|        0.99|
|[0.98,109428.33,7...|        0.98|
|[1.02,99811.42,10...|        1.02|
|[1.07,74338.76,84...|        1.07|
|[1.12,84843.44,92...|        1.12|
|[1.28,64489.17,15...|        1.28|
|[1.31,61007.1,226...|        1.31|
|[0.99,106803.39,1...|        0.99|
|[1.33,69759.01,10...|        1.33|
|[1.28,76111.27,98...|        1.28|
|[1.11,99172.96,87...|        1.11|
|[1.07,105693.84,6...|        1.07|
|[1.34,79992.09,73...|        1.34|
|[1.33,80043.78,53...|        1.33|
+--------------------+------------+
only showing top 20 rows



Nak, sekarang, apakah Anda menyadari bahwa `VectorAssembler` adalah sebuah transformator? Bagus. Saya yakin Anda menyadarinya. Apakah Anda juga menyadari bahwa sementara `StringIndexer` dan `OneHotEncoder` adalah estimator, `VectorAssembler` adalah sebuah transformator? Dapatkah Anda menebak mengapa demikian? Jelaskan!

Terakhir, kita dapat melatih model regresi linier dengan kolom `type_dummy` tambahan sebagai regresor.

In [39]:
df_train, df_test = df_vec.randomSplit([0.7, 0.3])

lr = LinearRegression(featuresCol='features', labelCol='AveragePrice')
lr_model = lr.fit(df_train)

predicted = lr_model.evaluate(df_test)

In [40]:
predicted.rootMeanSquaredError

0.0

In [41]:
predictions = lr_model.transform(df_test)
predictions.show()

+--------------------+------------+----------+
|            features|AveragePrice|prediction|
+--------------------+------------+----------+
|(8,[0,1,2,3],[1.4...|        1.41|      1.41|
|(8,[0,1,2,3],[1.8...|        1.83|      1.83|
|(8,[0,1,2,3],[2.3...|        2.36|      2.36|
|(8,[0,1,2,3],[2.7...|        2.79|      2.79|
|(8,[0,1,3,5],[1.8...|        1.87|      1.87|
|[0.51,17135.45,48...|        0.51|      0.51|
|[0.51,41987.86,22...|        0.51|      0.51|
|[0.51,1366844.88,...|        0.51|      0.51|
|[0.53,1097224.25,...|        0.53|      0.53|
|[0.53,1272428.72,...|        0.53|      0.53|
|[0.53,1613159.67,...|        0.53|      0.53|
|[0.54,1423939.62,...|        0.54|      0.54|
|[0.54,1582877.09,...|        0.54|      0.54|
|[0.55,1609195.36,...|        0.55|      0.55|
|[0.55,1977923.65,...|        0.55|      0.55|
|[0.56,9801.7,9.25...|        0.56|      0.56|
|[0.56,10571.3,0.0...|        0.56|      0.56|
|[0.56,1236204.18,...|        0.56|      0.56|
|[0.56,133952

Sekali lagi, apakah Anda memperhatikan bahwa `LinearRegression` memiliki metode `fit()` dan dengan demikian merupakan estimator? Dan objek `LinearRegressionModel`, yang dalam kasus kita disimpan dalam variabel `lr_model`, merupakan transformer?

Jadi, seperti yang Anda lihat, transformer dan estimator merupakan konsep inti dalam Spark. Faktanya, banyak contoh yang akan kita lihat nanti dalam kursus ini pada dasarnya hanyalah estimator dan transformer yang berbeda.

## Pipeline
Satu hal terakhir: seperti yang kita lihat di kelas DataFrame, Spark dibangun di atas konsep 'lazy execution', artinya, hingga pengguna meminta nilai numerik yang sebenarnya, tidak ada penghitungan angka yang dilakukan. Sebaliknya, Spark terus membangun grafik komputasi, yang menggambarkan logika tentang bagaimana angka akan dihitung. Pada akhirnya, hanya ketika pengguna meminta hasil numerik yang sebenarnya dengan memanggil, katakanlah `DataFrame.show()`, Spark benar-benar mengalirkan angka melalui grafik dan melakukan penghitungan angka yang sebenarnya. Kita melihat ini karena masalah efisiensi. Prinsip yang sama diterapkan pada transformer dan estimator. Dengan kata lain, estimator dan transformer hanya membangun grafik komputasi. Hanya ketika pengguna memanggil misalnya `DataFrame.show()`, grafik komputasi benar-benar dieksekusi dan angka mengalir melalui grafik.

Dalam beberapa hal, grafik komputasi seperti 'pipe'/'pipa', tempat data mengalir. Sebagai seorang arsitek, Anda menambahkan pipa dengan bentuk yang berbeda ke dalam pipa. Hanya saat Anda siap mengalirkan air, Anda membuka katup dan membiarkan air mengalir. Dengan menggunakan analogi yang sama, di Spark, Anda dapat (dan harus) mendefinisikan sebuah jaringan pipa. Lebih khusus lagi, Spark Pipeline adalah objek yang mencakup bagaimana transformer dan estimator (=pipa) dihubungkan. Saat pengguna mencari hasil numerik, Spark menjalankan angka dalam DataFrame (=air) melalui Pipeline dan menghasilkan sebuah hasil.

Untuk membangun sebuah Pipeline, kita harus mendefinisikan tahapan:

In [42]:
stages = []  # create an empty list

indexer = StringIndexer(inputCol = 'type', outputCol = 'type_index')
encoder = OneHotEncoder(inputCols=['type_index'], outputCols=['type_dummy'])
assembler = VectorAssembler(inputCols=df.columns[:-4] + ['type_dummy'], outputCol="features")

stages += [indexer, encoder, assembler]  # append the estimators and transformers (ordering matters)

Sekarang, menyiapkan jalur pipa sebenarnya hanya memerlukan beberapa baris kode:

In [43]:
df = spark.read.csv('avocado.csv', header=True, sep=',', inferSchema=True) # overwrite df by recreating it.

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
df_vec = pipelineModel.transform(df)
df_vec.show()

+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+-------------+--------------------+
|_c0|      Date|AveragePrice|Total Volume|   4046|     4225|  4770|Total Bags|Small Bags|Large Bags|XLarge Bags|        type|year|region|type_index|   type_dummy|            features|
+---+----------+------------+------------+-------+---------+------+----------+----------+----------+-----------+------------+----+------+----------+-------------+--------------------+
|  0|2015-12-27|        1.33|    64236.62|1036.74| 54454.85| 48.16|   8696.87|   8603.62|     93.25|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|[1.33,64236.62,10...|
|  1|2015-12-20|        1.35|    54876.98| 674.28| 44638.81| 58.33|   9505.56|   9408.07|     97.49|        0.0|conventional|2015|Albany|       0.0|(1,[0],[1.0])|[1.35,54876.98,67...|
|  2|2015-12-13|        0.93|   118220.22|  794.7|109149.67| 130.5|   8145.35|  

Sekali lagi, perhatikan bahwa Pipeline itu sendiri adalah estimator, yang dapat `fit()` ke DataFrame untuk menghasilkan transformer. Hasil dari transformer adalah DataFrame baru yang dikonversi dari DataFrame input dengan menerapkan urutan `indexer`--`encoder`--`assembler`. Output dari pipeline harus sama persis dengan yang dari bagian sebelumnya.

Jika outputnya sama, mengapa kita repot-repot menggunakan Pipeline? Pertama, penulisan kode program lebih sederhana, seperti yang mungkin telah Anda perhatikan. Ini  memiliki efek optimalisasi komputasi, karena menjadi lebih jelas bagaimana transformer dan estimator harus dihubungkan.

Untuk informasi lebih lanjut, Anda sangat dianjurkan untuk mempelajari dokumentasi resmi Spark: https://spark.apache.org/docs/latest/ml-pipeline.html

### Tugas
Buat model regresi menggunakan pipeline, dengan spesifikasi:
- gunakan kolom `region` ubah sebagai one-hot-encoder
- bagi data menjadi df_train dan df_test dengan proporsi 0/7 dan 0.3
- buat model regresinya menggunakan df_train
- uji model dengan df_test dan tampilkan RMSE
- tampilkan hasil prediksi terhadap df_test
- diskusikan apakah hasil prediksi menggunakan `region` hasilnya sama atau berbeda dibandingkan `type`

### Encode

In [44]:
# 1. Kolom region
region_indexer = StringIndexer(inputCol="region", outputCol="region_index")
region_encoder = OneHotEncoder(inputCol="region_index", outputCol="region_encoded")

# 2. Assembling feature columns
assembler = VectorAssembler(inputCols=["region_encoded", "AveragePrice", "Total Volume", "4046", "4225", "4770", "Total Bags", "Small Bags", "Large Bags", "XLarge Bags"], outputCol="features")

# 3. Membuat model regresi
lr = LinearRegression(featuresCol="features", labelCol="AveragePrice")

# 4. Membuat pipeline
stages = [region_indexer, region_encoder, assembler, lr]
pipeline = Pipeline(stages=stages)

### Spliting Data

In [45]:
df_train, df_test = df.randomSplit([0.7, 0.3], seed=42)

# 5. Melatih model regresi menggunakan df_train
pipeline_model = pipeline.fit(df_train)

### Modeling

In [46]:
df_predictions = pipeline_model.transform(df_test)

In [47]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="AveragePrice", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(df_predictions)
print(f"Root Mean Squared Error (RMSE) on test data = {rmse}")

Root Mean Squared Error (RMSE) on test data = 1.3810964163543961e-16


In [48]:
df_predictions.select("prediction", "AveragePrice", "region").show(10)

+------------------+------------+----------------+
|        prediction|AveragePrice|          region|
+------------------+------------+----------------+
|              0.78|        0.78|         Houston|
|0.8299999999999998|        0.83|        LasVegas|
|              0.87|        0.87|CincinnatiDayton|
|0.9000000000000002|         0.9|      California|
|0.9299999999999999|        0.93|         Chicago|
|0.9499999999999998|        0.95|         TotalUS|
|0.9599999999999999|        0.96|         Roanoke|
|0.9699999999999999|        0.97|           Boise|
|0.9799999999999999|        0.98| RichmondNorfolk|
|0.9799999999999999|        0.98|      Sacramento|
+------------------+------------+----------------+
only showing top 10 rows



Tulis deskripsi dan analisa kalian di sini

Model berhasil menangkap hubungan antara fitur dan variabel target di berbagai wilayah, seperti yang ditunjukkan oleh keselarasan yang tepat antara harga prediksi dan harga aktual di kota-kota seperti Houston, Las Vegas, dan Sacramento. Meskipun model berkinerja sangat baik, kesalahan yang rendah tersebut dapat mengindikasikan potensi overfitting, dan validasi atau investigasi lebih lanjut terhadap kumpulan data mungkin diperlukan untuk memastikan generalisasi yang kuat pada data yang tidak terlihat.