### 1. Pengenalan Spark DataFrames
Spark DataFrame menyediakan struktur data yang optimal dengan operasi yang dioptimalkan untuk pemrosesan data besar, yang sangat mirip dengan DataFrame di Pandas atau di RDBMS.

- **Tugas 1**: Buat DataFrame sederhana di Spark dan eksplorasi beberapa fungsi dasar yang tersedia.

In [None]:
# Contoh membuat DataFrame sederhana dan operasi dasar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+



### 2. Transformasi Dasar dengan DataFrames
Pemrosesan data meliputi transformasi seperti filtering, selections, dan aggregations. Spark menyediakan cara efisien untuk melaksanakan operasi ini.

- **Tugas 2**: Gunakan operasi filter, select, groupBy untuk mengekstrak informasi dari data, serta lakukan agregasi data untuk mendapatkan insight tentang dataset menggunakan perintah seperti mean, max, sum.

In [None]:
# Contoh operasi transformasi DataFrame
select('EmployeeName', 'Salary').show()
filter(df['Salary'] > 3000).show()
groupBy('Department').avg('Salary').show()

In [None]:
#Select kolom tertentu
print("Data Kolom EmployeeName dan Salary")
df.select('EmployeeName', 'Salary').show()

#Filter gaji lebih dari 3000
print("Salary above 3000")
df.filter(df['Salary'] > 3000).show()

#GroupBy Department dan hitung rata-rata gaji
print("Mean Salary per Department")
df.groupBy('Department').avg('Salary').show()

#GroupBy Department dan hitung nilai maksimum gaji
print("Salary Max per Department")
df.groupBy('Department').max('Salary').show()

#GroupBy Department dan hitung total gaji
print("Total Salary per Department")
df.groupBy('Department').sum('Salary').show()


Data Kolom EmployeeName dan Salary
+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3000|
+------------+------+

Salary above 3000
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

Mean Salary per Department
+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3900.0|
|   Finance|     3000.0|
+----------+-----------+

Salary Max per Department
+----------+-----------+
|Department|max(Salary)|
+----------+-----------+
|     Sales|       4600|
|   Finance|       3000|
+----------+-----------+

Total Salary per Department
+----------+-----------+
|Department|sum(Salary)|
+----------+-----------+
|     Sales|      11700|
|   Finance|       3000|
+----------+-----------+



### 3. Bekerja dengan Tipe Data Kompleks
Spark mendukung tipe data yang kompleks seperti maps, arrays, dan structs yang memungkinkan operasi yang lebih kompleks pada dataset yang kompleks.

- **Tugas 3**: Eksplorasi bagaimana mengolah tipe data kompleks dalam Spark DataFrames.

In [None]:
# Contoh manipulasi tipe data kompleks
df.withColumn('SalaryBonus', df['Salary'] * 0.1).show()
df.withColumn('TotalCompensation', df['Salary'] + df['SalaryBonus']).show()

In [None]:
#Menambah kolom SalaryBonus (10% dari Salary)
df = df.withColumn('SalaryBonus', df['Salary'] * 0.1)

#Menambah kolom TotalCompensation (Salary + Bonus)
df = df.withColumn('TotalCompensation', df['Salary'] + df['SalaryBonus'])

#Menampilkan hasil akhir
df.show()

+------------+----------+------+-----------+-----------------+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|
+------------+----------+------+-----------+-----------------+
|       James|     Sales|  3000|      300.0|           3300.0|
|     Michael|     Sales|  4600|      460.0|           5060.0|
|      Robert|     Sales|  4100|      410.0|           4510.0|
|       Maria|   Finance|  3000|      300.0|           3300.0|
+------------+----------+------+-----------+-----------------+



### 4. Operasi Data Lanjutan
Menggunakan Spark untuk operasi lanjutan seperti window functions, user-defined functions (UDFs), dan mengoptimalkan query.

