Skip to content

Commit

Permalink
invalid transactions for indicating failed transactions on the server…
Browse files Browse the repository at this point in the history
… side
  • Loading branch information
dpisarewski committed Jan 6, 2015
1 parent aa69345 commit a24194f
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 122 deletions.
109 changes: 67 additions & 42 deletions lib/neo4j-server/cypher_response.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module Neo4j
module Server
class CypherResponse
attr_reader :data, :columns, :error_msg, :error_status, :error_code, :response
attr_reader :data, :columns, :response

class ResponseError < StandardError
attr_reader :status, :code
Expand All @@ -13,7 +13,6 @@ def initialize(msg, status, code)
end
end


class HashEnumeration
include Enumerable
extend Forwardable
Expand Down Expand Up @@ -89,12 +88,13 @@ def hash_value_as_object(value, session)
attr_reader :struct

def initialize(response, uncommited = false)
@response = response
@response = response
@uncommited = uncommited
set_data_from_request if response
end

def entity_data(id = nil)
if @uncommited
def entity_data(id=nil)
if uncommited?
data = @data.first['row'].first
data.is_a?(Hash) ? {'data' => data, 'id' => id} : data
else
Expand All @@ -104,7 +104,7 @@ def entity_data(id = nil)
end

def first_data(id = nil)
if @uncommited
if uncommited?
data = @data.first['row'].first
# data.is_a?(Hash) ? {'data' => data, 'id' => id} : data
else
Expand All @@ -121,8 +121,43 @@ def add_transaction_entity_id
mapped_rest_data.merge!('id' => mapped_rest_data['self'].split('/').last.to_i)
end

def errors
transaction_response? ? transaction_errors : non_transaction_errors
end

def transaction_errors
Array(response.body['errors']).map do |error|
ResponseError.new(error['message'], error['status'], error['code'])
end
end

def non_transaction_errors
return [] unless response.status == 400
Array(ResponseError.new(response.body['message'], response.body['exception'], response.body['fullname']))
end

def error
errors.first
end

def error_msg
error && error.message
end

def error_status
error && error.status
end

def error_code
error && error.code
end

def error?
!!@error
errors.any?
end

def uncommited?
@uncommited
end

def data?
Expand All @@ -147,55 +182,45 @@ def set_data(data, columns)
@struct = columns.empty? ? Object.new : Struct.new(*columns.map(&:to_sym))
self
end

def set_error(error_msg, error_status, error_core)
@error = true
@error_msg = error_msg
@error_status = error_status
@error_code = error_core
self

def set_data_from_request
return if error?
if transaction_response? && response.body['results']
set_data(response.body['results'][0]['data'], response.body['results'][0]['columns'])
else
set_data(response.body['data'], response.body['columns'])
end
end

def raise_error
fail 'Tried to raise error without an error' unless @error
fail ResponseError.new(@error_msg, @error_status, @error_code)
fail 'Tried to raise error without an error' unless error?
fail error
end

def raise_cypher_error
fail 'Tried to raise error without an error' unless @error
fail Neo4j::Session::CypherError.new(@error_msg, @error_code, @error_status)
fail 'Tried to raise error without an error' unless error?
fail Neo4j::Session::CypherError.new(error.message, error.code, error.status)
end



def self.create_with_no_tx(response)
case response.status
when 200
CypherResponse.new(response).set_data(response.body['data'], response.body['columns'])
when 400
CypherResponse.new(response).set_error(response.body['message'], response.body['exception'], response.body['fullname'])
else
fail "Unknown response code #{response.status} for #{response.env[:url]}"
end
CypherResponse.new(response)
end

def self.create_with_tx(response)
fail "Unknown response code #{response.status} for #{response.request_uri}" unless response.status == 200

first_result = response.body['results'][0]
cr = CypherResponse.new(response, true)

if response.body['errors'].empty?
cr.set_data(first_result['data'], first_result['columns'])
else
first_error = response.body['errors'].first
cr.set_error(first_error['message'], first_error['status'], first_error['code'])
end
cr
CypherResponse.new(response, true)
end

def transaction_response?
response.respond_to?('body') && !response.body['commit'].nil?
end

def transaction_failed?
errors.any? { |e| e.code =~ /Neo\.DatabaseError/ }
end

def transaction_not_found?
errors.any? { |e| e.code == 'Neo.ClientError.Transaction.UnknownId' }
end

def rest_data
@result_index = @row_index = 0
Expand Down
81 changes: 62 additions & 19 deletions lib/neo4j-server/cypher_transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,34 @@ def initialize(url, session_connection)

ROW_REST = %w(row REST)
def _query(cypher_query, params = nil)
fail 'Transaction expired, unable to perform query' if expired?
statement = {statement: cypher_query, parameters: params, resultDataContents: ROW_REST}
body = {statements: [statement]}

