In [1]:
# Import necessary libraries

# Data handling
import pandas as pd
import numpy as np
import geopandas as gpd

# Data visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Visualizations
import plotly.express as px
import plotly.graph_objects as go

# Calculations
import math
from haversine import haversine, Unit
from geopy.distance import geodesic

#Cloud storage
from google.cloud import storage
import io

ModuleNotFoundError: No module named 'geopandas'

We load the necessary data

In [None]:
# Define the path to your file in the bucket
file_path = '../../original_data/ais/ais_train.csv'

# Load the file into a pandas dataframe
ais_train_df = pd.read_csv(file_path, delimiter= '|', encoding= 'utf-8')

# Display the dataframe
ais_train_df.info()

In [None]:
# Define the path to your file in the bucket
file_path = '../../original_data/ports/ports.csv'

#Load the file into a pandas dataframe
ports_df = pd.read_csv(file_path, delimiter= '|', encoding= 'utf-8')

ports_df.head(5)

In [None]:
# Define the path to your file in the bucket
file_path = '../../original_data/ais/schedules_to_may_2024.csv'

#Load the file into a pandas dataframe
schedules_df = pd.read_csv(file_path, delimiter= '|', encoding= 'utf-8')

schedules_df.head(5)

In [None]:
# Define the path to your file in the bucket
file_path = '../../original_data/vessels/vessels.csv'

#Load the file into a pandas dataframe
vessels_df = pd.read_csv(file_path, delimiter= '|', encoding= 'utf-8')

vessels_df.head(5)

In [None]:
#MAP_LAND_PATH = "gs://jacobsbucketformlproject/ML Competition/ne_10m_land.zip" # Path to the land zip file
#MAP_OCEAN_PATH = "gs://jacobsbucketformlproject/ML Competition/ne_10m_ocean.zip" # Path to the ocean zip file
#land_world = gpd.read_file(MAP_LAND_PATH)
#ocean_world = gpd.read_file(MAP_OCEAN_PATH)

Convert all timestamps to datetime objects

In [None]:
def preprocess_ais_train(ais_train_df):
    """
    Preprocess the ais_train_df by converting columns, handling missing or invalid values, 
    and mapping NAVSTAT codes to descriptions.

    Parameters:
    - ais_train_df: DataFrame containing the raw AIS train data.

    Returns:
    - ais_train_df_cleaned: A cleaned and preprocessed version of ais_train_df.
    """
    # Step 1: Convert 'time' and 'etaRaw' to datetime
    ais_train_df['time'] = pd.to_datetime(ais_train_df['time'], format='%Y-%m-%d %H:%M:%S')
    ais_train_df['etaRaw'] = pd.to_datetime(ais_train_df['etaRaw'], errors='coerce', format='%m-%d %H:%M')
    
    # Step 2: Add the year 2024 to 'etaRaw' column
    ais_train_df['etaRaw'] = ais_train_df['etaRaw'].apply(lambda x: x.replace(year=2024) if pd.notna(x) else pd.NaT)
    
    # Step 3: Convert relevant columns to float
    ais_train_df['cog'] = ais_train_df['cog'].astype(float)
    ais_train_df['sog'] = ais_train_df['sog'].astype(float)
    ais_train_df['rot'] = ais_train_df['rot'].astype(float)
    ais_train_df['heading'] = ais_train_df['heading'].astype(float)
    ais_train_df['latitude'] = ais_train_df['latitude'].astype(float)
    ais_train_df['longitude'] = ais_train_df['longitude'].astype(float)
    
    # Step 4: Replace invalid or default values with NaN
    ais_train_df['cog'] = np.where((ais_train_df['cog'] == 360) | (ais_train_df['cog'] > 360), np.nan, ais_train_df['cog'])
    ais_train_df['sog'] = np.where((ais_train_df['sog'] == 1023), np.nan, ais_train_df['sog'])
    ais_train_df['rot'] = np.where((ais_train_df['rot'] == -128), np.nan, ais_train_df['rot'])
    ais_train_df['heading'] = np.where((ais_train_df['heading'] > 360) | (ais_train_df['heading'] == 511), np.nan, ais_train_df['heading'])
    
    # Step 5: Map NAVSTAT codes to descriptive statuses
    navstat_mapping = {
        0: 'Under way using engine',
        1: 'At anchor',
        2: 'Not under command',
        3: 'Restricted manoeuvrability',
        4: 'Constrained by her draught',
        5: 'Moored',
        6: 'Aground',
        7: 'Engaged in fishing',
        8: 'Under way sailing',
        15: 'Undefined'
    }
    ais_train_df['navstat_desc'] = ais_train_df['navstat'].map(navstat_mapping)
    ais_train_df['navstat_desc'].fillna('Unknown', inplace=True)
    
    ais_train_df = ais_train_df.sort_values(by=['vesselId', 'time']).reset_index(drop=True)

    return ais_train_df

