In [None]:
# Import Required Libraries
import requests
import pandas as pd
import numpy as np
import warnings
import csv
import time
import tempfile
import shutil

from pandas import json_normalize
from datetime import datetime

warnings.filterwarnings('ignore')

In [None]:
# API Key and Default Parameters
# User-specific API key
API_KEY = 'API_KEY'  # Replace with your actual API key
API_KEY_MT = 'API_KEY_MT' #MT Vessel Positions API Key

# Default API parameters
DEFAULT_ORIGIN_DATE_START = '2024-10-15'       # Set a default origin start date
DEFAULT_ORIGIN_DATE_END = '2024-10-24'         # Set a default origin end date
DEFAULT_DESTINATION_DATE_START = None # Optional, leave None if not required
DEFAULT_DESTINATION_DATE_END = None            # Optional, leave None if not required
DEFAULT_VESSELS = []                           # Optional, list of vessels as strings
DEFAULT_TRADE_STATUS = 'in transit'            # By Trade Status
DEFAULT_SIZE = 100                             # Set Data Limit

In [None]:
def get_api_params():
    params = {
        'tradeStatus': input(f"Enter Trade Status (default: {DEFAULT_TRADE_STATUS }): ") or DEFAULT_TRADE_STATUS 
        'size': input(f"Enter Data Limit (default: {DEFAULT_SIZE}): ") or DEFAULT_SIZE
    }
    return params

# Construct Trades API URL
def construct_trades_url(params):
    base_url = 'https://api.kpler.com/v2/cargo/trades?'
    for key, value in params.items():
        if value:
            if isinstance(value, list):
                for item in value:
                    base_url += f"&{key}[]={item.strip()}"
            else:
                base_url += f"&{key}={value}"
    return base_url

# Fetch Trades Data
def fetch_trades_data(url, headers):
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        print("Trades API call successful.")
        json_response_trades = response.json()
        return json_response_trades
    else:
        print(f"Error {response.status_code}: {response.text}")
        return None

In [None]:
# Convert the json response to a Pandas dataframe and keep only the data fields of interest
def normalize_trades_data(data):
    df_trades = pd.json_normalize(
        data, 
        record_path=['vessels'],
        meta=['id', 'status',
              ['vessel', 'imo'],
              ['vessel', 'name'],    
              ['vessel', 'type'],
              ['vessel', 'cargoType'],   
              ['origin', 'zone', 'name'],
              ['origin', 'arrival'],
              ['origin', 'departure'],
              ['destination', 'zone', 'name'],
              ['destination', 'arrival'],
              ['destination', 'departure'],
        ],
        sep='_',
        errors='ignore',
        meta_prefix='trades_'  # Prefix all meta fields to avoid conflicts
    )

    # Keep only the columns of interest
    trades = df_trades[['voyageId', 'vessel_id', 'vessel_name', 'vessel_mmsi', 'vessel_imo', 'vessel_deadWeightTonnage',
                        'vessel_capacity', 'trades_id', 'trades_status',
                        'trades_origin_zone_name', 'trades_destination_zone_name']]

    # Keep only rows where 'imo' is not NaN
    trades_filtered = trades[trades['vessel_imo'].notna() & (trades['vessel_imo'] != '')]

    return trades_filtered

In [None]:
# Fetch Compliance Data
def fetch_compliance_data(imo_list, start_date, end_date, headers):
    compliance_url = 'https://api.kpler.com/v2/compliance/vessel-risks'
    
    # Compliance API expects the request body as a list of IMO numbers
    body = imo_list  # Directly pass the IMO list as the request body
    
    # API Call
    response_compliance = requests.post(compliance_url, headers=headers, json=body)
    
    if response_compliance.status_code == 200:
        print("Compliance API call successful.")
        return response_compliance.json()
    else:
        print(f"Error {response_compliance.status_code}: {response_compliance.text}")
        return None

In [None]:
#Array Handling
def expand_and_select_columns(df, column_name, selected_keys, prefix):
    """
    Expands a column containing a list of dictionaries into separate columns for selected keys.
    Converts Unix timestamps to 'YYYY/MM/DD HH:MM:SS' for specific keys like 'startDate' and 'endDate'.
    Replaces empty or missing values with NaN.
    """
    if column_name in df.columns:
        # Ensure the column contains lists (drop invalid entries)
        valid_entries = df[column_name].apply(lambda x: isinstance(x, list))
        if not valid_entries.all():
            print(f"Non-list entries found in column {column_name}. Converting to empty lists where necessary.")
            df[column_name] = df[column_name].apply(lambda x: x if isinstance(x, list) else [])

        # Initialize a temporary DataFrame to hold expanded data
        temp_df = pd.DataFrame()

        # Loop through each selected key
        for key in selected_keys:
            # Extract the values for the current key
            extracted = df[column_name].apply(lambda x: [d.get(key, None) for d in x] if isinstance(x, list) else [])
            
            # Convert Unix timestamps to readable format if the key is 'startDate' or 'endDate'
            if key in ['startDate', 'endDate']:
                extracted = extracted.apply(lambda x: [convert_unix_to_datetime(ts) for ts in x])

            # Replace empty or None values with NaN
            extracted = extracted.apply(lambda x: [item if item is not None else np.nan for item in x])

            # Create new columns for each value in the list
            max_len = extracted.apply(len).max()  # Find the maximum number of entries for the key
            for i in range(max_len):
                temp_df[f"{prefix}_{key}_{i+1}"] = extracted.apply(lambda x: x[i] if i < len(x) else np.nan)

        # Concatenate the new columns back to the main DataFrame
        df = pd.concat([df, temp_df], axis=1)

        # Drop the original array column
        df = df.drop(columns=[column_name])
    else:
        print(f"Column {column_name} not found in the DataFrame.")

    return df

