# Brukerhistorie 1
Som bruker ønsker jeg å strømme AIS-data slik at jeg kan få tilgang til sanntids- og historiske skipsposisjoner filtrert etter mine behov.  

Funksjonalitet: 
- Det skal være mulig å hente AIS-data direkte fra fil. Prosessen omhandler strømming som benytter “partial read" som bruker kan filtrere ut data. 

Akseptkriterier: 
- Jeg skal kunne hente AIS-data direkte fra filer med geografisk filtrering og kolonnefiltrering.  
- Jeg skal kunne filtrere data etter mine egne behov. 

Mål: 
- Strømmingen av AIS-data skal være en effektiv og brukervennlig prosess. Brukeren skal kunne filtrere data etter egne behov. Prosessen skal også benytte “partial read”, slik at data oppdateres automatisk og ytelsen optimaliseres. 

# Packages

In [0]:
from sedona.spark import *
import requests
import os
import json
from pyspark.sql import Row
import pyspark.sql.functions as F
from pyspark.sql.functions import expr, col, to_timestamp, hour, date_format, lit, to_timestamp, to_date,count
from pyspark.sql.types import IntegerType, DoubleType, FloatType, LongType, TimestampType, DateType, StringType
from pyspark.sql import DataFrame
from pyspark.sql.utils import AnalysisException
import pandas as pd
import geopandas as gpd
from shapely import wkb
from datetime import datetime, date   
import time
import ipywidgets as widgets
from IPython.display import display, clear_output


# Constant variable

In [0]:
AIS_TABLE ="land_techtroll_dev.bronze.ais"
PLACE_NAMES_CRS = 4258
EPSG = 4326

# Config
Define dev_config for the development environment and prod_config for the production environment.</br></br>
Each configuration contains:
- catalog_name: the name of the catalog.
- landing_zone_prefix: file path to the "landing zone" where new data is typically placed initially.
- location_prefix: file path to where static data is stored in the cloud.
- static_data_prefix: general file path to static data.

In [0]:
%run ./config

{'catalog_name': 'land_techtroll_dev',
 'landing_zone_prefix': '/Volumes/land_techtroll_dev/external_dev/landing_zone',
 'location_prefix': '/Volumes/land_techtroll_dev/external_dev/static_data/cloudFiles',
 'static_data_prefix': '/Volumes/land_techtroll_dev/external_dev/static_data',
 'env': 'dev'}

# Set catelog
In this notebook, the catalog is set to `land_techtroll_dev`, as this is where the relevant databases are located. Setting the catalog ensures that all subsequent queries reference the correct data environment, making it easier to access and manage the necessary tables for our testing and development.


In [0]:
spark.sql(f'USE CATALOG {spark.conf.get("conf.catalog_name")}')

DataFrame[]

# Get tabell from database
The database follows the Medallion Architecture, which organizes data into layers:
- Bronze: Raw or minimally processed data (used in this case).
- Silver: Cleaned and structured data.
- Gold: Aggregated, business-ready data

This notebook using only bronze layer to work with raw data for early-stage testing and development.

Important note! <i> Spark table has geometry data as binary, hence it need to be convert</i>

In [0]:
# Read AIS data from database
AIS_DF = spark.table(AIS_TABLE)
# Convert binary geometry to points
AIS_DF = AIS_DF.withColumn("geometry", expr("ST_AsText(ST_GeomFromWKB(geometry))"))

# GIS query

In [0]:
def create_buffer_radius(lon, lat, radius_m=500):
    data = [(1, lon, lat)]
    df_raw = spark.createDataFrame(data, ["id", "lon", "lat"])
    df_raw.createOrReplaceTempView("geom_points")

    query = f"""
        SELECT 
            id,
            st_asgeojson(
                ST_Buffer(
                    ST_SetSRID(ST_Point(lon, lat), {EPSG}),
                    {radius_m}, True
                )
            ) AS geom
        FROM geom_points
    """
    buffer_df = spark.sql(query)
    return buffer_df


