In [2]:
!pip install pandas==2.2.2 numpy pyspark==3.4.1


Collecting pyspark==3.4.1
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285391 sha256=6699598cd65debf8e025f46b350e14b28c734fa73b56fded9f79bd93dd3bd362
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


1. Pengenalan Spark DataFrames

In [6]:
# Contoh membuat DataFrame sederhana dan operasi dasar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan6').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+



2. Transformasi Dasar dengan DataFrames

In [7]:
# Contoh operasi transformasi DataFrame
df.select('EmployeeName', 'Salary').show()
df.filter(df['Salary'] > 3000).show()
df.groupBy('Department').avg('Salary').show()

+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3000|
+------------+------+

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3900.0|
|   Finance|     3000.0|
+----------+-----------+



Tugas 2: Gunakan operasi filter, select, groupBy untuk mengekstrak informasi dari data, serta lakukan agregasi data untuk mendapatkan insight tentang dataset menggunakan perintah seperti mean, max, sum.

In [8]:
# Mengelompokkan berdasarkan 'Department' dan menghitung rata-rata gaji
df.groupBy('Department').avg('Salary').show()

# Mengelompokkan berdasarkan 'Department' dan menghitung gaji maksimum
df.groupBy('Department').max('Salary').show()

# Mengelompokkan berdasarkan 'Department' dan menghitung total gaji
df.groupBy('Department').sum('Salary').show()


+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3900.0|
|   Finance|     3000.0|
+----------+-----------+

+----------+-----------+
|Department|max(Salary)|
+----------+-----------+
|     Sales|       4600|
|   Finance|       3000|
+----------+-----------+

+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|     Sales|      11700|
|   Finance|       3000|
+----------+-----------+



3. Bekerja dengan Tipe Data Kompleks

In [10]:
# Contoh manipulasi tipe data kompleks
# Tambahkan kolom SalaryBonus
df_with_bonus = df.withColumn('SalaryBonus', df['Salary'] * 0.1)

# Tambahkan kolom TotalCompensation berdasarkan kolom Salary dan SalaryBonus
df_with_total_comp = df_with_bonus.withColumn('TotalCompensation', df_with_bonus['Salary'] + df_with_bonus['SalaryBonus'])

# Tampilkan hasil akhir
df_with_total_comp.show()


+------------+----------+------+-----------+-----------------+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|
+------------+----------+------+-----------+-----------------+
|       James|     Sales|  3000|      300.0|           3300.0|
|     Michael|     Sales|  4600|      460.0|           5060.0|
|      Robert|     Sales|  4100|      410.0|           4510.0|
|       Maria|   Finance|  3000|      300.0|           3300.0|
+------------+----------+------+-----------+-----------------+



Tugas 3: Eksplorasi bagaimana mengolah tipe data kompleks dalam Spark DataFrames.

In [12]:
## Array di PySpark

from pyspark.sql.functions import array

# Membuat kolom baru 'Benefits' yang berupa array dari dua elemen
df_with_array = df.withColumn('Benefits', array(df['Salary'], df['Salary'] * 0.2))
df_with_array.show(truncate=False)

+------------+----------+------+---------------+
|EmployeeName|Department|Salary|Benefits       |
+------------+----------+------+---------------+
|James       |Sales     |3000  |[3000.0, 600.0]|
|Michael     |Sales     |4600  |[4600.0, 920.0]|
|Robert      |Sales     |4100  |[4100.0, 820.0]|
|Maria       |Finance   |3000  |[3000.0, 600.0]|
+------------+----------+------+---------------+



In [13]:
## Manipulasi Array

from pyspark.sql.functions import col

# Mengakses elemen pertama dan kedua dari kolom 'Benefits'
df_with_array.select(col('EmployeeName'), col('Benefits')[0].alias('SalaryComponent1'), col('Benefits')[1].alias('SalaryComponent2')).show()


+------------+----------------+----------------+
|EmployeeName|SalaryComponent1|SalaryComponent2|
+------------+----------------+----------------+
|       James|          3000.0|           600.0|
|     Michael|          4600.0|           920.0|
|      Robert|          4100.0|           820.0|
|       Maria|          3000.0|           600.0|
+------------+----------------+----------------+



In [14]:
## Struct di PySpark

from pyspark.sql.functions import struct

# Membuat kolom baru 'EmployeeInfo' yang berisi nama dan departemen sebagai satu struct
df_with_struct = df.withColumn('EmployeeInfo', struct(df['EmployeeName'], df['Department']))
df_with_struct.show(truncate=False)


+------------+----------+------+----------------+
|EmployeeName|Department|Salary|EmployeeInfo    |
+------------+----------+------+----------------+
|James       |Sales     |3000  |{James, Sales}  |
|Michael     |Sales     |4600  |{Michael, Sales}|
|Robert      |Sales     |4100  |{Robert, Sales} |
|Maria       |Finance   |3000  |{Maria, Finance}|
+------------+----------+------+----------------+



In [15]:
## Akses dan Manipulasi Struct

