In [1]:
# necessary prolog
from pyspark import SparkContext
from pyspark import SQLContext
sc = SparkContext()
sql = SQLContext(sc)

In [2]:
#Epoch -> datetime
import time
from pyspark.sql.functions import udf
epochToDate = udf(lambda epoch: time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(epoch)))

# Reading the file
full_data = sql.read.json('data/100000_regels_ais_data')
full_data = full_data.withColumn("date", epochToDate(full_data['ts']))
full_data.registerTempTable("full_data")

In [3]:
# Get static data:
# shipname, eta_hour, dimstarboard, draught, dimstern, mmsi, destination, dimport, ts, imo, eta_day, eta_minute, shiptype, callsign, eta_month, dimbow, type
static_data = sql.sql("SELECT shipname, eta_hour, dimstarboard, draught, dimstern, mmsi, destination, dimport, ts, date, imo, eta_day, eta_minute, shiptype, callsign, eta_month, dimbow, type FROM full_data WHERE shiptype >= 70 AND shiptype <= 89 ORDER BY mmsi DESC")
static_data.registerTempTable("static_data")

In [4]:
# Get dynamic data:
#sog, ts, timestamp, mmsi, lat, lat2, lon, lon2, rot_direction, rot_angle, nav_status, cog, type, heading
dynamic_data = sql.sql("SELECT sog, ts, date, timestamp, mmsi, lat, lat2, lon, lon2, rot_direction, rot_angle, nav_status, cog, type, heading FROM full_data WHERE lat IS NOT NULL")
dynamic_data.registerTempTable("dynamic_data")

In [48]:
# Make static data unique
unique_static_data = sql.sql("SELECT count(*) as count, shipname, max(dimstarboard) as dimstarboard, max(dimport) as dimport, max(dimstern) as dimstern, max(dimbow) as dimbow, draught, mmsi, destination, imo, avg(eta_minute) as eta_minute_avg, avg(eta_hour) as eta_hour_avg, avg(eta_day) as eta_day_avg, avg(eta_month) as eta_month_avg, shiptype, callsign, type FROM static_data GROUP BY shipname, draught, mmsi, destination, imo, shiptype, callsign, type ORDER BY mmsi DESC")
unique_static_data.registerTempTable("unique_static_data")
sql.sql("SELECT * FROM unique_static_data ORDER BY count DESC").show()

+-----+--------------------+------------+-------+--------+------+-------+---------+--------------------+-------+--------------+------------+-----------+-------------+--------+--------+----+
|count|            shipname|dimstarboard|dimport|dimstern|dimbow|draught|     mmsi|         destination|    imo|eta_minute_avg|eta_hour_avg|eta_day_avg|eta_month_avg|shiptype|callsign|type|
+-----+--------------------+------------+-------+--------+------+-------+---------+--------------------+-------+--------------+------------+-----------+-------------+--------+--------+----+
|   33|          MV SEATTLE|          14|     13|      26|   145|      5|209316000|            JIANGYIN|9220988|           0.0|        22.0|       22.0|          2.0|      70|   P3UX8|   5|
|   19|           ANL EUROA|          19|      8|      19|   156|      9|209830000|AUCKLAND NEW ZEALAND|9433066|           0.0|        18.0|       26.0|          2.0|      70|   5BTJ3|   5|
|   15|     TANJA DEYMANN 2|           5|      5| 

In [52]:
# Check if it is truely unique
sql.sql("SELECT COUNT(*) as cnt, first(shipname) as shipname, first(mmsi) as mmsi FROM unique_static_data GROUP BY mmsi HAVING COUNT(*)>1").show()

# Fails:
# - Names that are not exactly the same
# - The ETA is different #Fixed i think by taking the average eta for each ship
# - Destinations are not exactly the same but look like eachother
# - Bullshit destination (233 vs LIANYUNGANG)
# - Bullshit callsign (566 vs 309B / 5(A3328 vs 9HA3328)
# - Different draughts
# 

+---+--------------+---------+
|cnt|      shipname|     mmsi|
+---+--------------+---------+
|  8| NAVE UBHZ5RSE|538003852|
|  2|    SHOSEIMARU|431301658|
|  2|           123|413000000|
|  2|  AUDE AUDENDA|244660326|
|  2|HAFNIA SEAWAYS|235060989|
|  2|     DP GALYNA|229437000|
+---+--------------+---------+



In [65]:
# Join static and dynamic data
joined_data = sql.sql("SELECT * FROM dynamic_data INNER JOIN unique_static_data ON (dynamic_data.mmsi=unique_static_data.mmsi)")
joined_data.registerTempTable("joined")
joined_data.show()

+---+----------+-------------------+---------+---------+---------+----+--------+----+-------------+---------+----------+---+----+-------+-----+--------+------------+-------+--------+------+-------+---------+--------------------+---+--------------+------------+-----------+-------------+--------+--------+----+
|sog|        ts|               date|timestamp|     mmsi|      lat|lat2|     lon|lon2|rot_direction|rot_angle|nav_status|cog|type|heading|count|shipname|dimstarboard|dimport|dimstern|dimbow|draught|     mmsi|         destination|imo|eta_minute_avg|eta_hour_avg|eta_day_avg|eta_month_avg|shiptype|callsign|type|
+---+----------+-------------------+---------+---------+---------+----+--------+----+-------------+---------+----------+---+----+-------+-----+--------+------------+-------+--------+------+-------+---------+--------------------+---+--------------+------------+-----------+-------------+--------+--------+----+
|  0|1424573822|2015-02-22 02:57:02|       17|244010031|52.025958|   N