In [1]:
from elasticsearch import Elasticsearch
from kafka import KafkaConsumer
import os
import json
from datetime import datetime

es = Elasticsearch([{'host': 'localhost', 'port': 9200}])

#consumer = KafkaConsumer('passenger', group_id = 1)
#for message in consumer:
#    message = json.loads(message.value)
#    print "{}".format(message)
#    incoming = passenger(message)
#    incoming.store()
#    consumer.commit()
#consumer.close()

In [2]:
driver_mapping = {
  'mappings': {
    'rolling': {
      'properties': {
        'id': {'type': 'string'},
        'status': {'type': 'string'},
        'location': {'type': 'geo_point', 'lat_lon': 'true'},
        'ctime': {'type': 'date'},
        'p1': {'type': 'string'},
        'p2': {'type': 'string'},
        'destination': {'type': 'geo_point', 'lat_lon': 'true'},
        'destinationid':{'type': 'string'},
        'altdest1': {'type': 'geo_point', 'lat_lon': 'true'},
        'altdest2id':{'type': 'string'},                
        'altdest2': {'type': 'geo_point', 'lat_lon': 'true'},
        'altdest2id':{'type': 'string'},                
      }
    }
  }
}


pass_mapping = {
  'mappings': {
    'rolling': {
      'properties': {
        'id': {'type': 'string'},
        'status': {'type': 'string'},
        'match': {'type': 'string'},                
        'location': {'type': 'geo_point', 'lat_lon': 'true'},
        'ctime': {'type': 'date'},
        'driver': {'type': 'string'},
        'destination': {'type': 'geo_point', 'lat_lon': 'true'},
        'destinationid':{'type': 'string'},                
        'altdest1': {'type': 'geo_point', 'lat_lon': 'true'},
        'altdestid1':{'type': 'string'},                    
        'altdest2': {'type': 'geo_point', 'lat_lon': 'true'},
        'altdestid2':{'type': 'string'},                
      }
    }
  }
}


#es.indices.delete(index='driver', ignore=[400, 404])
#es.indices.delete(index='passenger', ignore=[400, 404])
#es.indices.create(index='driver', body=driver_mapping, ignore=400)
#es.indices.create(index='passenger', body=pass_mapping, ignore=400)

In [3]:
class driver(object):
    def __init__(self, *arg, **kwargs):
        for item in arg:
            for key in item:
                setattr(self, key, item[key])
        for key in kwargs:
            setattr(self, key, kwargs[key])
        try:
            res = datetime.strptime("{}".format(unicode(self.ctime)), '%Y-%m-%d %H:%M:%S.%f')
            self.ctime = res.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
        except:
            pass
    def jsonFormat(self):
        return(json.dumps(self.__dict__))
    def isKnown(self):
        res = es.get(index='driver', doc_type='rolling', id=self.id, ignore=[404, 400])
        return(res['found'])
    def store(self):
        res = es.create(index='driver', doc_type='rolling', id=self.id, body=self.jsonFormat())
        return(res['created'])
    def nearbyPassengers(self):
        geo_query = { "from" : 0, "size" : 3,
                      "_source":{"include": [ "_id" ]},
                      "query": {
                        "filtered": {
                           "query" : {
                                "match_all" : {}
                            },
                            "term": {"status": "wait"}
                            "filter": {
                                "geo_distance": {
                                  "distance":      "5km",
                                  "distance_type": "plane", 
                                  "location": self.location
                            }
                          }
                        }
                      }
                    }

        nearby = []
        res = es.search(index='passenger', doc_type='rolling', body=geo_query )
        for i in (res['hits']['hits']):
            nearby.append(i['_id'])
        return(nearby)

class passenger(object):
    def __init__(self, *arg, **kwargs):
        for item in arg:
            for key in item:
                setattr(self, key, item[key])
        for key in kwargs:
            setattr(self, key, kwargs[key])
        try:
            res = datetime.strptime("{}".format(unicode(self.ctime)), '%Y-%m-%d %H:%M:%S.%f')
            self.ctime = res.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
        except:
            pass
    def jsonFormat(self):
        return(json.dumps(self.__dict__))
    def isKnown(self):
        res = es.get(index='passenger', doc_type='rolling', id=self.id, ignore=[404, 400])
        return(res['found'])
    def store(self):
        res = es.create(index='passenger', doc_type='rolling', id=self.id, body=self.jsonFormat())
        return(res['created'])

def loadPassenger(p_id):
    res = es.get(index='passenger', doc_type='rolling', id=p_id, ignore=404)
    return(passenger(res['_source'])) if res['found'] else res['found']

