# Arbeidsbok: Nye stats

Du arbeider her for å videreføre arbeidet

Du har gjort:
- Beregnet steady-state course- og speed change
- Beregnet forholdet, ikke bare true/false når det kommer til dSOG og dCOG og hvor vidt de overstiger verdier for før-perioden (overshoot)
- Du har begrenset før- og etter-periodene til 60 sek før og etter ikke alle observasjoner før og etter
- Du har sørget for å bruke circmean istedenfor mean på sirkulære verdier (viktig)
- Du beregner avstanden mellom skipene idet de foretar sin manøver
- Du gjør utvelgelse ift lengde på skip, ikke med absolutt størrelse
- Du henter ut lats og lons for situasjonene, slik at du kan plotte dem

Du kan fremdeles:
- Ha en høyere begrensnning for hva det vil si å registreres som en speed/course maneuver (prøv deg frem med dette og plott i R)
- Leke deg rundt med om mean eller max er best for å bestemme course/speed maneuver
- NB: Husk å ta ut no action-ene, eventuelt bruke steady-state og total speed change / coruse change til å teste robustheten til disse
- Bruke mean rolling filter for steady course / speed change isteden
- Plotte dSOG, SOG osv for noen utvalgte situasjoner - se at ting stemmer

In [1]:
# Importing relevant libraries
import numpy as np
import pandas as pd 
import datetime as dt
import pyarrow.parquet as pq
import glob, os

from NearCollisionStatistics import *

# Setting internal options for speedup
pd.set_option('compute.use_bottleneck', True)
pd.set_option('compute.use_numexpr', True)

pd.set_option('display.max_columns', 500)



# Reading small dataset from CSV

Reading the small one-day sample from CSV file. This does not take increadibly much memory (but still quite some).

In [2]:
### Building a databank ###

# Re-reading full AIS original file
PATH_csv = "~/code/DATA_DNV/ais_small.csv"

incols = ["date_time_utc","mmsi","lat","lon","sog","cog","true_heading","length","nav_status","RISK_Norwegian_Main_Vessel_Category_ID"]

AIS_df = pd.io.parsers.read_csv(PATH_csv
                        ,engine="c"
                        ,sep=";"
                        ,usecols=incols        
                        ,compression=None
                        ,dtype={"mmsi": np.uint64,             
                                "lat": np.float32, 
                                "lon": np.float32,
                                "sog": np.float32,
                                "cog": np.float32,
                                "true_heading": np.float32,
                                "length": np.float32,
                                "nav_status": np.int32,
                                "RISK_Norwegian_Main_Vessel_Category_ID": np.float32}
                        )

# Renaming fields
AIS_df.rename(columns={"date_time_utc": "Time",
                        "mmsi": "ID",
                        "lon": "LON",
                        "lat": "LAT",
                        "nav_status": "Status",
                        "true_heading": "Heading",
                        "sog": "SOG",
                        "cog": "COG",
                        "length": "Length",
                        "RISK_Norwegian_Main_Vessel_Category_ID": "Category"
                         },inplace=True)

# Converting time column from string to datetime
AIS_df["Time"] = pd.to_datetime(AIS_df.Time,format="%Y-%m-%dT%H:%M:%S.%f",box=False)

# Storing datetimes in the dataframe
AIS_df["Time_datetime"] = AIS_df["Time"]

# Converting datetimes to ints for later conversion to intervals
AIS_df["Time"] = AIS_df.Time.values.astype(np.uint64) / 10**9

# Removing platforms
AIS_df = AIS_df[(AIS_df.ID > 99999999)&(AIS_df.ID <= 999999999)]

# Sorting by time
AIS_df.sort_values(by = "Time",inplace=True)

In [6]:
AIS_df["Day"] = 13
AIS_df["Interval"] = (AIS_df.Time - np.min(AIS_df.Time)) // 20
AIS_df["Interval"] = AIS_df.Interval.values.astype(np.uint32)
AIS_df = AIS_df.drop_duplicates(["ID","Interval"],keep="first")
AIS_df = AIS_df.sort_values(by=["Time","LAT","LON"], axis=0, inplace=False)


