In [1]:
from elasticsearch import Elasticsearch
from elasticsearch_dsl.connections import connections

import requests   # https://www.geeksforgeeks.org/get-post-requests-using-python/
import os
import datetime
import time       # https://docs.python.org/3/library/time.html (time related api) # https://realpython.com/python-time-module/
import dateutil

DATE_TIME_FORMAT = '%Y-%m-%dT%H:%M:%S'
GMAP_API = "GMAP_API"

# print(os.environ[GMAP_API])

# Define a default Elasticsearch client
pooledConnection = connections.create_connection(hosts=['localhost'])
# print(connections.get_connection().cluster.health())

In [2]:
# method to get taxi info based on a datetime
def getTaxiDataAtDatetime(_datetime):
    # from data.gov.sg api(s)
    URL = "https://api.data.gov.sg/v1/transport/taxi-availability";
    PARAMS = { 'date_time': _datetime }
    
    # response object
    response = {}
    
    payload = requests.get(url = URL, params = PARAMS).json()
    
    # validations
    if len(payload['features']) <= 0:
        response['err'] = 'features IS 0 length!'
        return response
    coords = payload['features'][0]['geometry']['coordinates']
    if len(coords) <= 0:
        response['err'] = 'no TAXI coordinates at all!'
        return response
    
    # print( coords[0][0], 'x', coords[0][1] )
    
    # a. check the lat lng for google map as well (as known the 1st coordinate element is lng and 2nd element is lat probably)
    # e.g. pasir panjang => 1.2761° N (lat N+ve S-ve), 103.7919° E (lng E+ve W-ve); get the lat lng from google by "pasir panjang lat lng"
    # google map api => https://maps.googleapis.com/maps/api/geocode/json?latlng=1.32386,103.63332&key={{API_KEY}}
    # -> re-format as latlng={lat},{lng} => latlng=1.2761,103.7919
    
    # b. ingest to ES in bulk
    # c. add a hashtable to keep track all unique locations (might be futile though...)
    # -> based on the entries, call google geocode api (reverse version) => point b. and update the geolocation data when necessary

    return _datetime, coords
    
    

In [4]:
from elasticsearch_dsl import Document, Date, Integer, Keyword, Text, GeoPoint, Object, InnerDoc, Nested

# entity / models for persistence

class SGTaxiGeo(InnerDoc):
    formatted = Text(analyzer="standard")
    postalCode = Keyword()
    route = Text(fields={ "raw": Keyword() })
    neighbourhood = Text(fields={ "raw": Keyword() })


class SGTaxi(Document):
    timestamp = Date()
    location = GeoPoint()
    geo = Object(SGTaxiGeo)
    
    class Index:
        name = 'sg_taxi_location'
    class Meta:
        docType = "_doc"


# create the index and settings + mappings
SGTaxi.init()

# dict for the latLng pairs (unique)
dictLatLng = {}

def prepareDataForES(datetimeStr, coords):
    # list of ES entity for bulk ingestion
    entityList = []
    
    URL = "https://maps.googleapis.com/maps/api/geocode/json?key=" + os.environ[GMAP_API]
    # latlng=1.32386,103.63332&key={API_KEY}
    
    for item in coords:
        lat = item[1]
        lng = item[0]
        
        # add the cache key for geocode api query later on
        latLngKey = str(lat) + ',' + str(lng)
        dictLatLng[latLngKey] = 1
        
        # geocode api lookup
        PARAMS = {}
        PARAMS['latlng']  = latLngKey
        gData = requests.get(url = URL, params = PARAMS).json()
        
        # sDate = time.strptime(datetimeStr, DATE_TIME_FORMAT) # parse to string? but not appropricate in this scenario
        inst = SGTaxi(
            timestamp=dateutil.parser.parse(datetimeStr), 
            location={ "lat": lat, "lon": lng })
        # add back the geoCode object
        geoInst = prepareGeoDataByJson(gData)
        if geoInst is not None:
            inst.geo = geoInst
        
        entityList.append(inst)
        # TESTING
        # break
        
    return entityList # return everything

    # print(len(entityList))
    # print(len(dictLatLng))

    
    #instance = SGTaxi(formatted="some addr", postalCode="676767")
    #dictObj = instance.to_dict()
    #print(dictObj['postalCode'])
    
def prepareGeoDataByJson(data):
    geoData = {}
    inst = SGTaxiGeo()
    
    # validation
    if len(data['results']) > 0:
        geoData = data['results'][0]
        inst['formatted']=geoData['formatted_address']
        # loop against the address_components
        for item in geoData['address_components']:
            if 'postal_code' in item['types']:
                inst['postalCode'] = item['long_name']
            elif 'route' in item['types']:
                inst['route'] = item['long_name']
            elif 'neighborhood' in item['types']:
                inst['neighbourhood'] = item['long_name']
        
        return inst
    
    # return nothing if no data was queried
    return None
        
def bulkInsert(entityList):
    # normally should use bulk...
    idx = 0
    for item in entityList:
        idx = idx+1
        print(str(idx), "entry...")
        
        item.save()
        time.sleep(1)
    


In [5]:
# main ...

# print(t)
# print(time.ctime(time.mktime(t)))
# import datetime
# print(datetime.datetime(2019, 12, 31, 23, 59, 59))

t = (2019, 12, 31, 23, 59, 59, 0, 0, 0)

dateInStr, coords = getTaxiDataAtDatetime(time.strftime(DATE_TIME_FORMAT, t))
entityList = prepareDataForES(dateInStr, coords)
bulkInsert(entityList)

print('done')


1 entry...
2 entry...
3 entry...
4 entry...
5 entry...
6 entry...
7 entry...
8 entry...
9 entry...
10 entry...
11 entry...
12 entry...
13 entry...
14 entry...
15 entry...
16 entry...
17 entry...
18 entry...
19 entry...
20 entry...
21 entry...
22 entry...
23 entry...
24 entry...
25 entry...
26 entry...
27 entry...
28 entry...
29 entry...
30 entry...
31 entry...
32 entry...
33 entry...
34 entry...
35 entry...
36 entry...
37 entry...
38 entry...
39 entry...
40 entry...
41 entry...
42 entry...
43 entry...
44 entry...
45 entry...
46 entry...
47 entry...
48 entry...
49 entry...
50 entry...
51 entry...
52 entry...
53 entry...
54 entry...
55 entry...
56 entry...
57 entry...
58 entry...
59 entry...
60 entry...
61 entry...
62 entry...
63 entry...
64 entry...
65 entry...
66 entry...
67 entry...
68 entry...
69 entry...
70 entry...
71 entry...
72 entry...
73 entry...
74 entry...
75 entry...
76 entry...
77 entry...
78 entry...
79 entry...
80 entry...
81 entry...
82 entry...
83 entry...
84 entry...
8