# Notes

Aggregate points into routes (stop/move, buffer) per Unique ship identifier (imo x mmsi)

stop is SOG<1
buffer are 22KM polygons, with overlaps combined into one polygon

if multiple port, get nearest port

Final output is save at `ki/global_routes_all`. one month is ~35Mn rows

# Initializers

In [68]:
from ais import functions as af
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [69]:
import pandas as pd
import numpy as np

In [70]:
import h3.api.numpy_int as h3int

In [71]:
pd.set_option('display.max_columns', None) #Show all columns in pandas df
pd.set_option('display.max_rows', 100) #Show 100 rows in pandas df
pd.options.display.float_format = '{:.10f}'.format #Show float with 10 decimal points in pandas df

from IPython.core.interactiveshell import InteractiveShell #allow multiple outputs in one jupyter cell
InteractiveShell.ast_node_interactivity = "all"

# Variables

In [72]:
ais_cols = ['mmsi', 'imo',
 'dt_pos_utc',
 'dt_static_utc',
 'longitude',
 'latitude',
 'vessel_name',
 'vessel_type',
 'vessel_class',
 'destination',
 'draught',
 'sog',
 'nav_status',
 'H3_int_index_8',
 'H3_int_index_9',
 'H3_int_index_12',
]

In [73]:
start_date_range = pd.date_range("2022-01-01","2022-12-31", freq="MS")
end_date_range = pd.date_range("2022-01-01","2022-12-31", freq="M")

In [74]:
bucket = "ungp-ais-data-historical-backup"
path = f"s3a://{bucket}/user_temp/adb/"

# Global Ports

ensure no duplicated h3 indices otherwise this will duplicate the AIS rows

In [75]:
poly_sdf = spark.read.parquet(f"{path}ki/global_polygon/")
poly_sdf.printSchema()

root
 |-- buffer_grouped_id: long (nullable = true)
 |-- H3_int_index_8: decimal(20,0) (nullable = true)
 |-- port_id: double (nullable = true)
 |-- port_id_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- port_count: long (nullable = true)
 |-- passage_part_id: long (nullable = true)
 |-- passage_id: long (nullable = true)



In [76]:
poly_sdf.count()
poly_sdf.select(F.countDistinct("H3_int_index_8")).show()

7694797

+------------------------------+
|count(DISTINCT H3_int_index_8)|
+------------------------------+
|                       7694797|
+------------------------------+



In [77]:
port_sdf = spark.read.parquet(f"{path}ki/global_point/")
port_sdf.printSchema()
port_sdf.count()

root
 |-- port_id: double (nullable = true)
 |-- H3_int_index_8: long (nullable = true)
 |-- H3_int_index_9: long (nullable = true)
 |-- H3_int_index_10: long (nullable = true)
 |-- H3_int_index_11: long (nullable = true)
 |-- H3_int_index_12: long (nullable = true)
 |-- __index_level_0__: long (nullable = true)



122

In [78]:
# HHH nyimpan ke cache
port_sdf.cache()

DataFrame[port_id: double, H3_int_index_8: bigint, H3_int_index_9: bigint, H3_int_index_10: bigint, H3_int_index_11: bigint, H3_int_index_12: bigint, __index_level_0__: bigint]

In [79]:
# Jadi, secara keseluruhan, kode ini menghitung jumlah nilai unik dalam beb
# erapa kolom dari DataFrame port_sdf dan menampilkan hasilnya dengan memberikan alias 
# "8", "9", "10", "11", dan "12" masing-masing. Ini memberikan wawasan tentang seberapa 
# banyak nilai unik yang terdapat dalam masing-masing kolom yang dihitung.

In [80]:
port_sdf.agg(F.countDistinct("H3_int_index_8").alias("8"),
             F.countDistinct("H3_int_index_9").alias("9"),
             F.countDistinct( "H3_int_index_10").alias("10"),
             F.countDistinct("H3_int_index_11").alias("11"),
             F.countDistinct("H3_int_index_12").alias("12"),
            ).show()

+---+---+---+---+---+
|  8|  9| 10| 11| 12|
+---+---+---+---+---+
|122|122|122|122|122|
+---+---+---+---+---+



In [81]:
# Jadi, secara keseluruhan, baris kode tersebut menghitung jumlah nilai unik yang muncul dari kombinasi 
# nilai dalam kolom "H3_int_index_8" dan "H3_int_index_9" dari DataFrame port_sdf. Hasilnya memberikan
# informasi tentang seberapa banyak kombinasi unik yang terdapat dalam kedua kolom tersebut.

In [82]:
port_sdf.select("H3_int_index_8","H3_int_index_9").distinct().count()

122

In [83]:
#there are port ids sharing the same indices. Choose H3_int_index_9, 
#port ids with same index to be investigated later

# Jadi, secara keseluruhan, baris kode tersebut menampilkan nilai maksimum dari kolom "port_id" 
# dalam DataFrame port_sdf. Ini berguna untuk mengetahui nilai maksimum dalam kolom tersebut.

In [84]:
port_sdf.select(F.max("port_id")).show()

+------------+
|max(port_id)|
+------------+
|     53133.0|
+------------+



In [27]:
# temp = port_sdf.select("port_id").distinct().toPandas()

# Build Function

## manual call 

In [85]:
i = 0
start_date = start_date_range[i]
end_date = end_date_range[i]
# start_date = "2019-01-01"
# end_date = "2019-01-07"
print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

'2022-01-01':'2022-01-31'


In [66]:
# Jadi, secara keseluruhan, kode ini membentuk DataFrame sdf dari data AIS, menambahkan kolom "stopped" 
# berdasarkan kondisi pada kolom "sog", mengisi nilai null dalam kolom "imo" dan "mmsi" dengan 0, dan mencetak 
# jumlah total baris DataFrame tersebut.