ais_train_df = preprocess_ais_train(ais_train_df)

ais_train_df.info()

In [None]:
def preprocess_vessels(vessels_df):
    """
    Preprocess the vessels_df by converting 'yearBuilt' to 'age', 
    handling missing values, and dropping unnecessary columns.

    Parameters:
    - vessels_df: DataFrame containing the raw vessels data.

    Returns:
    - vessels_df_cleaned: A cleaned and preprocessed version of vessels_df.
    """
    # Step 1: Calculate the current year and create 'age' column
    current_year = 2024
    vessels_df['age'] = vessels_df['yearBuilt'].apply(lambda x: current_year - x if pd.notna(x) else np.nan)

    # Step 2: Drop the 'yearBuilt' column
    vessels_df = vessels_df.drop(columns=['yearBuilt'])

    # Step 3: Drop columns with high missing values and low predictive power
    columns_to_drop = ['NT', 'depth', 'draft', 'freshWater', 'fuel', 'maxHeight', 'maxWidth', 'rampCapacity']
    vessels_df = vessels_df.drop(columns=columns_to_drop)

    return vessels_df

vessels_df = preprocess_vessels(vessels_df)

vessels_df.info()

In [None]:
def preprocess_ports(ports_df):
    """
    Preprocess the ports_df by dropping unnecessary columns based on redundancy.

    Parameters:
    - ports_df: DataFrame containing the raw ports data.

    Returns:
    - ports_df_cleaned: A cleaned and preprocessed version of ports_df.
    """
    # Step 1: Drop columns based on redundancy
    columns_to_drop = ['name','portLocation', 'UN_LOCODE', 'countryName']
    ports_df = ports_df.drop(columns=columns_to_drop)

    return ports_df

ports_df = preprocess_ports(ports_df)

ports_df.info()