# Akses elemen di dalam struct 'EmployeeInfo'
df_with_struct.select(col('EmployeeInfo.EmployeeName'), col('EmployeeInfo.Department')).show()


+------------+----------+
|EmployeeName|Department|
+------------+----------+
|       James|     Sales|
|     Michael|     Sales|
|      Robert|     Sales|
|       Maria|   Finance|
+------------+----------+



In [16]:
## Map di PySpark

from pyspark.sql.functions import create_map, lit

# Membuat kolom baru 'SalaryInfo' berupa map yang berisi gaji dan bonus
df_with_map = df.withColumn('SalaryInfo', create_map(lit('Salary'), df['Salary'], lit('Bonus'), df['Salary'] * 0.1))
df_with_map.show(truncate=False)


+------------+----------+------+----------------------------------+
|EmployeeName|Department|Salary|SalaryInfo                        |
+------------+----------+------+----------------------------------+
|James       |Sales     |3000  |{Salary -> 3000.0, Bonus -> 300.0}|
|Michael     |Sales     |4600  |{Salary -> 4600.0, Bonus -> 460.0}|
|Robert      |Sales     |4100  |{Salary -> 4100.0, Bonus -> 410.0}|
|Maria       |Finance   |3000  |{Salary -> 3000.0, Bonus -> 300.0}|
+------------+----------+------+----------------------------------+



4. Operasi Data Lanjutan

In [11]:
# Contoh menggunakan window functions
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy('Department').orderBy('Salary')
df.withColumn('Rank', F.rank().over(windowSpec)).show()

+------------+----------+------+----+
|EmployeeName|Department|Salary|Rank|
+------------+----------+------+----+
|       Maria|   Finance|  3000|   1|
|       James|     Sales|  3000|   1|
|      Robert|     Sales|  4100|   2|
|     Michael|     Sales|  4600|   3|
+------------+----------+------+----+



Tugas 4: Implementasikan window function untuk menghitung running totals atau rangkings.

In [17]:
## Running Total menggunakan sum()

# Definisikan window specification untuk running total
windowSpec_running_total = Window.partitionBy('Department').orderBy('Salary').rowsBetween(Window.unboundedPreceding, Window.currentRow)

# Tambahkan kolom 'RunningTotal' yang menghitung total kumulatif gaji
df_with_running_total = df.withColumn('RunningTotal', F.sum('Salary').over(windowSpec_running_total))

# Tampilkan DataFrame dengan kolom running total
df_with_running_total.show()


+------------+----------+------+------------+
|EmployeeName|Department|Salary|RunningTotal|
+------------+----------+------+------------+
|       Maria|   Finance|  3000|        3000|
|       James|     Sales|  3000|        3000|
|      Robert|     Sales|  4100|        7100|
|     Michael|     Sales|  4600|       11700|
+------------+----------+------+------------+



In [18]:
## Ranking dengan dense_rank()

# Tambahkan kolom 'DenseRank' yang menggunakan window function dense_rank
df_with_dense_rank = df.withColumn('DenseRank', F.dense_rank().over(windowSpec))

# Tampilkan DataFrame dengan kolom dense rank
df_with_dense_rank.show()


+------------+----------+------+---------+
|EmployeeName|Department|Salary|DenseRank|
+------------+----------+------+---------+
|       Maria|   Finance|  3000|        1|
|       James|     Sales|  3000|        1|
|      Robert|     Sales|  4100|        2|
|     Michael|     Sales|  4600|        3|
+------------+----------+------+---------+



In [19]:
## Menghitung Moving Average

# Definisikan window specification untuk moving average
windowSpec_moving_avg = Window.partitionBy('Department').orderBy('Salary').rowsBetween(-1, 1)

# Tambahkan kolom 'MovingAvg' yang menghitung rata-rata bergerak (moving average)
df_with_moving_avg = df.withColumn('MovingAvg', F.avg('Salary').over(windowSpec_moving_avg))

# Tampilkan DataFrame dengan kolom moving average
df_with_moving_avg.show()


+------------+----------+------+---------+
|EmployeeName|Department|Salary|MovingAvg|
+------------+----------+------+---------+
|       Maria|   Finance|  3000|   3000.0|
|       James|     Sales|  3000|   3550.0|
|      Robert|     Sales|  4100|   3900.0|
|     Michael|     Sales|  4600|   4350.0|
+------------+----------+------+---------+



5. Kesimpulan dan Eksplorasi Lebih Lanjut

Tugas 5: Buat ringkasan dari semua operasi yang telah dilakukan dan bagaimana teknik ini dapat diterapkan pada proyek data Anda.

Jawaban:

Operasi yang dilakukan pada PySpark DataFrames meliputi:

1. Membuat DataFrame sederhana dan melakukan operasi dasar seperti select(), filter(), dan groupBy() untuk mengekstrak dan menganalisis data.
2. Manipulasi tipe data kompleks seperti penambahan kolom dan perhitungan menggunakan fungsi aritmatika.
3. Window functions seperti rank(), sum(), dan avg() digunakan untuk perhitungan peringkat, total kumulatif, dan rata-rata bergerak dalam partisi data.