# paths

In [2]:
!ls ../dataset/piraeus

'107782 - The Piraeus AIS Dataset for Large-Scale Maritime Data Analytics.pdf'
 ais_augmented.parquet
 ais_cleaned.parquet
 ais_loiter.parquet
 ais_loiter_pair.parquet
 ais_static
 geodata
 models
 noaa_weather
 parquet_version
 processed
 sar
 unipi_ais_dynamic_2017
 unipi_ais_dynamic_2018
 unipi_ais_dynamic_2019
 unipi_ais_dynamic_synopses


In [3]:
ls ..\dataset\piraeus\unipi_ais_dynamic_2017

ls: cannot access '..datasetpiraeusunipi_ais_dynamic_2017': No such file or directory


In [4]:
import pandas, pyarrow
print(pandas.__version__)
print(pyarrow.__version__)

2.3.3
21.0.0


In [5]:
import pyarrow.parquet as pq

pq_file = pq.ParquetFile("unipi_ais_dynamic_may2017.parquet")

# Suppose row groups are ~500k rows each
row_group_index = 6  # 6*500k = 3Mth row
table = pq_file.read_row_group(row_group_index)

df_chunk = table.to_pandas()  # Only this row group in memory
row = df_chunk.iloc[0]  # Approx 3Mth row
print(row)


t                                                1494345047000
vessel_id    b0b2bd45bbb8911fbea20744b0e8b98bbb0e76f6c3af37...
lat                                                  37.929298
lon                                                  23.682772
heading                                                   30.0
speed                                                      0.0
course                                                   170.0
Name: 0, dtype: object


## Random Access

In [6]:
import pyarrow.parquet as pq
import pandas as pd

# Load Parquet file metadata
pq_file = pq.ParquetFile("unipi_ais_dynamic_may2017.parquet")
num_rows = pq_file.metadata.num_rows
num_row_groups = pq_file.num_row_groups

print(f"Total rows: {num_rows}, Row groups: {num_row_groups}")

# Function to read a row by index
def read_row(row_idx: int) -> pd.Series:
    if row_idx < 0 or row_idx >= num_rows:
        raise IndexError("Row index out of bounds")

    cum_rows = 0
    for group_idx in range(num_row_groups):
        rg_rows = pq_file.metadata.row_group(group_idx).num_rows
        if row_idx < cum_rows + rg_rows:
            local_idx = row_idx - cum_rows
            table = pq_file.read_row_group(group_idx)
            df = table.to_pandas()
            return df.iloc[local_idx]
        cum_rows += rg_rows

# Example: read 3,000,000th row
row_3m = read_row(3_000_000)
print(row_3m)


Total rows: 4305035, Row groups: 9
t                                                1494345047000
vessel_id    b0b2bd45bbb8911fbea20744b0e8b98bbb0e76f6c3af37...
lat                                                  37.929298
lon                                                  23.682772
heading                                                   30.0
speed                                                      0.0
course                                                   170.0
Name: 0, dtype: object


In [7]:
import platform; print(platform.architecture()); import sys; print(sys.version)

('64bit', 'ELF')
3.12.12 | packaged by conda-forge | (main, Jan 26 2026, 23:51:32) [GCC 14.3.0]


# Clustering

In [26]:
"""
Unsupervised AIS clustering using cuML HDBSCAN (GPU-safe).
"""

import cudf
from cuml.cluster import HDBSCAN
from cuml.preprocessing import StandardScaler


def main():
    """
    Load, clean, scale, and cluster AIS data.
    """

    df = cudf.read_parquet(
        "unipi_ais_dynamic_may2017.parquet",
        row_groups=[6]
    )

    features = df[["lat", "lon", "speed"]].astype("float32")

    features = features.dropna()
    # features = features[~features.isin([float("inf"), float("-inf")]).any(axis=1)]

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(features)

    X_scaled = cudf.DataFrame(
        X_scaled,
        columns=["lat", "lon", "speed"]
    ).astype("float32")

    clusterer = HDBSCAN(
        min_cluster_size=3,
        min_samples=3,
        metric="euclidean"
    )

    features["cluster_id"] = clusterer.fit_predict(X_scaled)

    print(features["cluster_id"].value_counts())