In [None]:
def preprocess_schedules(schedules_df):
    """
    Preprocess the schedules_df by converting timestamps, handling invalid rows,
    removing duplicates, and filtering out conflicting port information.

    Parameters:
    - schedules_df: DataFrame containing the raw schedules data.

    Returns:
    - schedules_df_cleaned: A cleaned and preprocessed version of schedules_df.
    """
    # Make an explicit copy of the DataFrame to avoid working with a slice
    schedules_df = schedules_df.copy()

    # Step 1: Remove duplicates
    schedules_df = schedules_df.drop_duplicates()

    # Step 2: Convert 'arrivalDate' and 'sailingDate' to datetime format
    schedules_df['arrivalDate'] = pd.to_datetime(schedules_df['arrivalDate'], errors='coerce')
    schedules_df['sailingDate'] = pd.to_datetime(schedules_df['sailingDate'], errors='coerce')

    # Step 3: Remove timezone info to match the format of 'ais_train'
    schedules_df['arrivalDate'] = schedules_df['arrivalDate'].dt.tz_localize(None)
    schedules_df['sailingDate'] = schedules_df['sailingDate'].dt.tz_localize(None)

    # Step 4: Drop rows with NaN values for vesselID, portId
    schedules_df = schedules_df.dropna(subset=['vesselId', 'portId'])

    # Step 5: Remove redundant columns ('portName', 'shippingLineName')
    columns_to_drop = ['portName','shippingLineId', 'shippingLineName']
    schedules_df = schedules_df.drop(columns=columns_to_drop)

    # Step 6: Remove rows where 'sailingDate' is before 'arrivalDate'
    schedules_df = schedules_df[schedules_df['sailingDate'] >= schedules_df['arrivalDate']].copy()

    # Step 7: Handle conflicting port information
    # Drop duplicates based on 'vesselId', 'arrivalDate', 'sailingDate', and 'portId'
    schedules_no_port_duplicates = schedules_df.drop_duplicates(subset=['vesselId', 'arrivalDate', 'sailingDate', 'portId'])

    # Identify conflicting entries with the same 'vesselId', 'arrivalDate', 'sailingDate' but different 'portId'
    vessel_date_duplicates = schedules_no_port_duplicates[schedules_no_port_duplicates.duplicated(subset=['vesselId', 'arrivalDate', 'sailingDate'], keep=False)]

    # Remove all conflicting rows from the original DataFrame
    schedules_df_cleaned = schedules_df[~schedules_df.index.isin(vessel_date_duplicates.index)]

    # Step 8: Remove exact duplicates (same 'vesselId', 'arrivalDate', 'sailingDate', 'portId')
    schedules_df_cleaned = schedules_df_cleaned.drop_duplicates(subset=['vesselId', 'arrivalDate', 'sailingDate', 'portId'])

    # Step 9: Sort the cleaned DataFrame by 'vesselId' and 'arrivalDate' to ensure proper ordering
    schedules_df_cleaned = schedules_df_cleaned.sort_values(by=['vesselId', 'arrivalDate']).reset_index(drop=True)

    return schedules_df_cleaned

schedules_df = preprocess_schedules(schedules_df)

schedules_df.info()

# We now merge the datasets!

Before mergin we must check that all the data we have behaves as expected!

In [None]:
def check_timestamp_anomalies(ais_train_df):
    """
    Check if the 'time' column for each vessel in the ais_train_df is in ascending order.
    Identify any vessels with timestamp anomalies (non-ascending order).
    
    Parameters:
    - ais_train_df: The AIS data containing vessel information and timestamps.

    Returns:
    - A list of vessel IDs with timestamp anomalies and the corresponding rows with issues.
    """
    # Initialize a list to store vessel IDs with anomalies
    vessels_with_anomalies = []

    # Group by 'vesselId' and check if the 'time' column is sorted for each group
    for vessel_id, vessel_data in ais_train_df.groupby('vesselId'):
        # Sort the data by 'time' to ensure correct order
        sorted_data = vessel_data.sort_values('time').reset_index(drop=True)
        
        # Check if the 'time' column is strictly increasing
        if not sorted_data['time'].is_monotonic_increasing:
            print(f"Anomaly detected in vessel ID: {vessel_id}")
            
            # Add vessel ID to the list of anomalies
            vessels_with_anomalies.append(vessel_id)
            
            # Print the rows where anomalies exist (timestamps are not in ascending order)
            incorrect_rows = sorted_data[sorted_data['time'].diff().dt.total_seconds() < 0]
            print(f"Problematic rows for vessel ID {vessel_id}:\n", incorrect_rows[['time', 'latitude', 'longitude']])

    # Final report
    if vessels_with_anomalies:
        print(f"\nNumber of vessels with timestamp anomalies: {len(vessels_with_anomalies)}")
    else:
        print("\nNo timestamp anomalies found. All timestamps are in ascending order for each vessel.")
    
    return vessels_with_anomalies


# Test the function on ais_train_df
vessels_with_timestamp_anomalies = check_timestamp_anomalies(ais_train_df)

