In [None]:
import uuid, base64, arcpy, json
from datetime import datetime as dt, timedelta
from arcgis.geoanalytics.manage_data import run_python_script
from arcgis.features import Feature, GeoAccessor, FeatureSet, Table
from arcgis.gis import GIS
gis = GIS("home")

In [None]:
'''Parameters for Analytics'''
DWELL_TIME = 4
DWELL_TIME_UNIT = 'HOURS'
DWELL_DIST = 5
DWELL_DIST_UNIT = 'KILOMETERS'
MEETING_DIST = 500
GAP_TIME = 300 #minutes
UID_FIELD = 'mmsi'
TIME_FIELD = 'timestamp'
FLAG_FIELD = 'flag_short_code'
IMO_field = 'imo'
NAME_FIELD = 'name'
ME_GUID = '<PORTAL ITEM ID FOR MOTION EVENT LAYER>'
PORTAL_URL = '<URL FOR PORTAL>'

In [None]:
'''Load layers for analytics and outputs'''
# Title: ShipMotionEvents | Type: Feature Service
motionEventLayer = gis.content.get("f477a82e804840b7839c556631f06052").layers[0]

# Title: SPIRE_Last24hr | Type: Feature Service
ais = gis.content.get("94d10cafdfb446d88312b68c8637f9f7").layers[0] #ais should contain the last 6 hours of data in AOI

# Title: World Exclusive Economic Zone Boundaries | Type: Feature Service | Owner: esri_livingatlas
eez = gis.content.get("9c707fa7131b4462a08b8bf2e06bf4ad").layers[1]

# Title: World Port Index 10 nm Buffer | Type: Feature Service
ports_buffer = gis.content.get("fe8503bda27949a8a76668d1d1365d8c").layers[0]

# Title: Combined_Maritime_AOIs | Type: Feature Service
global_aois = gis.content.get("1b6e8066521b43cb93d8a82e262b7587").layers[0]

# Title: motion_event_nb_history | Type: Feature Service
me_history_table = gis.content.get("3916a94ba928495bbea84454d673a8ee")
me_his_tbl = Table(me_history_table.url + '/2')

In [None]:
'''manually clean old rows from motion event data'''
retain_time = dt.now() - timedelta(days=14) #retain data for 2 weeks
query = "starttime < TIMESTAMP '{}'".format(retain_time.strftime('%Y-%m-%d %H:%M:%S'))
try:
    result = motionEventLayer.delete_features(where=query)
    print('Features Deleted:',len(result['deleteResults']))
    print('---------------------------------------------')
    print('Features Failed to Delete:')
    for r in result['deleteResults']:
        if r['success'] == False:
            print(r)
except Exception as e:
    print(e)

In [None]:
def PrettyPrintGPResult(result,printSparkCodes=False):
    '''
    util function to print "printed" lines from python
    in a GP Spark Job
        
    @result: GP Job result object
    '''
    for r in result:
        row = r['description']
        try:
            jrow = json.loads(row)
            if 'messageCode' in jrow.keys():
                if (printSparkCodes == False) and not (jrow["messageCode"] == "BD_101029"):
                    print(jrow["message"])
        except:
            print(row)

In [None]:
def GetConfig():
    '''
    util function to get config file from user's notebook files
    '''
    import json
    path = r'/arcgis/home/notebook_config.txt'
    return json.loads(open(path,'r').read())