In [86]:
sdf = af.get_ais(spark, start_date = start_date, end_date = end_date, columns=ais_cols) \
        .withColumn("stopped", F.when(F.col("sog") < 1,F.lit(1)).otherwise(F.lit(0))) \
        .na.fill(0,["imo","mmsi"]) 

n = sdf.count()
print(f"Raw: {n:,}")

Raw: 708,862,027


In [87]:
# cek isi datanya 10 teratas. terlihat ada atribut baru ditambahkan yaitu stopped
sdf.show(10)

+---------+-------+-------------------+-------------------+------------+-----------+----------------+-----------+------------+-----------+-------+----+--------------------+------------------+------------------+------------------+-------+
|     mmsi|    imo|         dt_pos_utc|      dt_static_utc|   longitude|   latitude|     vessel_name|vessel_type|vessel_class|destination|draught| sog|          nav_status|    H3_int_index_8|    H3_int_index_9|   H3_int_index_12|stopped|
+---------+-------+-------------------+-------------------+------------+-----------+----------------+-----------+------------+-----------+-------+----+--------------------+------------------+------------------+------------------+-------+
|205654000|9691279|2022-01-01 21:17:43|2022-01-01 21:14:24|     3.20316|51.32248833|            DN97|Port Tender|           A|  ZEEBRUGGE|    1.2| 0.0|         Not Defined|612934698542301183|617438298168098815|630949097050129919|      1|
|533170107|9629237|2022-01-01 23:39:13|2022-01-0

In [32]:
# Dengan demikian, kode ini menggabungkan sdf dan poly_sdf berdasarkan kolom "H3_int_index_8" dengan left join, dan kemudian mencetak jumlah 
# baris hasil gabungan serta skema DataFrame hasil gabungan.

In [88]:
#attach polygon
sdf_wpoly = sdf.join(poly_sdf.drop("poly_id"),
                     on = "H3_int_index_8",
                     how='left'
                    )
#should be same as raw
n = sdf_wpoly.count()
print(f"Poly attached, should be same as raw: {n:,}")
sdf_wpoly.printSchema()

