**Tugas 1** : Buat DataFrame sederhana di Spark dan eksplorasi beberapa fungsi dasar yang tersedia.

In [None]:
#Contoh membuat DataFrame sederhana dan operasi dasar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3100)]
columns = ['EmployeeName', 'Departement', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()

+------------+-----------+------+
|EmployeeName|Departement|Salary|
+------------+-----------+------+
|       James|      Sales|  3000|
|     Michael|      Sales|  4600|
|      Robert|      Sales|  4100|
|       Maria|    Finance|  3100|
+------------+-----------+------+



**Tugas 2** : Gunakan operasi filter,  select, GroupBy untuk mengekstrak informasi dari data, serta lakukan agragasi data untuk mendapatkan insight tentang dataset menggunakan perintah seperti mean, max, sum.

In [None]:
from pyspark.sql.functions import col, avg, max, sum, lit

#Menampilkan skema dataframe
df.printSchema()

#Memilih kolom tertentu
print("Kolom EmployeeName dan Salary")
df_cobaselect = df.select("EmployeeName", "Salary")
df_cobaselect.show()

# Menyaring baris
print("Gaji  di atas 4000")
df_saring_gaji = df.filter(col("Salary") >  4000)
df_saring_gaji.show()

#Menambah atau mengubah kolom
print("Menambah kolom")
df_kolom_baru = df.withColumn("Bonus", lit(500))
df_kolom_baru.show()

# Mengelompokkan dan mengagregasi
print("Mengelompokkan dan mengagregasi")
df_kelompok_rata_gaji = df.groupBy("Departement").agg(
    avg("Salary").alias("Rata-rata Gaji"),
    max("Salary").alias("Gaji tertinggi"),
    sum("Salary").alias("Total Gaji"))
df_kelompok_rata_gaji.show()

#Mengurutkan data dari tertinggi ke terendah
print("Mengurutkan data dari tertinggi ke terendah")
df_urut =df.orderBy(col("Salary").desc())
df_urut.show()

#Mengubah nama kolom
print("Mengubah nama kolom")
df_ubah_nama = df.withColumnRenamed("EmployeeName", "Nama pekerja")
df_ubah_nama.show()

root
 |-- EmployeeName: string (nullable = true)
 |-- Departement: string (nullable = true)
 |-- Salary: long (nullable = true)

Kolom EmployeeName dan Salary
+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3100|
+------------+------+

Gaji  di atas 4000
+------------+-----------+------+
|EmployeeName|Departement|Salary|
+------------+-----------+------+
|     Michael|      Sales|  4600|
|      Robert|      Sales|  4100|
+------------+-----------+------+

Menambah kolom
+------------+-----------+------+-----+
|EmployeeName|Departement|Salary|Bonus|
+------------+-----------+------+-----+
|       James|      Sales|  3000|  500|
|     Michael|      Sales|  4600|  500|
|      Robert|      Sales|  4100|  500|
|       Maria|    Finance|  3100|  500|
+------------+-----------+------+-----+

Mengelompokkan dan mengagregasi
+-----------+--------------+--------------+----------+
|Departement|Rata

**Tugas 3** : Eksplorasi bagaimana mengolah tipe data kompleks dalam Spark DataFrames.

In [None]:
# Contoh manipulasi tipe data kompleks
SalaryBonus = df.withColumn('SalaryBonus', df['Salary'] * 0.1)
SalaryBonus.show()
TotalCompensation = SalaryBonus.withColumn('TotalCompensation', SalaryBonus['Salary'] + SalaryBonus['SalaryBonus'])
TotalCompensation.show()

+------------+-----------+------+-----------+
|EmployeeName|Departement|Salary|SalaryBonus|
+------------+-----------+------+-----------+
|       James|      Sales|  3000|      300.0|
|     Michael|      Sales|  4600|      460.0|
|      Robert|      Sales|  4100|      410.0|
|       Maria|    Finance|  3100|      310.0|
+------------+-----------+------+-----------+

+------------+-----------+------+-----------+-----------------+
|EmployeeName|Departement|Salary|SalaryBonus|TotalCompensation|
+------------+-----------+------+-----------+-----------------+
|       James|      Sales|  3000|      300.0|           3300.0|
|     Michael|      Sales|  4600|      460.0|           5060.0|
|      Robert|      Sales|  4100|      410.0|           4510.0|
|       Maria|    Finance|  3100|      310.0|           3410.0|
+------------+-----------+------+-----------+-----------------+



In [None]:
# ArrayType
from pyspark.sql import SparkSession
from pyspark.sql.functions  import col, explode

spark = SparkSession.builder.appName("Tipe_Kompleks").getOrCreate()

data = [(1, ["Membaca", "Coding", "Traveling"]),
        (2, ["Gaming", "Musik"])]

df = spark.createDataFrame(data, ["id","hobbies"])
print("Contoh tipe array")
df.show(truncate=False)

#Mengolah array
df_exploded = df.withColumn("Hobby", explode(col("Hobbies")))
print("Array setelah diolah")
df_exploded.show(truncate=False)


# MapType
data = [(1, {"Big Data Teori": 90, "Bahasa Inggris": 85}),
        (2, {"Big Data Teori": 78, "Bahasa Inggris": 92})]

df = spark.createDataFrame(data, ["id", "Nilai"])
print("Contoh tipe map")
df.show(truncate=False)

# Mengambil nilai dari map
print("Mengambil nilai dari map")
df.select(col("id"), col("Nilai")["Big Data Teori"].alias("Nilai Big Data")).show()


#  StructType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data = [
    (1, ("Alice", 23)),
    (2, ("Bob", 30))
]