def convert_unix_to_datetime(unix_timestamp):
    """
    Converts a Unix timestamp to 'YYYY/MM/DD HH:MM:SS' format.
    """
    try:
        if unix_timestamp is not None:
            return datetime.utcfromtimestamp(int(unix_timestamp)).strftime('%Y/%m/%d %H:%M:%S')
        return np.nan  # Return NaN if the timestamp is None
    except Exception as e:
        print(f"Error converting timestamp: {unix_timestamp}. Error: {e}")
        return np.nan  # Return NaN in case of an error

# Normalize the data
def normalize_compliance_data(data):
    """
    Normalizes compliance data, expands nested fields, and allows for key selection from arrays.
    """
    # Flatten the JSON data
    df_compliance = pd.json_normalize(data, sep='_')

    # Define the required columns
    required_columns = [
        'vessel_imo', 'vessel_mmsi', 'vessel_callsign', 'vessel_shipname',
        'vessel_flag', 'vessel_countryCode', 'vessel_typeName',
        'vessel_typeSummary', 'vessel_particulars_gt', 'vessel_particulars_yob',
        'vessel_vesselCompanies',
        'compliance_sanctionRisks_sanctionedVessels_isSanctioned',
        'compliance_sanctionRisks_sanctionedVessels_historicalData',
        'compliance_sanctionRisks_sanctionedCompanies_isSanctioned',
        'compliance_sanctionRisks_sanctionedCompanies_companies',
        'compliance_sanctionRisks_sanctionedCompanies_historicalData',
    ]

    # Check for missing columns
    missing_columns = [col for col in required_columns if col not in df_compliance.columns]
    if missing_columns:
        print(f"Warning: Missing columns - {missing_columns}")

    # Select only available columns
    compliance = df_compliance[[col for col in required_columns if col in df_compliance.columns]]

    # Expand nested arrays and select keys
    compliance = expand_and_select_columns(
        compliance,
        column_name='vessel_vesselCompanies',
        selected_keys=['typeName', 'name'],
        prefix='vessel_vesselCompanies'
    )

    compliance = expand_and_select_columns(
        compliance,
        column_name='compliance_sanctionRisks_sanctionedCompanies_companies',
        selected_keys=['name'],
        prefix='sanctionedCompanies_companies'
    )

    compliance = expand_and_select_columns(
        compliance,
        column_name='compliance_sanctionRisks_sanctionedVessels_historicalData',
        selected_keys=['name', 'startDate', 'endDate'],
        prefix='sanctionedVessels_historicalData'
    )

    compliance = expand_and_select_columns(
        compliance,
        column_name='compliance_sanctionRisks_sanctionedCompanies_historicalData',
        selected_keys=['name', 'startDate', 'endDate'],
        prefix='sanctionedCompanies_historicalData'
    )

    # Return the processed DataFrame
    return compliance

