In [1]:
import dask
import coiled
from dask.distributed import Client, LocalCluster, Lock, wait
from dask.utils import SerializableLock
import dask.dataframe as dd

def config_env(env_name="ais"):
    software_env_name = f"mrmaksimize/{env_name}"
    coiled.create_software_environment(
        name=software_env_name,
        #container="mrmaksimize/prefect-coiled-env:latest",
        pip=[
            #"Fiona==1.8.19",
            #"rasterio==1.2.3",
            "s3fs==2021.5.0",
            #"xarray==0.18.2",
            #"xarray-spatial==0.2.2",
            #"rioxarray==0.4.0",
            "dask==2021.7.0",
            "distributed >= 2.23.0",
            "geopandas"
            #"scikit-image==0.18.2"
        ],
        backend_options={"region": "us-east-1"})


    return True



def get_dask_client(cluster_type = 'local', n_workers = 8, processes=True, threads_per_worker=1, scheduler_mem_gb = 8, worker_mem_gb = 8):


    if cluster_type == 'local':
        try:
            client = Client('127.0.0.1:8786')
        except:   
            cluster = LocalCluster(n_workers = n_workers, 
                               processes=processes, 
                               threads_per_worker=threads_per_worker, 
                               scheduler_port=8786)

            client = Client(cluster)
    


    else:
        software = "ais"
        #config_env(software)
        cluster = coiled.Cluster(
            name='ais-cluster',
            n_workers=n_workers,
            scheduler_cpu = threads_per_worker,
            scheduler_memory = f"{str(scheduler_mem_gb)} GiB",
            worker_cpu = threads_per_worker,
            worker_memory=f"{str(worker_mem_gb)} GiB",
            #software = f"mrmaksimize/{software}"
            
            
        )

        client = Client(cluster) 
        
    client.restart()


    return client

client = get_dask_client(cluster_type = 'local', n_workers = 8, processes=True, threads_per_worker=4, scheduler_mem_gb = 16, worker_mem_gb = 16)
client

Output()


+-------------+---------------+----------------+----------------+
| Package     | client        | scheduler      | workers        |
+-------------+---------------+----------------+----------------+
| blosc       | None          | 1.10.2         | 1.10.2         |
| dask        | 2021.07.0     | 2021.07.2      | 2021.07.2      |
| distributed | 2021.07.0     | 2021.07.2      | 2021.07.2      |
| lz4         | None          | 3.1.3          | 3.1.3          |
| msgpack     | 0.6.2         | 1.0.2          | 1.0.2          |
| numpy       | 1.18.4        | 1.21.1         | 1.21.1         |
| pandas      | 1.3.0         | 1.3.1          | 1.3.1          |
| python      | 3.8.6.final.0 | 3.8.10.final.0 | 3.8.10.final.0 |
+-------------+---------------+----------------+----------------+
Notes: 
-  msgpack: Variation is ok, as long as everything is above 0.6


0,1
Connection method: Cluster object,Cluster type: Cluster
Dashboard: http://ec2-54-89-119-192.compute-1.amazonaws.com:8787,

0,1
Dashboard: http://ec2-54-89-119-192.compute-1.amazonaws.com:8787,Workers: 8
Total threads:  32,Total memory:  128.00 GiB

0,1
Comm: tls://10.3.191.68:8786,Workers: 8
Dashboard: http://10.3.191.68:8787/status,Total threads:  32
Started:  17 minutes ago,Total memory:  128.00 GiB

0,1
Comm: tls://10.3.113.148:45841,Total threads: 4
Dashboard: http://10.3.113.148:44185/status,Memory: 16.00 GiB
Nanny: tls://10.3.113.148:43057,
Local directory: /dask-worker-space/worker-smktm970,Local directory: /dask-worker-space/worker-smktm970

0,1
Comm: tls://10.3.190.102:37543,Total threads: 4
Dashboard: http://10.3.190.102:38223/status,Memory: 16.00 GiB
Nanny: tls://10.3.190.102:33791,
Local directory: /dask-worker-space/worker-u2ta5o4q,Local directory: /dask-worker-space/worker-u2ta5o4q

0,1
Comm: tls://10.3.147.5:35753,Total threads: 4
Dashboard: http://10.3.147.5:37575/status,Memory: 16.00 GiB
Nanny: tls://10.3.147.5:40881,
Local directory: /dask-worker-space/worker-4gdj6gze,Local directory: /dask-worker-space/worker-4gdj6gze

