In [2]:
import os
import pandas as pd
from tqdm import tqdm
from datetime import timedelta
import requests
from zipfile import ZipFile

def download_and_extract(zip_urls, download_folder, extract_folder):
    """
    Downloads ZIP files from provided URLs and extracts their contents.
    """
    os.makedirs(download_folder, exist_ok=True)
    os.makedirs(extract_folder, exist_ok=True)
    
    for idx, url in enumerate(zip_urls):
        print(f"({idx + 1}/{len(zip_urls)}) Downloading: {url}")
        zip_filename = os.path.join(download_folder, os.path.basename(url))
        
        # Download the file
        with requests.get(url, stream=True) as response:
            response.raise_for_status()
            with open(zip_filename, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
        
        print(f"Downloaded: {zip_filename}")
        
        # Extract the ZIP file
        with ZipFile(zip_filename, 'r') as zip_ref:
            zip_ref.extractall(extract_folder)
            print(f"Extracted: {zip_filename} to {extract_folder}")


def process_large_csv(csv_files, temp_folder, chunk_size=100000):
    """Reads large CSV files and splits them into individual MMSI files."""
    os.makedirs(temp_folder, exist_ok=True)
    mmsi_dict = {}

    for csv_file in csv_files:
        print(f"Processing {csv_file}...")
        with tqdm(total=sum(1 for _ in open(csv_file)) - 1, desc="Processing chunks", unit="rows") as pbar:
            for chunk in pd.read_csv(csv_file, chunksize=chunk_size):
                # Filter out rows with 'Unknown value' in Navigational Status
                chunk = chunk[chunk['Navigational status'] != 'Unknown value']
                for mmsi, group in chunk.groupby('MMSI'):
                    if mmsi not in mmsi_dict:
                        mmsi_dict[mmsi] = group
                    else:
                        mmsi_dict[mmsi] = pd.concat([mmsi_dict[mmsi], group])
                pbar.update(len(chunk))
    
    # Save individual MMSI files
    for mmsi, group in tqdm(mmsi_dict.items(), desc="Saving MMSI files"):
        output_file = os.path.join(temp_folder, f"{mmsi}.csv")
        group.to_csv(output_file, index=False)

def filter_by_area(input_folder, output_folder, area_bounds):
    """Filters MMSI files for entries within a specified geographical area."""
    os.makedirs(output_folder, exist_ok=True)
    min_lat, max_lat, min_lon, max_lon = area_bounds

    for file in tqdm(os.listdir(input_folder), desc="Filtering by area"):
        if not file.endswith('.csv'):
            continue
        df = pd.read_csv(os.path.join(input_folder, file))
        df = df[(df['Latitude'] >= min_lat) & (df['Latitude'] <= max_lat) & 
                (df['Longitude'] >= min_lon) & (df['Longitude'] <= max_lon)]
        if len(df) >= 10:  # Ensure at least 10 consecutive entries
            df.to_csv(os.path.join(output_folder, file), index=False)

def extract_30min_sets(input_folder, output_folder):
    """Extracts 30-minute sets with consistent Navigational Status and saves each as a separate file with time range in the filename."""
    os.makedirs(output_folder, exist_ok=True)

    for file in tqdm(os.listdir(input_folder), desc="Extracting 30-min sets"):
        if not file.endswith('.csv'):
            continue

        # Load the CSV file
        file_path = os.path.join(input_folder, file)
        try:
            df = pd.read_csv(file_path)

            # Strip leading and trailing spaces and remove the '#' from column names
            df.columns = df.columns.str.strip().str.lstrip('# ')
            print(f"Cleaned column names: {list(df.columns)}")

            # Ensure the 'Timestamp' column exists
            if 'Timestamp' not in df.columns:
                print(f"Error: 'Timestamp' column not found in {file}")
                continue

            # Convert the 'Timestamp' column to datetime
            try:
                df['Timestamp'] = pd.to_datetime(df['Timestamp'], format='%d/%m/%Y %H:%M:%S', errors='coerce')
            except Exception as e:
                print(f"Error parsing 'Timestamp' in {file}: {e}")
                continue

            # Drop rows with invalid timestamps
            df = df.dropna(subset=['Timestamp'])

            # Sort the DataFrame by timestamp
            df = df.sort_values('Timestamp')

            # Group into 30-minute intervals
            df['Time_Group'] = (df['Timestamp'] - df['Timestamp'].min()).dt.total_seconds() // (30 * 60)
            grouped = df.groupby('Time_Group')

            # Process each 30-minute group
            valid_sets = 0
            for group_id, group in grouped:
                # Check if the Navigational Status is consistent
                if group['Navigational status'].nunique() == 1:
                    valid_sets += 1

                    # Get start and end times for the 30-minute interval
                    start_time = group['Timestamp'].min().strftime('%Y%m%d_%H%M')
                    end_time = group['Timestamp'].max().strftime('%Y%m%d_%H%M')

                    # Construct the output filename with time range in the format: original_filename_starttime-endtime.csv
                    output_filename = f"{os.path.splitext(file)[0]}_{start_time}-{end_time}.csv"
                    output_path = os.path.join(output_folder, output_filename)

                    # Save the 30-minute group as a separate file
                    group.to_csv(output_path, index=False)
                    print(f"Saved 30-min set: {output_filename}")

            if valid_sets == 0:
                print(f"No valid 30-min sets found in {file}.")

        except Exception as e:
            print(f"Error processing file {file}: {e}")

def analyze_navigational_status(input_folder):
    """Analyzes the distribution of 30-minute sets by Navigational Status."""
    status_counts = {}

    for file in tqdm(os.listdir(input_folder), desc="Analyzing Navigational Status"):
        if not file.endswith('.csv'):
            continue
        df = pd.read_csv(os.path.join(input_folder, file))
        status = df['Navigational status'].iloc[0]  # Assuming consistent status in each set
        status_counts[status] = status_counts.get(status, 0) + 1

    return status_counts

def filter_and_save_csv(input_folder: str, output_folder: str, columns_to_keep: list, row_filter: str = None):
    """
    Processes CSV files in the input folder, keeps only the specified columns,
    optionally applies row filtering, removes duplicate rows based on "Timestamp",
    and saves the filtered CSV files to the output folder.

    Parameters:
    - input_folder (str): Path to the folder containing input .csv files.
    - output_folder (str): Path to the folder where the filtered .csv files will be saved.
    - columns_to_keep (list): List of column names to retain.
    - row_filter (str, optional): A string representing a row filter condition, 
                                  e.g., 'SOG > 0'. Defaults to None (no row filtering).
    """
    # Ensure the output folder exists
    os.makedirs(output_folder, exist_ok=True)
    
    # List all the CSV files in the input folder
    csv_files = [f for f in os.listdir(input_folder) if f.endswith('.csv')]

    for file in csv_files:
        file_path = os.path.join(input_folder, file)
        
        # Read the CSV file
        df = pd.read_csv(file_path)
        
        # Remove duplicate rows based on the "Timestamp" column
        df = df.drop_duplicates(subset=["Timestamp"], keep="first")
        
        # Filter only the columns you need
        df_filtered = df[columns_to_keep]
        
        # Apply the row filtering condition if provided
        if row_filter:
            df_filtered = df_filtered.query(row_filter)
        
        # Save the filtered DataFrame to the output folder
        output_path = os.path.join(output_folder, f'filtered_{file}')
        df_filtered.to_csv(output_path, index=False)
        
        print(f"Processed {file} and saved to {output_path}")

def check_missing_values_in_csv(directory_path):
    # Loop through all CSV files in the specified directory
    for filename in os.listdir(directory_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(directory_path, filename)
            df = pd.read_csv(file_path)
            
            # Check for missing values in each column
            missing_data = df.isnull().sum()
            
            print(f"Missing values in {filename}:")
            print(missing_data)
            
            # Handle missing values:
            for column in df.columns:
                if missing_data[column] > 0:
                    # Suggestion for handling missing values
                    print(f"\nColumn: {column}")
                    if df[column].dtype in ['float64', 'int64']:
                        # Numeric columns: Suggest filling or interpolating
                        print(f"Suggested action: Interpolate missing values or fill with a default value (e.g., 0).")
                    else:
                        # Non-numeric columns (e.g., text): Suggest filling with the mode or dropping rows
                        print(f"Suggested action: Fill missing values with the most frequent value or drop rows.")
            
            print("\n")

def check_and_process_csv_files(directory_path):
    deleted_files_count = 0
    processed_files_count = 0
    files_with_high_missing_data = 0
    files_with_non_numeric_interpolation = []
    affected_files_due_to_duplicates = 0
    deleted_rows_due_to_duplicates = 0

    # Loop through all CSV files in the specified directory
    for filename in os.listdir(directory_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(directory_path, filename)
            df = pd.read_csv(file_path)

            # Ensure the Timestamp column is in datetime format
            df['Timestamp'] = pd.to_datetime(df['Timestamp'])
            
            # Step 1: Handle duplicate timestamps by dropping them
            before_drop = len(df)
            df = df.drop_duplicates(subset='Timestamp', keep='first')  # Keep the first occurrence
            after_drop = len(df)
            deleted_rows_due_to_duplicates += (before_drop - after_drop)
            
            # If rows were deleted, track the affected files
            if before_drop != after_drop:
                affected_files_due_to_duplicates += 1

            # Check for missing values in each column
            missing_data = df.isnull().sum()

            # Step 2: Delete file if any column has more than 50% missing data
            if (missing_data / len(df) > 0.5).any():
                os.remove(file_path)
                deleted_files_count += 1
                continue  # Skip to the next file

            # Step 3: Apply interpolation for columns with less than 50% missing data
            missing_percentage = missing_data / len(df) * 100
            columns_to_interpolate = missing_percentage[missing_percentage <= 50].index

            # Track which columns could not be interpolated due to non-numeric data
            non_numeric_columns = []

            for column in columns_to_interpolate:
                if pd.api.types.is_numeric_dtype(df[column]):
                    # Perform time-based linear interpolation
                    df[column] = df.apply(lambda row: interpolate_with_time(df, column, row.name), axis=1)
                else:
                    non_numeric_columns.append(column)

            # If there are non-numeric columns attempted to be interpolated
            if non_numeric_columns:
                files_with_non_numeric_interpolation.append((filename, non_numeric_columns))

            # Step 4: Save the file back after interpolation
            df.to_csv(file_path, index=False)
            processed_files_count += 1

            # Step 5: Count files where too many columns have high missing data
            if (missing_percentage > 50).any():
                files_with_high_missing_data += 1

    # Summarize process
    print(f"\nProcess completed: {deleted_files_count} files deleted, {processed_files_count} files processed.")
    print(f"{files_with_high_missing_data} files had columns with more than 50% missing data.")
    
    # Report files where duplicates were removed
    if affected_files_due_to_duplicates > 0:
        print(f"\n{affected_files_due_to_duplicates} files had duplicate timestamps. {deleted_rows_due_to_duplicates} rows were deleted.")

    # Report files and columns where interpolation was skipped due to non-numeric data
    if files_with_non_numeric_interpolation:
        print("\nInterpolation skipped for the following columns due to non-numeric data:")
        for filename, columns in files_with_non_numeric_interpolation:
            print(f"In file '{filename}', skipped interpolation for columns: {', '.join(columns)}")

def interpolate_with_time(df, column, index):
    # Get the row with the Timestamp
    timestamp = df.at[index, 'Timestamp']
    
    # Get the previous and next rows with non-null values for the column
    prev_row = df.iloc[:index][df[column].notna()].iloc[-1:] if index > 0 else None
    next_row = df.iloc[index:][df[column].notna()].iloc[0:1] if index < len(df) - 1 else None

    if prev_row is not None and next_row is not None:
        # Linear interpolation based on timestamps
        prev_timestamp = prev_row['Timestamp'].iloc[0]
        next_timestamp = next_row['Timestamp'].iloc[0]

        prev_value = prev_row[column].iloc[0]
        next_value = next_row[column].iloc[0]

        # Calculate the time difference (in seconds)
        time_diff_total = (next_timestamp - prev_timestamp).total_seconds()
        time_diff_before = (timestamp - prev_timestamp).total_seconds()

        # Handle division by zero if timestamps are the same
        if time_diff_total == 0:
            return prev_value  # or handle this case differently if needed

        # Calculate the interpolated value using the proportional time difference
        interpolated_value = prev_value + (next_value - prev_value) * (time_diff_before / time_diff_total)
        
        return interpolated_value

    # If there's no valid previous or next row, leave the value as NaN (or handle this case differently)
    return df.at[index, column]


def interpolate_vessel_data(input_folder, output_folder, time_interval='10S'):
    """
    Interpolate vessel data to a specified time interval for all CSV files in a folder.

    Parameters:
        folder_path (str): Path to the folder containing CSV files.
        output_folder (str): Path to the folder to save processed files.
        time_interval (str): Time interval for resampling (e.g., '10S', '1T').
    """
    if not os.path.exists(output_folder):
        os.makedirs(output_folder)

    for file_name in os.listdir(input_folder):
        if file_name.endswith(".csv"):
            file_path = os.path.join(input_folder, file_name)
            try:
                # Load data
                df = pd.read_csv(file_path)
                
                # Ensure 'Timestamp' is in datetime format and set as index
                df['Timestamp'] = pd.to_datetime(df['Timestamp'])
                df.set_index('Timestamp', inplace=True)
                
                # Separate numeric and non-numeric columns
                numeric_cols = df.select_dtypes(include='number').columns
                non_numeric_cols = df.select_dtypes(exclude='number').columns
                
                # Resample numeric columns
                numeric_resampled = df[numeric_cols].resample(time_interval).mean()
                
                # Resample non-numeric columns using forward-fill or first value
                non_numeric_resampled = df[non_numeric_cols].resample(time_interval).ffill().fillna(method='bfill')
                
                # Combine resampled data
                resampled_df = pd.concat([numeric_resampled, non_numeric_resampled], axis=1)
                
                # Reset index and save the processed file
                resampled_df.reset_index(inplace=True)
                output_file_path = os.path.join(output_folder, file_name)
                resampled_df.to_csv(output_file_path, index=False)
                
                print(f"Processed: {file_name}")
            
            except Exception as e:
                print(f"Error processing {file_name}: {e}")

def integrate_pipeline_with_web_sources(zip_urls, temp_folder, area_bounds):
    """
    Downloads and processes ZIP files, extracting CSVs and running the pipeline.
    """
    download_folder = './downloaded_zips'
    extract_folder = './extracted_files'

    # Step 1: Download and Extract
    # download_and_extract(zip_urls, download_folder, extract_folder)

    # Step 2: Split CSVs into MMSI-specific files
    # process_large_csv([os.path.join(extract_folder, f) for f in os.listdir(extract_folder) if f.endswith('.csv')], temp_folder)

    # Step 3: Filter files by geographical area
    # filtered_folder = './filtered_area'
    # filter_by_area(temp_folder, filtered_folder, area_bounds)

    # Step 4: Extract 30-minute sets
    # sets_folder = './30min_sets'
    # extract_30min_sets(filtered_folder, sets_folder)

    # Step 5: Analyze Navigational Status
    # status_distribution = analyze_navigational_status(sets_folder)
    # print("Distribution of Navigational Status:", status_distribution)

    # Step 6: Slim the .csv files down to what we want to work with
    input_folder = './time_sets'
    output_folder = './time_sets_slim'

    # Columns you want to retain in the filtered CSVs
    columns_to_keep = [
        'Timestamp', 'MMSI', 'Latitude', 'Longitude', 
        'Navigational status', 'SOG', 'COG', 'Heading'
    ]

    # Optionally define a row filter (e.g., remove rows with SOG <= 0)
    # row_filter = 'SOG > 0'  # Optional, can be set to None if no filtering is needed

    # Call the function
    filter_and_save_csv(input_folder, output_folder, columns_to_keep)

    # Step 7: Getting overview of missing values in the slimmed sets
    # directory_path = "./time_sets_slim"
    # check_missing_values_in_csv(directory_path)

    # Step 8: Imputing and deleting
    # directory_path = "./time_sets_slim"
    # check_and_process_csv_files(directory_path)

    # Step 9: standardize to specified interval in files
    # input_folder = './time_sets'
    # output_folder = './time_sets_interpolated'
    # time_interval = '10S'  # Modify the interval as needed (e.g., '1T' for 1 minute)
    # interpolate_vessel_data(input_folder, output_folder, time_interval)

# Example URLs for ZIP files (replace with actual URLs)
zip_urls = [
    "https://web.ais.dk/aisdata/aisdk-2024-11-30.zip",
    "https://web.ais.dk/aisdata/aisdk-2024-11-29.zip",
    "https://web.ais.dk/aisdata/aisdk-2024-11-28.zip"
]

# Example usage
area_bounds = (55.0, 56.0, 7.0, 8.0)  # Define your area
temp_folder = './mmsi_temp'
integrate_pipeline_with_web_sources(zip_urls, temp_folder, area_bounds)


Processed 219596000_20241130_1957-20241130_2011.csv and saved to ./time_sets_slim/filtered_219596000_20241130_1957-20241130_2011.csv
Processed 563192100_20241126_2154-20241126_2209.csv and saved to ./time_sets_slim/filtered_563192100_20241126_2154-20241126_2209.csv
Processed 212376000_20241127_0222-20241127_0237.csv and saved to ./time_sets_slim/filtered_212376000_20241127_0222-20241127_0237.csv
Processed 255805899_20241127_0720-20241127_0735.csv and saved to ./time_sets_slim/filtered_255805899_20241127_0720-20241127_0735.csv
Processed 219026490_20241127_1819-20241127_1834.csv and saved to ./time_sets_slim/filtered_219026490_20241127_1819-20241127_1834.csv
Processed 219613000_20241129_1715-20241129_1729.csv and saved to ./time_sets_slim/filtered_219613000_20241129_1715-20241129_1729.csv
Processed 235008380_20241130_1547-20241130_1602.csv and saved to ./time_sets_slim/filtered_235008380_20241130_1547-20241130_1602.csv
Processed 246179000_20241130_2334-20241130_2334.csv and saved to ./ti

In [17]:
import os
import pandas as pd

def check_for_missing_in_specific_columns(directory_path):
    timestamp_and_status_missing_files = []
    
    # Loop through all CSV files in the specified directory
    for filename in os.listdir(directory_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(directory_path, filename)
            df = pd.read_csv(file_path)
            
            # Check for missing values in 'Timestamp' and 'Navigational Status' columns
            missing_timestamp = df['Timestamp'].isnull().sum() if 'Timestamp' in df.columns else 0
            missing_status = df['Navigational Status'].isnull().sum() if 'Navigational Status' in df.columns else 0
            
            # If either column has missing values, add to the list
            if missing_timestamp > 0 or missing_status > 0:
                timestamp_and_status_missing_files.append({
                    'filename': filename,
                    'missing_timestamp': missing_timestamp,
                    'missing_status': missing_status
                })
    
    # Print a report of files with missing values in those specific columns
    if timestamp_and_status_missing_files:
        print("\nFiles with missing values in 'Timestamp' or 'Navigational Status' columns:")
        for file_info in timestamp_and_status_missing_files:
            print(f"File: {file_info['filename']}, Missing 'Timestamp': {file_info['missing_timestamp']}, Missing 'Navigational Status': {file_info['missing_status']}")
    else:
        print("\nNo missing values found in 'Timestamp' or 'Navigational Status' columns.")
directory_path = "./time_sets_slim"
check_for_missing_in_specific_columns(directory_path)



No missing values found in 'Timestamp' or 'Navigational Status' columns.