schema =  StructType([
    StructField("id",IntegerType(), True),
    StructField("orang", StructType([
        StructField("nama", StringType(), True),
        StructField("umur", IntegerType(), True)
    ]))
])

df = spark.createDataFrame(data, schema)
print("Contoh tipe struct")
df.show(truncate=False)

# Mengakses field di dalam struct
print("Mengakses field di dalam struct")
df.select(col("id"), col("orang.nama"),col("orang.umur")).show()


Contoh tipe array
+---+----------------------------+
|id |hobbies                     |
+---+----------------------------+
|1  |[Membaca, Coding, Traveling]|
|2  |[Gaming, Musik]             |
+---+----------------------------+

Array setelah diolah
+---+----------------------------+---------+
|id |hobbies                     |Hobby    |
+---+----------------------------+---------+
|1  |[Membaca, Coding, Traveling]|Membaca  |
|1  |[Membaca, Coding, Traveling]|Coding   |
|1  |[Membaca, Coding, Traveling]|Traveling|
|2  |[Gaming, Musik]             |Gaming   |
|2  |[Gaming, Musik]             |Musik    |
+---+----------------------------+---------+

Contoh tipe map
+---+--------------------------------------------+
|id |Nilai                                       |
+---+--------------------------------------------+
|1  |{Big Data Teori -> 90, Bahasa Inggris -> 85}|
|2  |{Big Data Teori -> 78, Bahasa Inggris -> 92}|
+---+--------------------------------------------+

Mengambil nilai dari 

**Tugas 4** : Implementasikan window function untuk menghitung running total  atau rangking.

In [None]:
# Contoh menggunakan window funtions untuk rangking
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy('Departement').orderBy('Salary')
df_with_rank = df_ubah_nama.withColumn('Rank', F.rank().over(windowSpec))
print("Penggunaan window functions untuk ranking")
df_with_rank.show()

# Contoh menggunakan window functions untuk running
df_running = df_ubah_nama.withColumn('Running Total', F.sum('Salary').over(windowSpec))
print("Penggunaan window function untuk running")
df_running.show()

Penggunaan window functions untuk ranking
+------------+-----------+------+----+
|Nama pekerja|Departement|Salary|Rank|
+------------+-----------+------+----+
|       Maria|    Finance|  3100|   1|
|       James|      Sales|  3000|   1|
|      Robert|      Sales|  4100|   2|
|     Michael|      Sales|  4600|   3|
+------------+-----------+------+----+

Penggunaan window function untuk running
+------------+-----------+------+-------------+
|Nama pekerja|Departement|Salary|Running Total|
+------------+-----------+------+-------------+
|       Maria|    Finance|  3100|         3100|
|       James|      Sales|  3000|         3000|
|      Robert|      Sales|  4100|         7100|
|     Michael|      Sales|  4600|        11700|
+------------+-----------+------+-------------+



In [None]:
!pip  install  pyspark



In [4]:
# Menggunakan dataset dari kaggle
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AnalisisDatasetNetflix").getOrCreate()

# Load  dataset Netflix
df = spark.read.csv("/netflix_titles_nov_2019.csv", header=True, inferSchema=True)

df.show(20,truncate=False)

df.printSchema()

+--------+--------------------------------------+--------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------+-----------------+------------+------+---------+-----------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------+-------+
|show_id |title                                 |director                  |cast                                                                                                                                                                                                                                                                 

In [5]:
from pyspark.sql.functions import split, explode, col

# Memisahkan genre dari array, lalu explode DUARR
df_genre = df.withColumn("genre", explode(split(col("listed_in"),", ")))

# Mengurutkan genre dari yang terbanyak
df_genre.groupBy("genre").count().orderBy(col("count").desc()).show(20, truncate=False)

