<span style="color:black">
    <h3>Start Redis Server before running:</h3>
    <p><b>&dollar; sudo systemctl status redis-server &nbsp; # To check if it's running</b></p>
    <p><b>&dollar; sudo systemctl start redis-server &nbsp; # To start it</b></p>
    <p><b>&dollar; redis-server /etc/redis/redis.conf &nbsp; # To start it manually</b></p>
</span>

In [None]:
import datetime as dt
import itertools
import os
from calendar import monthrange
from random import randint

# from google.cloud import bigquery
import geopandas as gpd
import ml2rt  # for loading and serializing models
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
import redis
from geopy.distance import distance
from ipyleaflet import Map, basemaps, basemap_to_tiles, Polyline, Polygon, Circle, Rectangle, LayerGroup, LayersControl
from palettable.lightbartlein.sequential import Blues10_4  # For Shipping Lanes
from palettable.cartocolors.qualitative import Bold_10, Pastel_10  # For ships & tracks
from redisearch import Client, TextField, NumericField, Query

# All three Redis modules have a constructor named "Client". 
# To avoid name clashes, import the modules just before you need them
# from redistimeseries.client import Client
# from redisai import Client, DType, Tensor, Backend, Device

CODE_DIR = "/media/rock/x/data/ais_20180921/"
DATA_DIR = CODE_DIR + "output/"
os.chdir(CODE_DIR)

## os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "/media/rock/x/data/ais_20180921/ais-bigquery-sa.json"
## client = bigquery.Client()
## dataset_id = 'by_ymd_3m'  # for BigQuery
# https://googleapis.dev/python/bigquery/latest/index.html

r = redis.Redis()
# r = redis.Redis(host='localhost', port=6379, db=0, password=None, socket_timeout=None)
# https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.html

esri_satellite_layer = basemap_to_tiles(bm=basemaps.Esri.WorldImagery)
GOOGLE_MAPS_API_KEY = "xxxxxxxxxxxxxxx"
# jupyter nbextension enable --py --sys-prefix ipyleaflet  # For Jupyter Notebook
# jupyter labextension install @jupyter-widgets/jupyterlab-manager jupyter-leaflet  # for JupyterLab

<span style="color:blue">
    <h2><b>BigQuery</b></h2>
    <p><b>BigQuery can read partitioned parquet datasets that were produced in Spark</b></p>
</span>

In [None]:
# INCOMPLETE
# https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-parquet
dataset_ref = client.dataset(dataset_id)
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.PARQUET
uri = "gs://ais-rmp/output/by_ymd_3m_pqt/"

load_job = client.load_table_from_uri(uri, dataset_ref.table("by_ymd_3m"), job_config=job_config)

<span style="color:blue">
    <h2><b>PyArrow, Pandas</b></h2>
    <p><b>PyArrow can read partitioned parquet datasets on the desktop</b></p>
    <p><b>The dataset is a directory named "by_ymd_3m", with nested subdirectories for year, month, and day.</b></p>
</span>

In [None]:
def vessels_one_minute(y, m, d, H, M, S):
    """
    Returns a dataframe of vessels in Puget Sound at the specified time
    """
    DIR = DATA_DIR + "by_ymd_3m_pqt" + "/year=" + str(y) + "/month=" + str(m) + "/day=" + str(d)
    df = pq.read_table(DIR).to_pandas()  # Create a pandas dataframe from the pyarrow table
    vessels_minute = df.loc[(df['localtime'] > pd.Timestamp(year=y, month=m, day=d, hour=H, minute=M-1, second=S)) & 
                            (df['localtime'] <= pd.Timestamp(year=y, month=m, day=d, hour=H, minute=M, second=S))]
    return vessels_minute


def vessels_one_hour(y, m, d, H, M, S):
    """
    Returns a dataframe of vessels in Puget Sound at the specified time
    This dataframe is used in the RedisTimeSeries section
    """
    DIR = DATA_DIR + "by_ymd_3m_pqt" + "/year=" + str(y) + "/month=" + str(m) + "/day=" + str(d)
    df = pq.read_table(DIR).to_pandas()
    vessels_hour = df.loc[(df['localtime'] > pd.Timestamp(year=y, month=m, day=d, hour=H-1, minute=M, second=S)) & 
                            (df['localtime'] <= pd.Timestamp(year=y, month=m, day=d, hour=H, minute=M, second=S))]
    return vessels_hour