Poly attached, should be same as raw: 708,862,027
root
 |-- H3_int_index_8: long (nullable = true)
 |-- mmsi: integer (nullable = true)
 |-- imo: integer (nullable = true)
 |-- dt_pos_utc: timestamp (nullable = true)
 |-- dt_static_utc: timestamp (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- vessel_type: string (nullable = true)
 |-- vessel_class: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- draught: double (nullable = true)
 |-- sog: double (nullable = true)
 |-- nav_status: string (nullable = true)
 |-- H3_int_index_9: long (nullable = true)
 |-- H3_int_index_12: long (nullable = true)
 |-- stopped: integer (nullable = false)
 |-- buffer_grouped_id: long (nullable = true)
 |-- port_id: double (nullable = true)
 |-- port_id_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- port_count: long (nullable = true)
 |-- passage_part_

In [89]:
# Jadi, keseluruhan, kode ini mengidentifikasi dan mencetak jumlah hexes dalam DataFrame sdf_wpoly yang memiliki lebih dari satu polygon 
# (dengan nilai "port_count" lebih dari 1) dan menampilkan jumlahnya.

In [90]:
sdf_overlap = sdf_wpoly.filter(F.col("port_count") > 1) \
         .select("H3_int_index_8","port_id_list").distinct()
print(f"Hexes with multiple polygons: {sdf_overlap.count():,}")

Hexes with multiple polygons: 497,985


In [36]:
sdf_overlap.cache()

DataFrame[H3_int_index_8: bigint, port_id_list: array<double>]

In [43]:
# melihat 10 data teratas
sdf_overlap.show(10)

+------------------+--------------------+
|    H3_int_index_8|        port_id_list|
+------------------+--------------------+
|612929995160944639|  [33340.0, 33290.0]|
|613212096108167167|[16140.0, 16145.0...|
|613231725138411519|    [8120.0, 7200.0]|
|613301768628994047|  [61340.0, 61350.0]|
|613305501177872383|[61547.0, 61555.0...|
|613481690142081023|  [39260.0, 39250.0]|
|613601501947887615|[40245.0, 40240.0...|
|613680102097551359|[48272.0, 48271.0...|
|613957316510744575|[48343.0, 48342.0...|
|614320665788415999|  [51646.0, 51645.0]|
+------------------+--------------------+
only showing top 10 rows



Note:

given point
```python
>>>x = 104.01716666666668
>>>y = 1.2565633333333333
>>>h3int.geo_to_h3(y,x, 9)
618772617860481023
>>>h3int.geo_to_h3(y,x, 8)
614269018274004991
```
BUT
```python
>>>h3int.h3_to_parent(618772617860481023, 8)
>>>614269018234159103
>>>h3int.h3_to_parent(h3int.geo_to_h3(y,x, 9), 8) == h3int.geo_to_h3(y,x, 8)
>>>False
```

In [46]:
@F.udf(returnType=IntegerType()) 
def h3_distance(h3_1, h3_2): 
    return h3int.h3_distance(h3_1, h3_2)

In [50]:
sdf_ports_ind_dist = sdf_overlap \
         .withColumn("port_id",F.explode("port_id_list")) \
         .join(port_sdf.select("port_id","H3_int_index_8")
                   .withColumnRenamed("H3_int_index_8","H3_int_index_8_ind"),
               how="left",
               on="port_id"
              ) \
         .withColumn("h3_distance",h3_distance(F.col("H3_int_index_8"), F.col("H3_int_index_8_ind"))) 
sdf_ports_ind_dist.count()

1340497

In [40]:
sdf_ports_ind_dist.printSchema()

root
 |-- port_id: double (nullable = true)
 |-- H3_int_index_8: long (nullable = true)
 |-- port_id_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- H3_int_index_8_ind: long (nullable = true)
 |-- h3_distance: integer (nullable = true)



In [49]:
# HHH hapus cache
# sdf_ports_ind_dist.unpersist()

DataFrame[port_id: double, H3_int_index_8: bigint, port_id_list: array<double>, H3_int_index_8_ind: bigint, h3_distance: int]

In [53]:
sdf_ports_nearest =  sdf_ports_ind_dist.groupBy("H3_int_index_8").agg(F.min("h3_distance").alias("h3_distance")) \
                                        .join(sdf_ports_ind_dist.select("port_id","H3_int_index_8","h3_distance"),
                                              how="left",
                                              on=['H3_int_index_8','h3_distance']) \
                                        .groupBy("H3_int_index_8").agg(F.collect_set("port_id").alias("nearest_port_id_list"),
                                                                       F.count(F.lit(1)).alias("nearest_port_count"))
                                              
sdf_ports_nearest.count()
# sdf_ports_nearest.show(n=10)

497985

In [None]:
# temp = sdf_ports_nearest.filter(F.col("nearest_port_count") > 1).select("nearest_port_id_list").distinct().toPandas()
# temp = temp.explode("nearest_port_id_list").reset_index()

In [54]:
# temp[temp.duplicated('nearest_port_id_list',keep=False)].sort_values("nearest_port_id_list").head(100)

In [57]:
sdf_ports_nearest.unpersist()

DataFrame[H3_int_index_8: bigint, nearest_port_id_list: array<double>, nearest_port_count: bigint]

In [91]:
sdf_ports_nearest.filter(F.col("nearest_port_count")>1).count()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_51/2127951053.py", line 3, in h3_distance
  File "/opt/conda/lib/python3.8/site-packages/h3/api/_api_template.py", line 272, in h3_distance
    d = _cy.distance(h1, h2)
  File "cells.pyx", line 47, in h3._cy.cells.distance
TypeError: an integer is required


DataFrame[H3_int_index_8: bigint, nearest_port_id_list: array<double>, nearest_port_count: bigint]

In [92]:
sdf_wpoly_stopped = sdf_wpoly.join(sdf_ports_nearest,
               on = ["H3_int_index_8"],
               how="left"
              ) \
         .withColumn("nearest_port", F.when(F.col("nearest_port_count")==1,F.col("nearest_port_id_list").getItem(1))) \
         .withColumn("nearest_ports", F.when(F.col("nearest_port_count")>1,F.concat_ws(",",F.col("nearest_port_id_list"))) \
                                        .otherwise(",")) \
         .withColumn("port_id", F.coalesce("port_id","nearest_port")) \
         .drop("nearest_port","nearest_port_id_list") \
         .na.fill(-1, ["port_id","passage_part_id","passage_id"]) \
         .withColumn("poly",F.concat_ws(":",
                                        F.col("stopped"),
                                        F.col("port_id"),
                                        F.col("nearest_ports"),
                                        F.col("passage_part_id"),
                                        F.col("passage_id")
                                       )
                    )

#should be same as original
sdf_wpoly_stopped.count()
sdf_wpoly_stopped.printSchema()
#1022884130

708862027

root
 |-- H3_int_index_8: long (nullable = true)
 |-- mmsi: integer (nullable = true)
 |-- imo: integer (nullable = true)
 |-- dt_pos_utc: timestamp (nullable = true)
 |-- dt_static_utc: timestamp (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- vessel_type: string (nullable = true)
 |-- vessel_class: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- draught: double (nullable = true)
 |-- sog: double (nullable = true)
 |-- nav_status: string (nullable = true)
 |-- H3_int_index_9: long (nullable = true)
 |-- H3_int_index_12: long (nullable = true)
 |-- stopped: integer (nullable = false)
 |-- buffer_grouped_id: long (nullable = true)
 |-- port_id: double (nullable = false)
 |-- port_id_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- port_count: long (nullable = true)
 |-- passage_part_id: long (nullable = true)
 |-- passage_id: long 

In [65]:
sdf_wpoly_stopped.select("poly","stopped","port_id","nearest_ports","passage_part_id","passage_id").show(n=10)

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_51/2127951053.py", line 3, in h3_distance
  File "/opt/conda/lib/python3.8/site-packages/h3/api/_api_template.py", line 272, in h3_distance
    d = _cy.distance(h1, h2)
  File "cells.pyx", line 47, in h3._cy.cells.distance
TypeError: an integer is required


In [61]:
sdf_routes = af.assign_route(sdf_wpoly_stopped, 
                             ship_unique_identifier_cols = ['mmsi','imo'],
                             route_order_by_cols = ['dt_pos_utc', 'dt_static_utc'],
                             polygon_col_name = "poly"
                            )
sdf_routes.printSchema()

root
 |-- H3_int_index_8: long (nullable = true)
 |-- mmsi: integer (nullable = true)
 |-- imo: integer (nullable = true)
 |-- dt_pos_utc: timestamp (nullable = true)
 |-- dt_static_utc: timestamp (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- vessel_type: string (nullable = true)
 |-- vessel_class: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- draught: double (nullable = true)
 |-- sog: double (nullable = true)
 |-- nav_status: string (nullable = true)
 |-- H3_int_index_9: long (nullable = true)
 |-- H3_int_index_12: long (nullable = true)
 |-- stopped: integer (nullable = false)
 |-- buffer_grouped_id: long (nullable = true)
 |-- port_id: double (nullable = false)
 |-- port_id_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- port_count: long (nullable = true)
 |-- passage_part_id: long (nullable = true)
 |-- passage_id: long 

In [63]:
# unique_group_cols = ['imo','mmsi','route_group']
# order_by_cols = "dt_pos_utc"
# last_cols = ["dt_pos_utc","draught","nav_status","destination"]
# first_cols = ["dt_pos_utc","buffer_grouped_id","port_id","nearest_ports","vessel_type",
#               "vessel_name","latitude","longitude","H3_int_index_8","H3_int_index_12",
#              "draught","nav_status","destination"]
# min_max_avg_cols = ["sog"]
# count_cols

sdf_agg = sdf_routes.groupBy('imo','mmsi','route_group').agg(F.max("dt_pos_utc").alias("departure_dt_pos_utc"),
                                                F.min("dt_pos_utc").alias("arrival_dt_pos_utc"),
                                                F.min_by("stopped","dt_pos_utc").alias("stopped"),
                                                F.min_by("buffer_grouped_id","dt_pos_utc").alias("buffer_grouped_id"),
                                                F.min_by("port_id","dt_pos_utc").alias("port_id"),
                                                F.min_by("nearest_ports","dt_pos_utc").alias("nearest_ports_id"),
                                                F.min_by("passage_part_id","dt_pos_utc").alias("passage_part_id"),
                                                F.min_by("passage_id","dt_pos_utc").alias("passage_id"),             
                                                F.min_by("vessel_type","dt_pos_utc").alias("vessel_type"),
                                                F.min_by("vessel_name","dt_pos_utc").alias("vessel_name"),
                                                F.min_by("latitude","dt_pos_utc").alias("latitude"),
                                                F.min_by("longitude","dt_pos_utc").alias("longitude"),
                                                F.min_by("H3_int_index_8","dt_pos_utc").alias("H3_int_index_8"),
                                                F.min_by("H3_int_index_12","dt_pos_utc").alias("H3_int_index_12"),
                                                F.min_by("draught","dt_pos_utc").alias("arrival_draught"),
                                                F.min_by("nav_status","dt_pos_utc").alias("arrival_nav_status"),
                                                F.min_by("destination","dt_pos_utc").alias("arrival_destination"),
                                                F.max_by("draught","dt_pos_utc").alias("departure_draught"),
                                                F.max_by("nav_status","dt_pos_utc").alias("departure_nav_status"),
                                                F.max_by("destination","dt_pos_utc").alias("departure_destination"),                                                F.mean("sog").alias("mean_sog"),
                                                F.max("sog").alias("max_sog"),
                                                F.min("sog").alias("min_sog"),
                                                F.count("mmsi").alias("count_ais")
                                                            )

In [64]:
print(f"grouped {sdf_agg.count():,}")
print(f"grouped distinct {sdf_agg.distinct().count():,}")

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_51/2127951053.py", line 3, in h3_distance
  File "/opt/conda/lib/python3.8/site-packages/h3/api/_api_template.py", line 272, in h3_distance
    d = _cy.distance(h1, h2)
  File "cells.pyx", line 47, in h3._cy.cells.distance
TypeError: an integer is required


In [None]:
sdf_routes.select('imo','mmsi','route_group').distinct().count()

In [70]:
def save_routes(sdf, mode, path):
    sdf.withColumn("date", F.coalesce("arrival_dt_pos_utc","departure_dt_pos_utc")) \
        .withColumn("year", F.year("date")) \
        .withColumn("month", F.month("date")) \
        .repartition(1,"year","month") \
        .write \
        .mode(mode) \
        .partitionBy("year","month") \
        .parquet(path)
    print("Saved")
    return None

In [71]:
save_routes(sdf_agg.distinct(), 
            mode="overwrite", 
            path=f"{path}ki/global_routes_all/")

Py4JJavaError: An error occurred while calling o894.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:638)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:278)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:793)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 195.0 failed 4 times, most recent failure: Lost task 0.3 in stage 195.0 (TID 3321) (192.168.93.235 executor 24): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_51/2127951053.py", line 3, in h3_distance
  File "/opt/conda/lib/python3.8/site-packages/h3/api/_api_template.py", line 272, in h3_distance
    d = _cy.distance(h1, h2)
  File "cells.pyx", line 47, in h3._cy.cells.distance
TypeError: an integer is required

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1508)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_51/2127951053.py", line 3, in h3_distance
  File "/opt/conda/lib/python3.8/site-packages/h3/api/_api_template.py", line 272, in h3_distance
    d = _cy.distance(h1, h2)
  File "cells.pyx", line 47, in h3._cy.cells.distance
TypeError: an integer is required

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:86)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1508)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


## Check

In [119]:
temp2 = sdf_routes.filter((F.col("imo")==9703174) & (F.col("mmsi")==710001788))
temp2.count()

36555

In [120]:
sdf_routes.printSchema()

root
 |-- H3_int_index_8: long (nullable = true)
 |-- mmsi: integer (nullable = true)
 |-- imo: integer (nullable = true)
 |-- dt_pos_utc: timestamp (nullable = true)
 |-- dt_static_utc: timestamp (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- vessel_type: string (nullable = true)
 |-- vessel_class: string (nullable = true)
 |-- destination: string (nullable = true)
 |-- draught: double (nullable = true)
 |-- sog: double (nullable = true)
 |-- nav_status: string (nullable = true)
 |-- H3_int_index_9: long (nullable = true)
 |-- H3_int_index_12: long (nullable = true)
 |-- stopped: integer (nullable = false)
 |-- buffer_grouped_id: long (nullable = true)
 |-- port_id: double (nullable = false)
 |-- port_id_list: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- port_count: long (nullable = true)
 |-- passage_part_id: long (nullable = true)
 |-- passage_id: long (n

In [121]:
temp2=temp2.toPandas()
temp2.sort_values("dt_pos_utc", inplace=True, ignore_index=True)

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [127]:
temp2[~temp2.nearest_port_count.isnull()]

Unnamed: 0,H3_int_index_8,mmsi,imo,dt_pos_utc,dt_static_utc,longitude,latitude,vessel_name,vessel_type,vessel_class,destination,draught,sog,nav_status,H3_int_index_9,H3_int_index_12,stopped,buffer_grouped_id,port_id,port_id_list,port_count,passage_part_id,passage_id,nearest_port_count,nearest_ports,poly,route_group


In [140]:
sdf_agg = spark.read.parquet(f"{path}ki/global_routes_all/")

In [146]:
temp = sdf_agg.filter((F.col("vessel_type")=="Cargo") & (F.col("nearest_ports_id")!= ",")).groupBy("imo").count().toPandas()

In [148]:
temp.sort_values("count", ascending=False)

Unnamed: 0,imo,count
1336,0,149515
8345,9739642,1094
15859,9701190,716
4974,9616838,682
2113,9127540,594
...,...,...
8019,201051011,1
8016,8949056,1
1640,9584229,1
8009,9735191,1


In [150]:
temp = sdf_agg.filter(F.col("imo")==9739642).toPandas()
# temp = temp[temp.mmsi==710001788].sort_values("arrival_dt_pos_utc", ignore_index=True)

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [152]:
temp.mmsi.value_counts()

431005799    2360
257551600       1
367661820       1
Name: mmsi, dtype: int64

In [154]:
temp.sort_values("arrival_dt_pos_utc", ignore_index=True, inplace=True)

In [166]:
# sdf_agg.filter(F.col("passage_id") > -1).show(n=10)

+---+---------+-----------+--------------------+-------------------+-------+-----------------+-------+----------------+---------------+----------+-----------+-----------+------------------+-------------------+------------------+------------------+---------------+--------------------+-------------------+-----------------+--------------------+---------------------+-------------------+-------+-------+---------+-------------------+----+-----+
|imo|     mmsi|route_group|departure_dt_pos_utc| arrival_dt_pos_utc|stopped|buffer_grouped_id|port_id|nearest_ports_id|passage_part_id|passage_id|vessel_type|vessel_name|          latitude|          longitude|    H3_int_index_8|   H3_int_index_12|arrival_draught|  arrival_nav_status|arrival_destination|departure_draught|departure_nav_status|departure_destination|           mean_sog|max_sog|min_sog|count_ais|               date|year|month|
+---+---------+-----------+--------------------+-------------------+-------+-----------------+-------+------------

In [163]:
temp[['arrival_dt_pos_utc','departure_dt_pos_utc','stopped','count_ais','port_id','nearest_ports_id','passage_part_id','passage_id','latitude','longitude','arrival_destination','departure_destination']].iloc[200:300]

Unnamed: 0,arrival_dt_pos_utc,departure_dt_pos_utc,stopped,count_ais,port_id,nearest_ports_id,passage_part_id,passage_id,latitude,longitude,arrival_destination,departure_destination
200,2019-01-02 15:03:58,2019-01-02 15:09:57,1,2,-1.0,34403322,-1,-1,34.0516666667,131.7833333333,>JP TMI,>JP TMI
201,2019-01-02 15:13:21,2019-01-02 15:13:21,1,1,-1.0,",",-1,-1,34.052995,131.7839733333,>JP TMI,>JP TMI
202,2019-01-02 15:24:57,2019-01-02 15:24:57,1,1,-1.0,34403322,-1,-1,34.0516666667,131.7833333333,>JP TMI,>JP TMI
203,2019-01-02 15:25:20,2019-01-02 15:25:20,1,1,-1.0,",",-1,-1,34.0529783333,131.78395,>JP TMI,>JP TMI
204,2019-01-02 15:27:58,2019-01-02 15:33:58,1,2,-1.0,34403322,-1,-1,34.0516666667,131.7833333333,>JP TMI,>JP TMI
205,2019-01-02 15:37:18,2019-01-02 16:13:20,1,4,-1.0,",",-1,-1,34.0530283333,131.7839316667,>JP TMI,>JP TMI
206,2019-01-02 16:20:18,2019-01-02 16:20:18,1,1,-1.0,34403322,-1,-1,34.0516666667,131.7833333333,>JP TMI,>JP TMI
207,2019-01-02 16:25:19,2019-01-02 16:25:19,1,1,-1.0,",",-1,-1,34.052995,131.7839633333,>JP TMI,>JP TMI
208,2019-01-02 16:29:19,2019-01-02 16:35:18,0,3,-1.0,34403322,-1,-1,34.0516666667,131.7833333333,>JP TMI,>JP TMI
209,2019-01-02 16:38:18,2019-01-02 17:32:18,0,16,-1.0,",",-1,-1,34.04,131.7833333333,>JP TMI,>JP TMI


# Call 

## Functions

In [16]:
def create_routes(start_date, 
                  end_date, 
                  poly_sdf,
                  port_sdf, 
                  columns, 
                  stats=True,
                  return_sdf = False):
    
    print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")
    
        
    sdf = af.get_ais(spark, start_date = start_date, end_date = end_date, columns=ais_cols) \
        .withColumn("stopped", F.when(F.col("sog") < 1,F.lit(1)).otherwise(F.lit(0))) \
        .na.fill(0,"imo") 
    
    
    if stats:
        print(f"Raw: {sdf.count():,}")
    
    #attach polygon
    sdf_wpoly = sdf.join(poly_sdf.drop("poly_id"),
                     on = "H3_int_index_8",
                     how='left'
                    )

    if stats:
        print(f"Poly attached, should be same as raw: {n:,}")
        
    #get polygons with overlap
    
    sdf_overlap = sdf_wpoly.filter(F.col("port_count") > 1) \
                     .select("H3_int_index_8","port_id_list").distinct()
    
    if stats:
        print(f"Hexes with multiple polygons: {sdf_overlap.count():,}")
        
    
    #get the nearest port to index
    sdf_ports_ind_dist = sdf_overlap \
         .withColumn("port_id",F.explode("port_id_list")) \
         .join(port_sdf.select("port_id","H3_int_index_8")
                   .withColumnRenamed("H3_int_index_8","H3_int_index_8_ind"),
               how="left",
               on="port_id"
              ) \
         .withColumn("h3_distance",h3_distance(F.col("H3_int_index_8"), F.col("H3_int_index_8_ind"))) 
    
    sdf_ports_nearest =  sdf_ports_ind_dist.groupBy("H3_int_index_8").agg(F.min("h3_distance").alias("h3_distance")) \
                                        .join(sdf_ports_ind_dist.select("port_id","H3_int_index_8","h3_distance"),
                                              how="left",
                                              on=['H3_int_index_8','h3_distance']) \
                                        .groupBy("H3_int_index_8").agg(F.collect_set("port_id").alias("nearest_port_id_list"),
                                                                       F.count(F.lit(1)).alias("nearest_port_count"))

             # .withColumn("nearest_port", F.when(F.col("nearest_port_count")==1,F.col("nearest_port_id_list").getItem(0))) \
         # .withColumn("nearest_ports", F.when(F.col("nearest_port_count")>1,F.concat_ws(",",F.col("nearest_port_id_list"))) \
         #                                .otherwise(",")) \
         # .withColumn("port_id", F.coalesce("port_id","nearest_port")) \
         # .drop("nearest_port","nearest_port_id_list") \
        
    
    sdf_wpoly_stopped = sdf_wpoly.join(sdf_ports_nearest,
               on = ["H3_int_index_8"],
               how="left"
              ) \
         .withColumn("ports_id_list", F.when(F.col("port_id").isNotNull(), F.array("port_id")) \
                                      .otherwise(F.when(F.col("nearest_port_id_list").isNull(), F.array(F.lit(-1))) \
                                                  .otherwise(F.col("nearest_port_id_list"))
                                                )
                    ) \
         .na.fill(-1, ["passage_part_id","passage_id"]) \
         .withColumn("poly",F.concat_ws(":",
                                        F.col("stopped"),
                                        F.col("ports_id_list"),
                                        # F.col("nearest_ports"),
                                        # F.col("passage_part_id"),
                                        F.col("passage_id")
                                       )
                    )
    
    sdf_routes = af.assign_route(sdf_wpoly_stopped, 
                             ship_unique_identifier_cols = ['mmsi','imo'],
                             route_order_by_cols = ['dt_pos_utc', 'dt_static_utc'],
                             polygon_col_name = "poly"
                            )
    
    sdf_agg = sdf_routes.groupBy('imo','mmsi','route_group').agg(F.max("dt_pos_utc").alias("departure_dt_pos_utc"),
                                                F.min("dt_pos_utc").alias("arrival_dt_pos_utc"),
                                                F.min_by("stopped","dt_pos_utc").alias("stopped"),
                                                F.min_by("buffer_grouped_id","dt_pos_utc").alias("buffer_grouped_id"),
                                                # F.min_by("port_id","dt_pos_utc").alias("port_id"),
                                                F.min_by("ports_id_list","dt_pos_utc").alias("ports_id_list"),
                                                F.min_by("passage_part_id","dt_pos_utc").alias("passage_part_id"),
                                                F.min_by("passage_id","dt_pos_utc").alias("passage_id"),             
                                                F.min_by("vessel_type","dt_pos_utc").alias("vessel_type"),
                                                F.min_by("vessel_name","dt_pos_utc").alias("vessel_name"),
                                                F.min_by("latitude","dt_pos_utc").alias("latitude"),
                                                F.min_by("longitude","dt_pos_utc").alias("longitude"),
                                                F.min_by("H3_int_index_8","dt_pos_utc").alias("H3_int_index_8"),
                                                F.min_by("H3_int_index_12","dt_pos_utc").alias("H3_int_index_12"),
                                                F.min_by("draught","dt_pos_utc").alias("arrival_draught"),
                                                F.min_by("nav_status","dt_pos_utc").alias("arrival_nav_status"),
                                                F.min_by("destination","dt_pos_utc").alias("arrival_destination"),
                                                F.max_by("draught","dt_pos_utc").alias("departure_draught"),
                                                F.max_by("nav_status","dt_pos_utc").alias("departure_nav_status"),
                                                F.max_by("destination","dt_pos_utc").alias("departure_destination"),                                                
                                                F.mean("sog").alias("mean_sog"),
                                                F.max("sog").alias("max_sog"),
                                                F.min("sog").alias("min_sog"),
                                                F.count("mmsi").alias("count_ais")
                                                            )
    
    if stats:
        print(f"Aggregated routes: {sdf_agg.count():,}")
        
        
    if return_sdf:
        return [sdf_agg, sdf, sdf_wpoly_stopped, sdf_routes]
    else:
        return sdf_agg
    # return sdf_routes

In [17]:
def save_routes(sdf, mode, path):
    sdf.withColumn("date", F.coalesce("arrival_dt_pos_utc","departure_dt_pos_utc")) \
        .withColumn("year", F.year("date")) \
        .withColumn("month", F.month("date")) \
        .repartition(1,"year","month") \
        .write \
        .mode(mode) \
        .partitionBy("year","month") \
        .parquet(path)
    print("Saved")
    return None

In [18]:
@F.udf(returnType=IntegerType()) 
def h3_distance(h3_1, h3_2): 
    return h3int.h3_distance(h3_1, h3_2)

## Run

## Sample -- seems okay

In [19]:
i = 33
start_date = start_date_range[i]
end_date = end_date_range[i]
print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

'2021-10-01':'2021-10-31'


In [23]:
sdf_agg = create_routes(
                  start_date, 
                  end_date, 
                  poly_sdf,
                  port_sdf, 
                  columns=ais_cols, 
                  stats=False,
                  return_sdf=False
                  )

'2021-10-01':'2021-10-31'


In [24]:
save_routes(sdf_agg.distinct(), 
            mode="overwrite", 
            path=f"{path}temp/KI")

Saved


In [26]:
sdf_agg = spark.read.parquet(f"{path}temp/KI")
sdf_agg.printSchema()
sdf_agg.count()

root
 |-- imo: integer (nullable = true)
 |-- mmsi: integer (nullable = true)
 |-- route_group: long (nullable = true)
 |-- departure_dt_pos_utc: timestamp (nullable = true)
 |-- arrival_dt_pos_utc: timestamp (nullable = true)
 |-- stopped: integer (nullable = true)
 |-- buffer_grouped_id: long (nullable = true)
 |-- ports_id_list: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- passage_part_id: long (nullable = true)
 |-- passage_id: long (nullable = true)
 |-- vessel_type: string (nullable = true)
 |-- vessel_name: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- H3_int_index_8: long (nullable = true)
 |-- H3_int_index_12: long (nullable = true)
 |-- arrival_draught: double (nullable = true)
 |-- arrival_nav_status: string (nullable = true)
 |-- arrival_destination: string (nullable = true)
 |-- departure_draught: double (nullable = true)
 |-- departure_nav_status: string (nullable = true)
 |--

27704946

In [27]:
sdf_agg.filter(F.col("mmsi")==225993688).count()

36

In [29]:
temp = sdf_agg.filter(F.col("mmsi")==225993688).toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


In [32]:
temp

Unnamed: 0,imo,mmsi,route_group,departure_dt_pos_utc,arrival_dt_pos_utc,stopped,buffer_grouped_id,ports_id_list,passage_part_id,passage_id,vessel_type,vessel_name,latitude,longitude,H3_int_index_8,H3_int_index_12,arrival_draught,arrival_nav_status,arrival_destination,departure_draught,departure_nav_status,departure_destination,mean_sog,max_sog,min_sog,count_ais,date,year,month
0,0,225993688,0,2021-10-05 06:39:09,2021-10-05 06:28:39,0,,[-1.0],-1,-1,Unknown,LA LUNA,39.61413833,2.35654167,613496869984665599,631511268492952063,0.0,Unknown,,0.0,Unknown,,6.45,6.8,6.1,2,2021-10-05 06:28:39,2021,10
1,0,225993688,1,2021-10-05 09:28:09,2021-10-05 06:49:40,0,616.0,[38630.0],-1,-1,Unknown,LA LUNA,39.64111333,2.38929333,613496869967888383,631511268476951551,0.0,Unknown,,0.0,Unknown,,4.5,6.0,3.0,16,2021-10-05 06:49:40,2021,10
2,0,225993688,2,2021-10-05 09:59:10,2021-10-05 09:38:40,0,,[-1.0],-1,-1,Unknown,LA LUNA,39.76489167,2.60865333,613496871249248255,631511269758418431,0.0,Unknown,,0.0,Unknown,,4.6666666667,5.8,2.6,3,2021-10-05 09:38:40,2021,10
3,0,225993688,3,2021-10-05 10:36:11,2021-10-05 10:12:09,1,616.0,[38630.0],-1,-1,Unknown,LA LUNA,39.76284667,2.64056167,613496851024314367,631511249533405695,0.0,Unknown,,0.0,Unknown,,0.3,0.3,0.3,3,2021-10-05 10:12:09,2021,10
4,0,225993688,4,2021-10-05 10:48:11,2021-10-05 10:48:11,0,616.0,[38630.0],-1,-1,Unknown,LA LUNA,39.76287167,2.640425,613496851024314367,631511249533420543,0.0,Unknown,,0.0,Unknown,,1.0,1.0,1.0,1,2021-10-05 10:48:11,2021,10
5,0,225993688,5,2021-10-05 11:00:11,2021-10-05 11:00:11,1,616.0,[38630.0],-1,-1,Unknown,LA LUNA,39.76279667,2.64062667,613496851024314367,631511249533405695,0.0,Unknown,,0.0,Unknown,,0.5,0.5,0.5,1,2021-10-05 11:00:11,2021,10
6,0,225993688,6,2021-10-05 11:41:14,2021-10-05 11:11:11,0,,[-1.0],-1,-1,Unknown,LA LUNA,39.76932667,2.64619167,613496851032702975,631511249540450303,0.0,Unknown,,0.0,Unknown,,5.625,6.9,2.8,4,2021-10-05 11:11:11,2021,10
7,0,225993688,7,2021-10-06 06:05:45,2021-10-05 11:53:45,1,,[-1.0],-1,-1,Unknown,LA LUNA,39.795525,2.69382833,613496850799919103,631511249307699199,0.0,Unknown,,0.0,Unknown,,0.0344444444,0.1,0.0,90,2021-10-05 11:53:45,2021,10
8,0,225993688,8,2021-10-06 07:28:13,2021-10-06 06:16:11,0,,[-1.0],-1,-1,Unknown,LA LUNA,39.795085,2.69112167,613496850799919103,631511249307486207,0.0,Unknown,,0.0,Unknown,,5.375,6.9,3.7,8,2021-10-06 06:16:11,2021,10
9,0,225993688,9,2021-10-06 09:01:11,2021-10-06 07:38:13,0,616.0,[38630.0],-1,-1,Unknown,LA LUNA,39.75834833,2.53769333,613496871110836223,631511269618344447,0.0,Unknown,,0.0,Unknown,,6.5666666667,7.1,5.6,9,2021-10-06 07:38:13,2021,10


In [33]:
sdf_agg = spark.read.parquet(f"{path}ki/global_routes_all/year=2021/month=10/")
temp2 = sdf_agg.filter(F.col("mmsi")==225993688).toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


## Loop

### 2021

In [38]:
i = 24
start_date = start_date_range[i]
end_date = end_date_range[i]
# print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

sdf_agg_route = create_routes(
                  start_date, 
                  end_date, 
                  poly_sdf,
                  port_sdf, 
                  columns=ais_cols, 
                  stats=False,
                  return_sdf=False
                  )

save_routes(sdf_agg_route.distinct(), 
            mode="overwrite", 
            path=f"{path}ki/global_routes_all/")

'2021-01-01':'2021-01-31'
Saved


In [39]:
i = 25
start_date = start_date_range[i]
end_date = end_date_range[i]
# print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

sdf_agg_route = create_routes(
                  start_date, 
                  end_date, 
                  poly_sdf,
                  port_sdf, 
                  columns=ais_cols, 
                  stats=False,
                  return_sdf=False
                  )

save_routes(sdf_agg_route.distinct(), 
            mode="append", 
            path=f"{path}ki/global_routes_all/")

'2021-02-01':'2021-02-28'
Saved


In [40]:
for i in range(26,41):
    start_date = start_date_range[i]
    end_date = end_date_range[i]
    # print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

    sdf_agg_route = create_routes(
                      start_date, 
                      end_date, 
                      poly_sdf,
                      port_sdf, 
                      columns=ais_cols, 
                      stats=False,
                      return_sdf=False
                      )

    save_routes(sdf_agg_route.distinct(), 
                mode="append", 
                path=f"{path}ki/global_routes_all/")

'2021-03-01':'2021-03-31'
Saved
'2021-04-01':'2021-04-30'
Saved
'2021-05-01':'2021-05-31'
Saved
'2021-06-01':'2021-06-30'
Saved
'2021-07-01':'2021-07-31'
Saved
'2021-08-01':'2021-08-31'
Saved
'2021-09-01':'2021-09-30'
Saved
'2021-10-01':'2021-10-31'
Saved
'2021-11-01':'2021-11-30'
Saved
'2021-12-01':'2021-12-31'
Saved
'2022-01-01':'2022-01-31'
Saved
'2022-02-01':'2022-02-28'
Saved
'2022-03-01':'2022-03-31'
Saved
'2022-04-01':'2022-04-30'
Saved
'2022-05-01':'2022-05-31'
Saved


In [41]:
start_date_range = pd.date_range("2023-01-01","2023-02-28", freq="MS")
end_date_range = pd.date_range("2023-01-01","2023-02-28", freq="M")

In [42]:
for i in range(0,2):
    start_date = start_date_range[i]
    end_date = end_date_range[i]
    sdf_agg_route = create_routes(
                  start_date, 
                  end_date, 
                  poly_sdf,
                  port_sdf, 
                  columns=ais_cols, 
                  stats=False,
                  return_sdf=False
                  )

    save_routes(sdf_agg_route.distinct(), 
        mode="append", 
        path=f"{path}ki/global_routes_all/")

'2023-01-01':'2023-01-31'
Saved
'2023-02-01':'2023-02-28'
Saved


In [43]:
start_date_range = pd.date_range("2019-01-01","2022-12-31", freq="MS")
end_date_range = pd.date_range("2019-01-01","2022-12-31", freq="M")

In [44]:
for i in range(41,45):
    start_date = start_date_range[i]
    end_date = end_date_range[i]
    # print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

    sdf_agg_route = create_routes(
                      start_date, 
                      end_date, 
                      poly_sdf,
                      port_sdf, 
                      columns=ais_cols, 
                      stats=False,
                      return_sdf=False
                      )

    save_routes(sdf_agg_route.distinct(), 
                mode="append", 
                path=f"{path}ki/global_routes_all/")

'2022-06-01':'2022-06-30'
Saved
'2022-07-01':'2022-07-31'
Saved
'2022-08-01':'2022-08-31'
Saved
'2022-09-01':'2022-09-30'


[E 2023-03-16 06:20:51,895.895 root] KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [19]:
    i=44
    start_date = start_date_range[i]
    end_date = end_date_range[i]
    # print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

    sdf_agg_route = create_routes(
                      start_date, 
                      end_date, 
                      poly_sdf,
                      port_sdf, 
                      columns=ais_cols, 
                      stats=False,
                      return_sdf=False
                      )

    save_routes(sdf_agg_route.distinct(), 
                mode="append", 
                path=f"{path}ki/global_routes_all/")

'2022-09-01':'2022-09-30'
Saved


In [None]:
for i in range(45,48):
    start_date = start_date_range[i]
    end_date = end_date_range[i]
    # print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")

    sdf_agg_route = create_routes(
                      start_date, 
                      end_date, 
                      poly_sdf,
                      port_sdf, 
                      columns=ais_cols, 
                      stats=False,
                      return_sdf=False
                      )

    save_routes(sdf_agg_route.distinct(), 
                mode="append", 
                path=f"{path}ki/global_routes_all/")

'2022-10-01':'2022-10-31'
Saved
'2022-11-01':'2022-11-30'
Saved
'2022-12-01':'2022-12-31'
Saved


In [19]:
for i in range(21,24):
    start_date = start_date_range[i]
    end_date = end_date_range[i]
    # print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")
    
    sdf_agg_route = create_routes(
                      start_date, 
                      end_date, 
                      poly_sdf,
                      port_sdf, 
                      columns=ais_cols, 
                      stats=False,
                      return_sdf=False
                      )

    save_routes(sdf_agg_route.distinct(), 
                mode="append", 
                path=f"{path}ki/global_routes_all/")

'2020-10-01':'2020-10-31'
Saved
'2020-11-01':'2020-11-30'
Saved
'2020-12-01':'2020-12-31'
Saved


In [20]:
    i=20
    start_date = start_date_range[i]
    end_date = end_date_range[i]
    # print(f"{start_date:'%Y-%m-%d'}:{end_date:'%Y-%m-%d'}")
    
    sdf_agg_route = create_routes(
                      start_date, 
                      end_date, 
                      poly_sdf,
                      port_sdf, 
                      columns=ais_cols, 
                      stats=False,
                      return_sdf=False
                      )

    save_routes(sdf_agg_route.distinct(), 
                mode="append", 
                path=f"{path}ki/global_routes_all/")

'2020-09-01':'2020-09-30'
Saved


In [21]:
spark.stop()