In [None]:
def check_duplicate_rows(schedules_df):
    """
    Check for duplicate rows in the schedules_cleaned_df based on 'vesselId', 'arrivalDate', 'sailingDate' and 'portId'.

    Parameters:
    - schedules_cleaned_df: The schedules data containing vessel information and timestamps.

    Returns:
    - A DataFrame of duplicate rows.
    """
    # Check for duplicates based on 'vesselId', 'arrivalDate', 'sailingDate' and 'PortId
    duplicate_rows = schedules_df[schedules_df.duplicated(subset=['vesselId', 'arrivalDate', 'sailingDate','portId'], keep=False)]

    if not duplicate_rows.empty:
        print(f"Number of duplicate rows found: {len(duplicate_rows)}")
        print(duplicate_rows[['vesselId', 'arrivalDate', 'sailingDate', 'portId']])
    else:
        print("No duplicate rows found.")

    return duplicate_rows


# Run the duplicate check
duplicate_anomalies = check_duplicate_rows(schedules_df)

In [None]:
def check_schedule_timestamp_anomalies_complete(schedules_df):
    """
    Check if 'arrivalDate' is in ascending order for each vessel and if 'sailingDate' is before 'arrivalDate'.
    Allow cases where 'sailingDate' is the same as 'arrivalDate'.

    Parameters:
    - schedules_cleaned_df: The schedules data containing vessel information and timestamps.

    Returns:
    - A list of vessel IDs with anomalies and a DataFrame of rows with issues.
    """
    # Initialize a list to store vessel IDs with anomalies
    vessels_with_anomalies = []
    
    # Initialize a DataFrame to store rows with issues
    anomaly_rows = pd.DataFrame()

    # Group by 'vesselId' and check for ascending 'arrivalDate' and valid 'sailingDate'
    for vessel_id, vessel_data in schedules_df.groupby('vesselId'):
        # Check if 'arrivalDate' is in ascending order
        if not vessel_data['arrivalDate'].is_monotonic_increasing:
            print(f"ArrivalDate anomaly detected in vessel ID: {vessel_id}")
            vessels_with_anomalies.append(vessel_id)
            
            # Identify rows where 'arrivalDate' is not in ascending order
            arrival_anomalies = vessel_data[vessel_data['arrivalDate'].diff().dt.total_seconds() < 0]
            anomaly_rows = pd.concat([anomaly_rows, arrival_anomalies])

        # Check if 'sailingDate' is before 'arrivalDate' (not equal)
        sailing_anomalies = vessel_data[vessel_data['sailingDate'] < vessel_data['arrivalDate']]
        if not sailing_anomalies.empty:
            print(f"SailingDate anomaly detected in vessel ID: {vessel_id}")
            vessels_with_anomalies.append(vessel_id)
            anomaly_rows = pd.concat([anomaly_rows, sailing_anomalies])
    
    # Final report
    if not anomaly_rows.empty:
        print(f"\nNumber of vessels with timestamp anomalies: {len(vessels_with_anomalies)}")
        print(anomaly_rows[['vesselId', 'arrivalDate', 'sailingDate', 'portId']])
    else:
        print("\nNo anomalies found. All timestamps are in ascending order and valid.")
    
    return vessels_with_anomalies, anomaly_rows

# Run the updated anomaly check on schedules_df
vessels_with_anomalies, anomaly_rows = check_schedule_timestamp_anomalies_complete(schedules_df)


In [None]:
def check_sailing_vs_arrival_anomalies(schedules_df):
    """
    Sorts schedules for each vessel by 'arrivalDate' and checks if the 'sailingDate' 
    of one row is greater than the 'arrivalDate' of the next row.

    Parameters:
    - schedules_df: DataFrame containing schedules data.

    Returns:
    - anomalies_df: DataFrame containing rows where 'sailingDate' is greater 
      than the 'arrivalDate' of the next row.
    """
    # Create an empty list to store anomalies
    anomalies = []

    # Get unique vessel IDs
    vessel_ids = schedules_df['vesselId'].unique()

    # Loop through each vessel and perform the check
    for vessel_id in vessel_ids:
        # Filter schedules for this vessel and sort by arrivalDate
        vessel_schedules_df = schedules_df[schedules_df['vesselId'] == vessel_id].sort_values('arrivalDate').reset_index(drop=True)
        
        # Iterate through rows to check if sailingDate is greater than the arrivalDate of the next row
        for i in range(len(vessel_schedules_df) - 1):
            current_sailing_date = vessel_schedules_df.loc[i, 'sailingDate']
            next_arrival_date = vessel_schedules_df.loc[i + 1, 'arrivalDate']
            
            # If the current row's sailingDate is greater than the next row's arrivalDate, record it
            if current_sailing_date > next_arrival_date:
                anomalies.append({
                    'vesselId': vessel_id,
                    'sailingDate': current_sailing_date,
                    'next_arrivalDate': next_arrival_date,
                    'index_current': i,
                    'index_next': i + 1
                })

    # Convert the list of anomalies to a DataFrame for inspection
    anomalies_df = pd.DataFrame(anomalies)

    return anomalies_df