def get_geometry_within(df, lon, lat, radius_m=500, include_all_columns=False):
    # Create the buffer
    buffer_df = create_buffer_radius(lon, lat, radius_m)
    
    # Register input DataFrame
    df.createOrReplaceTempView("Within_view")

    # Select either all columns or a default subset
    if include_all_columns:
        select_stmt = "SELECT *"
    else:
        select_stmt = """
            SELECT
                mmsi,
                ship_name,
                ship_type,
                date_time_utc,
                geometry
        """

    # Run spatial query directly on the buffer
    query = f"""
        {select_stmt}
        FROM Within_view
        WHERE ST_Intersects(
            ST_GeomFromText(geometry),
            ST_Transform(
                ST_SetSRID(
                    ST_Buffer(
                        ST_SetSRID(ST_Point({lon}, {lat}), {EPSG}),
                        {radius_m}, True
                    ),
                    {EPSG}
                ),
                'EPSG:{EPSG}'
            )
        )
    """
    within_df = spark.sql(query)
    return within_df, buffer_df


# Map visualization 

In [0]:
def prepare_for_kepler(df: DataFrame) -> DataFrame:
    """
    Cleans a DataFrame for visualization in SedonaKepler.
    
    - Converts DecimalType to FloatType
    - Converts DateType to String ("yyyy-MM-dd")

    Args:
        df (DataFrame): The input PySpark DataFrame

    Returns:
        DataFrame: Cleaned DataFrame ready for Kepler.gl
    """
    # Convert decimal columns to float
    decimal_cols = [f.name for f in df.schema.fields if "decimal" in str(f.dataType).lower()]
    for col_name in decimal_cols:
        df = df.withColumn(col_name, col(col_name).cast("float"))

    # Convert date columns to string (yyyy-MM-dd)
    if "date" in df.columns:
        df = df.withColumn("date", date_format(col("date"), "yyyy-MM-dd HH:mm"))

    return df

def visualize_map(filter_df: DataFrame, name: str = None):
    """
    Visualizes a filtered PySpark DataFrame on a Kepler-style interactive map.

    Parameters:
    filter_df (DataFrame): The filtered DataFrame containing spatial data (geometry or coordinates).
    name (str, optional): A name or label to assign to the map layer. Defaults to None.

    Returns:
    None: Displays the interactive map in the notebook environment.
    """
    # Preprocess the DataFrame to make it compatible with Kepler visualization
    filtered_df_clean = prepare_for_kepler(filter_df)

    # Create an empty interactive map using SedonaKepler
    map = SedonaKepler.create_map()

    # Add the cleaned DataFrame to the map, with an optional layer name
    SedonaKepler.add_df(map, filtered_df_clean, name=name)

    # Display the map in the notebook
    return map



# Optimization

In [0]:
def difference_between_rows(df: DataFrame, filter_df: DataFrame) -> int:
    """
    Calculates and prints the difference in row counts between the original and filtered DataFrames.

    Parameters:
    df (DataFrame): The original (unfiltered) DataFrame.
    filter_df (DataFrame): The filtered version of the DataFrame.

    Returns:
    int: The number of rows removed during filtering.
    """
    # Count rows in both DataFrames
    original_count = df.count()
    filtered_count = filter_df.count()

    # Print summary
    print(f"The original DataFrame has {original_count} rows.")
    print(f"The filtered DataFrame has {filtered_count} rows.")
    print(f"The difference is {original_count - filtered_count} rows.")

    # Return the difference in row count
    return original_count - filtered_count

In [0]:
def difference_between_size(filter_df: DataFrame) -> float:
    """
    Compares the disk size (in MB) of the original Delta table and a filtered DataFrame.

    This function:
    - Writes the filtered DataFrame to a temporary table
    - Compares table sizes using DESCRIBE DETAIL
    - Cleans up the temporary table afterward
    - Returns the size difference in megabytes

    Parameters:
    original_table (str): The name of the original Delta table (e.g., 'bronze.ais').
    filter_df (DataFrame): The filtered DataFrame to compare.

    Returns:
    float: The difference in size (original - filtered) in megabytes.
    """
    tmp_table = "bronze.ais_tmp"

    try:
        # Save the filtered DataFrame as a temporary table
        filter_df.write.mode("overwrite").saveAsTable(tmp_table)
        
        # Get size of the original table
        original_info = spark.sql(f"DESCRIBE DETAIL land_techtroll_dev.bronze.ais")
        original_size_bytes = original_info.select("sizeInBytes").collect()[0][0]
        original_size_mb = original_size_bytes / (1024 * 1024)
        original_size_kb = original_size_bytes / 1024 

        # Get size of the temporary (filtered) table
        tmp_info = spark.sql(f"DESCRIBE DETAIL land_techtroll_dev.bronze.ais_tmp")
        tmp_size_bytes = tmp_info.select("sizeInBytes").collect()[0][0]
        print(tmp_size_bytes)
        tmp_size_mb = tmp_size_bytes / 1024 

        # Calculate and print the size difference
        size_diff_mb = original_size_kb - tmp_size_mb
        print(f"Original table size: {original_size_mb:.2f} MB")
        print(f"Filtered table size: {tmp_size_mb:.2f} KB")
        print(f"Difference in size: {size_diff_mb:.2f} KB")

        return size_diff_mb

    except AnalysisException as e:
        print(f"[ERROR] Table not found or query failed: {e}")
        return 0.0
    except Exception as e:
        print(f"[ERROR] Unexpected error: {e}")
        return 0.0
    finally:
        # Always drop the temporary table, even if something fails
        try:
            spark.sql(f"DROP TABLE IF EXISTS {tmp_table}")
            print(f"Temporary table '{tmp_table}' dropped.")
        except Exception as drop_err:
            print(f"[WARNING] Could not drop temporary table: {drop_err}")


