In [1]:
%%capture
%pip install plotly

In [12]:
import plotly.express as px
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.window import Window

In [3]:
# Create SparkSession
spark = SparkSession.builder.getOrCreate()

# Read CSV into DataFrame
df = spark.read.csv('data/aisdk-2023-05-08.csv', header=True)

# Filter rows based on conditions
filtered_df = df.filter((F.col('SOG') == 0) &
                        (F.col('Latitude') > 45) & (F.col('Latitude') < 65) &
                        (F.col('Longitude') > -2) & (F.col('Longitude') < 22))
df.unpersist()
filtered_df = filtered_df.select('# Timestamp', 'MMSI', 'Latitude', 'Longitude')
filtered_df = filtered_df.withColumnRenamed('# Timestamp', 'timestamp')
filtered_df = filtered_df.withColumn('timestamp', F.to_timestamp(F.col('timestamp'),
                                                                 'dd/MM/yyyy HH:mm:ss'))
filtered_df = filtered_df.withColumn('lat', F.round(F.col('Latitude'), 2))
filtered_df = filtered_df.withColumn('long', F.round(F.col('Longitude'), 2))

filtered_df = filtered_df.orderBy('timestamp')

filtered_df = filtered_df.select('timestamp', 'MMSI', 'lat', 'long')

In [4]:
time_window_minutes = 10
required_ship_count = 5

# Create a window specification based on timestamp column
w = Window.partitionBy(F.floor((F.unix_timestamp(F.col("timestamp")) / (time_window_minutes * 60)).cast("long")))

# Assign a window id
filtered_df = filtered_df.withColumn('window_id', 
                                     F.floor((F.unix_timestamp(F.col("timestamp")) /
                                              (time_window_minutes * 60)).cast("long")))

# Drop duplicates and group by lat, long, window_id
window_df = filtered_df.dropDuplicates(subset=['lat', 'long', 'MMSI', 'window_id'])
window_df = window_df.groupBy(['lat', 'long', 'window_id']).count().withColumnRenamed("count", "total_count")

# Filter based on required_ship_count
window_df = window_df.filter(F.col("total_count") >= required_ship_count)
window_df = window_df.drop('window_id')

# Combine counts
result_df = window_df.groupBy("lat", "long").agg(F.sum("total_count").alias("count"))

In [19]:
final_df = result_df.toPandas()
fig = px.scatter_mapbox(final_df, lat="lat", lon="long", size='count',
                        color_discrete_sequence=["fuchsia"], zoom=5, height=500)
fig.update_layout(mapbox_style="open-street-map")
fig.update_layout(margin={"r":0,"t":0,"l":0,"b":0})
fig.show(renderer='iframe')