## Computing on the small dataset

Computing happens here.

In [None]:
AIS_filtered = time_to_CPA_calculator(AIS_df)

In [None]:
# Collecting observations
DATABANK = observation_collector(AIS_df, AIS_filtered)

AIS_databank = DATABANK

# Synchronizing times
databank_mergetime = observation_synchronizer(AIS_databank)

In [None]:
### Calculating Statistics (CPA, tCPA, CPA_dist) for all observations ###

AIS_db = databank_mergetime

# Calculating stats
AIS_with_stats = Stat_calculator(AIS_db)

# Calculating dSOG and dCOGs
AIS_with_diffs = diffs_computer(AIS_with_stats)


In [None]:
AIS_with_diffs = pd.io.parsers.read_csv("AIS_with_stats.csv")

In [None]:
### Calculating summary statistics ###
AIS2_db = AIS_with_diffs

# Calculating summart statistics
AIS_summary_stats = statistics_aggregator(AIS2_db)

AIS_with_diffs.to_csv("AIS_with_stats.csv",index=False)

# Writing final output for further analysis
AIS_summary_stats.to_csv("AIS_summary_stats.csv",index=False,
                     )

# Reading from parquet and writing to HDF5

Reading and writing to queryable format

In [3]:
files = glob.glob("/users/arnsteinvestre/code/Parquet/*.parquet")
cols = ["Timestamp", "MMSI-nummer", "IMO", "Longitude", "Latitude", "Speed_over_ground", "Course_over_ground", "Navigational_status", "Risk_category","Width"]
pqdataset = pq.ParquetDataset(files)
AIS_pqds = pqdataset.read(columns = cols)
AIS_pd = AIS_pqds.to_pandas()

  labels, = index.labels


In [4]:
Lengths_df = pd.read_pickle("vessel_imo_length.pkl") 
Lengths_short_df = Lengths_df.drop_duplicates(["IMO"],keep="first")
Lengths_clean_df = Lengths_short_df[Lengths_short_df["LENGTHOVERALL"] != 0]

In [5]:
Lengths_clean_df = Lengths_clean_df[["IMO","LENGTHOVERALL"]]

In [6]:
AIS_pd = pd.merge(AIS_pd, Lengths_clean_df, on="IMO", how="left")

In [7]:
# Creating day labels
AIS_pd["Daystamp"] = AIS_pd["Timestamp"].dt.normalize()
AIS_pd["Day"] = AIS_pd["Daystamp"].factorize()[0]

In [8]:
# Renaming fields
AIS_pd.drop(labels="Daystamp",axis=1,inplace=True)

In [9]:
AIS_pd.rename(columns={"Timestamp": "Time",
                        "MMSI-nummer": "ID",
                        "Longitude": "LON",
                        "Latitude": "LAT",
                        "Navigational_status": "Status",
                        "Speed_over_ground": "SOG",
                        "Course_over_ground": "COG",
                        "Length": "Length",
                        "Risk_category": "Category",
                       "LENGTHOVERALL": "Length"
                         },inplace=True)

In [10]:
# Storing datetimes in the dataframe
AIS_pd["Time_datetime"] = AIS_pd["Time"]

# Converting datetimes to ints for later conversion to intervals
AIS_pd["Time"] = AIS_pd.Time.values.astype(np.uint64) / 10**9

In [16]:
AIS_pd["Length"] = AIS_pd["Length"].astype(np.float)

In [17]:
#AIS_pd.drop(columns = ["IMO","Risk_category_name","Ship_cargo_type_ID"], inplace = True)
# If not properly read (with cols restriction)

In [18]:
data_store = pd.HDFStore("AIS_from_parquet_large.h5")

