## Panggil Data Clean

In [1]:
#For 3.3.1
#Register Sedona Functions to Spark
from sedona.register import SedonaRegistrator
SedonaRegistrator.registerAll(spark)

True

In [2]:
#For 3.3.2
from shapely.geometry import Point, Polygon, mapping
import h3.api.numpy_int as h3int

In [3]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import col, when

In [4]:
# Path
base_path = "s3a://ungp-ais-data-historical-backup/user_temp/"
path_unique = base_path + "212112409/"

In [10]:
# Read Data
data_2019 = spark.read.parquet(path_unique + "ais-ihs-indonesia-2019.parquet")
data_2020 = spark.read.parquet(path_unique + "ais-ihs-indonesia-2020.parquet")
data_2021 = spark.read.parquet(path_unique + "ais-ihs-indonesia-2021.parquet")
data_2022 = spark.read.parquet(path_unique + "ais-ihs-indonesia-2022.parquet")
data_2023 = spark.read.parquet(path_unique + "ais-ihs-indonesia-2023.parquet")
data_2024 = spark.read.parquet(path_unique + "ais-ihs-indonesia-2024.parquet")

In [8]:
data_2022.select("vessel_type").distinct().show(n=100,truncate=False)

+------------------------------------+
|vessel_type                         |
+------------------------------------+
|Sailing                             |
|Tanker                              |
|Towing                              |
|Unknown                             |
|Other                               |
|Tug                                 |
|Pleasure Craft                      |
|Passenger                           |
|Fishing                             |
|Pilot                               |
|WIG                                 |
|Dredging                            |
|Not Available                       |
|Cargo                               |
|Reserved                            |
|HSC                                 |
|UNAVAILABLE                         |
|Law Enforcement                     |
|Military                            |
|Diving                              |
|Spare                               |
|SAR                                 |
|Ships Not Party to Armed

In [9]:
data_2022.filter(col("vessel_type") == "Tug").select("ShiptypeLevel5").distinct().show(n=100,truncate=False)

+----------------------------------------+
|ShiptypeLevel5                          |
+----------------------------------------+
|Tug                                     |
|Utility Vessel                          |
|Products Tanker                         |
|Anchor Handling Vessel                  |
|Articulated Pusher Tug                  |
|General Cargo Ship                      |
|Anchor Handling Tug Supply              |
|Passenger/Ro-Ro Ship (Vehicles)         |
|Pusher Tug                              |
|Container Ship (Fully Cellular)         |
|Fishing Vessel                          |
|Salvage Ship                            |
|Landing Craft                           |
|Crew Boat                               |
|Offshore Tug/Supply Ship                |
|Platform Supply Ship                    |
|Buoy Tender                             |
|Bulk Carrier                            |
|Cement Carrier                          |
|Diving Support Vessel                   |
|Chemical/P

In [7]:
# Menghitung jumlah record pada setiap dataframe
count_2019 = data_2019.count()
count_2020 = data_2020.count()
count_2021 = data_2021.count()
count_2022 = data_2022.count()
count_2023 = data_2023.count()
count_2024 = data_2024.count()

# Menampilkan jumlah record untuk setiap tahun
print(f"Jumlah record data 2019: {count_2019}")
print(f"Jumlah record data 2020: {count_2020}")
print(f"Jumlah record data 2021: {count_2021}")
print(f"Jumlah record data 2022: {count_2022}")
print(f"Jumlah record data 2023: {count_2023}")
print(f"Jumlah record data 2024: {count_2024}")

Jumlah record data 2019: 252515833
Jumlah record data 2020: 295553890
Jumlah record data 2021: 252772186
Jumlah record data 2022: 319879984
Jumlah record data 2023: 380596722
Jumlah record data 2024: 612394139


## Filter Kapal Pertambangan Penggalian

In [11]:
from functools import reduce
from pyspark.sql import functions as F

from pyspark.sql.functions import col