0,1
Comm: tls://10.3.204.64:39631,Total threads: 4
Dashboard: http://10.3.204.64:45495/status,Memory: 16.00 GiB
Nanny: tls://10.3.204.64:36253,
Local directory: /dask-worker-space/worker-_j6lceoy,Local directory: /dask-worker-space/worker-_j6lceoy

0,1
Comm: tls://10.3.133.12:39401,Total threads: 4
Dashboard: http://10.3.133.12:46831/status,Memory: 16.00 GiB
Nanny: tls://10.3.133.12:43389,
Local directory: /dask-worker-space/worker-vodo2f8p,Local directory: /dask-worker-space/worker-vodo2f8p

0,1
Comm: tls://10.3.160.147:41357,Total threads: 4
Dashboard: http://10.3.160.147:45173/status,Memory: 16.00 GiB
Nanny: tls://10.3.160.147:40075,
Local directory: /dask-worker-space/worker-5ahf4xq3,Local directory: /dask-worker-space/worker-5ahf4xq3

0,1
Comm: tls://10.3.202.252:36325,Total threads: 4
Dashboard: http://10.3.202.252:35069/status,Memory: 16.00 GiB
Nanny: tls://10.3.202.252:36331,
Local directory: /dask-worker-space/worker-6_xrwxta,Local directory: /dask-worker-space/worker-6_xrwxta

0,1
Comm: tls://10.3.167.180:36043,Total threads: 4
Dashboard: http://10.3.167.180:37849/status,Memory: 16.00 GiB
Nanny: tls://10.3.167.180:38745,
Local directory: /dask-worker-space/worker-p97mh2jk,Local directory: /dask-worker-space/worker-p97mh2jk


In [2]:
import os, sys

import numpy as np


from matplotlib import pyplot

import random
from datetime import datetime

import pandas as pd
import geopandas as gpd


# Set Full Path Here
PATH_BASE = "/.."



STORAGE_OPTS = {"secret": os.environ.get("AWS_SECRET_ACCESS_KEY"), 
                "key": os.environ.get("AWS_ACCESS_KEY_ID")}

CLUSTER_TYPE = 'remote'

pd.set_option('display.max_columns', None)


#client = get_dask_client(cluster_type = 'remote', n_workers = 8, processes=True, threads_per_worker=4, scheduler_mem_gb = 8, worker_mem_gb = 8)
client

0,1
Connection method: Cluster object,Cluster type: Cluster
Dashboard: http://ec2-54-89-119-192.compute-1.amazonaws.com:8787,

0,1
Dashboard: http://ec2-54-89-119-192.compute-1.amazonaws.com:8787,Workers: 8
Total threads:  32,Total memory:  128.00 GiB

0,1
Comm: tls://10.3.191.68:8786,Workers: 8
Dashboard: http://10.3.191.68:8787/status,Total threads:  32
Started:  17 minutes ago,Total memory:  128.00 GiB

0,1
Comm: tls://10.3.113.148:45841,Total threads: 4
Dashboard: http://10.3.113.148:44185/status,Memory: 16.00 GiB
Nanny: tls://10.3.113.148:43057,
Local directory: /dask-worker-space/worker-smktm970,Local directory: /dask-worker-space/worker-smktm970

0,1
Comm: tls://10.3.190.102:37543,Total threads: 4
Dashboard: http://10.3.190.102:38223/status,Memory: 16.00 GiB
Nanny: tls://10.3.190.102:33791,
Local directory: /dask-worker-space/worker-u2ta5o4q,Local directory: /dask-worker-space/worker-u2ta5o4q

0,1
Comm: tls://10.3.147.5:35753,Total threads: 4
Dashboard: http://10.3.147.5:37575/status,Memory: 16.00 GiB
Nanny: tls://10.3.147.5:40881,
Local directory: /dask-worker-space/worker-4gdj6gze,Local directory: /dask-worker-space/worker-4gdj6gze

0,1
Comm: tls://10.3.204.64:39631,Total threads: 4
Dashboard: http://10.3.204.64:45495/status,Memory: 16.00 GiB
Nanny: tls://10.3.204.64:36253,
Local directory: /dask-worker-space/worker-_j6lceoy,Local directory: /dask-worker-space/worker-_j6lceoy