In [None]:
# Fetch the positions for the IMO numbers using the MarineTraffic API
def fetch_ais_data(api_key, imo_list, timespan=1440, buffer_time=120):
    """
    Fetch AIS data for a list of IMO numbers from the MarineTraffic API.

    Parameters:
    - api_key (str): The API key for authentication.
    - imo_list (list): List of IMO numbers to fetch data for.
    - timespan (int): Timespan in minutes for the data (default is 1440 minutes, i.e., 24 hours).
    - buffer_time (int): Time in seconds to wait between API calls (default is 1 second).

    Returns:
    - pd.DataFrame: A DataFrame containing the filtered AIS data for the IMO numbers.
    """
    
    print("Starting AIS data retrieval...")
    url_template = f'https://services.marinetraffic.com/api/exportvessel/{api_key}/v:6/timespan:{timespan}/imo:{{imo}}/protocol:jsono'
    all_ais_data = []
    
    for idx, imo in enumerate(imo_list, start=1):
        print(f"[{idx}/{len(imo_list)}] Fetching AIS data for IMO: {imo}...")
        try:
            # API call
            response = requests.get(url_template.format(imo=imo))
            if response.ok:
                data = response.json()
                if isinstance(data, list):
                    for record in data:
                        record['IMO'] = imo  # Add IMO to each record
                        all_ais_data.append(record)
                    print(f"SUCCESS: Data fetched and processed for IMO {imo}.")
                else:
                    print(f"WARNING: Unexpected data format for IMO {imo}.")
            else:
                print(f"ERROR: Failed to fetch data for IMO {imo}. Status: {response.status_code} {response.reason}")
        except Exception as e:
            print(f"ERROR: Exception occurred for IMO {imo}: {e}")
        
        # Add a buffer time (in seconds) between API calls
        if idx < len(imo_list):  # Avoid waiting after the last call
            print(f"Waiting for {buffer_time} seconds before the next call...")
            time.sleep(buffer_time)

    # Check if any data was collected
    if all_ais_data:
        # Create DataFrame
        df_ais = pd.DataFrame(all_ais_data)
        # Retain only the desired columns: LAT, LON, and TIMESTAMP
        df_ais = df_ais[['IMO', 'LAT', 'LON', 'TIMESTAMP']] if {'IMO', 'LAT', 'LON', 'TIMESTAMP'}.issubset(df_ais.columns) else pd.DataFrame(columns=['IMO', 'LAT', 'LON', 'TIMESTAMP'])
        print("AIS data retrieval complete.")
        return df_ais
    else:
        print("No AIS data retrieved.")
        return None

In [None]:
# Merge the Two Data Frames Using the VoyageId and Include AIS Data
def combine_filtered(trades_filtered, compliance_filtered, ais_data):
    # Handle NaN values in 'vessel_imo' columns before conversion
    trades_filtered['vessel_imo'] = trades_filtered['vessel_imo'].fillna(np.nan).astype(int)
    compliance_filtered['vessel_imo'] = compliance_filtered['vessel_imo'].fillna(np.nan).astype(int)
    ais_data['IMO'] = ais_data['IMO'].fillna(np.nan).astype(int)

    # Merge the trades and compliance data
    df_merged = pd.merge(
        trades_filtered,
        compliance_filtered, 
        how='outer',  # 'outer' to keep all records
        left_on='vessel_imo',
        right_on='vessel_imo',
        suffixes=('_trade', '_compliance')
    )
    
    # Merge the result with AIS data
    df_final = pd.merge(
        df_merged,
        ais_data,
        how='left',  # Use 'left' to keep all records from trades/compliance
        left_on='vessel_imo',
        right_on='IMO'
    )

    # Drop the 'IMO' column from AIS data
    df_final = df_final.drop(columns=['IMO'], errors='ignore')
    
    # Fill NaN values explicitly where applicable (if needed for other columns)
    df_final = df_final.fillna(np.nan)
    
    return df_final

In [None]:
def main():
    try:
        # Step 1: Retrieve API parameters
        params = get_api_params()

        # Step 2: Fetch Trades data
        trades_url = construct_trades_url(params)
        headers = {'Authorization': f'Basic {API_KEY}', 'Accept': 'application/json'}
        trades_data = fetch_trades_data(trades_url, headers)

        if trades_data:
            df_trades = normalize_trades_data(trades_data)
            trades_file_path = 'E:/json_response_trades.csv'
            df_trades.to_csv(trades_file_path, index=False)
            print(f"Trades data saved to '{trades_file_path}'")

            # Extract distinct IMO numbers
            distinct_imo_list = (
                df_trades['vessel_imo']
                .dropna()
                .unique()
                .astype(int)
                .tolist()
            )

            # Step 3: Fetch Compliance data
            compliance_data = fetch_compliance_data(distinct_imo_list, DEFAULT_ORIGIN_DATE_START, DEFAULT_ORIGIN_DATE_END, headers)

            if compliance_data:
                df_compliance = normalize_compliance_data(compliance_data)
                compliance_file_path = 'E:/compliance_data.csv'
                df_compliance.to_csv(compliance_file_path, index=False)
                print(f"Compliance data saved to '{compliance_file_path}'")

                # Step 4: Fetch AIS data
                df_ais = fetch_ais_data(API_KEY_MT, distinct_imo_list, timespan=1440)
                if df_ais is not None:
                    ais_file_path = 'E:/ais_data.csv'
                    df_ais.to_csv(ais_file_path, index=False)
                    print(f"AIS data saved to '{ais_file_path}'")

                    # Step 5: Merge all data
                    merged_data = combine_filtered(df_trades, df_compliance, df_ais)
                    merged_file_path = 'E:/merged_trades_compliance_ais_data.csv'
                    merged_data.to_csv(merged_file_path, index=False)
                    print(f"Merged data saved to '{merged_file_path}'")
                else:
                    print("Failed to fetch AIS data.")
            else:
                print("Failed to fetch Compliance data.")
        else:
            print("Failed to fetch Trades data.")
    except Exception as e:
        print(f"An error occurred: {e}")

# Execute Main
if __name__ == "__main__":
    main()