- **Tugas 4**: Implementasikan window function untuk menghitung running totals atau rangkings.

In [None]:
# Contoh menggunakan window functions
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy('Department').orderBy('Salary')
df.withColumn('Rank', F.rank().over(windowSpec)).show()

+------------+----------+------+-----------+-----------------+----+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|Rank|
+------------+----------+------+-----------+-----------------+----+
|       Maria|   Finance|  3000|      300.0|           3300.0|   1|
|       James|     Sales|  3000|      300.0|           3300.0|   1|
|      Robert|     Sales|  4100|      410.0|           4510.0|   2|
|     Michael|     Sales|  4600|      460.0|           5060.0|   3|
+------------+----------+------+-----------+-----------------+----+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, sum as _sum, rank

# Window berdasarkan Department dengan urutan Salary
windowDept = Window.partitionBy("Department").orderBy(col("Salary"))

# Running total Salary per Department
df_running_total = df.withColumn(
    "RunningTotal",
    _sum("Salary").over(windowDept)
)

# Ranking Salary per Department
df_rank = df_running_total.withColumn(
    "Rank",
    rank().over(windowDept)
)

df_rank.show()


+------------+----------+------+-----------+-----------------+------------+----+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|RunningTotal|Rank|
+------------+----------+------+-----------+-----------------+------------+----+
|       Maria|   Finance|  3000|      300.0|           3300.0|        3000|   1|
|       James|     Sales|  3000|      300.0|           3300.0|        3000|   1|
|      Robert|     Sales|  4100|      410.0|           4510.0|        7100|   2|
|     Michael|     Sales|  4600|      460.0|           5060.0|       11700|   3|
+------------+----------+------+-----------+-----------------+------------+----+