# Assuming schedules_df_cleaned is the cleaned schedules DataFrame
anomalies_df = check_sailing_vs_arrival_anomalies(schedules_df)

# Print anomalies if any exist
if not anomalies_df.empty:
    print("Anomalies found where sailingDate is greater than the next row's arrivalDate:")
    print(anomalies_df)
else:
    print("No anomalies found.")
    
schedules_df.tail()


In [None]:
def preprocess_schedules_final(schedules_df):
    """
    Preprocess schedules to handle infeasible arrival dates and small movements at the same port.

    Parameters:
    - schedules_df: DataFrame with schedules data.

    Returns:
    - schedules_df: Updated schedules DataFrame with added flags for small movements and skipped ports.
    """
    schedules_df = schedules_df.copy()

    # Initialize new features
    schedules_df['small_movement_flag'] = 0
    schedules_df['skipped_port_flag'] = 0

    # Sort by 'vesselId' and 'arrivalDate'
    schedules_df = schedules_df.sort_values(by=['vesselId', 'arrivalDate']).reset_index(drop=True)

    # Keep track of rows to drop
    rows_to_drop = set()

    # Process each vessel separately
    for vessel_id in schedules_df['vesselId'].unique():
        vessel_indices = schedules_df[schedules_df['vesselId'] == vessel_id].index.tolist()
        i = 0
        while i < len(vessel_indices) - 1:
            current_idx = vessel_indices[i]
            next_idx = vessel_indices[i + 1]
            current_row = schedules_df.loc[current_idx]
            next_row = schedules_df.loc[next_idx]

            # Handle same ports
            if current_row['portId'] == next_row['portId']:
                # Update sailingDate and set flag
                schedules_df.at[current_idx, 'sailingDate'] = next_row['sailingDate']
                schedules_df.at[current_idx, 'small_movement_flag'] = 1
                rows_to_drop.add(next_idx)  # Mark next row for removal
                vessel_indices.pop(i + 1)   # Remove next index from list
                # Do not increment i to check for further consecutive same ports
                continue

            # Handle different ports with invalid or zero time difference arrivalDate
            elif next_row['arrivalDate'] <= current_row['sailingDate']:
                # Mark next row for removal
                rows_to_drop.add(next_idx)
                # Set skipped port flag
                schedules_df.at[current_idx, 'skipped_port_flag'] = 1
                vessel_indices.pop(i + 1)   # Remove next index from list
                # Do not increment i to check the next row against the current one
                continue

            else:
                # Valid movement, proceed to next
                i += 1

    # Remove rows marked for dropping
    schedules_df = schedules_df.drop(index=list(rows_to_drop)).reset_index(drop=True)

    return schedules_df


# Re-run the anomaly detection
# Preprocess the schedules
schedules_df_final = preprocess_schedules_final(schedules_df)
anomalies_df_final = check_sailing_vs_arrival_anomalies(schedules_df_final)

# Print anomalies if any exist
if not anomalies_df_final.empty:
    print("Anomalies found where sailingDate is greater than the next row's arrivalDate:")
    print(anomalies_df_final)
else:
    print("No anomalies found.")
    
anomalies_df_final.tail()

In [None]:
ais_train_df.info()

In [None]:
vessels_df.info()

In [None]:
ports_df.info()

