Skip to content

Commit

Permalink
Parse json messages into intermediate format
Browse files Browse the repository at this point in the history
Significantly improves performance as now each json message is
parsed exactly/only once.

Specs updated + added to test new algorithm
  • Loading branch information
movitto committed Jun 13, 2014
1 parent 56330c7 commit 2bacb30
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 157 deletions.
1 change: 1 addition & 0 deletions lib/rjr/messages.rb
Expand Up @@ -6,3 +6,4 @@
require 'rjr/messages/request'
require 'rjr/messages/response'
require 'rjr/messages/notification'
require 'rjr/messages/intermediate'
50 changes: 50 additions & 0 deletions lib/rjr/messages/intermediate.rb
@@ -0,0 +1,50 @@
# RJR Request Message
#
# Copyright (C) 2012-2014 Mohammed Morsi <mo@morsi.org>
# Licensed under the Apache License, Version 2.0

require 'json'
require 'rjr/util/json_parser'

module RJR
module Messages

# Intermediate representation of a JSON-RPC data containing
# extracted/parsed data which has not been analysed.
class Intermediate
# JSON from which data is extracted from
attr_accessor :json

# Data extracted from message
attr_accessor :data

def initialize(args = {})
@json = args[:json] || nil
@data = args[:data] || {}
end

def keys
data.keys
end

def [](key)
data[key.to_s]
end

def has?(key)
data.key?(key)
end

def self.parse(json)
parsed = nil

#allow parsing errs to propagate up
parsed = JSONParser.parse(json)

self.new :json => json,
:data => parsed
end

end # class Intermediate
end # module Messages
end # module RJR
25 changes: 9 additions & 16 deletions lib/rjr/messages/notification.rb
Expand Up @@ -13,7 +13,7 @@ module Messages
# indicate the result should _not_ be returned
class Notification
# Message string received from the source
attr_accessor :json_message
attr_accessor :message

# Method source is invoking on the destination
attr_accessor :jr_method
Expand Down Expand Up @@ -48,18 +48,17 @@ def parse_args(args)
end

def parse_message(message)
@json_message = message
notification = JSONParser.parse(@json_message)
@jr_method = notification['method']
@jr_args = notification['params']
@message = message
@jr_method = message['method']
@jr_args = message['params']

parse_headers(notification)
parse_headers(message)
end

def parse_headers(notification)
notification.keys.select { |k|
def parse_headers(message)
message.keys.select { |k|
!['jsonrpc', 'method', 'params'].include?(k)
}.each { |k| @headers[k] = notification[k] }
}.each { |k| @headers[k] = message[k] }
end

public
Expand All @@ -70,13 +69,7 @@ def parse_headers(notification)
# @param [String] message string message to check
# @return [true,false] indicating if message is a notification message
def self.is_notification_message?(message)
begin
# FIXME log error
parsed = JSONParser.parse(message)
parsed.has_key?('method') && !parsed.has_key?('id')
rescue Exception => e
false
end
message.has?('method') && !message.has?('id')
end

# Convert notification message to json
Expand Down
34 changes: 14 additions & 20 deletions lib/rjr/messages/request.rb
Expand Up @@ -12,7 +12,7 @@ module Messages
# Message sent from client to server to invoke a JSON-RPC method
class Request
# Message string received from the source
attr_accessor :json_message
attr_accessor :message

# Method source is invoking on the destination
attr_accessor :jr_method
Expand Down Expand Up @@ -51,35 +51,29 @@ def parse_args(args)
end

def parse_message(message)
@json_message = message
request = JSONParser.parse(@json_message)
@jr_method = request['method']
@jr_args = request['params']
@msg_id = request['id']
@message = message
@jr_method = message['method']
@jr_args = message['params']
@msg_id = message['id']

parse_headers(request)
parse_headers(message)
end

def parse_headers(request)
request.keys.select { |k|
def parse_headers(message)
message.keys.select { |k|
!['jsonrpc', 'id', 'method', 'params'].include?(k)
}.each { |k| @headers[k] = request[k] }
}.each { |k| @headers[k] = message[k] }
end

public

# Class helper to determine if the specified string is a valid json-rpc
# method request
# @param [String] message string message to check
# Class helper to determine if the specified message is a valid
# json-rpc method request message.
#
# @param [Message] message to check
# @return [true,false] indicating if message is request message
def self.is_request_message?(message)
begin
# FIXME log error
parsed = JSONParser.parse(message)
parsed.has_key?('method') && parsed.has_key?('id')
rescue Exception => e
false
end
message.has?('method') && message.has?('id')
end

# Convert request message to json
Expand Down
45 changes: 19 additions & 26 deletions lib/rjr/messages/response.rb
Expand Up @@ -13,7 +13,7 @@ module Messages
# Message sent from server to client in response to a JSON-RPC request
class Response
# Message string received from the source
attr_accessor :json_message
attr_accessor :message

# ID of the message in accordance w/ json-rpc specification
attr_accessor :msg_id
Expand Down Expand Up @@ -49,54 +49,47 @@ def parse_args(args)
end

def parse_message(message)
@json_message = message
response = JSONParser.parse(@json_message)
@msg_id = response['id']
@message = message
@msg_id = message['id']

parse_result(response)
parse_headers(response)
parse_result(message)
parse_headers(message)
end

def parse_result(response)
def parse_result(message)
@result = Result.new
@result.success = response.has_key?('result')
@result.success = message.has?('result')
@result.failed = !@result.success

if @result.success
@result.result = response['result']
@result.result = message['result']