+------------------------+-----+
|genre                   |count|
+------------------------+-----+
|International Movies    |1797 |
|Dramas                  |1485 |
|Comedies                |990  |
|International TV Shows  |966  |
|Documentaries           |657  |
|TV Dramas               |574  |
|Action & Adventure      |532  |
|Independent Movies      |511  |
|TV Comedies             |424  |
|Crime TV Shows          |355  |
|Thrillers               |352  |
|Children & Family Movies|340  |
|Romantic Movies         |338  |
|Kids' TV                |313  |
|Stand-Up Comedy         |273  |
|Docuseries              |271  |
|Romantic TV Shows       |265  |
|Music & Musicals        |229  |
|Horror Movies           |229  |
|British TV Shows        |205  |
+------------------------+-----+
only showing top 20 rows



In [30]:
# 1. Menghitung jumlah movie dan jumlah TV Show
df_filtered_tvshowandmovie = df.filter((col("type") == "Movie") | (col("type") == "TV Show"))
print("Menghitung jumlah movie dan TV Show")
df_filtered_tvshowandmovie.groupBy("type").count().show()

# 2. Menghitung jumlah konten berdasarkan negara (Top 10)
print("Menghitung jumlah konten berdasarkan negara (Top 10)")
df.groupBy("country").count().orderBy("count", ascending=False).show(10)

# 3. Mencari film rilisan terbaru
print("Film rilisan terbaru")
df.orderBy(df.release_year.desc()).select("title", "release_year").show(5)

# 4. Menghitung jumlah konten per tahun
print("Jumlah konten per tahun")
df.groupBy("release_year").count().orderBy("release_year", ascending=False).show(10)

# 5. Memecah genre (listed_in) jadi array
from pyspark.sql.functions import split
print("pecah genre")
df_genre = df.withColumn("genre", split(df.listed_in, ","))
df_genre.select("title", "genre").show(20, truncate=False)

# 6. Menghitung jumlah konten per genre
print("Jumlah konten per genre")
df_genre.select(explode(df_genre.genre).alias("genre")).groupBy("genre").count().orderBy("count", ascending=False).show(10, truncate=False)

# 7. Rangking negara dengan jumlah konten terbanyak
windowSpec = Window.orderBy(F.col("count").desc())
df_country_rank = df.groupBy("country").count() \
    .withColumn("rank", F.rank().over(windowSpec))
print("rangking negara dengan jumlah konten terbanyak")
df_country_rank.show(10)

# 8. Mencari sutradara dengan jumlah film terbanyak
print("Sutradara dengan jumlah film terbanyak")
df.groupBy("director").count().orderBy("count", ascending=False).show(10)

# 9. Mencari film dengan judul mengandung kata "Love" hehe
print("Film dengan judul mengandung kata Love")
df.filter(df.title.contains("Love")).select("title", "release_year").show(10, truncate=False)

# 10. Menghitung jumlah konten per rating
print("Jumlah konten per rating")
df.groupBy("rating").count().orderBy("count", ascending=False).show(10)

# 11. Menghitung jumlah aktor yang muncul di cast list
from pyspark.sql.functions import split, explode
print("Jumlah aktor yang muncul  di cast list")
df_cast = df.withColumn("actor", explode(split(df.cast, ", ")))
df_cast.groupBy("actor").count().orderBy("count", ascending=False).show(10)

# 12. 5 negara teratas dengan jumlah TV Shows terbanyak
print("5 negara teratas dengan jumlah TV Shows terbanyak")
df.filter(df.type == "TV Show").groupBy("country").count().orderBy("count", ascending=False).show(5)

# 13. Mencari tahun dengan jumlah film terbanyak
print("Tahun dengan jumlah film terbanyak")
df.filter(df.type == "Movie").groupBy("release_year").count().orderBy("count", ascending=False).show(5)

Menghitung jumlah movie dan TV Show
+-------+-----+
|   type|count|
+-------+-----+
|TV Show| 1871|
|  Movie| 3854|
+-------+-----+

Menghitung jumlah konten berdasarkan negara (Top 10)
+--------------+-----+
|       country|count|
+--------------+-----+
| United States| 1900|
|         India|  697|
|          NULL|  429|
|United Kingdom|  336|
|         Japan|  168|
|        Canada|  139|
|   South Korea|  133|
|         Spain|  113|
|        France|   85|
|        Mexico|   80|
+--------------+-----+
only showing top 10 rows

Film rilisan terbaru
+------------------+------------+
|             title|release_year|
+------------------+------------+
|     William Wyler|       TV-PG|
|Maradona in Mexico|        2020|
|    I Lost My Body|        2019|
|      Holiday Rush|        2019|
|  La Reina del Sur|        2019|
+------------------+------------+
only showing top 5 rows

Jumlah konten per tahun
+------------+-----+
|release_year|count|
+------------+-----+
|       TV-PG|    1|
|     