<a href="https://colab.research.google.com/github/zahrahani/big-data/blob/main/(2420506024)_Hands_On_Pertemuan_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Hands-On Pertemuan 3: Data Processing dengan Apache Spark

## Tujuan:
- Memahami dan mempraktikkan data processing menggunakan Apache Spark.
- Menggunakan Spark untuk operasi data yang efisien pada dataset besar.
- Menerapkan teknik canggih dalam Spark untuk mengatasi kasus penggunaan nyata.

### 1. Pengenalan Spark DataFrames
Spark DataFrame menyediakan struktur data yang optimal dengan operasi yang dioptimalkan untuk pemrosesan data besar, yang sangat mirip dengan DataFrame di Pandas atau di RDBMS.

- **Tugas 1**: Buat DataFrame sederhana di Spark dan eksplorasi beberapa fungsi dasar yang tersedia.

In [4]:
# DataFrame Spark sederhana
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df.show()

+------------+-------------+------+
|EmployeeName|   Department|Salary|
+------------+-------------+------+
|        Axel|           HR|  4200|
|       James|        Sales|  3000|
|     Michael|        Sales|  4600|
|      Robert|        Sales|  4100|
|       Maria|      Finance|  3000|
|       Selly|Cust. Service|  3250|
+------------+-------------+------+



### 2. Transformasi Dasar dengan DataFrames
Pemrosesan data meliputi transformasi seperti filtering, selections, dan aggregations. Spark menyediakan cara efisien untuk melaksanakan operasi ini.

- **Tugas 2**: Gunakan operasi filter, select, groupBy untuk mengekstrak informasi dari data, serta lakukan agregasi data untuk mendapatkan insight tentang dataset menggunakan perintah seperti mean, max, sum.

In [None]:
# Contoh operasi transformasi DataFrame
filter(df['Salary'] > 3000).show()
select('EmployeeName', 'Salary').show()
groupBy('Department').avg('Salary').show()

In [None]:
# DataFrame Spark sederhana dan operasi dasar filter()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df.filter(df['Salary'] > 4000).show()

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|        Axel|        HR|  4200|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+



In [None]:
# DataFrame Spark sederhana dan operasi dasar filter(), toPandas()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df.filter(df.Department == "Sales").toPandas()

Unnamed: 0,EmployeeName,Department,Salary
0,James,Sales,3000
1,Michael,Sales,4600
2,Robert,Sales,4100


In [None]:
# DataFrame Spark sederhana dan operasi dasar select()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df.select('EmployeeName', 'Salary').show()

+------------+------+
|EmployeeName|Salary|
+------------+------+
|        Axel|  4200|
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3000|
|       Selly|  3250|
+------------+------+



In [None]:
# DataFrame Spark sederhana dan operasi dasar select()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df.select("EmployeeName", "Department").show()

+------------+-------------+
|EmployeeName|   Department|
+------------+-------------+
|        Axel|           HR|
|       James|        Sales|
|     Michael|        Sales|
|      Robert|        Sales|
|       Maria|      Finance|
|       Selly|Cust. Service|
+------------+-------------+



In [None]:
# DataFrame Spark sederhana dan operasi dasar groupBy()
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df.groupBy('Department').avg('Salary').show()

+-------------+-----------+
|   Department|avg(Salary)|
+-------------+-----------+
|        Sales|     3900.0|
|           HR|     4200.0|
|      Finance|     3000.0|
|Cust. Service|     3250.0|
+-------------+-----------+



In [None]:
# DataFrame Spark sederhana dan operasi dasar groupBy() + fungsi agregasi
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, mean, max, min, sum

spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df.groupBy('Department').agg(
    count("EmployeeName").alias("Employee Count"),
    mean("Salary").alias("Average Salary"),
    max("Salary").alias("Max Salary"),
    min("Salary").alias("Min Salary"),
    sum("Salary").alias("Total Salary")
).show()

+-------------+--------------+--------------+----------+----------+------------+
|   Department|Employee Count|Average Salary|Max Salary|Min Salary|Total Salary|
+-------------+--------------+--------------+----------+----------+------------+
|        Sales|             3|        3900.0|      4600|      3000|       11700|
|           HR|             1|        4200.0|      4200|      4200|        4200|
|      Finance|             1|        3000.0|      3000|      3000|        3000|
|Cust. Service|             1|        3250.0|      3250|      3250|        3250|
+-------------+--------------+--------------+----------+----------+------------+



### 3. Bekerja dengan Tipe Data Kompleks
Spark mendukung tipe data yang kompleks seperti maps, arrays, dan structs yang memungkinkan operasi yang lebih kompleks pada dataset yang kompleks.

- **Tugas 3**: Eksplorasi bagaimana mengolah tipe data kompleks dalam Spark DataFrames.