In [1]:
def abi_processes():
    '''Overhead function to be passed to the geoanalytics.run_python_script function'''
    
    import os, uuid, base64, arcpy, json, sys, warnings, requests
    from heapq import heappush, heappop
    from pyspark.sql import functions as f
    from pyspark.sql import Window, SparkSession
    from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType
    from datetime import datetime, timedelta
    from arcgis.features import Feature, FeatureSet, FeatureLayer
    from arcgis.geoanalytics.find_locations import find_dwell_locations
    from arcgis.gis import GIS
    
    class ShipMotionEvent():
        '''a consolidated class for handling generic motion events for ships

        Params:
        @eventType : legal values are DWELL, ENTER-PORT, EXIT-PORT, MEETING, TRACK-LOST, TRACK-FOUND, TRIPWIRE
        @startTime : datetime obj
        @endTime : datetime obj
        @mmsi : str of the ship uid
        @eventID : guid obj
        @aoi : str name of the aoi associated to the event
        @nearestPOI : str name of nearest POI to the event
        @algorithmParams : str the parameters of the detection algorithm
        @latitude : float latitude
        @longitude : float longitude
        @flag : str flag of ship
        @name : str name of ship
        @imo : str imo of ship
        '''
      
        def __init__(self, eventType=None, startTime=None,endTime=None,mmsi=None,eventID=None,aoi=None,nearestPOI=None,
                     algorithmParams=None,latitude=None,longitude=None,geometry=None,flag=None,name=None,imo=None):
            self.eventtype = eventType
            self.starttime = startTime
            self.endtime = endTime
            self.mmsi = mmsi
            self.eventid = eventID if eventID is not None else ShipMotionEvent.generateUUID()
            self.aoi = aoi #use for name of tripwire, port, locations new event
            self.nearestpoi = nearestPOI
            self.algorithmparams = algorithmParams
            self.latitude = latitude
            self.longitude = longitude
            self.geometry = geometry
            self.flag = flag
            self.name = name
            self.imo = imo
            
        def toDict(self):
            '''returns attributes as a python dictionary'''
            return self.__dict__

        def toList(self):
            '''returns attributes as a python list'''
            return list(self.__dict__.values())

        def toFeature(self):
            '''creates an arcigs.features.Feature of the motion event'''
            geom = json.loads(arcpy.PointGeometry(arcpy.Point(self.longitude,self.latitude),arcpy.SpatialReference(4326)).JSON)
            attr = self.__dict__
            output = {'attributes':attr,
                     'geometry':geom}
            return Feature(geom,attr)
        
        @classmethod
        def fromFeature(cls,feat):
            '''creates ship motion event obj from a arcgis.features.Feature obj'''
            data = cls()
            data.geometry = feat['geometry'] if 'geometry' in feat.keys() else None
            feat = feat['attributes']
            
            data.eventtype = feat['eventtype'] if 'eventtype' in feat.keys() else None
            data.starttime = feat['starttime'] if 'starttime' in feat.keys() else None
            data.endtime = feat['endtime'] if 'endtime' in feat.keys() else None
            data.mmsi = feat['mmsi'] if 'mmsi' in feat.keys() else None
            data.eventid = feat['eventid'] if 'eventid' in feat.keys() else None
            data.aoi = feat['aoi'] if 'aoi' in feat.keys() else None
            data.nearestpoi = feat['nearestpoi'] if 'nearestpoi' in feat.keys() else None
            data.algorithmparams = feat['algorithmparams'] if 'algorithmparams' in feat.keys() else None
            data.latitude = feat['latitude'] if 'latitude' in feat.keys() else None
            data.longitude = feat['longitude'] if 'longitude' in feat.keys() else None
            data.flag = feat['flag'] if 'flag' in feat.keys() else None
            data.name = feat['name'] if 'name' in feat.keys() else None
            data.imo = feat['imo'] if 'imo' in feat.keys() else None
            
            if not data.starttime is None:
                data.starttime = datetime.utcfromtimestamp(data.starttime/1000)
            if not data.endtime is None:
                data.endtime = datetime.utcfromtimestamp(data.endtime/1000)
            return data
        
        @staticmethod
        def generateUUID():
            '''
            Generates a base64 encoded ISO standard UUID. Suitable for generating
            new UIDs for tracks, detections, and Motion Events
            '''
            return base64.b64encode(uuid.uuid4().bytes).decode('utf-8')
        
    class ABI():
        def __init__(self,uid_field='mmsi',flag_field='flag_short_code',name_field='name',imo_field='imo',gap_time=60,dwell_dist=5,dwell_unit='KILOMETERS',dwell_time=2,dwell_time_unit='HOURS'):
            self.motion_events = []
            self.uid_field = uid_field
            self.flag_field = flag_field
            self.name_field = name_field
            self.imo_field = imo_field
            self.gap_time = gap_time
            self.dwell_dist = dwell_dist
            self.dwell_unit = dwell_unit
            self.dwell_time = dwell_time
            self.dwell_time_unit = dwell_time_unit

        def createGapMEs(self,row):
            '''convert detected time gaps to ShipMotionEvents'''
            try:
                me = ShipMotionEvent(eventType="TRACK-LOST",
                                     startTime=row['prevTime'],
                                     mmsi=row[self.uid_field],
                                     algorithmParams="Gap Time: {} Minutes".format(self.gap_time),
                                     latitude=row['$prevGeom'][1],
                                     longitude=row['$prevGeom'][0],
                                     aoi=row['ANY_aoi_name'],
                                     geometry=row['$prevGeom'],
                                     flag=row[self.flag_field],
                                     name=row[self.name_field],
                                     imo=row[self.imo_field])
                self.motion_events.append(me)

                meFound = ShipMotionEvent(eventType="TRACK-FOUND",
                                         startTime=row['$time'].start,
                                         mmsi=row[self.uid_field],
                                         algorithmParams="Gap Time: {} Minutes".format(self.gap_time),
                                         eventID=me.eventid,
                                         latitude=row['$geometry'][1],
                                         longitude=row['$geometry'][0],
                                         aoi=row['ANY_aoi_name'],
                                         geometry=row['$geometry'],
                                         flag=row[self.flag_field],
                                         name=row[self.name_field],
                                         imo=row[self.imo_field])
                self.motion_events.append(meFound)
            except Exception as e:
                print(e)
            
        def getTimeGapEvents(self,df,aoi_df):
            '''detect time gaps in input track data'''
            try:
                win = Window.partitionBy(self.uid_field).orderBy("$time")
                
                df1 = geoanalytics.join_features(df,aoi_df,spatial_relationship='Intersects',summary_fields=[])
                df1 = df1.withColumn('prevTime',f.lag(f.col('$time.start')).over(win))\
                        .withColumn('$prevGeom',f.lag(f.col('$geometry')).over(win))\
                        .withColumn('timedelta',f.round((f.unix_timestamp(f.col("$time.start")) - f.unix_timestamp(f.col("prevTime")))/60,2))
                time_gap_df = df1.where("timedelta > {}".format(self.gap_time))

                print("FOUND {} TRACK GAPS".format(time_gap_df.count()))
                
                dataCollect = time_gap_df.collect()
                for row in dataCollect:
                    self.createGapMEs(row.asDict())
                    
            except Exception as e:
                print(e)
                
        def createDwellMEs(self,row):
            '''convert detected dwell events to ShipMotionEvents''' 
            try:
                me = ShipMotionEvent(eventType='DWELL',
                                    mmsi=row[self.uid_field],
                                    startTime=row['$time'][0],
                                    endTime=row['$time'][1],
                                    algorithmParams="{} {} for {} {}".format(self.dwell_dist,self.dwell_unit,self.dwell_time,self.dwell_time_unit),
                                    aoi=row['ANY_aoi_name'],
                                    latitude=row['MeanY'],
                                    longitude=row['MeanX'],
                                    geometry=row['$geometry'],
                                    flag=row['ANY_{}'.format(self.flag_field)],
                                    name=row['ANY_{}'.format(self.name_field)],
                                    imo=row['MIN_{}'.format(self.imo_field)])
                self.motion_events.append(me)
            except Exception as e:
                print(e)
                
        def getDwellEvents(self,df,aoi_df):
            '''detect dwell events in input track data'''
            try:
                dwellResult1 = geoanalytics.find_dwell_locations(input_layer=df,
                                                   track_fields=[self.uid_field],
                                                   distance_tolerance=self.dwell_dist,
                                                   distance_tolerance_unit=self.dwell_unit,
                                                   time_tolerance=self.dwell_time,
                                                   time_tolerance_unit=self.dwell_time_unit)
                
                dwellResult = geoanalytics.join_features(dwellResult1,aoi_df,spatial_relationship='Intersects',summary_fields=[])
                
                print("FOUND {} DWELL EVENTS".format(dwellResult.count()))
                
                dataCollect = dwellResult.collect()
                for row in dataCollect:
                    self.createDwellMEs(row.asDict())
                
            except Exception as e:
                print(e)
         
        
        def createPortMEs(self,row):
            '''convert detected port events to ShipMotionEvents'''
            try:
                try:
                    if row['motion_event'] == 'ENTER-PORT':
                        port = row['Prev_Name_ID']
                    else:
                        port = row['ANY_port_name']
                except:
                    port = ''
                    
                me = ShipMotionEvent(eventType=row['motion_event'],
                                     mmsi=row[self.uid_field],
                                     startTime=row['$time'].start,
                                     aoi=row['ANY_aoi_name'],
                                     latitude=row['$geometry'][1],
                                     longitude=row['$geometry'][0],
                                     geometry=row['$geometry'],
                                     flag=row[self.flag_field],
                                     name=row[self.name_field],
                                     imo=row[self.imo_field])
                self.motion_events.append(me)
                                      
            except Exception as e:
                print(e)
                
        def getPortEvents(self,df,ports_df,aoi_df):
            '''detect port events in input track data'''
            try:
                join1_result = geoanalytics.join_features(df,
                                                        ports_df,
                                                        keep_all_target_features=True,
                                                        spatial_relationship='Within',
                                                        summary_fields=[])
                join_result = geoanalytics.join_features(join1_result,aoi_df,spatial_relationship='Intersects',summary_fields=[])
                
                join_result = join_result.select(self.uid_field,self.name_field,self.flag_field,self.imo_field,"$time","ANY_port_name","ANY_aoi_name","$geometry")
                join_result = join_result.na.fill("no port",subset=["ANY_port_name"])
                
                win = Window.partitionBy(self.uid_field).orderBy("$time")
                join_result = join_result.withColumn('Prev_Name_ID',f.lag(f.col('ANY_port_name')).over(win))
                join_result = join_result.where(f.col('ANY_port_name') != f.col('Prev_Name_ID'))
                
                def testEnterExit(currentPt,prevPt):
                    if prevPt == 'no port':
                        return 'EXIT-PORT'
                    elif currentPt == 'no port':
                        return 'ENTER-PORT'
                    else:
                        return None

                event_test = f.udf(testEnterExit,StringType())
                result = join_result.withColumn("motion_event",event_test('ANY_port_name','Prev_Name_ID'))
                clean_result = result.na.drop(subset=['motion_event'])
                                
                print("FOUND {} PORT EVENTS".format(join_result.count()))
                
                dataCollect = join_result.collect()
                for row in join_result.collect():
                    self.createPortMEs(row.asDict())
                join_result.foreachPartition(self.createPortMEs)
                
            except Exception as e:
                print(e)
        
    if __name__ == "__main__":
        try:           
            DWELL_TIME = user_variables['dwell_time']
            DWELL_TIME_UNIT = user_variables['dwell_time_unit']
            DWELL_DIST = user_variables['dwell_dist']
            DWELL_DIST_UNIT = user_variables['dwell_dist_unit']
            MEETING_DIST = user_variables['meeting_dist']
            GAP_TIME = user_variables['gap_time']
            UID_FIELD = user_variables['uid_field']
            TIME_FIELD = user_variables['time_field']
            FLAG_FIELD = user_variables['flag_field']
            IMO_FIELD = user_variables['imo_field']
            NAME_FIELD = user_variables['name_field']
            GIS_URL = user_variables['gis_url']
            GIS_USR = user_variables['gis_usr']
            GIS_PWD = user_variables['gis_pwd']
            
            gis = GIS(GIS_URL,GIS_USR,GIS_PWD)
            
            spark = SparkSession.builder.getOrCreate()
            df, eez_df, ports_df, aois = layers[1], layers[2], layers[3], layers[4]
            
            df = df.select(UID_FIELD,NAME_FIELD,FLAG_FIELD,IMO_FIELD,"$time","$geometry")
                
            #get points inside an eez
            eez_points = geoanalytics.join_features(df,eez_df,spatial_relationship='Intersects',spatial_near_distance=1000, 
                                                    spatial_near_distance_unit='METERS',summary_fields=[])
            eez_points = eez_points.select(UID_FIELD,NAME_FIELD,FLAG_FIELD,IMO_FIELD,"$time","$geometry")
            
            #get points outside an eez
            international_points = df.subtract(eez_points)
                   
            abi = ABI(uid_field=UID_FIELD,
                      name_field=NAME_FIELD,
                      flag_field=FLAG_FIELD,
                      imo_field=IMO_FIELD,
                      gap_time=GAP_TIME,
                      dwell_dist=DWELL_DIST,
                      dwell_unit=DWELL_DIST_UNIT,
                      dwell_time=DWELL_TIME,
                      dwell_time_unit=DWELL_TIME_UNIT)
            
            print("---------CALCULATING TIME GAPS---------")
            abi.getTimeGapEvents(international_points,aois)
            print("---------CALCULATING DWELLS---------")
            abi.getDwellEvents(international_points,aois)
            print("---------CALCULATING PORT EVENTS---------")
            abi.getPortEvents(eez_points,ports_df,aois)

            motionEvents = gis.content.get(user_variables['me_guid']).layers[0]
            
            if len(abi.motion_events):
                tacrep_mes = []
                feats = []
                for f in abi.motion_events:
                    feats.append(f.toFeature())
                print("---------WRITING {} MOTION EVENTS---------".format(len(feats)))
                edit_result = motionEvents.edit_features(adds=feats)
        except Exception as e:
            print(e)