for day in range(0,12):

    AIS_daytable = AIS_pd[AIS_pd["Day"] == day]
    
    print("Day",day,"separated")
    
    data_store["day{}_df".format(day)] = AIS_daytable
    
    print("Day",day,"written")

data_store.close()

Day 0 separated
Day 0 written
Day 1 separated
Day 1 written
Day 2 separated
Day 2 written
Day 3 separated
Day 3 written
Day 4 separated
Day 4 written
Day 5 separated
Day 5 written
Day 6 separated
Day 6 written
Day 7 separated
Day 7 written
Day 8 separated
Day 8 written
Day 9 separated
Day 9 written
Day 10 separated
Day 10 written
Day 11 separated
Day 11 written


# Collecting instances on the large set

We now transition to reading the large file and turning this into a larger databank.
This requires using a larger part of memory, thus be careful.

## First some testing

We need to check that it works on a small sample

In [14]:
data_store = pd.HDFStore("AIS_from_parquet_large.h5")
AIS_pd = data_store["day1_df".format(day)]
data_store.close()

In [22]:
AIS_pd_samp = AIS_pd[np.random.rand(len(AIS_pd.index)) < 0.05].copy()

In [23]:
# Creating intervals
AIS_pd_samp["Interval"] = (AIS_pd_samp.Time - np.min(AIS_pd_samp.Time)) // 20
AIS_pd_samp["Interval"] = AIS_pd_samp.Interval.values.astype(np.uint32)

In [24]:
AIS_shorter = AIS_pd_samp.drop_duplicates(["ID","Interval"],keep="first")
AIS_shorter = AIS_shorter.sort_values(by=["Time","LON","LAT"], axis=0, inplace=False)

In [38]:
AIS_filtered = time_to_CPA_calculator(AIS_shorter,namedict = {"DateTime": "Time_datetime", 
                            "ID": "ID",
                            "Time": "Time",
                            "COG": "COG",
                            "SOG": "SOG",
                            "Heading": "Heading",
                            "LON": "LON",
                            "LAT": "LAT",
                            "Status": "Status",
                            "Length": "Length",
                            "Interval": "Interval",
                            "Distances": "Distances",
                            "Day": "Day"})

In [None]:
AIS_collector = AIS_filtered

# Collecting observations
DATABANK_parq = observation_collector(AIS_pd_samp, AIS_filtered)

# Synchronizing times
databank_mergetime_l = observation_synchronizer(DATABANK_parq)
# Computing time to cpas and differentiations
AIS_large_with_stats = Stat_calculator(databank_mergetime_l)
AIS_large_with_diffs = diffs_computer(AIS_large_with_stats)

In [69]:
# Computing summary stats for each situation
AIS_large_summary_stats = statistics_aggregator(AIS_large_with_diffs)


## Now it starts for real
This is a loop that ideally computes for the whole set

In [4]:
### Read whole HDF5 table
data_store = pd.HDFStore("AIS_from_parquet_large.h5")
day1 = 0
AIS_collector_flag = False

data_store2 = pd.HDFStore("AIS_large_instances.h5")

for day in range(day1,12):
    
    AIS_pd = data_store["day{}_df".format(day)]
    
    AIS_pd["Time_Datetime"] = AIS_pd["Time_datetime"]
    AIS_pd.drop(columns=["Time_datetime"],inplace=True)
    # Removing platforms
    AIS_pd = AIS_pd[(AIS_pd.ID > 99999999)&(AIS_pd.ID <= 999999999)]

    # Creating intervals
    AIS_pd["Interval"] = (AIS_pd.Time - np.min(AIS_pd.Time)) // 20
    AIS_pd["Interval"] = AIS_pd.Interval.values.astype(np.uint32)
    
    AIS_shorter = AIS_pd.drop_duplicates(["ID","Interval"],keep="first")
    AIS_shorter = AIS_shorter.sort_values(by=["Time","LON","LAT"], axis=0, inplace=False)
    
    print("Starting calculations day",day)
    
    AIS_filtered = time_to_CPA_calculator(AIS_shorter)

    data_store2["Large_instances_day_{}".format(day)] = AIS_filtered
    
    if not AIS_collector_flag:
        AIS_collector = AIS_filtered
        AIS_collector_flag = True
    else:
        AIS_collector = pd.concat([AIS_collector, AIS_filtered])
    
    print("Finished day",day)