0,1
Comm: tls://10.3.133.12:39401,Total threads: 4
Dashboard: http://10.3.133.12:46831/status,Memory: 16.00 GiB
Nanny: tls://10.3.133.12:43389,
Local directory: /dask-worker-space/worker-vodo2f8p,Local directory: /dask-worker-space/worker-vodo2f8p

0,1
Comm: tls://10.3.160.147:41357,Total threads: 4
Dashboard: http://10.3.160.147:45173/status,Memory: 16.00 GiB
Nanny: tls://10.3.160.147:40075,
Local directory: /dask-worker-space/worker-5ahf4xq3,Local directory: /dask-worker-space/worker-5ahf4xq3

0,1
Comm: tls://10.3.202.252:36325,Total threads: 4
Dashboard: http://10.3.202.252:35069/status,Memory: 16.00 GiB
Nanny: tls://10.3.202.252:36331,
Local directory: /dask-worker-space/worker-6_xrwxta,Local directory: /dask-worker-space/worker-6_xrwxta

0,1
Comm: tls://10.3.167.180:36043,Total threads: 4
Dashboard: http://10.3.167.180:37849/status,Memory: 16.00 GiB
Nanny: tls://10.3.167.180:38745,
Local directory: /dask-worker-space/worker-p97mh2jk,Local directory: /dask-worker-space/worker-p97mh2jk


In [4]:
#df = dd.read_csv(f"{PATH_BASE}/WorldBank_SAIS_globalAOI_20190101_20201231_*.csv", 
df1 = dd.read_csv(f"{PATH_BASE}_TMP/MAY2_2019_PART.csv/*", 
                 storage_options=STORAGE_OPTS, 
                 #include_path_column=True,
                 #blocksize='100mb',
                 #blocksize=None, # Specifying NONE aligns to the files 
                 parse_dates=[#'created_at', 
                              #'timestamp', 
                              #'eta'
                             ], # Don't parse dates - extra CPU
                 dtype= {'created_at': str,
                         'timestamp': str,
                         'mmsi': int,
                         'msg_type': int,
                         'latitude': float,
                         'longitude': float,
                         'speed': float,
                         'course': float,
                         'heading': float,
                         'rot': float,
                         'imo': float,
                         'name': str,
                         'call_sign': str,
                         'flag': str,
                         'draught': float,
                         'ship_and_cargo_type': float,
                         'length': float,
                         'width': float,
                         'eta': str,
                         'destination': str,
                         'status': float,
                         'maneuver': float,
                         'accuracy': float,
                         'collection_type': str,
                         'to_bow': float,
                         'to_stern': float,
                         'to_port': float,
                         'to_starboard': float})
df1.head()

Unnamed: 0,created_at,timestamp,mmsi,msg_type,latitude,longitude,speed,course,heading,rot,imo,name,call_sign,flag,draught,ship_and_cargo_type,length,width,eta,destination,status,maneuver,accuracy,collection_type
0,2019-05-02 07:57:10.659228 UTC,2019-05-02 06:13:59 UTC,366767150,1,47.580923,-122.343117,0.0,107.2,179.0,0.0,,,,US,,,,,,,0.0,0.0,1.0,satellite
1,2019-05-02 08:30:26.684951 UTC,2019-05-02 07:34:24 UTC,622122005,1,27.943515,33.750965,0.0,84.7,135.0,0.0,,,,EG,,,,,,,0.0,0.0,0.0,satellite
2,2019-05-02 06:28:14.036435 UTC,2019-05-02 01:18:22 UTC,710026780,1,-25.501227,-48.533733,1.3,112.4,256.0,0.0,,,,BR,,,,,,,0.0,0.0,0.0,satellite
3,2019-05-02 16:15:03.708925 UTC,2019-05-02 15:28:28 UTC,636092864,1,-1.462267,-81.265733,17.6,3.0,5.0,0.0,,,,LR,,,,,,,0.0,0.0,0.0,satellite
4,2019-05-02 06:00:54.341805 UTC,2019-05-02 05:23:13 UTC,352553000,1,4.37344,99.32368,10.8,118.1,116.0,0.0,,,,PA,,,,,,,0.0,0.0,1.0,satellite