In [None]:
result = run_python_script(code=abi_processes, gis=gis, 
                           layers=[motionEventLayer,
                                   ais,
                                   eez,
                                   ports_buffer,
                                   global_aois],
                           parameters={'dwell_time':DWELL_TIME,
                                        'dwell_time_unit':DWELL_TIME_UNIT,
                                        'dwell_dist':DWELL_DIST,
                                        'dwell_dist_unit':DWELL_DIST_UNIT,
                                        'meeting_dist':MEETING_DIST,
                                        'gap_time':GAP_TIME,
                                        'uid_field':UID_FIELD,
                                        'time_field':TIME_FIELD,
                                        'flag_field':FLAG_FIELD,
                                        'imo_field':IMO_field,
                                        'name_field':NAME_FIELD,
                                        'me_guid':ME_GUID,
                                        'gis_url':PORTAL_URL,
                                        'gis_usr':GetConfig()['gis_usr'],
                                        'gis_pwd':GetConfig()['gis_pwd']})
PrettyPrintGPResult(result)

In [None]:
ts = dt.now()
text = ''
for r in result:
    row = r['description']
    try:
        jrow = json.loads(row)
        if 'message' in jrow.keys():
            if jrow['message'].startswith('['):
                if jrow['message'][8:].strip().startswith('F'):
                    print(jrow['message'][8:])
                    text = text + jrow['message'][8:] + '<br>'
    except:
        pass

update_result = me_his_tbl.edit_features(adds=[{'attributes': {'lastupdate':ts,'message':text[:-2]}}])
update_result, text