In [None]:
schedules_df_final.info()

### Merge code to datasets in batches due to memory constraints


In [None]:
def merge_data_in_batches(
    ais_train_df,
    vessels_df,
    schedules_df,
    ports_df,
    batch_size=200,
    gcs_bucket_path="gs://jacobsbucketformlproject/ML Competition/"
):
    # Get unique vessel IDs
    vessel_ids = ais_train_df['vesselId'].unique()

    # Split vessel IDs into batches
    vessel_id_batches = [vessel_ids[i:i + batch_size] for i in range(0, len(vessel_ids), batch_size)]

    # Process each batch
    for batch_num, vessel_id_batch in enumerate(vessel_id_batches, start=1):
        print(f'Processing batch {batch_num}/{len(vessel_id_batches)}')

        # Filter AIS data for the current batch
        ais_batch_df = ais_train_df[ais_train_df['vesselId'].isin(vessel_id_batch)]

        # Filter schedules data for the current batch
        schedules_batch_df = schedules_df[schedules_df['vesselId'].isin(vessel_id_batch)]

        # Merge schedules data with ports data (left join)
        schedules_batch_df = schedules_batch_df.merge(
            ports_df,
            on='portId', how='left'
        )

        # Create a list to collect merged dataframes per vessel
        merged_dfs = []

        # Process each vessel individually to manage memory usage and ensure time order
        for vessel_id in vessel_id_batch:
            # Filter data for this vessel
            ais_vessel_df = ais_batch_df[ais_batch_df['vesselId'] == vessel_id].sort_values('time').reset_index(drop=True)
            schedules_vessel_df = schedules_batch_df[schedules_batch_df['vesselId'] == vessel_id].sort_values('arrivalDate').reset_index(drop=True)

            # Rename AIS portId to ais_portId
            if 'portId' in ais_vessel_df.columns:
                ais_vessel_df.rename(columns={'portId': 'ais_portId'}, inplace=True)

            # Initialize schedule-related columns with NaN
            schedule_cols = [
                'schedule_arrivalDate', 'schedule_sailingDate', 'schedule_moored_portId',
                'schedule_moored_portLatitude', 'schedule_moored_portLongitude',
                'schedule_voyage_end',
                'schedule_destination_portId',
                'schedule_destination_portLatitude',
                'schedule_destination_portLongtitude',
                'schedule_small_movement_flag', 'schedule_skipped_port_flag'
            ]

            if schedules_vessel_df.empty:
                # Assign NaN for schedule-related columns
                for col in schedule_cols:
                    ais_vessel_df[col] = pd.NA
                merged_vessel_df = ais_vessel_df
            else:
                # Create events (port stays and voyages)
                events = []

                # Add events
                for idx in range(len(schedules_vessel_df) - 1):
                    arrivalDate = schedules_vessel_df.loc[idx, 'arrivalDate']
                    voyage_start = schedules_vessel_df.loc[idx, 'sailingDate']
                    voyage_end = schedules_vessel_df.loc[idx + 1, 'arrivalDate']
                    voyage_start_portId = schedules_vessel_df.loc[idx, 'portId']
                    mooredPortLongitude = schedules_vessel_df.loc[idx, 'portLongitude']
                    mooredPortLatitude = schedules_vessel_df.loc[idx, 'portLatitude']
                    voyage_end_portId = schedules_vessel_df.loc[idx + 1, 'portId']
                    destinationLatitude = schedules_vessel_df.loc[idx + 1, 'portLatitude'] 
                    destinationLongitude = schedules_vessel_df.loc[idx + 1, 'portLongitude']
                    small_movement_flag = schedules_vessel_df.loc[idx, 'small_movement_flag']
                    skipped_port_flag = schedules_vessel_df.loc[idx,'skipped_port_flag']
                    event = {
                        'start_time': arrivalDate,
                        'end_time': voyage_end,
                        'schedule_arrivalDate': arrivalDate,
                        'schedule_sailingDate': voyage_start,
                        'schedule_moored_portId': voyage_start_portId,
                        'schedule_moored_portLatitude': mooredPortLatitude,
                        'schedule_moored_portLongitude': mooredPortLongitude,
                        'schedule_voyage_end': voyage_end,
                        'schedule_destination_portId': voyage_end_portId,
                        'schedule_destination_portLatitude': destinationLatitude,
                        'schedule_destination_portLongtitude': destinationLongitude,
                        'schedule_small_movement_flag': small_movement_flag,
                        'schedule_skipped_port_flag': skipped_port_flag
                    }
                    events.append(event)

                # Handle the last voyage after the last port stay
                last_idx = len(schedules_vessel_df)-1
                arrivalDate = schedules_vessel_df.loc[last_idx, 'arrivalDate']
                voyage_start = schedules_vessel_df.loc[last_idx, 'sailingDate']
                voyage_start_portId = schedules_vessel_df.loc[last_idx, 'portId']
                mooredPortLongitude = schedules_vessel_df.loc[last_idx, 'portLongitude']
                mooredPortLatitude = schedules_vessel_df.loc[last_idx, 'portLatitude']
                small_movement_flag = schedules_vessel_df.loc[last_idx, 'small_movement_flag']
                skipped_port_flag = schedules_vessel_df.loc[last_idx,'skipped_port_flag']
                voyage_end = pd.NaT # No known destination time
                voyage_end_portId = pd.NA  # No known destination port
                destinationLatitude = pd.NA
                destinationLongitude = pd.NA
                event = {
                        'start_time': arrivalDate,
                        'end_time': voyage_start,
                        'schedule_arrivalDate': arrivalDate,
                        'schedule_sailingDate': voyage_start,
                        'schedule_moored_portId': voyage_start_portId,
                        'schedule_moored_portLatitude': mooredPortLatitude,
                        'schedule_moored_portLongitude': mooredPortLongitude,
                        'schedule_voyage_end': voyage_end,
                        'schedule_destination_portId': voyage_end_portId,
                        'schedule_destination_portLatitude': destinationLatitude,
                        'schedule_destination_portLongtitude': destinationLongitude,
                        'schedule_small_movement_flag': small_movement_flag,
                        'schedule_skipped_port_flag': skipped_port_flag
                }
                events.append(event)

                # Convert events to DataFrame
                events_df = pd.DataFrame(events)

                # Now process each event
                data_points = []

                for idx, event in events_df.iterrows():
                    start_time = event['start_time']
                    end_time = event['end_time']

                    # Get AIS data points within this event period
                    ais_event_df = ais_vessel_df[(ais_vessel_df['time'] >= start_time) & (ais_vessel_df['time'] <= end_time)].copy()

                    if not ais_event_df.empty:
                        # Assign event data to AIS data points
                        for col in schedule_cols:
                            ais_event_df[col] = event.get(col, pd.NA)
                        data_points.append(ais_event_df)
                    else:
                        # Create synthetic data point with NaN AIS data and event data
                        synthetic_point = {col: pd.NA for col in ais_vessel_df.columns if col != 'vesselId'}
                        synthetic_point['vesselId'] = vessel_id  # Ensure vesselId is set
                        synthetic_point['time'] = pd.NaT  # Set time to NaT

                        # Assign event data to synthetic point
                        for col in schedule_cols:
                            synthetic_point[col] = event.get(col, pd.NA)

                        synthetic_df = pd.DataFrame([synthetic_point])
                        data_points.append(synthetic_df)

                # Include any AIS data points not covered by events
                # Collect all event intervals
                event_intervals = [(event['start_time'], event['end_time']) for _, event in events_df.iterrows()]
                # Function to check if a time is within any event interval
                def is_in_event(time):
                    return any((time >= start) and (time <= end) for (start, end) in event_intervals)

                # Identify AIS data points not in any event
                ais_outside_events_df = ais_vessel_df[~ais_vessel_df['time'].apply(is_in_event)].copy()
                if not ais_outside_events_df.empty:
                    # Assign NaNs to schedule columns
                    for col in schedule_cols:
                        ais_outside_events_df[col] = pd.NA
                    data_points.append(ais_outside_events_df)

                # Collect all data points
                merged_vessel_df = pd.concat(data_points, ignore_index=True)

                # Create a sort key for merged_vessel_df
                # For rows with valid 'time', use 'time' as sort key
                # For synthetic data points with 'time' as NaT, use 'arrivalDate' or 'voyage_start'
                merged_vessel_df['sort_time'] = merged_vessel_df.apply(
                    lambda row: row['time'] if pd.notnull(row['time']) else (
                        row['schedule_arrivalDate'] if pd.notnull(row['schedule_arrivalDate']) else pd.Timestamp.max
                    ),
                    axis=1
                )

                # Now sort by 'sort_time'
                merged_vessel_df.sort_values('sort_time', inplace=True)
                merged_vessel_df.drop(columns=['sort_time'], inplace=True)
                merged_vessel_df.reset_index(drop=True, inplace=True)

            # Merge with vessel specifications to ensure vessel data is present
            merged_vessel_df = merged_vessel_df.merge(vessels_df, on='vesselId', how='left')

            # Append to the list of merged dataframes
            merged_dfs.append(merged_vessel_df)

        # Concatenate merged dataframes for the batch
        if merged_dfs:
            merged_batch_df = pd.concat(merged_dfs, ignore_index=True)

            # Save the merged batch to Google Cloud Storage
            output_path = f"{gcs_bucket_path}merged_data_batch_{batch_num}.csv"
            merged_batch_df.to_csv(output_path, index=False, encoding='utf-8')
        else:
            print(f'No data to merge for batch {batch_num}.')