In [5]:
#df = dd.read_csv(f"{PATH_BASE}/WorldBank_SAIS_globalAOI_20190101_20201231_*.csv", 
df2 = dd.read_csv(f"{PATH_BASE}_TMP/MAY2_PART.csv/*", 
                 storage_options=STORAGE_OPTS, 
                 #include_path_column=True,
                 #blocksize='100mb',
                 #blocksize=None, # Specifying NONE aligns to the files 
                 parse_dates=[#'created_at', 
                              #'timestamp', 
                              #'eta'
                             ], # Don't parse dates - extra CPU
                 dtype= {'created_at': str,
                         'timestamp': str,
                         'mmsi': int,
                         'msg_type': int,
                         'latitude': float,
                         'longitude': float,
                         'speed': float,
                         'course': float,
                         'heading': float,
                         'rot': float,
                         'imo': float,
                         'name': str,
                         'call_sign': str,
                         'flag': str,
                         'draught': float,
                         'ship_and_cargo_type': float,
                         'length': float,
                         'width': float,
                         'eta': str,
                         'destination': str,
                         'status': float,
                         'maneuver': float,
                         'accuracy': float,
                         'collection_type': str,
                         'to_bow': float,
                         'to_stern': float,
                         'to_port': float,
                         'to_starboard': float})
df2.head()

Unnamed: 0,created_at,timestamp,mmsi,msg_type,latitude,longitude,speed,course,heading,rot,imo,name,call_sign,flag,draught,ship_and_cargo_type,length,width,eta,destination,status,maneuver,accuracy,collection_type,to_bow,to_stern,to_port,to_starboard
0,2020-05-02 11:39:34.657114 UTC,2020-05-02 10:31:08 UTC,310773000,1,19.985653,-111.613173,17.7,284.1,283.0,-6.0,,,,BM,,,,,,,0.0,0.0,0.0,satellite,,,,
1,2020-05-02 15:46:19.031011 UTC,2020-05-02 13:20:19 UTC,247188100,1,-37.8076,149.99064,18.4,48.0,49.0,-12.0,,,,IT,,,,,,,0.0,0.0,0.0,satellite,,,,
2,2020-05-02 21:58:16.164046 UTC,2020-05-02 21:02:54 UTC,253476000,1,-10.767813,40.54912,0.4,234.6,184.0,-14.0,,,,LU,,,,,,,0.0,0.0,0.0,satellite,,,,
3,2020-05-02 04:39:23.940515 UTC,2020-05-02 01:23:12 UTC,261536000,3,74.110747,34.322827,2.8,230.0,222.0,-13.0,,,,PL,,,,,,,8.0,0.0,0.0,satellite,,,,
4,2020-05-02 04:19:24.636776 UTC,2020-05-02 01:40:52 UTC,310558000,1,-2.083307,-44.11312,8.7,36.3,38.0,-6.0,,,,BM,,,,,,,0.0,0.0,1.0,satellite,,,,


In [7]:
df3 = dd.concat([df1, df2], interleave_partitions=True)
df3.head()

Unnamed: 0,created_at,timestamp,mmsi,msg_type,latitude,longitude,speed,course,heading,rot,imo,name,call_sign,flag,draught,ship_and_cargo_type,length,width,eta,destination,status,maneuver,accuracy,collection_type,to_bow,to_stern,to_port,to_starboard
0,2019-05-02 07:57:10.659228 UTC,2019-05-02 06:13:59 UTC,366767150,1,47.580923,-122.343117,0.0,107.2,179.0,0.0,,,,US,,,,,,,0.0,0.0,1.0,satellite,,,,
1,2019-05-02 08:30:26.684951 UTC,2019-05-02 07:34:24 UTC,622122005,1,27.943515,33.750965,0.0,84.7,135.0,0.0,,,,EG,,,,,,,0.0,0.0,0.0,satellite,,,,
2,2019-05-02 06:28:14.036435 UTC,2019-05-02 01:18:22 UTC,710026780,1,-25.501227,-48.533733,1.3,112.4,256.0,0.0,,,,BR,,,,,,,0.0,0.0,0.0,satellite,,,,
3,2019-05-02 16:15:03.708925 UTC,2019-05-02 15:28:28 UTC,636092864,1,-1.462267,-81.265733,17.6,3.0,5.0,0.0,,,,LR,,,,,,,0.0,0.0,0.0,satellite,,,,
4,2019-05-02 06:00:54.341805 UTC,2019-05-02 05:23:13 UTC,352553000,1,4.37344,99.32368,10.8,118.1,116.0,0.0,,,,PA,,,,,,,0.0,0.0,1.0,satellite,,,,