# Daftar ShiptypeLevel5 yang ingin difilter
ship_types = [
    "Ore Carrier",
    "Bulk/Caustic Soda Carrier (CABU)",
    "Bulk/Oil Carrier (OBO)",
    "Bulk/Sulphuric Acid Carrier",
    "Bulk/Oil/Chemical Carrier (CLEANBU)",
    "Ore/Oil Carrier",
    "Aggregates Carrier",
    "Limestone Carrier",
    "General Cargo/Tanker (Container/Oil/Bulk - COB Ship)",
    "Molten Sulphur Tanker",
    "CNG Tanker",
    "Combination Gas Tanker (LNG/LPG)",
    "LNG Tanker",
    "Liquefied Hydrogen Tanker",
    "LPG Tanker",
    "LPG/Chemical Tanker",
    "Asphalt/Bitumen Tanker",
    "Coal/Oil Mixture Tanker",
    "Crude Oil Tanker",
    "Crude/Oil Products Tanker",
    "Shuttle Tanker",
    "Products Tanker",
    "Tanker (Unspecified)",
    "Bitumen Tank Barge, Non Propelled",
    "Bulk Aggregates Barge, Non Propelled",
    "Crude Oil Tank Barge, Non Propelled",
    "Hopper Barge, Non Propelled",
    "LNG Tank Barge, Non Propelled",
    "LPG Tank Barge, Non Propelled",
    "LPG Tanker, Inland Waterways",
    "Oil Tanker, Inland Waterways",
    "Stone Carrier"
]

# Filter untuk setiap tahun dan tipe kapal yang mengandung salah satu kata kunci dalam ship_types
def filter_by_ship_types(data, ship_types):
    return data.filter(
        reduce(
            lambda a, b: a | F.col("ShiptypeLevel5").contains(b), 
            ship_types, 
            F.lit(False)
        )
    )

data_2019_filtered = filter_by_ship_types(data_2019, ship_types)
data_2020_filtered = filter_by_ship_types(data_2020, ship_types)
data_2021_filtered = filter_by_ship_types(data_2021, ship_types)
data_2022_filtered = filter_by_ship_types(data_2022, ship_types)
data_2023_filtered = filter_by_ship_types(data_2023, ship_types)
data_2024_filtered = filter_by_ship_types(data_2024, ship_types)

In [14]:
from pyspark.sql.functions import when, col

# Fungsi untuk mengklasifikasikan vessel_type berdasarkan ShiptypeLevel5
def classify_vessel_type(df):
    return df.withColumn(
        "vessel_type",
        when(
            col("ShiptypeLevel5").rlike(
                "Ore Carrier|Bulk/Caustic Soda Carrier \\(CABU\\)|Bulk/Oil Carrier \\(OBO\\)|"
                "Bulk/Sulphuric Acid Carrier|Bulk/Oil/Chemical Carrier \\(CLEANBU\\)|Ore/Oil Carrier|"
                "Aggregates Carrier|Limestone Carrier|General Cargo/Tanker \\(Container/Oil/Bulk - COB Ship\\)|"
                "Bulk Aggregates Barge, Non Propelled|Hopper Barge, Non Propelled|Stone Carrier"
            ), "Cargo"
        )
        .when(
            col("ShiptypeLevel5").rlike(
                "Molten Sulphur Tanker|CNG Tanker|Combination Gas Tanker \\(LNG/LPG\\)|LNG Tanker|"
                "Liquefied Hydrogen Tanker|LPG Tanker|LPG/Chemical Tanker|Asphalt/Bitumen Tanker|"
                "Coal/Oil Mixture Tanker|Crude Oil Tanker|Crude/Oil Products Tanker|Shuttle Tanker|"
                "Products Tanker|Tanker \\(Unspecified\\)|Bitumen Tank Barge, Non Propelled|"
                "Crude Oil Tank Barge, Non Propelled|LNG Tank Barge, Non Propelled|LPG Tank Barge, Non Propelled|"
                "LPG Tanker, Inland Waterways|Oil Tanker, Inland Waterways"
            ), "Tanker"
        )
        .otherwise("Unknown")  # Jika ada tipe yang tidak masuk kriteria, beri label 'Unknown'
    )

