In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, to_date, when, lit, collect_set,
    min, max, first, size, date_format
)
from pyspark.sql.window import Window
from pyspark.sql.types import StringType
from pyspark.sql.functions import datediff
# Initialize Spark session
spark = SparkSession.builder \
    .appName("AIS Data Processing - Final Format") \
    .config("spark.driver.memory", "10g") \
    .config("spark.executor.memory", "10g") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/ais_training_data.new_specific_mmsi_data_cname") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/ais_training_data.station") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

    # .config("spark.mongodb.input.pipeline", "[{ \"$limit\": 50000 }]") \
    
# Load limited data
df = spark.read.format("mongo").load()

# Preprocessing
df = df.withColumn("date", to_date("Date", "yyyy-MM-dd")) \
       .withColumn("Nav_Status_LC", col("Nav_Status").cast(StringType())) \
       .withColumn("Is_At_Port", when(
           (col("IsPortStop") == True) |
           (col("Nav_Status_LC").rlike("(?i)moored|at anchor|restricted manoeuvrability")),
           lit(1)
       ).otherwise(0))

# Persist to prevent re-computation
#df = df.persist()

# Window for getting last port
w = Window.partitionBy("MMSI").orderBy(col("date").desc())

# Last port using window
last_port_df = df.filter(col("Is_At_Port") == 1) \
    .withColumn("Last_Port", first("NearestPort", ignorenulls=True).over(w)) \
    .select("MMSI", "Last_Port") \
    .dropna(subset=["Last_Port"]).dropDuplicates(["MMSI"])

# Base Aggregation
agg_df = df.groupBy("MMSI").agg(
    min("date").alias("First_Port_Date"),
    max("date").alias("Last_Seen_Date"),
    collect_set(when(col("Is_At_Port") == 1, col("NearestPort"))).alias("Visited_Ports"),
    collect_set(when(col("Is_At_Port") == 1, col("date"))).alias("Stationing_Day_List"),
    first(when(col("Is_At_Port") == 1, col("NearestPort")), ignorenulls=True).alias("First_Port"),
    first("Ship_Type", ignorenulls=True).alias("Ship_Type")
)

agg_df = agg_df.withColumn("Stationing_Days", size(col("Stationing_Day_List")))

# Join last port info
agg_df = agg_df.join(last_port_df, on="MMSI", how="left")

# Calculate overall turnaround time (first to last seen)

agg_df = agg_df.withColumn(
    "Overall_Turnaround_Time_Days",
    datediff("Last_Seen_Date", "First_Port_Date")
)


# Final formatting with date strings
final_df = agg_df.select(
    "MMSI",
    date_format("First_Port_Date", "yyyy-MM-dd").alias("First_Port_Date"),
    date_format("Last_Seen_Date", "yyyy-MM-dd").alias("Last_Seen_Date"),
    "Stationing_Days",
    "Overall_Turnaround_Time_Days",
    "First_Port",
    "Last_Port",
    "Visited_Ports",
    "Ship_Type" 
)

# Save to MongoDB
final_df.write.format("mongo").mode("overwrite").option("collection", "station").save()


25/05/21 18:19:44 WARN Utils: Your hostname, talal resolves to a loopback address: 127.0.1.1; using 192.168.0.119 instead (on interface wlo1)
25/05/21 18:19:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/talal/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/talal/.ivy2/cache
The jars for the packages stored in: /home/talal/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e7e1006d-57c3-4cac-b2ba-6f041639ed63;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 150ms :: artifacts dl 7ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   

In [10]:
from pymongo import MongoClient
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_absolute_error
import joblib
import numpy as np

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["ais_training_data"]
collection = db["station"]

# Load data
data = list(collection.find({
    "Overall_Turnaround_Time_Days": {"$exists": True},
    "Stationing_Days": {"$exists": True}
}))
df = pd.DataFrame(data)

# Clean and preprocess
df["Stationing_Days"] = df["Stationing_Days"].apply(lambda x: 1 if x == 0 else x)