In [None]:
def create_geokey(keyname, df):
    """
    Creates a Geospatial key from a DataFrame, df.
    Returns nothing
    """
    lonlatnames = [[df['lon'].iloc[row], df['lat'].iloc[row], str(df['mmsi'].iloc[row])] for row in range(len(df.index))]
    concat_list = list(itertools.chain(*lonlatnames))
    r.geoadd(keyname, *concat_list)
    return


def get_vessel_metadata(mmsi):
    return vessels.loc[vessels.mmsi == mmsi]

In [None]:
# Select a random time point in 3 months / 3 years. Pick 1 minute / 1 hour of data before that time point.
y = 2017
m = randint(10, 12)
d = randint(1, monthrange(y,m)[1])
H = randint(1, 23)
M = randint(0, 59)
S = randint(0, 59)

In [None]:
y,m,d,H,M,S
# vessels_one_hour(2017, 11, 3, 17, 58, 35)

<span style="color:black">
    <h3>Read Metadata</h3>
    <p><b>Attributes for each vessel</b></p>
</span>

In [None]:
# RUN: Read the metadata file which has "vessel_group" column added
col_names = ['mmsi', 'vessel_code', 'vessel_name', 'imo', 'call_sign', 'l', 'w', 'draft', 'cargo', 'vessel_group']
types = ['int32', 'category', 'object', 'object', 'object', 'float64', 'float64', 'float64', 'category', 'category']
col_dtypes = dict(zip(col_names, types))

metadata_file = DATA_DIR + 'metadata_vg_3m.csv'  # !!! CHANGE for 3 months <==> 3 years
# metadata_file = DATA_DIR + 'metadata_vg_3y.csv'  # !!! CHANGE for 3 months <==> 3 years

vessels = pd.read_csv(metadata_file, header=0, names=col_names, dtype=col_dtypes)

In [None]:
vgroup2color = {'pleasure': Bold_10.hex_colors[7], 
                'publicService': "red", 
                'passenger': Bold_10.hex_colors[1], 
                'cargo': Bold_10.hex_colors[8], 
                'tanker': Bold_10.hex_colors[3], 
                'fishing': Blues10_4.hex_colors[0], 
                'tugTow': Bold_10.hex_colors[6],  
                'military': "olive", 
                'research': "violet",
                'unknown': "white"
               }

vgroups = list(vgroup2color.keys())

<span style="color:black">
    <h3>Read Motion data from Parquet</h3>
</span>

In [None]:
vm = vessels_one_minute(y, m, d, H, M, S)
# vm = vessels_one_minute(2017, 11, 3, 17, 58, 35)
vm_moving = vm.loc[vm['sog'] >= 4]
vm_stationary = vm.loc[vm['sog'] < 4]

<span style="color:black">
    <h3>Create Geospatial Keys in Redis</h3>
</span>

In [None]:
# DO NOT RUN if Keys already exist
create_geokey("moving", vm_moving)
create_geokey("stationary", vm_stationary)
create_geokey("all", vm)

<span style="color:black">
    <h3>Get the MMSIs for the moving vessels from the Sorted Set</h3>
</span>

In [None]:
moving_mmsis = [mmsi.decode("utf-8") for mmsi in r.zrange("moving", 0, -1)]
moving_mmsis[:10]

<span style="color:black">
    <h3>Geospatial Functions in Redis</h3>
    <p>GEOPOS, GEODIST, GEORADIUSBYMEMBER, GEOADD, GEOHASH, GEORADIUS</p>
</span>

In [None]:
r.geopos("moving", moving_mmsis[10])[0]

In [None]:
r.geodist("moving", moving_mmsis[10], moving_mmsis[11], unit="mi")

In [None]:
r.georadiusbymember("moving", moving_mmsis[10], 3, unit="mi", withdist=True)

In [None]:
r.georadiusbymember("all", moving_mmsis[10], 3, unit="mi", withdist=True)