response = exec_url && commit_url ? connection.post(exec_url, body) : register_urls(body)
_create_cypher_response(response)
_create_cypher_response(response).tap do |cypher_response|
handle_transaction_errors(cypher_response)
end
end

def _create_cypher_response(response)
CypherResponse.create_with_tx(response)
end

# Replaces current transaction with invalid transaction indicating it was rolled back or expired on the server side. http://neo4j.com/docs/stable/status-codes.html#_classifications
def handle_transaction_errors(response)
tx_class = if response.transaction_not_found?
ExpiredCypherTransaction
elsif response.transaction_failed?
InvalidCypherTransaction
end

register_invalid_transaction(tx_class) if tx_class
end

def register_invalid_transaction(tx_class)
tx = tx_class.new(Neo4j::Transaction.current)
Neo4j::Transaction.unregister_current
tx.register_instance
end

def _delete_tx
Expand All @@ -36,7 +58,7 @@ def _delete_tx

def _commit_tx
_tx_query(:post, commit_url, nil)
end
end

private

Expand All @@ -57,23 +79,44 @@ def register_urls(body)
response
end

def _create_cypher_response(response)
first_result = response.body['results'][0]

cr = CypherResponse.new(response, true)
if response.body['errors'].empty?
cr.set_data(first_result['data'], first_result['columns'])
else
first_error = response.body['errors'].first
expired if first_error['message'].match(/Unrecognized transaction id/)
cr.set_error(first_error['message'], first_error['code'], first_error['code'])
end
cr
end

def empty_response
OpenStruct.new(status: 200, body: '')
end

def valid?
!invalid?
end

def expired?
is_a? ExpiredCypherTransaction
end

def invalid?
is_a? InvalidCypherTransaction
end
end

class InvalidCypherTransaction < CypherTransaction
attr_accessor :original_transaction

def initialize(transaction)
self.original_transaction = transaction
mark_failed
end

def close
Neo4j::Transaction.unregister(self)
end

def _query(cypher_query, params=nil)
fail 'Transaction invalid, unable to perform query'
end
end

class ExpiredCypherTransaction < InvalidCypherTransaction
def _query(cypher_query, params=nil)
fail 'Transaction expired, unable to perform query'
end
end
end
end
end
end
11 changes: 1 addition & 10 deletions lib/neo4j/transaction.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def register_instance
Neo4j::Transaction.register(self)
end

# Marks this transaction as failed, which means that it will unconditionally be rolled back when close() is called. Aliased for legacy purposes.
# Marks this transaction as failed on the client side, which means that it will unconditionally be rolled back when close() is called. Aliased for legacy purposes.
def mark_failed
@failure = true
end
Expand All @@ -21,15 +21,6 @@ def failed?
end
alias_method :failure?, :failed?

def mark_expired
@expired = true
end
alias_method :expired, :mark_expired

def expired?
!!@expired
end

# @private
def push_nested!
@pushed_nested += 1
Expand Down
4 changes: 2 additions & 2 deletions spec/neo4j-server/e2e/cypher_transaction_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ module Server
expect(r.error?).to be true

expect(r.error_msg).to match(/Invalid input/)
expect(r.error_status).to match(/Syntax/)
expect(r.error_code).to match(/Syntax/)
end

it 'can rollback' do
Expand All @@ -73,7 +73,7 @@ module Server
it 'cannot continue operations if a transaction is expired' do
node = Neo4j::Node.create(name: 'andreas')
Neo4j::Transaction.run do |tx|
tx.expired
tx.register_invalid_transaction(Neo4j::Server::ExpiredCypherTransaction)
expect { node[:name] = 'foo' }.to raise_error 'Transaction expired, unable to perform query'
end
end
Expand Down
24 changes: 24 additions & 0 deletions spec/neo4j-server/unit/cypher_response_unit_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,30 @@ def successful_response(response)
end

skip 'returns hydrated CypherPath objects?'

describe '#errors' do
let(:cypher_response) { CypherResponse.new(response, true) }

context 'without transaction' do
let(:response) do
double('tx_response', status: 400, body: {'message' => 'Some error', 'exception' => 'SomeError', 'fullname' => 'SomeError'})
end

it 'returns an array of errors' do
expect(cypher_response.errors).to be_a(Array)
end
end

context 'using transaction' do
let(:response) do
double('non-tx_response', status: 200, body: {'errors' => ['message' => 'Some error', 'status' => 'SomeError', 'code' => 'SomeError'], 'commit' => 'commit_uri'})
end

it 'returns an array of errors' do
expect(cypher_response.errors).to be_a(Array)
end
end
end
end
end
end
Expand Down
Loading

0 comments on commit a24194f

Please sign in to comment.