elsif response.has_key?('error')
@result.error_code = response['error']['code']
@result.error_msg = response['error']['message']
elsif message.has?('error')
@result.error_code = message['error']['code']
@result.error_msg = message['error']['message']

# TODO can we safely constantize this ?
@result.error_class = response['error']['class']
@result.error_class = message['error']['class']
end

@result
end

def parse_headers(request)
request.keys.select { |k|
def parse_headers(message)
message.keys.select { |k|
!['jsonrpc', 'id', 'method', 'result', 'error'].include?(k)
}.each { |k| @headers[k] = request[k] }
}.each { |k| @headers[k] = message[k] }
end

public

# Class helper to determine if the specified string is a valid json-rpc
# method response
# Class helper to determine if the specified string is a
# valid json-rpc method response
#
# @param [String] message string message to check
# @return [true,false] indicating if message is response message
def self.is_response_message?(message)
begin
# FIXME log error
json = JSONParser.parse(message)
json.has_key?('result') || json.has_key?('error')
rescue Exception => e
puts e.to_s
false
end
message.has?('result') || message.has?('error')
end

def success_json
Expand Down
30 changes: 19 additions & 11 deletions lib/rjr/node.rb
Expand Up @@ -183,28 +183,36 @@ def client_for(connection)

# Internal helper, handle message received
def handle_message(msg, connection = {})
if Messages::Request.is_request_message?(msg)
tp << ThreadPoolJob.new(msg) { |m| handle_request(m, false, connection) }
intermediate = Messages::Intermediate.parse(msg)

elsif Messages::Notification.is_notification_message?(msg)
tp << ThreadPoolJob.new(msg) { |m| handle_request(m, true, connection) }
if Messages::Request.is_request_message?(intermediate)
tp << ThreadPoolJob.new(intermediate) { |i|
handle_request(i, false, connection)
}

elsif Messages::Response.is_response_message?(msg)
handle_response(msg)
elsif Messages::Notification.is_notification_message?(intermediate)
tp << ThreadPoolJob.new(intermediate) { |i|
handle_request(i, true, connection)
}

elsif Messages::Response.is_response_message?(intermediate)
handle_response(intermediate)

end

intermediate
end

# Internal helper, handle request message received
def handle_request(data, notification=false, connection={})
def handle_request(message, notification=false, connection={})
# get client for the specified connection
# TODO should grap port/ip immediately on connection and use that
client_port,client_ip = client_for(connection)

msg = notification ?
Messages::Notification.new(:message => data,
Messages::Notification.new(:message => message,
:headers => @message_headers) :
Messages::Request.new(:message => data,
Messages::Request.new(:message => message,
:headers => @message_headers)

callback = NodeCallback.new(:node => self,
Expand Down Expand Up @@ -233,8 +241,8 @@ def handle_request(data, notification=false, connection={})
end

# Internal helper, handle response message received
def handle_response(data)
msg = Messages::Response.new(:message => data,
def handle_response(message)
msg = Messages::Response.new(:message => message,
:headers => self.message_headers)
res = err = nil
begin
Expand Down
17 changes: 12 additions & 5 deletions lib/rjr/nodes/local.rb
Expand Up @@ -61,8 +61,9 @@ def to_s
# Implementation of RJR::Node#send_msg
def send_msg(msg, connection)
# ignore response message
unless Messages::Response.is_response_message?(msg)
launch_request(msg, true) # .join?
inter = Messages::Intermediate.parse(msg)
unless Messages::Response.is_response_message?(inter)
launch_request(inter, true) # .join?
end
end

Expand All @@ -80,9 +81,15 @@ def listen
# (or close to it, globals will still be available, but locks will
# not be locally held, etc)
def launch_request(req, notification)
Thread.new(req,notification) { |req,notification|
res = handle_request(req, notification, nil)
handle_response(res.to_s) unless res.nil?
Thread.new(req, notification) { |req, notification|
inter = req.is_a?(Messages::Intermediate) ? req :
Messages::Intermediate.parse(req)
res = handle_request(inter, notification, nil)

unless res.nil?
inter = Messages::Intermediate.parse(res.to_s)
handle_response(inter)
end
}
end

Expand Down
5 changes: 3 additions & 2 deletions lib/rjr/nodes/web.rb
Expand Up @@ -52,12 +52,13 @@ def initialize(args = {})
def process_http_request
# TODO support http protocols other than POST
msg = @http_post_content.nil? ? '' : @http_post_content
@rjr_node.send(:handle_message, msg, self) # XXX private method
inter = @rjr_node.send(:handle_message, msg, self) # XXX private method

# XXX we still have to send a response back to client to satisfy
# the http standard, even if this is a notification. handle_message
# does not do this.
@rjr_node.send_msg "", self if Messages::Notification.is_notification_message?(msg)
notification = Messages::Notification.is_notification_message?(inter)
@rjr_node.send_msg "", self if notification
end
end

Expand Down
10 changes: 5 additions & 5 deletions lib/rjr/util/json_parser.rb
Expand Up @@ -16,11 +16,11 @@ class JSONParser
# Returns the message and remaining portion of the data string,
# if message is found, else nil
#
# TODO efficiency can probably be optimized in the case closing '}'
# hasn't arrived yet
#
# FIXME if uneven brackets appears in string data (such as in params)
# this will break, detect when in string and ignore in counts
# TODO efficiency can probably be optimized in the case closing '}'
# hasn't arrived yet
#
# FIXME if uneven brackets appears in string data (such as in params)
# this will break, detect when in string and ignore in counts
def self.extract_json_from(data)
return nil if data.nil? || data.empty?
start = 0
Expand Down

0 comments on commit 2bacb30

Please sign in to comment.