# Terapkan fungsi pada tiap dataset
data_2019_filtered = classify_vessel_type(data_2019_filtered)
data_2020_filtered = classify_vessel_type(data_2020_filtered)
data_2021_filtered = classify_vessel_type(data_2021_filtered)
data_2022_filtered = classify_vessel_type(data_2022_filtered)
data_2023_filtered = classify_vessel_type(data_2023_filtered)
data_2024_filtered = classify_vessel_type(data_2024_filtered)

In [17]:
# Menampilkan contoh hasil
data_2019_filtered.select("mmsi","vessel_type","ShiptypeLevel5").show(n=1000,truncate=False)

+---------+-----------+------------------------+
|mmsi     |vessel_type|ShiptypeLevel5          |
+---------+-----------+------------------------+
|760001120|Tanker     |LPG Tanker              |
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker     |Chemical/Products Tanker|
|525015751|Tanker   

In [18]:
# Menghitung jumlah record pada setiap dataframe
count_2019_filtered = data_2019_filtered.count()
count_2020_filtered = data_2020_filtered.count()
count_2021_filtered = data_2021_filtered.count()
count_2022_filtered = data_2022_filtered.count()
count_2023_filtered = data_2023_filtered.count()
count_2024_filtered = data_2024_filtered.count()

# Menampilkan jumlah record untuk setiap tahun
print(f"Jumlah record data 2019: {count_2019_filtered}")
print(f"Jumlah record data 2020: {count_2020_filtered}")
print(f"Jumlah record data 2021: {count_2021_filtered}")
print(f"Jumlah record data 2022: {count_2022_filtered}")
print(f"Jumlah record data 2023: {count_2023_filtered}")
print(f"Jumlah record data 2024: {count_2024_filtered}")

Jumlah record data 2019: 58260835
Jumlah record data 2020: 70345762
Jumlah record data 2021: 64127910
Jumlah record data 2022: 83410247
Jumlah record data 2023: 93431553
Jumlah record data 2024: 143527172


In [19]:
data_2019_filtered.select("ShiptypeLevel5").distinct().show(n=1000000, truncate=False)

+-----------------------------------+
|ShiptypeLevel5                     |
+-----------------------------------+
|Chemical/Products Tanker           |
|Crude/Oil Products Tanker          |
|LPG Tanker                         |
|LNG Tanker                         |
|Products Tanker                    |
|Crude Oil Tanker                   |
|Ore Carrier                        |
|Limestone Carrier                  |
|Aggregates Carrier                 |
|Asphalt/Bitumen Tanker             |
|Combination Gas Tanker (LNG/LPG)   |
|Molten Sulphur Tanker              |
|LPG/Chemical Tanker                |
|Bulk/Caustic Soda Carrier (CABU)   |
|Shuttle Tanker                     |
|Ore/Oil Carrier                    |
|Oil Tanker, Inland Waterways       |
|Bulk/Oil/Chemical Carrier (CLEANBU)|
|Stone Carrier                      |
|Bulk/Oil Carrier (OBO)             |
|Bulk/Sulphuric Acid Carrier        |
+-----------------------------------+



In [20]:
data_2020_filtered.select("ShiptypeLevel5").distinct().show(n=1000000, truncate=False)

+-----------------------------------+
|ShiptypeLevel5                     |
+-----------------------------------+
|Shuttle Tanker                     |
|Chemical/Products Tanker           |
|Crude/Oil Products Tanker          |
|LPG Tanker                         |
|LNG Tanker                         |
|Products Tanker                    |
|Asphalt/Bitumen Tanker             |
|Crude Oil Tanker                   |
|Ore Carrier                        |
|Aggregates Carrier                 |
|Limestone Carrier                  |
|LPG/Chemical Tanker                |
|Molten Sulphur Tanker              |
|Bulk/Oil Carrier (OBO)             |
|Bulk/Caustic Soda Carrier (CABU)   |
|Bulk/Sulphuric Acid Carrier        |
|Stone Carrier                      |
|Ore/Oil Carrier                    |
|Bulk/Oil/Chemical Carrier (CLEANBU)|
|Combination Gas Tanker (LNG/LPG)   |
|Oil Tanker, Inland Waterways       |
+-----------------------------------+



In [21]:
data_2021_filtered.select("ShiptypeLevel5").distinct().show(n=1000000, truncate=False)