<span style="color:blue">
    <h2><b>Map Vessel Locations</b></h2>
</span>

<span style="color:black">
    <h3>Read the shapefile for Puget Sound shipping lanes with GeoPandas</h3>
</span>

In [None]:
sss = gpd.read_file('/media/rock/x/data/ais_20180921/shapefiles/shiplanes_puget/shiplanes_puget.shp')
# The US Shiplanes shapefile has been cut to (lat BETWEEN 47 AND 49.45) AND (lon BETWEEN -123.75 AND -122)

In [None]:
def map_shipping_lanes(mmsi, zoom=11):
    """
    Creates a leaflet map, with shipping lanes and stationary vessels (white circles)
    """
    vessel_location = r.geopos("moving", mmsi)[0]
    m = Map(center=(vessel_location[1], vessel_location[0]), zoom=zoom)
    m.layout.width = '65%'
    m.layout.height = '1000px'
    m.add_layer(esri_satellite_layer)
    m.add_control(LayersControl())
    
    # Precautionary Areas
    precAreas_gs = sss[sss.OBJL == '96'].geometry  
    num_precAreas = len(precAreas_gs)
    precAreas = [None] * num_precAreas
    for i in range(num_precAreas):
        precAreas[i] = [(((precAreas_gs.iloc[i]).exterior.coords)[k][1], ((precAreas_gs.iloc[i]).exterior.coords)[k][0]) for k in range(len((precAreas_gs.iloc[i]).exterior.coords))]
    
    # Traffic Separation Schemes
    sepSchemes_gs = sss[sss.OBJL == '150'].geometry
    num_sepSchemes = len(sepSchemes_gs)
    sepSchemes = [None] * num_sepSchemes
    for i in range(num_sepSchemes):
        sepSchemes[i] = [(((sepSchemes_gs.iloc[i]).exterior.coords)[k][1], ((sepSchemes_gs.iloc[i]).exterior.coords)[k][0]) for k in range(len((sepSchemes_gs.iloc[i]).exterior.coords))]
    
    # Shipping Lanes
    shiplanes_gs = sss[(sss.OBJL == '148') | (sss.OBJL == '152')].geometry
    num_shiplanes = len(shiplanes_gs)
    shiplanes = [None] * num_shiplanes
    for i in range(num_shiplanes):
        shiplanes[i] = [(((shiplanes_gs.iloc[i]).exterior.coords)[k][1], ((shiplanes_gs.iloc[i]).exterior.coords)[k][0]) for k in range(len((shiplanes_gs.iloc[i]).exterior.coords))]
    
    colors = Blues10_4.hex_colors
    precAreas_polygons_list = [Polygon(locations=precAreas[i], color='red', weight=5, fill_color='red') for i in range(num_precAreas)]
    sepSchemes_polygons_list = [Polygon(locations=sepSchemes[i], color='black', weight=1, fill_color='blue', fill_opacity=0.7) for i in range(num_sepSchemes)]
    shiplanes_polygons_list = [Polygon(locations=shiplanes[i], color='black', weight=1, fill_color=colors[2], fill_opacity=0.7) for i in range(num_shiplanes)]  # fill_color=colors[i % 8]
    
    precAreas_layer_group = LayerGroup(layers=tuple(precAreas_polygons_list))
    sepSchemes_layer_group = LayerGroup(layers=tuple(sepSchemes_polygons_list))
    shiplanes_layer_group = LayerGroup(layers=tuple(shiplanes_polygons_list))
    m.add_layer(precAreas_layer_group)
    m.add_layer(sepSchemes_layer_group)
    m.add_layer(shiplanes_layer_group)
    
    # Add stationary vessels
    stat_mmsis = [mmsi.decode("utf-8") for mmsi in r.zrange("stationary", 0, -1)]
    stat_latlons = r.geopos("stationary", *stat_mmsis)
    stat_vessels = [Circle(location=(stat_latlons[i][1], stat_latlons[i][0]), radius=30, color='white', weight=2, fill_color='white', fill_opacity=0.6) for i in range(len(stat_latlons))] # radius in meters, not pixels
    stat_layer_group = LayerGroup(layers=tuple(stat_vessels))
    m.add_layer(stat_layer_group)
    
    return m