In [10]:
df1.groupby('msg_type')['created_at'].count().compute()

msg_type
1     7814744
2       27027
3     1233067
4        2465
5      166550
18     478758
24     150028
27     115804
Name: created_at, dtype: int64

In [11]:
df2.groupby('msg_type')['created_at'].count().compute()

msg_type
1     12892839
2        51049
3      2004343
5       266900
18      648627
24      217080
27      380124
4            1
Name: created_at, dtype: int64

In [12]:
df3.groupby('msg_type')['created_at'].count().compute()

msg_type
1     20707583
2        78076
3      3237410
4         2466
5       433450
18     1127385
24      367108
27      495928
Name: created_at, dtype: int64

In [13]:
df3.to_csv("s3://wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv", index=False, single_file=False)

['wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0000.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0001.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0002.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0003.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0004.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0005.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0006.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0007.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0008.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0009.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0010.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0011.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0012.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0013.part',
 'wbgdecinternal-ntl/AIS_TMP/MAY2_PART_2019_2020.csv/0014.part',
 'wbgdecinternal-ntl/AIS_

In [None]:
#df = dd.read_csv(f"{PATH_BASE}/WorldBank_SAIS_globalAOI_20190101_20201231_*.csv", 
df = dd.read_csv(f"{PATH_BASE}/WorldBank_SAIS_globalAOI_20190430_20190507.csv", 
                 storage_options=STORAGE_OPTS, 
                 #include_path_column=True,
                 #blocksize='100mb',
                 #blocksize=None, # Specifying NONE aligns to the files 
                 parse_dates=[#'created_at', 
                              #'timestamp', 
                              #'eta'
                             ], # Don't parse dates - extra CPU
                 dtype= {'created_at': str,
                         'timestamp': str,
                         'mmsi': int,
                         'msg_type': int,
                         'latitude': float,
                         'longitude': float,
                         'speed': float,
                         'course': float,
                         'heading': float,
                         'rot': float,
                         'imo': float,
                         'name': str,
                         'call_sign': str,
                         'flag': str,
                         'draught': float,
                         'ship_and_cargo_type': float,
                         'length': float,
                         'width': float,
                         'eta': str,
                         'destination': str,
                         'status': float,
                         'maneuver': float,
                         'accuracy': float,
                         'collection_type': str,
                         'to_bow': float,
                         'to_stern': float,
                         'to_port': float,
                         'to_starboard': float})
df.head()

In [None]:
df.npartitions

In [None]:
may = df.loc[(df.timestamp >= '2019-05-01') & (df.timestamp < '2019-05-06'), :]
may.head() 

In [None]:
#may = df.loc[(df.timestamp >= '2019-05-01') & (df.timestamp < '2019-05-03'), :]
may = df.loc[df.timestamp.str.contains("2019-05-01"), :]
may.head()

In [None]:
#df.loc[((df.longitude.notnull()) & (df.latitude.notnull()) & (df.mmsi == 247298800)), :].head(1000)

In [None]:
%time
df_ship = df.loc[df.mmsi == 247298800, :]
df_ship.to_csv("s3://wbgdecinternal-ntl/AIS_TMP/MMSI_247298800.csv", index=False, single_file=True)

In [None]:
df_day = df.loc[df.timestamp.str.contains("-05-02"), :]
df_day = df_day.repartition(npartitions=df.npartitions // 6)

df_day.head()

In [None]:
#df_day = df.loc[df.timestamp.str.contains("-05-02"), :]
df_day.to_csv("s3://wbgdecinternal-ntl/AIS_TMP/MAY2_2019_PART.csv", index=False, single_file=False)

In [None]:
#df_day = df.loc[(df.timestamp >= '2020-12-15') & (df.timestamp < '2020-12-16'), :]

In [None]:
#df_day = df_day.set_index('timestamp')

In [None]:
#df.visualize()

In [None]:
df_day = df_day.repartition(npartitions=df.npartitions // 730)

In [None]:
df_day.npartitions

In [None]:
df.to_csv("s3://wbgdecinternal-ntl/AIS_TMP/MAY2.csv", index=False, single_file=True)