# Save filtrered df

In [0]:
def save_filtered_df(
    filtered_df: DataFrame,
    filename: str,
    path: str = "/Workspace/Users/andrine.flatby@kartverket.no/techtroll-data-ingestor/src/databricks/nedlastet_filer",
    file_format: str = "csv",
    include_timestamp: bool = False
) -> None:
    """
    Saves a PySpark DataFrame to disk as CSV, Parquet, or JSON, with optional timestamp in filename.

    Parameters:
    filtered_df (DataFrame): The filtered PySpark DataFrame to save.
    filename (str): Base name for the file (without extension).
    path (str): Directory path where the file will be saved.
    file_format (str): File format: 'csv', 'parquet', or 'json'. Default is 'csv'.
    include_timestamp (bool): If True, adds a timestamp to the filename.

    Returns:
    None
    """
    try:
        # Convert to pandas DataFrame
        pandas_df = filtered_df.toPandas()

        # Validate format
        file_format = file_format.lower()
        if file_format not in {"csv", "parquet", "json"}:
            raise ValueError(f"Unsupported file format: {file_format}")

        # Optional timestamp
        if include_timestamp:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"{filename}_{timestamp}"

        # Construct full file path
        full_path = os.path.join(path, f"{filename}.{file_format}")

        # Ensure path exists
        os.makedirs(path, exist_ok=True)

        # Save based on format
        if file_format == "csv":
            pandas_df.to_csv(full_path, index=False)
        elif file_format == "parquet":
            pandas_df.to_parquet(full_path, index=False)
        elif file_format == "json":
            pandas_df.to_json(full_path, orient="records", lines=True)

        print(f"[INFO] File saved to: {full_path}")

    except Exception as e:
        print(f"[ERROR] Failed to save DataFrame: {e}")

# User case examples

## Henting av historiske skipsdata innenfor en gitt radius 

Som havnesjef i Kristiansand kommune ønsker jeg å vite hvilke skip som befant seg innen 6 km fra Kristiansand sentrum på en bestemt dato. 

Akseptkriterier: 
- Bruker skal kunne skrive inn forskjellige datoer. 
- Når testing med forskjellige datoer gir ulik output. 

Input:
- Koordinater (58.103141, 8.034570)  
- Dato (f.eks. 2024-12-28) 
- Radius i meter (f.eks. 6000) 

 

In [0]:
# Clear previous widgets
dbutils.widgets.removeAll()

# Define widgets
dbutils.widgets.text("latitude", "58.103141", "Latitude")
dbutils.widgets.text("longitude", "8.034570", "Longitude")
dbutils.widgets.text("radius", "6000", "Radius (m)")
dbutils.widgets.text("date", "2024-12-28", "Dato (YYYY-MM-DD)")

In [0]:
from datetime import datetime
from pyspark.sql.functions import to_date, col, lit

try:
    # Get and convert input values
    lat = float(dbutils.widgets.get("latitude"))
    lon = float(dbutils.widgets.get("longitude"))
    radius = int(dbutils.widgets.get("radius"))
    date_str = dbutils.widgets.get("date")

    # Validate date
    selected_date = datetime.strptime(date_str, "%Y-%m-%d").date()
    print(f"🔍 Søker etter skip innen {radius} meter fra ({lat}, {lon}) på {selected_date}…")

    # Filter AIS data
    AIS_filtered = AIS_DF.filter(
        to_date(col("date")) == to_date(lit(selected_date))
    )
    AIS_unique = AIS_filtered.dropDuplicates(["mmsi"])

    # Spatial filter
    df, buffer_df = get_geometry_within(AIS_unique, lon, lat, radius)

    if df.count() == 0:
        print("❗ Ingen skip funnet innenfor gitt radius.")
    else:
        limited_df = df.limit(10)
        print(f"✅ Fant {df.count()} skip. Viser 10 første.")
        display(limited_df.toPandas())

        # Show map
        print("🗺️ Viser kart…")
        map_df = SedonaKepler.create_map()
        map_df.add_data(prepare_for_kepler(df).toPandas(), "Skip innen radius")
        map_df.add_data(buffer_df.toPandas(), "Buffer")
        display(map_df)
        difference_between_rows(AIS_DF, df)
        difference_between_size(df)
