Skip to content

Commit

Permalink
Added poll endpoint for requests that take longer than the threshold
Browse files Browse the repository at this point in the history
This commit adds a new `/poll` endpoint that accepts a `poll_id` returned from `/feed` and allows you
 to query for it at a later time. This is useful for **stop** requests or any other request from the
redis monitor that takes longer than the default timeout.

Since we use redis to cache the result, you can even put the rest service behind a load balancer and
not have to worry about which application actually made the request - when it is finished it will be
available to the other nodes as well.

Also adds a schema validator for future endpoints so we can be sure that the data coming into the res
t endpoint is exactly like we expect.
  • Loading branch information
Madison Bahmer committed Oct 27, 2016
1 parent 73d28a9 commit 5341ec5
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 15 deletions.
163 changes: 148 additions & 15 deletions rest/rest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from copy import deepcopy
import sys
import signal
import os
from retrying import retry
from threading import Thread
import time
Expand All @@ -29,6 +30,9 @@
from scutils.settings_wrapper import SettingsWrapper
from scutils.method_timer import MethodTimer

from jsonschema import ValidationError
from jsonschema import Draft4Validator, validators


class RestService(object):

Expand All @@ -38,6 +42,7 @@ class RestService(object):
UNKNOWN_ERROR = "An error occurred while processing your request."
MUST_JSON = "The payload must be valid JSON."
DOES_NOT_EXIST = "The desired endpoint does not exist"
BAD_SCHEMA = "JSON did not validate against schema."

consumer = None
producer = None
Expand All @@ -57,6 +62,7 @@ def __init__(self, settings_name):
self.my_uuid = str(uuid.uuid4()).split('-')[4]
self.uuids = {}
self.uuids_lock = threading.Lock()
self.validator = self._extend_with_default(Draft4Validator)

def setup(self, level=None, log_file=None, json=None):
"""
Expand Down Expand Up @@ -97,6 +103,39 @@ def setup(self, level=None, log_file=None, json=None):
log = logging.getLogger('werkzeug')
log.disabled = True

self._load_schemas()

def _load_schemas(self):
"""Loads any schemas for JSON validation"""
self.schemas = {}
for filename in os.listdir(self.settings['SCHEMA_DIR']):
if filename[-4:] == 'json':
name = filename[:-5]
with open(self.settings['SCHEMA_DIR'] + filename) as the_file:
self.schemas[name] = json.load(the_file)
self.logger.debug("Successfully loaded " + filename + " schema")

def _extend_with_default(self, validator_class):
'''
Method to add default fields to our schema validation
( From the docs )
'''
validate_properties = validator_class.VALIDATORS["properties"]

def set_defaults(validator, properties, instance, schema):
for error in validate_properties(
validator, properties, instance, schema,
):
yield error

for property, subschema in list(properties.items()):
if "default" in subschema:
instance.setdefault(property, subschema["default"])

return validators.extend(
validator_class, {"properties": set_defaults},
)

def _spawn_redis_connection_thread(self):
"""Spawns a redis connection thread"""
self.logger.debug("Spawn redis connection thread")
Expand Down Expand Up @@ -137,28 +176,50 @@ def _process_messages(self):
loaded_dict = json.loads(message.value)
self.logger.debug("got valid kafka message")

if 'uuid' in loaded_dict and loaded_dict['uuid'] in self.uuids:
self.logger.debug("Found Kafka message from request")
with self.uuids_lock:
self.uuids[loaded_dict['uuid']] = loaded_dict
with self.uuids_lock:
if 'uuid' in loaded_dict:
if loaded_dict['uuid'] in self.uuids and \
self.uuids[loaded_dict['uuid']] != 'poll':
self.logger.debug("Found Kafka message from request")
self.uuids[loaded_dict['uuid']] = loaded_dict
else:
self.logger.debug("Got poll result")
self._send_result_to_redis(loaded_dict)
except ValueError:
extras = {}
if message is not None:
extras["data"] = message.value
self.logger.warning('Unparseable JSON Received from kafka',
extra=extras)
# check for kafka disconnection
for node_id in self.consumer._client._conns:
conn = self.consumer._client._conns[node_id]
if conn.state == ConnectionStates.DISCONNECTED or \
conn.state == ConnectionStates.DISCONNECTING:
self._spawn_kafka_connection_thread()
break

self._check_kafka_disconnect()

except OffsetOutOfRangeError:
# consumer has no idea where they are
self.consumer.seek_to_end()
self.logger.error("Kafka offset out of range error")

def _send_result_to_redis(self, result):
"""Sends the result of a poll to redis to be used potentially by
another process"""
if self.redis_connected:
self.logger.debug("Sending result to redis")
try:
key = "rest:poll:{u}".format(u=result['uuid'])
self.redis_conn.set(key, json.dumps(result))
except ConnectionError:
self.logger.error("Lost connection to Redis")
self._spawn_redis_connection_thread()

def _check_kafka_disconnect(self):
"""Checks the kafka connection is still valid"""
for node_id in self.consumer._client._conns:
conn = self.consumer._client._conns[node_id]
if conn.state == ConnectionStates.DISCONNECTED or \
conn.state == ConnectionStates.DISCONNECTING:
self._spawn_kafka_connection_thread()
break

def _heartbeat_loop(self):
"""A main run loop thread to do work"""
self.logger.debug("running main heartbeat thread")
Expand Down Expand Up @@ -220,6 +281,7 @@ def _setup_kafka(self):
self.producer = None

# create new connections
self._consumer_thread = None
self.logger.debug("Creating kafka connections")
self.consumer = self._create_consumer()
if not self.closed:
Expand Down Expand Up @@ -323,7 +385,7 @@ def _close_thread(self, thread, thread_name):
@param thread: the thread to close
@param thread_name: a human readable name of the thread
"""
if thread.isAlive():
if thread is not None and thread.isAlive():
self.logger.debug("Waiting for {} thread to close".format(thread_name))
thread.join(timeout=self.settings['DAEMON_THREAD_JOIN_TIMEOUT'])
if thread.isAlive():
Expand Down Expand Up @@ -395,6 +457,7 @@ def wrapper(*args, **kw):
log_dict = deepcopy(ret_dict)
log_dict['error']['cause'] = e.message
log_dict['error']['exception'] = str(e)
log_dict['error']['ex'] = traceback.format_exc()
instance.logger.error("Uncaught Exception Thrown", log_dict)
return jsonify(ret_dict), 500
return wrapper
Expand All @@ -421,6 +484,26 @@ def wrapper(*args, **kw):
return f(*args, **kw)
return wrapper