+-----------------------------------+
|ShiptypeLevel5                     |
+-----------------------------------+
|Chemical/Products Tanker           |
|Crude/Oil Products Tanker          |
|Aggregates Carrier                 |
|LPG Tanker                         |
|LNG Tanker                         |
|Products Tanker                    |
|Crude Oil Tanker                   |
|Ore Carrier                        |
|Asphalt/Bitumen Tanker             |
|Molten Sulphur Tanker              |
|Limestone Carrier                  |
|Bulk/Caustic Soda Carrier (CABU)   |
|Shuttle Tanker                     |
|Ore/Oil Carrier                    |
|Bulk/Oil/Chemical Carrier (CLEANBU)|
|Stone Carrier                      |
|LPG/Chemical Tanker                |
|Bulk/Oil Carrier (OBO)             |
|Combination Gas Tanker (LNG/LPG)   |
+-----------------------------------+



In [22]:
data_2022_filtered.select("ShiptypeLevel5").distinct().show(n=1000000, truncate=False)

+------------------------------------------+
|ShiptypeLevel5                            |
+------------------------------------------+
|Chemical/Products Tanker                  |
|Crude/Oil Products Tanker                 |
|LPG Tanker                                |
|LNG Tanker                                |
|Products Tanker                           |
|Asphalt/Bitumen Tanker                    |
|Crude Oil Tanker                          |
|Ore Carrier                               |
|Aggregates Carrier                        |
|Bulk/Oil/Chemical Carrier (CLEANBU)       |
|Bulk/Caustic Soda Carrier (CABU)          |
|Shuttle Tanker                            |
|Bulk/Oil Carrier (OBO)                    |
|Molten Sulphur Tanker                     |
|Limestone Carrier                         |
|Ore/Oil Carrier                           |
|LPG/Chemical Tanker                       |
|Coal/Oil Mixture Tanker                   |
|Chemical/Products Tanker, Inland Waterways|
|Stone Car

In [23]:
data_2023_filtered.select("ShiptypeLevel5").distinct().show(n=1000000, truncate=False)

+-----------------------------------+
|ShiptypeLevel5                     |
+-----------------------------------+
|Chemical/Products Tanker           |
|Crude/Oil Products Tanker          |
|LPG Tanker                         |
|LNG Tanker                         |
|Products Tanker                    |
|Crude Oil Tanker                   |
|LPG/Chemical Tanker                |
|Ore Carrier                        |
|Aggregates Carrier                 |
|Asphalt/Bitumen Tanker             |
|Bulk/Caustic Soda Carrier (CABU)   |
|Shuttle Tanker                     |
|Bulk/Oil/Chemical Carrier (CLEANBU)|
|Molten Sulphur Tanker              |
|Stone Carrier                      |
|Limestone Carrier                  |
|Liquefied Hydrogen Tanker          |
|Combination Gas Tanker (LNG/LPG)   |
|Ore/Oil Carrier                    |
+-----------------------------------+



In [24]:
data_2024_filtered.select("ShiptypeLevel5").distinct().show(n=1000000, truncate=False)

+-----------------------------------+
|ShiptypeLevel5                     |
+-----------------------------------+
|Chemical/Products Tanker           |
|Crude/Oil Products Tanker          |
|LPG Tanker                         |
|LNG Tanker                         |
|Products Tanker                    |
|Crude Oil Tanker                   |
|Aggregates Carrier                 |
|Asphalt/Bitumen Tanker             |
|Ore Carrier                        |
|Shuttle Tanker                     |
|Limestone Carrier                  |
|Stone Carrier                      |
|Bulk/Caustic Soda Carrier (CABU)   |
|Molten Sulphur Tanker              |
|Bulk/Oil/Chemical Carrier (CLEANBU)|
|LPG/Chemical Tanker                |
|CNG Tanker                         |
|Ore/Oil Carrier                    |
|Combination Gas Tanker (LNG/LPG)   |
|Coal/Oil Mixture Tanker            |
+-----------------------------------+



In [25]:
data_2019_filtered.select("vessel_type").distinct().show(n=1000000, truncate=False)

+-----------+
|vessel_type|
+-----------+
|Tanker     |
|Cargo      |
+-----------+