except Exception as e:
    print(f"❌ Feil under kjøring: {e}")


🔍 Søker etter skip innen 6000 meter fra (58.1467, 7.9956) på 2024-12-28…
✅ Fant 14 skip. Viser 10 første.


Unnamed: 0,mmsi,ship_name,ship_type,date_time_utc,geometry
0,209489000,JOIDES RESOLUTION,0,2024-12-28 05:59:48,POINT (7.98836166 58.140025)
1,257037970,HESTMANDEN,0,2024-12-28 05:55:21,POINT (7.98891166 58.10998666)
2,257042740,SEA PUFFIN 1,47,2024-12-28 05:59:32,POINT (8.03364 58.150175)
3,257076500,LOS113,50,2024-12-28 05:59:58,POINT (7.99569166 58.139405)
4,257130700,MAARTEN,60,2024-12-28 05:10:00,POINT (7.991666 58.141666)
5,257165800,RESCUE BERGESEN D.Y.,54,2024-12-28 05:58:35,POINT (8.03365 58.14987666)
6,257226500,GAMLE OKSOEY,90,2024-12-28 05:57:33,POINT (7.98890166 58.11077166)
7,257376700,MS BRAGDOYA,60,2024-12-28 05:57:20,POINT (7.97109166 58.12357333)
8,257874800,MOLLY,53,2024-12-28 03:30:30,POINT (7.995 58.14)
9,257878000,BOB,52,2024-12-28 05:09:20,POINT (7.99 58.141666)


🗺️ Viser kart…
User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