In [None]:
map_shipping_lanes(moving_mmsis[10], 11)

<span style="color:black">
    <h3>Use GEOPOS to get the latitude and longitude of moving vessels</h3>
</span>

In [None]:
def map_moving_vessels(mmsi, zoom=11):
    """
    Adds moving vessels (speed > 4 knots) as magenta circles
    """
    m = map_shipping_lanes(mmsi, zoom=zoom)
    # Add moving vessels
    moving_latlons = r.geopos("moving", *moving_mmsis)
    moving_vessels = [Circle(location=(moving_latlons[i][1], moving_latlons[i][0]), radius=120, color='magenta', weight=2, fill_color='magenta', fill_opacity=0.6) for i in range(len(moving_latlons))] # radius in meters, not pixels
    moving_layer_group = LayerGroup(layers=tuple(moving_vessels))
    m.add_layer(moving_layer_group)
    
    return m

In [None]:
map_moving_vessels(moving_mmsis[10], zoom=11)

<span style="color:darkred">
    <h1><b>Redis Modules</b></h1>
    <h3>RediSearch, RedisTimeSeries, RedisAI</h3>
</span>

### Loading Modules:

#### 1. From python:   
os.system("redis-server --loadmodule ./redisearch.so")  


#### 2. From the configuration file /etc/redis/redis.conf  
loadmodule /path/to/redisearch.so  


#### 3. From the CLI (redis-cli)  
127.0.0.6379> MODULE load /path/to/redisearch.so  

https://oss.redislabs.com/redisearch/Commands.html#ftsearch  

https://oss.redislabs.com/redisearch/Query_Syntax.html

<span style="color:blue">
    <h2><b>RediSearch</b></h2>
</span>

In [None]:
from redisearch import Client, TextField, NumericField, Query

# Creating a client with a index name, metadataIndex
meta = Client('metadataIndex')

# Creating the index definition and schema
meta.create_index([TextField('vessel_group'), NumericField('length'), NumericField('width'), NumericField('draft')])  # DO NOT DELETE

<span style="color:black">
    <h3>Add Documents to the Search Index from Metadata</h3>
</span>

In [None]:
# DO NOT RUN if the Index & Keys have been created
for row in range(len(vessels.index)):
    meta.add_document(doc_id="meta_"+str(vessels['mmsi'].iloc[row]), replace=True, 
                      vessel_group=vessels['vessel_group'].iloc[row] if str(vessels['vessel_group'].iloc[row]) != 'nan' else 'unknown',
                      length=vessels['l'].iloc[row] if not np.isnan(vessels['l'].iloc[row]) else -999,
                      width=vessels['w'].iloc[row] if not np.isnan(vessels['w'].iloc[row]) else -999,
                      draft=vessels['draft'].iloc[row] if not np.isnan(vessels['draft'].iloc[row]) else -999)

<span style="color:black">
    <h3>Query the Search Index</h3>
</span>

In [None]:
results = {vg:meta.search(Query(vg).no_content().paging(0,5000)).docs for vg in vgroups}
vg2mmsiList = {vg:[doc.id[5:] for doc in results[vg]] for vg in vgroups}
mmsi2vg = {mmsi:vg for vg in vgroups for mmsi in vg2mmsiList[vg]}

In [None]:
{vg:len(vg2mmsiList[vg]) for vg in vgroups}

<span style="color:black">
    <h3>Color-code moving vessels (passenger, cargo, fishing etc) using RediSearch query results</h3>
</span>

In [None]:
moving_mmsis = [mmsi.decode("utf-8") for mmsi in r.zrange("moving", 0, -1)]
moving_mmsi_vgroups = [mmsi2vg.get(mmsi) for mmsi in moving_mmsis]
moving_mmsi_colors = [vgroup2color[vg] for vg in moving_mmsi_vgroups]