def validate_schema(schema_name):
"""Validate the JSON against a required schema_name."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kw):
instance = args[0]
try:
instance.validator(instance.schemas[schema_name]).validate(request.get_json())
except ValidationError, e:
ret_dict = instance._create_ret_object(instance.FAILURE,
None, True,
instance.BAD_SCHEMA,
e.message)
instance.logger.error("Invalid Schema", ret_dict)
return jsonify(ret_dict), 400
instance.logger.debug("Schema is valid")
return f(*args, **kw)
return wrapper
return decorator

# Routes --------------------

def _decorate_routes(self):
Expand All @@ -436,6 +519,8 @@ def _decorate_routes(self):
methods=['POST', 'GET'])
self.app.add_url_rule('/feed', 'feed', self.feed,
methods=['POST'])
self.app.add_url_rule('/poll', 'poll', self.poll,
methods=['POST'])

@log_call('Non-existant route called')
@error_catch
Expand Down Expand Up @@ -491,20 +576,29 @@ def _feed(json_item):
self.logger.debug("expecting kafka response for request")
the_time = time.time()
found_item = False
while int(time.time() - the_time) < self.settings['WAIT_FOR_RESPONSE_TIME']:
while not found_item and int(time.time() - the_time) < self.settings['WAIT_FOR_RESPONSE_TIME']:
if self.uuids[json_item['uuid']] is not None:
found_item = True
true_response = self.uuids[json_item['uuid']]
with self.uuids_lock:
del self.uuids[json_item['uuid']]
break
else:
with self.uuids_lock:
# key still exists, means we didnt find get our
# response in time
if json_item['uuid'] in self.uuids:
self.uuids[json_item['uuid']] = 'poll'
self.logger.debug("Did not find response, "
"adding to poll")
if found_item:
self.logger.debug("Got successful reponse back from kafka")
else:
self.logger.warn("Did not get response within timeout "
"from kafka. If the request is still "
"running, use the `/poll` API")
true_response = {"poll_id": json_item['uuid']}
true_response = {
"poll_id": json_item['uuid']
}
else:
self.logger.debug("Not expecting response from kafka")

Expand All @@ -513,6 +607,45 @@ def _feed(json_item):
return self._create_ret_object(self.FAILURE, None, True,
"Unable to connect to Kafka"), 500

@validate_json
@validate_schema('poll')
@log_call('\'poll\' endpoint called')
@error_catch
def poll(self):
"""Retrieves older requests that may not make it back quick
enough"""
if self.redis_connected:
json_item = request.get_json()
try:
key = "rest:poll:{u}".format(u=json_item['poll_id'])
result = self.redis_conn.get(key)

if result is not None:
result = json.loads(result)
self.logger.debug("Found previous poll")
self.redis_conn.delete(key)
return self._create_ret_object(self.SUCCESS, result)
else:
self.logger.debug("poll key does not exist")
return self._create_ret_object(self.FAILURE, None, True,
"Could not find matching poll_id"), 404
except ConnectionError:
self.logger.error("Lost connection to Redis")
self._spawn_redis_connection_thread()
except ValueError:
extras = {
"value": result
}
self.logger.warning('Unparseable JSON Received from redis',
extra=extras)
self.redis_conn.delete(key)
return self._create_ret_object(self.FAILURE, None, True,
"Unparseable JSON Received "
"from redis"), 500
return self._create_ret_object(self.FAILURE, None, True,
"Unable to connect to Redis"), 500


if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Rest Service: Used for interacting and feeding Kafka'
Expand Down
15 changes: 15 additions & 0 deletions rest/schemas/poll.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"type": "object",
"properties": {
"poll_id": {
"type": "string",
"minLength": 1,
"maxLength": 100,
"description": "The poll id to retrieve"
}
},
"required": [
"poll_id"
],
"additionalProperties": false
}
1 change: 1 addition & 0 deletions rest/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@
HEARTBEAT_TIMEOUT = 120
DAEMON_THREAD_JOIN_TIMEOUT = 10
WAIT_FOR_RESPONSE_TIME = 5
SCHEMA_DIR = 'schemas/'

0 comments on commit 5341ec5

Please sign in to comment.