In [None]:
merge_data_in_batches(
    ais_train_df=ais_train_df,
    vessels_df=vessels_df,
    schedules_df=schedules_df_final,
    ports_df=ports_df,
    batch_size=200,
    gcs_bucket_path="gs://jacobsbucketformlproject/ML Competition/"
)

First we must create a function that gives voyage info so our merging with schedules becomes more useful!
    

In [None]:
import gc

def clear_dataframes(*dfs):
    """
    Remove specified dataframes from memory and trigger garbage collection.

    Parameters:
    - dfs: A list of dataframe variables to be removed.
    """
    for df in dfs:
        del df
    # Force garbage collection to free up memory
    gc.collect()

# Example usage:
# Assume ais_train_df, vessels_df, schedules_df, and ports_df were used for merging
clear_dataframes(ais_train_df, vessels_df, schedules_df, ports_df)

# After calling this, the dataframes will be removed from memory, and memory will be freed.

In [None]:
# Define the path to the first batch in your GCS bucket
first_batch_path = 'gs://jacobsbucketformlproject/ML Competition/merged_data_batch_1.csv'

# Load the first batch into a pandas DataFrame
first_batch_df = pd.read_csv(first_batch_path, delimiter=',', encoding='utf-8')

# Display the first few rows of the DataFrame to inspect
first_batch_df.head()


In [None]:
# Check for missing values in the merged dataframe

missing_values = first_batch_df.isna().sum()

# Display columns with missing values
print("Missing values per column:")
print(missing_values[missing_values > 0])


In [None]:
first_batch_df.info() 

In [None]:
first_batch_df.tail()

In [None]:
second_batch_path = 'gs://jacobsbucketformlproject/ML Competition/merged_data_batch_2.csv'
# Load the first batch into a pandas DataFrame
second_batch_df = pd.read_csv(second_batch_path, delimiter=',', encoding='utf-8')

# Display the first few rows of the DataFrame to inspect
second_batch_df.head()


In [None]:
# Check for missing values in the merged dataframe

missing_values1 = second_batch_df.isna().sum()

# Display columns with missing values
print("Missing values per column:")
print(missing_values1[missing_values1 > 0])

In [None]:
second_batch_df.tail()

# Summary!!!

Merge looks good! Now we must use the batches to resample time points and feature engineer, before model development. Will use different ways of resampling and feature engineering based on model. So this is the base dataset I now will work on.