In [None]:
def map_moving_vessels_by_color(mmsi, zoom=11):
    m = map_shipping_lanes(mmsi, zoom=zoom)
    # Add moving vessels
    moving_latlons = r.geopos("moving", *moving_mmsis)
    moving_vessels = [Circle(location=(moving_latlons[i][1], moving_latlons[i][0]), radius=200, 
                             color=moving_mmsi_colors[i], weight=2, fill_color=moving_mmsi_colors[i], fill_opacity=0.8) for i in range(len(moving_latlons))]
    moving_layer_group = LayerGroup(layers=tuple(moving_vessels))
    m.add_layer(moving_layer_group)
    
    return m

In [None]:
map_moving_vessels_by_color(moving_mmsis[10], zoom=11)

<span style="color:blue">
    <h2><b>Redis Time Series</b></h2>
</span>

In [None]:
from redistimeseries.client import Client
rts = Client()

In [None]:
v1h = vessels_one_hour(y, m, d, H, M, S)
# v1h = vessels_one_hour(2017, 11, 3, 17, 58, 35)
mmsi2latlon = {str(mmsi):(v1h.loc[v1h['mmsi'] == int(mmsi)]['lat'].values, v1h.loc[v1h['mmsi'] == int(mmsi)]['lon'].values) for mmsi in moving_mmsis}

<span style="color:black">
    <h3>Create Keys to hold the Time Series data with 1 hour retention</h3>
</span>

In [None]:
# DO NOT RUN if Keys exist
for mmsi in moving_mmsis:
    rts.create("ts_lat_" + mmsi, retention_msecs=3600000, labels={'Time':'Lat'})
    rts.create("ts_lon_" + mmsi, retention_msecs=3600000, labels={'Time':'Lon'})
    
# r.keys(pattern=u'ts_lat*')
# rts.get('ts_lat_' + mmsis[10])

In [None]:
# DO NOT RUN if Keys exist
for mmsi in moving_mmsis:
    v1h_subset = v1h.loc[v1h['mmsi'] == int(mmsi)]
    lats = v1h_subset['lat'].values
    lons = v1h_subset['lon'].values
    time = v1h_subset['localtime']
    for i in range(len(lats)):
        rts.add("ts_lat_" + mmsi, int(time.iloc[i].timestamp())*1000, lats[i])
        rts.add("ts_lon_" + mmsi, int(time.iloc[i].timestamp())*1000, lons[i])
        
# time is UNIX timestamp in milliseconds
# YES, time = v1h_subset['localtime'] is a pandas series of type pd.Timestamp. Then int(time.iloc[i].timestamp())
# NO, time = v1h_subset['localtime'].apply(lambda lt: int(lt.timestamp())) seems to be the same as above, but we get an int64 type, which is not accepted
# NO, time = v1h_subset['localtime'].apply(lambda lt: int(lt.timestamp())).values. The array has type np.datetime64, which is not accepted

<span style="color:black">
    <h3>Map Vessel Tracks with RedisTimeSeries</h3>
</span>

In [None]:
def map_vessel_tracks_redis_ts(mmsi, zoom=11):
    m = map_moving_vessels_by_color(mmsi, zoom=zoom)
    mmsi2lats = {mmsi:[float(tup[1]) for tup in rts.range(key='ts_lat_' + mmsi, from_time='0', to_time='+')] for mmsi in moving_mmsis}
    mmsi2lons = {mmsi:[float(tup[1]) for tup in rts.range(key='ts_lon_' + mmsi, from_time='0', to_time='+')] for mmsi in moving_mmsis}
    vessel_tracks = [Polyline(locations=[[mmsi2lats[moving_mmsis[j]][i], mmsi2lons[moving_mmsis[j]][i]] for i in range(len(mmsi2lats[moving_mmsis[j]]))], 
                      color=moving_mmsi_colors[j], fill_color=moving_mmsi_colors[j], fill_opacity=0.0) for j in range(len(moving_mmsis))]
    tracks_layer_group = LayerGroup(layers=tuple(vessel_tracks))
    m.add_layer(tracks_layer_group)
    return m

In [None]:
map_vessel_tracks_redis_ts(moving_mmsis[10], zoom=11)

In [None]:
get_vessel_metadata(int(moving_mmsis[10]))

<span style="color:blue">
    <h2><b>RedisAI</b></h2>
</span>

