In [1]:
%pip install boto3

[0mCollecting boto3
  Downloading boto3-1.28.78-py3-none-any.whl.metadata (6.7 kB)
Collecting botocore<1.32.0,>=1.31.78 (from boto3)
  Downloading botocore-1.31.78-py3-none-any.whl.metadata (6.1 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.8.0,>=0.7.0 (from boto3)
  Downloading s3transfer-0.7.0-py3-none-any.whl.metadata (1.8 kB)
Downloading boto3-1.28.78-py3-none-any.whl (135 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m135.8/135.8 kB[0m [31m63.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.31.78-py3-none-any.whl (11.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.3/11.3 MB[0m [31m233.8 MB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
[?25hDownloading s3transfer-0.7.0-py3-none-any.whl (79 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m79.8/79.8 kB[0m [31m239.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packa

In [2]:
# necessary import functions

import time
import pandas as pd
import numpy as np
import datetime
from dateutil.relativedelta import relativedelta
import base64
from IPython.display import HTML

from pyspark.sql import functions as F
from sedona.register import SedonaRegistrator
SedonaRegistrator.registerAll(spark)

True

In [3]:
import boto3
import os

ACCESS_KEY = '' # Removed
SECRET_KEY = '' # Removed

def upload_file_to_s3(query, file_path, bucket_name):
    df = query.toPandas()
    df.to_csv(file_path)
    
    s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
    file_name = os.path.basename(file_path)
    try:
        s3.upload_file(file_name, bucket_name, file_name)
        print(f"File {file_name} uploaded to {bucket_name}.")
    except Exception as e:
        print(f"Error uploading file {file_name} to {bucket_name}: {e}")

In [4]:
# helper functions
# save a local CSV from the notebook
def create_download_link(query, title="Download CSV file", filename="data.csv"):
    start_time = time.monotonic()
    df = query.toPandas()
    csv = df.to_csv()
    # with open(filename, "w", encoding="utf-8") as fout:
    #     print(csv, file=fout)
    b64 = base64.b64encode(csv.encode())
    payload = b64.decode()
    html = '<a download="{filename}" href="data:text/csv;base64,{payload}" target="_blank">{title}</a>'
    html = html.format(payload=payload, title=title, filename=filename)
    display(f"{time.monotonic() - start_time}s")
    return HTML(html)

# read in UNGP S3 data from a range of dates
def get_date_list(basepath, start_date, end_date):
    start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
    end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
    delta = end_date - start_date
    days = []
    for i in range(delta.days + 1):
        day = start_date + datetime.timedelta(days=i)
        days.append(datetime.datetime.strftime(day, "%Y-%m-%d"))
    
    paths = [basepath + f"year={day[:4]}/month={day[5:7]}/day={day[8:10]}" for day in days]
    return (paths)

In [5]:
# Entry Point of Suez Canal, Egypt	30.5852 N	32.2650 E
# Exit Point of Suez Canal, Egypt	29.9636 N	32.5618 E
# Bosphorus,strait,TUR,41.11833286,29.07183305
# Suez mid 30.443370,32.355877
# geographic locations
# locations = pd.read_csv("https://github.com/dhopp1-UNCTAD/ais_helper_files/raw/main/geographic_locations.csv")
# locations = pd.read_csv("https://raw.githubusercontent.com/UNECE/AIS/master/wpi_12nm_bounding_box_port.csv")
locations = pd.DataFrame({
    "name": [
        "Suez (EG)",
        "Bosphorus (TR)",
    ],
    "longitude": [
        32.355877,
        29.07183305,
    ],
    "latitude": [
        30.443370,
        41.11833286,
    ],
})

In [6]:
locs = locations.copy()

In [7]:
def get_data(start_date, end_date, locations, distance_parameter = "0.05"):
    # distance parameter = 0.01 = 1 kilometer radius
    # distance parameter = 0.05 = 5 kilometer radius
    # distance parameter = 0.3 = 30 kilometer radius?
    
    # suez bbox
    # latitude min 29.9 max 30.6
    # longitude min 32 max 33
    bbox_0_lat_min = 29.9
    bbox_0_lat_max = 30.6
    bbox_0_lon_min = 32
    bbox_0_lon_max = 33
    
    # bosphorus bbox
    # latitude min 41 max 41.2
    # longitude min 28.95 max 29.2
    bbox_1_lat_min = 41
    bbox_1_lat_max = 41.2
    bbox_1_lon_min = 28.95
    bbox_1_lon_max = 29.2

    # all geographies in one query
    condition_string = ""
    select_string = ""
    pos = "pos"
    for name_i in locations.name:
        name_s = name_i.replace('\'', '')
        condition_string += f"""ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), {pos})"""
        if name_i != locations.name.values[-1]:
            condition_string += " OR "
        if name_i == locations.name.values[0]:
            select_string += f"""CASE WHEN ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), {pos}) THEN '{name_s}' """
        elif name_i != locations.name.values[-1]:
            select_string += f"""WHEN ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), {pos}) THEN '{name_s}' """
        else:
            select_string += f"""WHEN ST_Contains(ST_Buffer(ST_Point({locations.loc[locations.name == name_i, 'longitude'].values[0]}, {locations.loc[locations.name == name_i, 'latitude'].values[0]}), {distance_parameter}), {pos}) THEN '{name_s}' """
            select_string += "END AS geo_name"
    
    # step 1
    # read data
    basepath = "s3a://ungp-ais-data-historical-backup/exact-earth-data/transformed/prod/"
    dates = get_date_list(basepath, start_date, end_date)
    df = spark.read.parquet(*dates)

    # create temp view to be able to use spark SQL
    df.createOrReplaceTempView("df")
    
    # print(spark.sql("SELECT * FROM df LIMIT 1").toPandas().values.tolist())
    # print(spark.sql("SELECT * FROM df LIMIT 1").toPandas().columns.tolist())

    # adding points and filtering for cargo and tankers
    step_01 = spark.sql(f"""
                    SELECT DISTINCT vessel_type, mmsi, date_year, date_month, {select_string} FROM
                    (
                    SELECT vessel_type, mmsi, date_year, date_month, ST_Point(lon, lat) as pos FROM
                    (
                        SELECT DISTINCT
                            YEAR(dt_pos_utc) as date_year, 
                            MONTH(dt_pos_utc) AS date_month,
                            mmsi, 
                            vessel_type,
                            cast(longitude as Decimal(6,3)) as lon,
                            cast(latitude as Decimal(6,3)) as lat
                        FROM df
                        WHERE vessel_type IN ('Cargo','Tanker')
                    ) AS subquery
                    WHERE
                       (lon >= {bbox_0_lon_min} AND lon <= {bbox_0_lon_max} AND lat >= {bbox_0_lat_min} AND lat <= {bbox_0_lat_max})
                    OR (lon >= {bbox_1_lon_min} AND lon <= {bbox_1_lon_max} AND lat >= {bbox_1_lat_min} AND lat <= {bbox_1_lat_max})
                    ) AS subsubquery
                    WHERE {condition_string}
                    """)
    return (step_01)

In [50]:
# queries for months
# start_month = datetime.datetime.strptime("2018-12-01", "%Y-%m-%d")
# end_month = datetime.datetime.strptime("2021-07-01", "%Y-%m-%d")

#start_month = datetime.datetime.strptime("2021-08-01", "%Y-%m-%d")
start_month = datetime.datetime.strptime("2021-08-01", "%Y-%m-%d")
end_month = datetime.datetime.strptime("2021-08-01", "%Y-%m-%d")

# start_month = datetime.datetime.strptime("2022-11-01", "%Y-%m-%d")
# end_month = datetime.datetime.strptime("2023-10-01", "%Y-%m-%d")

start_dates = []
end_dates = []

while start_month <= end_month:
    start_dates.append(datetime.datetime.strftime(start_month, "%Y-%m-%d"))
    end_date = min(start_month + relativedelta(months=1) - relativedelta(days = 1), datetime.datetime.today() - relativedelta(days=2)) # minimum between 2 days ago so don't go ahead of where there are actually files
    end_dates.append(datetime.datetime.strftime(end_date, "%Y-%m-%d"))
    start_month = start_month + relativedelta(months=1)

date_dict = {f"{x}": None for x in start_dates}

for i in range(len(start_dates)):
    date_dict[f"{start_dates[i]}"] = get_data(start_dates[i], end_dates[i], locs)

In [9]:
prefix = "strait_ships"

In [48]:
quick = []
quick += ['2021-07-01','2021-08-01','2021-09-01'] # These dates give an error

In [11]:
#for date_str in quick:
#    display((prefix, date_str))
#    #upload_file_to_s3(date_dict[date_str], f"{prefix}_{date_str}.csv", 'un-mercatorians')
#    display(create_download_link(date_dict[date_str], filename=f"{prefix}_{date_str}.csv"))

In [51]:
for date_str, query in date_dict.items():
    if date_str in quick:
        continue
    display((prefix, date_str))
    upload_file_to_s3(query, f"{prefix}_{date_str}.csv", 'un-mercatorians')
    #display(create_download_link(query, filename=f"{prefix}_{date_str}.csv"))

('strait_ships', '2021-08-01')

Py4JJavaError: An error occurred while calling o1169.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 26 in stage 476.0 failed 4 times, most recent failure: Lost task 26.3 in stage 476.0 (TID 36895, 192.168.37.204, executor 6): scala.MatchError: null
	at org.apache.spark.sql.sedona_sql.expressions.ST_Point.eval(Constructors.scala:230)
	at org.apache.spark.sql.sedona_sql.expressions.ST_Contains.eval(Predicates.scala:47)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
	at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:233)
	at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:232)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: scala.MatchError: null
	at org.apache.spark.sql.sedona_sql.expressions.ST_Point.eval(Constructors.scala:230)
	at org.apache.spark.sql.sedona_sql.expressions.ST_Contains.eval(Predicates.scala:47)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate.eval(Unknown Source)
	at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3(basicPhysicalOperators.scala:233)
	at org.apache.spark.sql.execution.FilterExec.$anonfun$doExecute$3$adapted(basicPhysicalOperators.scala:232)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:513)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:127)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:462)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Unknown Source)


In [None]:
#lcsv = locs[["name", "longitude", "latitude"]].to_csv()
#lb64 = base64.b64encode(lcsv.encode())
#lpayload = lb64.decode()
#lhtml = f'<a download="locs.csv" href="data:text/csv;base64,{lpayload}" target="_blank">locs_straights.csv</a>'
#display(HTML(lhtml))