In [None]:
#SPARK BINDINGS
execfile("/etc/spark/conf/spark_1.6.0_binings.py")

In [None]:
#PYTHON IMPORTS
import ais
from Geohash import geohash
import time
import os
import numpy as np
from datetime import datetime
import happybase

In [None]:
#SPARK CONF
# Set distribution mode, appname, and claim resources
master='yarn-client' #"yarn" to run distributed mode in yarn, "local" to run local
#dmode='client' #spark2.0 only
AppName="AIS - streaming Kystverket"
num_executors=2
exec_memory=1 #in GigaByte pr. executor. Tot mem = num_executors*exec_memory
driver_memory=1 #in GigaByte.



#############--==DO NOT EDIT==--###############
from pyspark import SparkConf
sconf=SparkConf()

sconf.set('spark.master',master)
#sconf.set('spark.submit.deployMode',dmode) #spark2.0 only
sconf.set('spark.executor.instances',str(num_executors))#Number of executors
#sconf.set('spark.shuffle.service.enabled',True)
#sconf.set('spark.dynamicAllocation.enabled',True)
sconf.set('spark.executor.memory',str(exec_memory)+'g')
sconf.set('spark.driver.memory',str(driver_memory)+'g')
#sconf.set('spark.executor.cores','4') # number of cores on same worker
sconf.set('spark.app.name',AppName) #Application Name
sconf.set('spark.app.id',AppName)

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(conf=sconf)
###############################################

In [None]:
# Import Spark DataFrame API's
from pyspark import HiveContext
sqlContext = HiveContext(sc)
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.window import Window

from pyspark.sql import Row

In [None]:
if 'ssc' in locals():
    del ssc

batch_interval=1 #Seconds
ssc = StreamingContext(sc, batch_interval)

#Kystverket's open streaming connection:
streaming_host="153.44.253.27"
streaming_port=5631

spark_stream(sc, ssc,sqlContext,streaming_host,streaming_port)

In [None]:
#Stop StreamingContext but keep SparkContext alive for re-use
ssc.stop(stopSparkContext=False)

In [None]:
#Stop SparkContext
sc.stop()

In [None]:
def spark_stream(sc, ssc,sqlContext,streaming_host,streaming_port):
    #Connect to stream
    nmea = ssc.socketTextStream(streaming_host, streaming_port)

    #Set region of interest
    #bbox=[lllat,lllon,urlat,urlon]
    #bbox=[59.522784,10.559235,59.924356,10.806427] <- Oslofjorden 
    bbox=[0,0,100,100]
    
    # Decode and filter bad messages
    nmea_decoded = nmea.map(lambda x: try_decode(x,bbox))
    nmea_decoded = nmea_decoded.filter(lambda x:x!=[])
    
    # Connect to HBase and add metadata about vessel
    nmea_decoded = nmea_decoded.map(lambda x: x+[get_meta_from_mmsi(str(x[0]))["P:imo"],\
                                                 get_meta_from_mmsi(str(x[0]))["P:name"],\
                                                 get_meta_from_mmsi(str(x[0]))["P:type"]])
    
    #since rdd is small collect and save to local FS
    nmea_decoded.map(lambda x: rdd_list_to_str(x)).foreachRDD(lambda rdd: save_to_local(rdd.collect()))
    
    # Run!
    ssc.start()
    ssc.awaitTermination(timeout=10)
    
def save_to_local(rdd_collected):
    filen='ais_'+str(int(time.time()))
    dirpath="/STAGING/DATASETS/AIS/dump08112016/"
    with open(dirpath+filen, 'w') as file_handler:
        for item in rdd_collected:
            file_handler.write(item+'\n')
    os.system("chmod 777 "+dirpath+filen)
    

def rdd_list_to_str(rdd_list): 
    rdd_str=''
    for el in rdd_list:
        rdd_str=rdd_str+','+str(el)
    
    return rdd_str[1:]

def get_meta_from_mmsi(mmsi):
    #Create connection
    connection = happybase.Connection('2.sherpa.client.sysedata.no')
    connection.open()

    table_name="mmsiShipInfo"
    table = connection.table(table_name)
    info_dict=table.row(mmsi)
    if info_dict=={}:
        info_dict={'P:imo': 'not_found','P:mmsi': mmsi,'P:name': 'not_found','P:type': 'not_found'}
    
    connection.close()
    return info_dict

def try_decode(nmea,bbox):
    #bbox=[lllat,lllon,urlat,urlon]
    try:
        x=decode_nmea_no_prefix(nmea)
        lat=x['y']
        lon=x['x']
        
        if lat > bbox[0] and lat < bbox[2] and lon > bbox[1] and lon < bbox[3]:
            decoded_list=[int(x['mmsi']),x['unixtime'],float(x['x']),float(x['y']),x['geohash'],float(x['sog']),float(x['rot']),float(x['cog'])]
        else:
            decoded_list=[]
            
    except:
        decoded_list=[]

    return decoded_list

def decode_nmea_no_prefix(nmea):
    commasplit=nmea.split(',')
    
    nmea_talkerid=commasplit[1].split('\\')[-1]
    fragment_no=commasplit[3]
    seq_message_id=commasplit[4]
    payload=commasplit[-2]
    fill_bits=int(commasplit[-1][0])

    #Decode ais payload
    msg_type=[]
    try:
        aisdata=ais.decode(payload,fill_bits)
        msg_type=int(aisdata['id'])
    except:
        try:
            fill_bits=2
            aisdata=ais.decode(payload,fill_bits)
            msg_type=int(aisdata['id'])
        except:
            msg_type=30
            aisdata={'id':msg_type}
    if msg_type==20:
        aisdata=unroll_msg20(aisdata)

    if 'x' in aisdata and 'y' in aisdata: # and 'x'!=181 and 'y'!=91: # x- longitude , y- latitude
        try:
            aisdata[u'geohash'] = geohash.encode(aisdata['y'],aisdata['x'],13)
        except:
            aisdata[u'geohash'] = '0'


    #Append NMEA Tag Blocks         
    aisdata[u'unixtime'] = int(time.time()) # since no timestamp is included, set it to utc.now
    aisdata[u'n_talkerid'] = nmea_talkerid
    aisdata[u'n_fragmentno'] = fragment_no
    aisdata[u'n_seqmsg'] = seq_message_id
    aisdata[u'n_aispayload'] = payload
    aisdata[u'n_fillbits'] = fill_bits
   
    return aisdata