In [None]:
from redisai import Client, DType, Tensor, Backend, Device
# import ml2rt
client = Client()
# https://oss.redislabs.com/redisai/  # docs
# https://github.com/RedisAI/redisai-py  # good
# https://github.com/RedisAI/RedisAI
# https://github.com/RedisAI/redisai-examples

# https://pypi.org/project/ml2rt/
# ML utilities: Converting models (sparkml, sklearn, xgboost to ONNX), serializing models to disk, loading it back to redisai-py

<span style="color:black">
    <h3>Load the Backend Runtime</h3>
    <h4>using redisai-py or redis-cli</h4>
</span>

<span style="color:black">
    <ul>
    <li>client.loadbackend('TORCH', '/home/rock/git/RedisAI/install-cpu/backends/redisai_torch/redisai_torch.so')</li>
    <li>client.loadbackend('TF', '/home/rock/git/RedisAI/install-cpu/backends/redisai_tensorflow/redisai_tensorflow.so')</li>
    <li>client.loadbackend('ONNX', '/home/rock/git/RedisAI/install-cpu/backends/redisai_onnxruntime/redisai_onnxruntime.so')</li>
    </ul>
    <p>&dollar; redis-cli AI.CONFIG LOADBACKEND TF /home/rock/git/RedisAI/install-cpu/backends/redisai_tensorflow/redisai_tensorflow.so</p>
    <p>&dollar; redis-cli -x AI.MODELSET foo TF CPU INPUTS a b OUTPUTS c < test/test_data/graph.pb</p>
</span>

<span style="color:black">
    <h3>(Convert) and Load the Model with ml2rt</h3>
</span>

<span style="color:black">
<p>model = ml2rt.load_model('/home/rock/git/RedisAI/test/test_data/graph.pb')</p>
<p>client.modelset('m', Backend.tf, Device.cpu, inputs=['input_1', 'input_2'], outputs='output', data=model)</p>
<p># ?client.modelset</p>
</span>

<span style="color:black">
    <h3>Get the (naive) predicted tracks for the next 5 minutes</h3>
</span>

In [None]:
# This does not use RedisAI yet. It estimates the 5-minute position using the geodesic distance in geopy
# from geopy.distance import distance
vm_moving['dest'] = vm_moving.apply(lambda row: distance(miles=row.sog * 5/60 * 1.15078).destination((row.lat, row.lon), row.cog), axis=1)

# lat2,lon2,_ = distance(miles=distMiles).destination((lat1, lon1), bearing)
# 1 knot = 1.15078 miles/hour
# ~/anaconda3/lib/python3.7/site-packages/geopy/distance.py  # bearing is in degrees, not radians

In [None]:
def map_predicted_track(mmsi, zoom=11):
    m = map_shipping_lanes(mmsi, zoom=11)
    # Add moving vessels
    moving_latlons = r.geopos("moving", *moving_mmsis)
    moving_vessels = [Rectangle(bounds=((moving_latlons[i][1]-0.0010, moving_latlons[i][0]-0.0015), (moving_latlons[i][1]+0.0010, moving_latlons[i][0]+0.0015)),
                               color=moving_mmsi_colors[i], weight=2, fill_color=moving_mmsi_colors[i], fill_opacity=0.8) for i in range(len(moving_latlons))]
    moving_layer_group = LayerGroup(layers=tuple(moving_vessels))
    m.add_layer(moving_layer_group)
    predicted_tracks = [Polyline(locations=[[vm_moving[vm_moving['mmsi'] == int(moving_mmsis[j])]['lat'].iloc[0],vm_moving[vm_moving['mmsi'] == int(moving_mmsis[j])]['lon'].iloc[0]], 
                                            [vm_moving[vm_moving['mmsi'] == int(moving_mmsis[j])]['dest'].iloc[0][0],vm_moving[vm_moving['mmsi'] == int(moving_mmsis[j])]['dest'].iloc[0][1]]], 
                                 color=moving_mmsi_colors[j], fill_color=moving_mmsi_colors[j], fill_opacity=0.0) for j in range(len(moving_mmsis))]
    pred_tracks_layer_group = LayerGroup(layers=tuple(predicted_tracks))
    m.add_layer(pred_tracks_layer_group)
    
    return m

In [None]:
map_predicted_track(moving_mmsis[10], zoom=11)