KeplerGl(data={'Skip innen radius': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13], 'columns': ['mms…

The original DataFrame has 1654172 rows.
The filtered DataFrame has 14 rows.
The difference is 1654158 rows.
2444
Original table size: 45.16 MB
Filtered table size: 2.39 KB
Difference in size: 46241.77 KB
Temporary table 'bronze.ais_tmp' dropped.


## Strømming av skipsdata innenfor en gitt radius 
Som havnesjef i Kristiansand kommune ønsker jeg å vite hvilke passasjerskip (type 60) som befant seg innen 6km fra Kristiansand sentrum i et bestemt tidsrom (simulert sanntid). 

- Bruker skal kunne skrive inn forskjellig avstand for overvåking/visualisering 

- Når testing med forskjellige avstander gir ulik output. 

Input
- Passasjerskip (ship_type = 60) 
- En gitt radius i meter 
- Start time og end time


In [0]:
# Clear existing widgets
dbutils.widgets.removeAll()

# Define input widgets
dbutils.widgets.text("latitude", "58.1467", "Latitude")
dbutils.widgets.text("longitude", "7.9956", "Longitude")
dbutils.widgets.text("radius", "6000", "Radius (m)")

ship_type_rows = AIS_DF.select("ship_type").distinct().collect()
ship_type_list = sorted([str(row["ship_type"]) for row in ship_type_rows])
dbutils.widgets.dropdown("ship_type", ship_type_list[0], ship_type_list)

dbutils.widgets.text("date", "2024-12-28", "Dato (YYYY-MM-DD)")
dbutils.widgets.text("start_time", "12:00", "Starttid (HH:MM)")
dbutils.widgets.text("end_time", "13:00", "Sluttid (HH:MM)")

In [0]:
from datetime import datetime
from pyspark.sql.functions import col, lit, count
from pyspark.sql.types import TimestampType

try:
    # Read inputs
    lat = float(dbutils.widgets.get("latitude"))
    lon = float(dbutils.widgets.get("longitude"))
    radius = int(dbutils.widgets.get("radius"))
    ship_type = dbutils.widgets.get("ship_type")
    date_str = dbutils.widgets.get("date")
    start_time_str = dbutils.widgets.get("start_time")
    end_time_str = dbutils.widgets.get("end_time")

    # Parse datetime
    start_dt = datetime.strptime(f"{date_str} {start_time_str}", "%Y-%m-%d %H:%M")
    end_dt = datetime.strptime(f"{date_str} {end_time_str}", "%Y-%m-%d %H:%M")

    if end_dt <= start_dt:
        raise ValueError("❌ Sluttid må være etter starttid.")

    print(f"🔍 Søker etter AIS-data for {ship_type} den {date_str} mellom {start_time_str} og {end_time_str}...")

    # Filter DataFrame
    AIS_filtered = AIS_DF.filter(
        (col("date") == lit(date_str)) &
        (col("ship_type") == ship_type) &
        (col("date_time_utc") >= lit(start_dt).cast(TimestampType())) &
        (col("date_time_utc") <= lit(end_dt).cast(TimestampType()))
    )

    # Spatial filter
    print("📍 Utfører geografisk søk...")
    df_full, buffer_df = get_geometry_within(AIS_filtered, lon, lat, radius, include_all_columns=True)

    total_count = df_full.count()
    if total_count == 0:
        print("❗ Ingen skip funnet innenfor radius og tidsintervall.")
    else:
        registrations_per_ship = df_full.groupBy("mmsi").agg(count("*").alias("registrations_in_timespan"))
        df_joined = df_full.join(registrations_per_ship, on="mmsi", how="left")

        df_limited = df_joined.limit(10)

        print(f"✅ Funnet {total_count} registreringer. Viser {df_limited.count()} rader.")
        df_limited.display()

        print("🗺️ Viser kart…")
        map_df = SedonaKepler.create_map()
        map_df.add_data(prepare_for_kepler(df_limited).toPandas(), "Skip")  # ✅ use limited DF
        map_df.add_data(buffer_df.toPandas(), "Buffer")
        display(map_df)
        difference_between_rows(AIS_DF, df_joined)
        difference_between_size(df_limited)
except Exception as e:
    print(f"❌ Feil: {e}")


🔍 Søker etter AIS-data for 60 den 2024-12-28 mellom 12:00 og 13:00...
📍 Utfører geografisk søk...
✅ Funnet 895 registreringer. Viser 10 rader.


mmsi,date_time_utc,longitude,latitude,status,course_over_ground,speed_over_ground,rate_of_turn,maneuvre,imo,callsign,ship_name,ship_type,length,draught,data_source,ais_class,hex_7,hex_14,geometry,date,registrations_in_timespan
219348000,2024-12-28T12:59:50Z,7.98709333,58.143015,0,65.0,0.0,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.98709333 58.143015),2024-12-28,492
219348000,2024-12-28T12:59:40Z,7.98709333,58.143015,0,245.0,0.0,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.98709333 58.143015),2024-12-28,492
219348000,2024-12-28T12:59:31Z,7.98709333,58.143015,0,65.0,0.0,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.98709333 58.143015),2024-12-28,492
219348000,2024-12-28T12:59:21Z,7.98709333,58.143015,0,245.0,0.0,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.98709333 58.143015),2024-12-28,492
219348000,2024-12-28T12:59:10Z,7.987095,58.143015,0,110.0,0.1,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.987095 58.143015),2024-12-28,492
219348000,2024-12-28T12:59:01Z,7.98709333,58.143015,0,173.0,0.1,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.98709333 58.143015),2024-12-28,492
219348000,2024-12-28T12:58:50Z,7.987095,58.143015,0,65.0,0.0,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.987095 58.143015),2024-12-28,492
219348000,2024-12-28T12:58:40Z,7.987095,58.143015,0,290.0,0.1,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.987095 58.143015),2024-12-28,492
219348000,2024-12-28T12:58:31Z,7.987095,58.14301666,0,348.0,0.1,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.987095 58.14301666),2024-12-28,492
219348000,2024-12-28T12:58:21Z,7.98709666,58.143015,0,213.0,0.1,0,0,9586617,OYPJ2,BERGENSFJORD,60,170,6.5,G,A,608155172278894591,639680369667918607,POINT (7.98709666 58.143015),2024-12-28,492


🗺️ Viser kart…
User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


KeplerGl(data={'Skip': {'index': [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], 'columns': ['mmsi', 'date_time_utc', 'longitu…

The original DataFrame has 1654172 rows.
The filtered DataFrame has 895 rows.
The difference is 1653277 rows.
7031
Original table size: 45.16 MB
Filtered table size: 6.87 KB
Difference in size: 46237.29 KB
Temporary table 'bronze.ais_tmp' dropped.