ship_types = np.array([
    'Sailing', 'Military', 'Tug', 'Fishing', 'Pilot',
    'Other', 'Port tender', 'Cargo', 'Pleasure', 'Passenger',
    'Reserved', 'Tanker', 'SAR', 'HSC', 'Dredging',
    'Not party to conflict', 'Law enforcement', 'Towing', 'Diving',
    'Anti-pollution', 'Medical', 'Spare 1', 'WIG', 'Towing long/wide',
    'Spare 2'
], dtype=object)

df["Ship_Type"] = df["Ship_Type"].apply(lambda x: np.random.choice(ship_types) if x == "Undefined" else x)

# Filter and drop missing values
df_model = df[["Ship_Type", "First_Port", "Last_Port", "Overall_Turnaround_Time_Days", "Stationing_Days"]].dropna()

# Features and targets
X = df_model[["Ship_Type", "First_Port", "Last_Port"]]
y1 = df_model["Overall_Turnaround_Time_Days"]
y2 = df_model["Stationing_Days"]

# Preprocessing for categorical features
preprocessor = ColumnTransformer([
    ("cat", OneHotEncoder(handle_unknown='ignore'), ["Ship_Type", "First_Port", "Last_Port"])
])

# Train-test split
X_train, X_test, y1_train, y1_test = train_test_split(X, y1, test_size=0.2, random_state=42)
_, _, y2_train, y2_test = train_test_split(X, y2, test_size=0.2, random_state=42)

# Pipelines
model_turnaround = Pipeline([
    ("preprocessor", preprocessor),
    ("regressor", RandomForestRegressor(random_state=42))
])
model_stationing = Pipeline([
    ("preprocessor", preprocessor),
    ("regressor", RandomForestRegressor(random_state=42))
])

# Train models
model_turnaround.fit(X_train, y1_train)
model_stationing.fit(X_train, y2_train)

# Evaluate
pred_turn = model_turnaround.predict(X_test)
pred_stat = model_stationing.predict(X_test)

# Save models
joblib.dump(model_turnaround, "model_turnaround_simple.pkl")
joblib.dump(model_stationing, "model_stationing_simple.pkl")


['model_stationing_simple.pkl']

In [14]:
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import numpy as np

# Turnaround Evaluation
mae_turn = mean_absolute_error(y1_test, pred_turn)
rmse_turn = np.sqrt(mean_squared_error(y1_test, pred_turn))
r2_turn = r2_score(y1_test, pred_turn)

# Stationing Evaluation
mae_stat = mean_absolute_error(y2_test, pred_stat)
rmse_stat = np.sqrt(mean_squared_error(y2_test, pred_stat))
r2_stat = r2_score(y2_test, pred_stat)

print("\n🔍 Turnaround Time Metrics:")
print(f"MAE:  {mae_turn:.2f}")
print(f"RMSE: {rmse_turn:.2f}")
print(f"R²:   {r2_turn:.2f}")

print("\n🔍 Stationing Days Metrics:")
print(f"MAE:  {mae_stat:.2f}")
print(f"RMSE: {rmse_stat:.2f}")
print(f"R²:   {r2_stat:.2f}")



🔍 Turnaround Time Metrics:
MAE:  2.35
RMSE: 3.12
R²:   0.92

🔍 Stationing Days Metrics:
MAE:  1.87
RMSE: 2.65
R²:   0.95


In [12]:
import joblib
import pandas as pd

# Load models
model_turnaround = joblib.load("model_turnaround_simple.pkl")
model_stationing = joblib.load("model_stationing_simple.pkl")

def predict_simple(ship_type, first_port, last_port):
    input_df = pd.DataFrame([{
        "Ship_Type": ship_type,
        "First_Port": first_port,
        "Last_Port": last_port
    }])
    pred_turnaround = model_turnaround.predict(input_df)[0]
    pred_stationing = model_stationing.predict(input_df)[0]
    return {
        "Predicted_Overall_Turnaround_Time_Days": round(pred_turnaround, 2),
        "Predicted_Stationing_Days": max(1, round(pred_stationing))
    }

# Example usage
result = predict_simple("Cargo", "Aalborg", "Havdrup")
print(result)


{'Predicted_Overall_Turnaround_Time_Days': 183.38, 'Predicted_Stationing_Days': 48}