if __name__ == "__main__":
    main()


cluster_id
-1    499220
Name: count, dtype: int64


Across a wide range of parameters, the algorithm consistently classified all observations as noise, indicating the absence of stable density structures in the feature space. Possible that there is a need for another model or further parameter tuning. So this will be done later after further analytics on as it reuires more data to set parameters.

The analysis was therefore redirected toward probabilistic modeling of vessel movement, where routes are represented as sequences of spatial transitions and ranked based on their empirical likelihood in the AIS data.

## Probability

## parquet conversion

In [28]:
ls ../dataset/piraeus/

[0m[01;32m'107782 - The Piraeus AIS Dataset for Large-Scale Maritime Data Analytics.pdf'[0m*
 [01;32mais_augmented.parquet[0m*
 [01;32mais_cleaned.parquet[0m*
 [01;32mais_loiter.parquet[0m*
 [01;32mais_loiter_pair.parquet[0m*
 [34;42mais_static[0m/
 [34;42mgeodata[0m/
 [34;42mmodels[0m/
 [34;42mnoaa_weather[0m/
 [34;42mparquet_version[0m/
 [34;42mprocessed[0m/
 [34;42msar[0m/
 [34;42munipi_ais_dynamic_2017[0m/
 [34;42munipi_ais_dynamic_2018[0m/
 [34;42munipi_ais_dynamic_2019[0m/
 [34;42munipi_ais_dynamic_synopses[0m/


In [29]:
ls ../dataset/piraeus/unipi_ais_dynamic_2017

[0m[01;32mREADME.md[0m*                      [01;32munipi_ais_dynamic_may2017.csv[0m*
[01;32munipi_ais_dynamic_aug2017.csv[0m*  [01;32munipi_ais_dynamic_nov2017.csv[0m*
[01;32munipi_ais_dynamic_dec2017.csv[0m*  [01;32munipi_ais_dynamic_oct2017.csv[0m*
[01;32munipi_ais_dynamic_jul2017.csv[0m*  [01;32munipi_ais_dynamic_sep2017.csv[0m*
[01;32munipi_ais_dynamic_jun2017.csv[0m*


In [30]:
ls ../dataset/piraeus/unipi_ais_dynamic_2018

[0m[01;32mREADME.md[0m*                      [01;32munipi_ais_dynamic_jun2018.csv[0m*
[01;32munipi_ais_dynamic_apr2018.csv[0m*  [01;32munipi_ais_dynamic_mar2018.csv[0m*
[01;32munipi_ais_dynamic_aug2018.csv[0m*  [01;32munipi_ais_dynamic_may2018.csv[0m*
[01;32munipi_ais_dynamic_dec2018.csv[0m*  [01;32munipi_ais_dynamic_nov2018.csv[0m*
[01;32munipi_ais_dynamic_feb2018.csv[0m*  [01;32munipi_ais_dynamic_oct2018.csv[0m*
[01;32munipi_ais_dynamic_jan2018.csv[0m*  [01;32munipi_ais_dynamic_sep2018.csv[0m*
[01;32munipi_ais_dynamic_jul2018.csv[0m*


In [31]:
ls ../dataset/piraeus/unipi_ais_dynamic_2019

[0m[01;32mREADME.md[0m*                      [01;32munipi_ais_dynamic_jun2019.csv[0m*
[01;32munipi_ais_dynamic_apr2019.csv[0m*  [01;32munipi_ais_dynamic_mar2019.csv[0m*
[01;32munipi_ais_dynamic_aug2019.csv[0m*  [01;32munipi_ais_dynamic_may2019.csv[0m*
[01;32munipi_ais_dynamic_dec2019.csv[0m*  [01;32munipi_ais_dynamic_nov2019.csv[0m*
[01;32munipi_ais_dynamic_feb2019.csv[0m*  [01;32munipi_ais_dynamic_oct2019.csv[0m*
[01;32munipi_ais_dynamic_jan2019.csv[0m*  [01;32munipi_ais_dynamic_sep2019.csv[0m*
[01;32munipi_ais_dynamic_jul2019.csv[0m*


In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

schema = pa.schema([
    pa.field("t", pa.int64()),
    pa.field("vessel_id", pa.string()),
    pa.field("lat", pa.float32()),
    pa.field("lon", pa.float32()),
    pa.field("heading", pa.float32()),
    pa.field("speed", pa.float32()),
    pa.field("course", pa.float32()),
])

writer = pq.ParquetWriter(
    "../dataset/piraeus/parquet/unipi_ais_dynamic_aug2017.parquet",
    schema
)

for chunk in pd.read_csv(
    "../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_aug2017.csv",
    chunksize=500_000
):
    table = pa.Table.from_pandas(
        chunk,
        schema=schema,
        preserve_index=False
    )
    writer.write_table(table)


writer.close()

In [33]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
import glob

# Define schema
schema = pa.schema([
    pa.field("t", pa.int64()),
    pa.field("vessel_id", pa.string()),
    pa.field("lat", pa.float32()),
    pa.field("lon", pa.float32()),
    pa.field("heading", pa.float32()),
    pa.field("speed", pa.float32()),
    pa.field("course", pa.float32()),
])

# Input and output
input_dirs = [
    "../dataset/piraeus/unipi_ais_dynamic_2017",
    "../dataset/piraeus/unipi_ais_dynamic_2018",
    "../dataset/piraeus/unipi_ais_dynamic_2019",
]

# Output base directory for Parquet files
output_base_dir = "../dataset/piraeus/parquet"
Path(output_base_dir).mkdir(exist_ok=True)

# Loop over all CSVs in each year folder
for input_dir in input_dirs:
    year = Path(input_dir).name
    year_output_dir = Path(output_base_dir) / year
    year_output_dir.mkdir(exist_ok=True)

    for csv_file in sorted(Path(input_dir).glob("*.csv")):
        output_file = year_output_dir / f"{csv_file.stem}.parquet"
        print(f"Processing {csv_file} -> {output_file}")

        writer = pq.ParquetWriter(output_file, schema)

        # Read CSV in chunks and write to Parquet
        for chunk in pd.read_csv(csv_file, chunksize=500_000):
            table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False)
            writer.write_table(table)

        writer.close()

print("All CSVs converted to Parquet with separate files per month.")

Processing ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_aug2017.csv -> ../dataset/piraeus/parquet/unipi_ais_dynamic_2017/unipi_ais_dynamic_aug2017.parquet
Processing ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_dec2017.csv -> ../dataset/piraeus/parquet/unipi_ais_dynamic_2017/unipi_ais_dynamic_dec2017.parquet
Processing ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_jul2017.csv -> ../dataset/piraeus/parquet/unipi_ais_dynamic_2017/unipi_ais_dynamic_jul2017.parquet
Processing ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_jun2017.csv -> ../dataset/piraeus/parquet/unipi_ais_dynamic_2017/unipi_ais_dynamic_jun2017.parquet
Processing ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_may2017.csv -> ../dataset/piraeus/parquet/unipi_ais_dynamic_2017/unipi_ais_dynamic_may2017.parquet
Processing ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_nov2017.csv -> ../dataset/piraeus/parquet/unipi_ais_dynamic_2017/unipi_ais_dyna

KeyError: "name 't' present in the specified schema is not found in the columns or index"

Due to schema mismatch, proceeding t -> timestamp and skipping files which were already processed in above cell

In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path

# Input and output
input_dirs = [
    "../dataset/piraeus/unipi_ais_dynamic_2017",
    "../dataset/piraeus/unipi_ais_dynamic_2018",
    "../dataset/piraeus/unipi_ais_dynamic_2019",
]

output_base_dir = Path("../dataset/piraeus/parquet")
output_base_dir.mkdir(exist_ok=True)

# Loop over all CSVs in each year folder
for input_dir in input_dirs:
    year = Path(input_dir).name
    year_output_dir = output_base_dir / year
    year_output_dir.mkdir(exist_ok=True)

    for csv_file in sorted(Path(input_dir).glob("*.csv")):
        output_file = year_output_dir / f"{csv_file.stem}.parquet"

        if output_file.exists():
            print(f"Skipping {csv_file}, Parquet already exists.")
            continue

        print(f"Processing {csv_file} -> {output_file}")

        # Try schema with 't', fallback to 'timestamp'
        try:
            schema = pa.schema([
                pa.field("t", pa.int64()),
                pa.field("vessel_id", pa.string()),
                pa.field("lat", pa.float32()),
                pa.field("lon", pa.float32()),
                pa.field("heading", pa.float32()),
                pa.field("speed", pa.float32()),
                pa.field("course", pa.float32()),
            ])

            writer = pq.ParquetWriter(output_file, schema)
            for chunk in pd.read_csv(csv_file, chunksize=500_000):
                table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False)
                writer.write_table(table)
            writer.close()

        except KeyError as e:
            if "'t'" in str(e):
                print(f"'t' column missing in {csv_file}, falling back to 'timestamp'")
                schema = pa.schema([
                    pa.field("timestamp", pa.int64()),
                    pa.field("vessel_id", pa.string()),
                    pa.field("lat", pa.float32()),
                    pa.field("lon", pa.float32()),
                    pa.field("heading", pa.float32()),
                    pa.field("speed", pa.float32()),
                    pa.field("course", pa.float32()),
                ])

                writer = pq.ParquetWriter(output_file, schema)
                for chunk in pd.read_csv(csv_file, chunksize=500_000):
                    # rename 'timestamp' to match schema if needed
                    if "t" not in chunk.columns and "timestamp" in chunk.columns:
                        chunk = chunk.rename(columns={"timestamp": "timestamp"})
                    table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False)
                    writer.write_table(table)
                writer.close()
            else:
                raise e

print("All CSVs converted to Parquet (skipped existing, fallback applied).")


Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_aug2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_dec2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_jul2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_jun2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_may2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_nov2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_oct2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2017/unipi_ais_dynamic_sep2017.csv, Parquet already exists.
Skipping ../dataset/piraeus/unipi_ais_dynamic_2018/unipi_ais_dynamic_apr2018.csv, Parquet already exists.
Processing ../dataset/piraeus/unipi_ais_dynami

reprocess unipi_ais_dynamic_apr2018.csv so that it might not be currupted.

In [35]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path

csv_file = Path("../dataset/piraeus/unipi_ais_dynamic_2018/unipi_ais_dynamic_apr2018.csv")
output_file = Path("../dataset/piraeus/parquet/unipi_ais_dynamic_2018/unipi_ais_dynamic_apr2018.parquet")
output_file.parent.mkdir(exist_ok=True)

# Try schema with 't', fallback to 'timestamp'
try:
    schema = pa.schema([
        pa.field("t", pa.int64()),
        pa.field("vessel_id", pa.string()),
        pa.field("lat", pa.float32()),
        pa.field("lon", pa.float32()),
        pa.field("heading", pa.float32()),
        pa.field("speed", pa.float32()),
        pa.field("course", pa.float32()),
    ])

    writer = pq.ParquetWriter(output_file, schema)
    for chunk in pd.read_csv(csv_file, chunksize=500_000):
        table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False)
        writer.write_table(table)
    writer.close()

except KeyError:
    print(f"'t' column missing, falling back to 'timestamp'")
    schema = pa.schema([
        pa.field("timestamp", pa.int64()),
        pa.field("vessel_id", pa.string()),
        pa.field("lat", pa.float32()),
        pa.field("lon", pa.float32()),
        pa.field("heading", pa.float32()),
        pa.field("speed", pa.float32()),
        pa.field("course", pa.float32()),
    ])

    writer = pq.ParquetWriter(output_file, schema)
    for chunk in pd.read_csv(csv_file, chunksize=500_000):
        if "t" not in chunk.columns and "timestamp" in chunk.columns:
            chunk = chunk.rename(columns={"timestamp": "timestamp"})
        table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False)
        writer.write_table(table)
    writer.close()

print(f"Reprocessed {csv_file} -> {output_file}")


't' column missing, falling back to 'timestamp'
Reprocessed ../dataset/piraeus/unipi_ais_dynamic_2018/unipi_ais_dynamic_apr2018.csv -> ../dataset/piraeus/parquet/unipi_ais_dynamic_2018/unipi_ais_dynamic_apr2018.parquet