### 5. Kesimpulan dan Eksplorasi Lebih Lanjut
Review apa yang telah dipelajari tentang pemrosesan data menggunakan Spark dan eksplorasi teknik lebih lanjut untuk mengoptimalkan pemrosesan data Anda.
<br>**Tugas 5**:
- Unduh dataset besar dari [Kaggle](https://www.kaggle.com/) atau sumber lainnya.
- Input data csv yang telah di download, kemudian load dan simpan data ke dalam pyspark.
- Setelah data berhasil di load menggunakan pyspark, lakukan manipulasi data untuk memperoleh informasi yang dibutuhkan

In [48]:
from pyspark.sql import SparkSession

# Buat SparkSession
spark = SparkSession.builder.appName("anime").getOrCreate()

# Baca CSV ke DataFrame Spark
df_anime = spark.read.csv("anime.csv", header=True, inferSchema=True)

# Tampilkan DataFrame
df_anime.show(5)

+--------+--------------------+--------------------+-----+--------+------+-------+
|anime_id|                name|               genre| type|episodes|rating|members|
+--------+--------------------+--------------------+-----+--------+------+-------+
|   32281|      Kimi no Na wa.|Drama, Romance, S...|Movie|       1|  9.37| 200630|
|    5114|Fullmetal Alchemi...|Action, Adventure...|   TV|      64|  9.26| 793665|
|   28977|            Gintama°|Action, Comedy, H...|   TV|      51|  9.25| 114262|
|    9253|         Steins;Gate|    Sci-Fi, Thriller|   TV|      24|  9.17| 673572|
|    9969|       Gintama&#039;|Action, Comedy, H...|   TV|      51|  9.16| 151266|
+--------+--------------------+--------------------+-----+--------+------+-------+
only showing top 5 rows



**Transformasi Dasar dengan DataFrames**

In [55]:
# Filter: mengambil anime dengan rating > 9.2
df_anime.filter(df_anime['rating'] > 9.2).select("name", "rating").show(5, truncate=False)

# Select: memilih kolom tertentu saja
df_anime.select("name", "type", "episodes").show(5, truncate=False)

# GroupBy + Aggregation: rata-rata rating per tipe anime
df_anime.groupBy("type").mean("rating").show(5)

# GroupBy + Aggregation lain: total members per genre
df_anime.groupBy("genre").sum("members").show(5, truncate=False)

+--------------------------------+------+
|name                            |rating|
+--------------------------------+------+
|Kimi no Na wa.                  |9.37  |
|Fullmetal Alchemist: Brotherhood|9.26  |
|Gintama°                        |9.25  |
|Kahei no Umi                    |9.33  |
|Mogura no Motoro                |9.5   |
+--------------------------------+------+
only showing top 5 rows

+--------------------------------+-----+--------+
|name                            |type |episodes|
+--------------------------------+-----+--------+
|Kimi no Na wa.                  |Movie|1       |
|Fullmetal Alchemist: Brotherhood|TV   |64      |
|Gintama°                        |TV   |51      |
|Steins;Gate                     |TV   |24      |
|Gintama&#039;                   |TV   |51      |
+--------------------------------+-----+--------+
only showing top 5 rows

+-------+-----------------+
|   type|      avg(rating)|
+-------+-----------------+
|     TV|6.902299101062382|
|   NULL| 

**Bekerja dengan Tipe Data Kompleks**

In [50]:
from pyspark.sql.functions import split, struct

# Buat kolom array dari genre (dipisahkan dengan koma)
df_complex = df_anime.withColumn("genre_array", split(df_anime["genre"], ","))

# Buat kolom struct untuk menyatukan rating dan members
df_complex = df_complex.withColumn("stats", struct("rating", "members"))

# Lihat hasil
df_complex.select("name", "genre_array", "stats").show(5, truncate=False)

+--------------------------------+--------------------------------------------------------------------+--------------+
|name                            |genre_array                                                         |stats         |
+--------------------------------+--------------------------------------------------------------------+--------------+
|Kimi no Na wa.                  |[Drama,  Romance,  School,  Supernatural]                           |{9.37, 200630}|
|Fullmetal Alchemist: Brotherhood|[Action,  Adventure,  Drama,  Fantasy,  Magic,  Military,  Shounen] |{9.26, 793665}|
|Gintama°                        |[Action,  Comedy,  Historical,  Parody,  Samurai,  Sci-Fi,  Shounen]|{9.25, 114262}|
|Steins;Gate                     |[Sci-Fi,  Thriller]                                                 |{9.17, 673572}|
|Gintama&#039;                   |[Action,  Comedy,  Historical,  Parody,  Samurai,  Sci-Fi,  Shounen]|{9.16, 151266}|
+--------------------------------+--------------

**Operasi Data Lanjutan**

In [54]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, rank, sum as _sum

# WindowSpec untuk ranking berdasarkan rating per type
windowRank = Window.partitionBy("type").orderBy(col("rating").desc())

# WindowSpec untuk running total members per type
windowRunning = Window.partitionBy("type").orderBy(col("rating").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_final = df_rank.withColumn("RunningMembers", _sum("members").over(windowRunning))

# Tampilkan hasil
df_final.select("name", "type", "rating", "members", "Rank", "RunningMembers").show(5, truncate=False)


+---------------------------------------------------+-----+------+-------+----+--------------+
|name                                               |type |rating|members|Rank|RunningMembers|
+---------------------------------------------------+-----+------+-------+----+--------------+
|Taka no Tsume 8: Yoshida-kun no X-Files            |Movie|10.0  |13     |1   |13            |
|Mogura no Motoro                                   |Movie|9.5   |62     |2   |75            |
|Kimi no Na wa.                                     |Movie|9.37  |200630 |3   |200705        |
|Kahei no Umi                                       |Movie|9.33  |44     |4   |200749        |
|Gintama Movie: Kanketsu-hen - Yorozuya yo Eien Nare|Movie|9.1   |72534  |5   |273283        |
+---------------------------------------------------+-----+------+-------+----+--------------+
only showing top 5 rows