In [None]:
# Contoh manipulasi tipe data kompleks
df.withColumn('SalaryBonus', df['Salary'] * 0.1).show()
df.withColumn('TotalCompensation', df['Salary'] + df['SalaryBonus']).show()

In [None]:
# Manipulasi tipe data kompleks
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('Axel', 'HR', 4200),
        ('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000),
        ('Selly', 'Cust. Service', 3250)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df1 = df.withColumn('SalaryBonus', df['Salary'] * 0.1)
print("       Bonus Gaji Karyawan per Departemen")
df1.show()

df2 = df1.withColumn('TotalCompensation', df1['Salary'] + df1['SalaryBonus'])
print("                     Total Kompensasi Karyawan")
df2.show()

       Bonus Gaji Karyawan per Departemen
+------------+-------------+------+-----------+
|EmployeeName|   Department|Salary|SalaryBonus|
+------------+-------------+------+-----------+
|        Axel|           HR|  4200|      420.0|
|       James|        Sales|  3000|      300.0|
|     Michael|        Sales|  4600|      460.0|
|      Robert|        Sales|  4100|      410.0|
|       Maria|      Finance|  3000|      300.0|
|       Selly|Cust. Service|  3250|      325.0|
+------------+-------------+------+-----------+

                     Total Kompensasi Karyawan
+------------+-------------+------+-----------+-----------------+
|EmployeeName|   Department|Salary|SalaryBonus|TotalCompensation|
+------------+-------------+------+-----------+-----------------+
|        Axel|           HR|  4200|      420.0|           4620.0|
|       James|        Sales|  3000|      300.0|           3300.0|
|     Michael|        Sales|  4600|      460.0|           5060.0|
|      Robert|        Sales|  4100

### 4. Operasi Data Lanjutan
Menggunakan Spark untuk operasi lanjutan seperti window functions, user-defined functions (UDFs), dan mengoptimalkan query.

- **Tugas 4**: Implementasikan window function untuk menghitung running totals atau rangkings.

In [None]:
# Implementasi window functions
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy('Department').orderBy('Salary')
df2.withColumn('Rank', F.rank().over(windowSpec)).show()

+------------+-------------+------+-----------+-----------------+----+
|EmployeeName|   Department|Salary|SalaryBonus|TotalCompensation|Rank|
+------------+-------------+------+-----------+-----------------+----+
|       Selly|Cust. Service|  3250|      325.0|           3575.0|   1|
|       Maria|      Finance|  3000|      300.0|           3300.0|   1|
|        Axel|           HR|  4200|      420.0|           4620.0|   1|
|       James|        Sales|  3000|      300.0|           3300.0|   1|
|      Robert|        Sales|  4100|      410.0|           4510.0|   2|
|     Michael|        Sales|  4600|      460.0|           5060.0|   3|
+------------+-------------+------+-----------+-----------------+----+



### 5. Kesimpulan dan Eksplorasi Lebih Lanjut
Review apa yang telah dipelajari tentang pemrosesan data menggunakan Spark dan eksplorasi teknik lebih lanjut untuk mengoptimalkan pemrosesan data Anda.
<br>**Tugas 5**:
- Unduh dataset besar dari [Kaggle](https://www.kaggle.com/) atau sumber lainnya.
- Input data csv yang telah di download, kemudian load dan simpan data ke dalam pyspark.
- Setelah data berhasil di load menggunakan pyspark, lakukan manipulasi data untuk memperoleh informasi yang dibutuhkan

[Coffee Sales Dataset](https://www.kaggle.com/datasets/navjotkaushal/coffee-sales-dataset) from Kaggle

In [1]:
# Input data .csv
from google.colab import files
uploaded = files.upload()

Saving Coffe_sales.csv to Coffe_sales.csv


In [7]:
# Membaca dataset dari file .csv dalam bentuk tabel
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()
dfCoffe = spark.read.csv("Coffe_sales.csv", header=True, inferSchema=True)

dfCoffe.show()

+-----------+---------+-----+-------------------+-----------+-------+----------+-----------+---------+----------+--------------------+
|hour_of_day|cash_type|money|        coffee_name|Time_of_Day|Weekday|Month_name|Weekdaysort|Monthsort|      Date|                Time|
+-----------+---------+-----+-------------------+-----------+-------+----------+-----------+---------+----------+--------------------+
|         10|     card| 38.7|              Latte|    Morning|    Fri|       Mar|          5|        3|2024-03-01|2025-09-09 10:15:...|
|         12|     card| 38.7|      Hot Chocolate|  Afternoon|    Fri|       Mar|          5|        3|2024-03-01|2025-09-09 12:19:...|
|         12|     card| 38.7|      Hot Chocolate|  Afternoon|    Fri|       Mar|          5|        3|2024-03-01|2025-09-09 12:20:...|
|         13|     card| 28.9|          Americano|  Afternoon|    Fri|       Mar|          5|        3|2024-03-01|2025-09-09 13:46:...|
|         13|     card| 38.7|              Latte|  Afte

In [6]:
# Membaca dataset dari file .csv dalam bentuk struktur kolom dataset
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()
dfCoffe = spark.read.csv("Coffe_sales.csv", header=True, inferSchema=True)

dfCoffe.printSchema()

root
 |-- hour_of_day: integer (nullable = true)
 |-- cash_type: string (nullable = true)
 |-- money: double (nullable = true)
 |-- coffee_name: string (nullable = true)
 |-- Time_of_Day: string (nullable = true)
 |-- Weekday: string (nullable = true)
 |-- Month_name: string (nullable = true)
 |-- Weekdaysort: integer (nullable = true)
 |-- Monthsort: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Time: timestamp (nullable = true)



In [8]:
# Menampilkan data penjualan yang terakhir masuk (descending)
from pyspark.sql.functions import col

dfCoffe.orderBy(col("Date").desc()).show()

+-----------+---------+-----+-------------------+-----------+-------+----------+-----------+---------+----------+--------------------+
|hour_of_day|cash_type|money|        coffee_name|Time_of_Day|Weekday|Month_name|Weekdaysort|Monthsort|      Date|                Time|
+-----------+---------+-----+-------------------+-----------+-------+----------+-----------+---------+----------+--------------------+
|         10|     card|35.76|              Latte|    Morning|    Sun|       Mar|          7|        3|2025-03-23|2025-09-09 10:07:...|
|         10|     card|35.76|         Cappuccino|    Morning|    Sun|       Mar|          7|        3|2025-03-23|2025-09-09 10:34:...|
|         14|     card|35.76|              Cocoa|  Afternoon|    Sun|       Mar|          7|        3|2025-03-23|2025-09-09 14:43:...|
|         14|     card|35.76|              Cocoa|  Afternoon|    Sun|       Mar|          7|        3|2025-03-23|2025-09-09 14:44:...|
|         15|     card|25.96|          Americano|  Afte

In [None]:
# Menampilkan jumlah record data dari dataset Coffee Sales
print("Jumlah baris dalam dataset 'Coffee Sales' : " + str(dfCoffe.count()) + " baris\n")

print("Jumlah baris per nama kopi:")
dfCoffe.groupby("coffee_name").count().show()

Jumlah baris dalam dataset 'Coffee Sales' : 3547 baris

Jumlah baris per nama kopi:
+-------------------+-----+
|        coffee_name|count|
+-------------------+-----+
|           Espresso|  129|
|Americano with Milk|  809|
|              Latte|  757|
|          Americano|  564|
|      Hot Chocolate|  276|
|              Cocoa|  239|
|            Cortado|  287|
|         Cappuccino|  486|
+-------------------+-----+



In [None]:
# Menghitung jumlah transaksi per kopi
dfCoffe.groupBy("coffee_name").count().orderBy(col("count").desc()).show()

# Menghitung jumlah menu kopi
num_menu = dfCoffe.select("coffee_name").distinct().count()
print(f"Jumlah menu: {num_menu}")

+-------------------+-----+
|        coffee_name|count|
+-------------------+-----+
|Americano with Milk|  809|
|              Latte|  757|
|          Americano|  564|
|         Cappuccino|  486|
|            Cortado|  287|
|      Hot Chocolate|  276|
|              Cocoa|  239|
|           Espresso|  129|
+-------------------+-----+

Jumlah menu: 8


In [15]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, year

# Filtering = mengambil data penjualan tahun 2025
df_2025 = dfCoffe.filter(year(col("Date")) == 2025)

# Window untuk mengambil harga terakhir per menu
windowSpec = Window.partitionBy("coffee_name").orderBy(col("Date").desc())
df_with_row = df_2025.withColumn("row_num", row_number().over(windowSpec))

# Ambil record terakhir
df_latest_price = (df_with_row
                   .filter(col("row_num") == 1)
                   .select(
                       col("coffee_name").alias("Menu"),
                       col("money"). alias("Price")))

# Menampilkan Katalog Menu yang dijual
print("====== Katalog  Menu ======")
df_latest_price.show()

+-------------------+-----+
|               Menu|Price|
+-------------------+-----+
|          Americano|25.96|
|Americano with Milk|30.86|
|         Cappuccino|35.76|
|              Cocoa|35.76|
|            Cortado|25.96|
|           Espresso|21.06|
|      Hot Chocolate|35.76|
|              Latte|35.76|
+-------------------+-----+