data_store.close()

print("Storing")
# Lagre mellomlagring
data_store2["Large_instances_all"] = AIS_collector
data_store2.close()



Starting calculations day 8
Finished day 8
Starting calculations day 9
Finished day 9
Starting calculations day 10
Finished day 10
Starting calculations day 11
Finished day 11


# Using instances to collect databank from large set

The databank is collected here

In [10]:
data_storeA = pd.HDFStore("AIS_large_istances_day0to7.h5")
data_storeB = pd.HDFStore("AIS_large_istances_8to11.h5")

AIS_large_filtered1 = pd.concat([data_storeA["Large_instances_day0to7"], data_storeB["Large_instances_8to11"]])
AIS_databank_flag = False

data_storeA.close()
data_storeB.close()

data_store = pd.HDFStore("AIS_from_parquet_large.h5")

data_storeSTAT = pd.HDFStore("AIS_summary_stats_large_backup.h5")

for day in range(0,12):
    AIS_pd = data_store["day{}_df".format(day)]
    
    # Collecting observations
    DATABANK_parq = observation_collector(AIS_pd, AIS_large_filtered1[AIS_large_filtered1["Day"] == day])
    
    # Synchronizing times
    databank_mergetime_l = observation_synchronizer(DATABANK_parq)
    
    # Computing time to cpas and differentiations
    AIS_large_with_stats = Stat_calculator(databank_mergetime_l)
    AIS_large_with_diffs = diffs_computer(AIS_large_with_stats)
    
    # Saving stat-ed file
    AIS_large_with_diffs.to_csv("AIS_large_with_stats_day{}.csv".format(day),
                                index=False)
    
    # Computing summary stats for each situation
    AIS_large_summary_stats = statistics_aggregator(AIS_large_with_diffs)

    data_storeSTAT["Stats_day_{}".format(day)] = AIS_large_summary_stats
    
    if not AIS_databank_flag:
        AIS_stats_databank_large = AIS_large_summary_stats
        AIS_databank_flag = True
    else:
        AIS_stats_databank_large = pd.concat([AIS_stats_databank_large,AIS_large_summary_stats])
    
    print("Finished day",day)

# Writing final output for further analysis
data_store.close()

AIS_stats_databank_large = AIS_stats_databank_large.reset_index(drop = True)
AIS_stats_databank_large["Situation_glob"] = AIS_stats_databank_large.index + 1

AIS_stats_databank_large.to_csv("AIS_large_summary_stats.csv",index=False,
                     )

Removing 1226 erroneous records
Removing 2887 inf entitites
Removing 0 NaN entitites
Finished day 0
Removing 1516 erroneous records
Removing 6637 inf entitites
Removing 0 NaN entitites
Finished day 1
Removing 1595 erroneous records
Removing 3591 inf entitites
Removing 0 NaN entitites
Finished day 2
Removing 120175 erroneous records
Removing 10407 inf entitites
Removing 0 NaN entitites
Finished day 3
Removing 724 erroneous records
Removing 3078 inf entitites
Removing 0 NaN entitites
Finished day 4
Removing 5619 erroneous records
Removing 4813 inf entitites
Removing 0 NaN entitites
Finished day 5
Removing 706 erroneous records
Removing 3366 inf entitites
Removing 0 NaN entitites
Finished day 6
Removing 1353 erroneous records
Removing 3948 inf entitites
Removing 0 NaN entitites
Finished day 7
Removing 4848 erroneous records
Removing 21607 inf entitites
Removing 0 NaN entitites
Finished day 8
Removing 863 erroneous records
Removing 2129 inf entitites
Removing 0 NaN entitites
Finished day 9