In [26]:
data_2020_filtered.select("vessel_type").distinct().show(n=1000000, truncate=False)

+-----------+
|vessel_type|
+-----------+
|Tanker     |
|Cargo      |
+-----------+



In [27]:
data_2021_filtered.select("vessel_type").distinct().show(n=1000000, truncate=False)

+-----------+
|vessel_type|
+-----------+
|Tanker     |
|Cargo      |
+-----------+



In [28]:
data_2022_filtered.select("vessel_type").distinct().show(n=1000000, truncate=False)

+-----------+
|vessel_type|
+-----------+
|Tanker     |
|Cargo      |
+-----------+



In [29]:
data_2023_filtered.select("vessel_type").distinct().show(n=1000000, truncate=False)

+-----------+
|vessel_type|
+-----------+
|Tanker     |
|Cargo      |
+-----------+



In [30]:
data_2024_filtered.select("vessel_type").distinct().show(n=1000000, truncate=False)

+-----------+
|vessel_type|
+-----------+
|Tanker     |
|Cargo      |
+-----------+



In [31]:
# Save Data
data_2020_filtered.write.option("header", True).mode("overwrite").parquet(path_unique + "data-filter-2020.parquet")

In [32]:
# Save Data
data_2019_filtered.write.option("header", True).mode("overwrite").parquet(path_unique + "data-filter-2019.parquet")
data_2021_filtered.write.option("header", True).mode("overwrite").parquet(path_unique + "data-filter-2021.parquet")

In [33]:
# Save Data
data_2022_filtered.write.option("header", True).mode("overwrite").parquet(path_unique + "data-filter-2022.parquet")
data_2023_filtered.write.option("header", True).mode("overwrite").parquet(path_unique + "data-filter-2023.parquet")
data_2024_filtered.write.option("header", True).mode("overwrite").parquet(path_unique + "data-filter-2024.parquet")

## Eksplorasi

In [34]:
# Menyusun data jumlah kapal per tahun
table_vessel = spark.createDataFrame([
    {"Tahun": 2019, "Jumlah Kapal": data_2019.select("mmsi").distinct().count()},
    {"Tahun": 2020, "Jumlah Kapal": data_2020.select("mmsi").distinct().count()},
    {"Tahun": 2021, "Jumlah Kapal": data_2021.select("mmsi").distinct().count()},
    {"Tahun": 2022, "Jumlah Kapal": data_2022.select("mmsi").distinct().count()},
    {"Tahun": 2023, "Jumlah Kapal": data_2023.select("mmsi").distinct().count()},
    {"Tahun": 2024, "Jumlah Kapal": data_2024.select("mmsi").distinct().count()}
])

# Menampilkan hasil
table_vessel.show()

+------------+-----+
|Jumlah Kapal|Tahun|
+------------+-----+
|       40971| 2019|
|       50438| 2020|
|       30196| 2021|
|       31434| 2022|
|       33556| 2023|
|       34693| 2024|
+------------+-----+



In [35]:
# Menyusun data jumlah kapal per tahun
table_vessel = spark.createDataFrame([
    {"Tahun": 2019, "Jumlah Kapal": data_2019_filtered.select("mmsi").distinct().count()},
    {"Tahun": 2020, "Jumlah Kapal": data_2020_filtered.select("mmsi").distinct().count()},
    {"Tahun": 2021, "Jumlah Kapal": data_2021_filtered.select("mmsi").distinct().count()},
    {"Tahun": 2022, "Jumlah Kapal": data_2022_filtered.select("mmsi").distinct().count()},
    {"Tahun": 2023, "Jumlah Kapal": data_2023_filtered.select("mmsi").distinct().count()},
    {"Tahun": 2024, "Jumlah Kapal": data_2024_filtered.select("mmsi").distinct().count()}
])

# Menampilkan hasil
table_vessel.show()

+------------+-----+
|Jumlah Kapal|Tahun|
+------------+-----+
|       11076| 2019|
|       17240| 2020|
|        7487| 2021|
|        7742| 2022|
|        7962| 2023|
|        7975| 2024|
+------------+-----+



In [36]:
spark.stop()