def loadDriver(p_id):
    res = es.get(index='driver', doc_type='rolling', id=p_id, ignore=404)
    return(driver(res['_source'])) if res['found'] else res['found']
    
def sanityCheck(driver):
    if driver.isKnown():
        driverRecord = getDriverRecord(driver.id)
        return(driver.time > driverRecord.time)
    else:
        return(True)    
    
def getDriverRecord(id):
    res = es.get(index='trip', doc_type='driver', id=id)['_source']
    return(driver(res))

In [4]:
'''
    Driver:
    1. Check if timestamp make sense
    2. Check if driver exists in DB, if doesn't exists, create
    3. If cab's capacity is not zero, look for nearby requests
        1. If passenger request matched:
            1. Change status from idle to on trip (if necessary)
            2. Re-route / Set destination to passenger
            3. Save information to ElasticSearch
        2. If no nearby passenger:
            1. Update current location
    4. If cab's capacity is zero:
        Check if current location matches with destination:
        1. If matched, empty the cab and mark to idle
            1. Update number of trip
            2. Send trip info to kafka (for archive)
        2. If not matched, update current location
'''

"\n    Driver:\n    1. Check if timestamp make sense\n    2. Check if driver exists in DB, if doesn't exists, create\n    3. If cab's capacity is not zero, look for nearby requests\n        1. If passenger request matched:\n            1. Change status from idle to on trip (if necessary)\n            2. Re-route / Set destination to passenger\n            3. Save information to ElasticSearch\n        2. If no nearby passenger:\n            1. Update current location\n    4. If cab's capacity is zero:\n        Check if current location matches with destination:\n        1. If matched, empty the cab and mark to idle\n            1. Update number of trip\n            2. Send trip info to kafka (for archive)\n        2. If not matched, update current location\n"

In [5]:
def driverIdle(d):
    if d.nearbyPassengers

SyntaxError: invalid syntax (<ipython-input-5-374c903738d1>, line 2)

In [6]:
d = loadDriver(1)
if d.isKnown(): d_ = loadDriver(d.id) 

In [41]:
# If IDLE

if len(d.nearbyPassengers())>0:
    p = loadPassenger(d.nearbyPassengers()[0])
    p.status = 'assigned'
    d.status = 'pickup'
    d.destination = p.location
    d.destinationid = "pick_{}".format(p.id)
    #d.store()
    #p.store()
    
# if PickUP
if d.status == "pickup":
    if d.location == p.location: 
        d.p1 = p.id
        p.driver = d.id
        d.destination = p.destination
        d.destinationid = p.destinationid
        d.status = "ontrip"
        d.store()
        p.store()

if d.status == "ontrip":
    if d.location == d.destination:
        
    

In [30]:
p

<__main__.passenger at 0x7ffa44bc8d10>

In [34]:
p.jsonFormat()

'{"status": "wait", "altdest2id": "American Museum of Natural History", "altdest2": [40.780829, -73.974004], "altdest1": [40.748817, -73.985428], "altdest1id": "Empire Building", "destination": [40.758896, -73.98513], "driver": null, "name": "passenger_5", "destinationid": "Times Square", "location": [40.7904368518007, -73.949250506704], "id": 5, "match": null, "ctime": "2016-09-20T18:59:19.117448Z"}'

In [33]:
d.jsonFormat()

'{"status": "idle", "p2": null, "p1": null, "altdest2id": null, "altdest2": null, "altdest1": null, "altdest1id": null, "destination": null, "name": "driver_1", "destinationid": null, "location": [40.79110336781974, -73.92626153649626], "id": 1, "ctime": "2016-09-20T18:32:43.152777Z"}'

In [39]:
p.jsonFormat()

'{"status": "assigned", "altdest2id": "American Museum of Natural History", "altdest2": [40.780829, -73.974004], "altdest1": [40.748817, -73.985428], "altdest1id": "Empire Building", "destination": [40.758896, -73.98513], "driver": 1, "name": "passenger_5", "destinationid": "Times Square", "location": [40.7904368518007, -73.949250506704], "id": 5, "match": null, "ctime": "2016-09-20T18:59:19.117448Z"}'

In [42]:
d.jsonFormat()

'{"status": "pickup", "p2": null, "p1": 5, "altdest2id": null, "altdest2": null, "altdest1": null, "altdest1id": null, "destination": [40.758896, -73.98513], "name": "driver_1", "destinationid": "pick_5", "location": [40.79110336781974, -73.92626153649626], "id": 1, "ctime": "2016-09-20T18:32:43.152777Z"}'

In [43]:
if d.status == "ontrip" and d.p2 == None: print(here)

'pickup'