-
Notifications
You must be signed in to change notification settings - Fork 2.3k
/
BoltMessageRouter.java
152 lines (130 loc) · 5.05 KB
/
BoltMessageRouter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.bolt.v1.messaging;
import java.io.IOException;
import java.util.Map;
import org.neo4j.bolt.v1.runtime.BoltWorker;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.bolt.v1.runtime.spi.BoltResult;
import org.neo4j.logging.Log;
import org.neo4j.values.AnyValue;
import org.neo4j.values.result.QueryResult;
/**
* This class is responsible for routing incoming request messages to a worker
* as well as handling outgoing response messages via appropriate handlers.
*/
public class BoltMessageRouter implements BoltRequestMessageHandler<RuntimeException>
{
// Note that these callbacks can be used for multiple in-flight requests simultaneously, you cannot reset them
// while there are in-flight requests.
private final MessageProcessingHandler initHandler;
private final MessageProcessingHandler runHandler;
private final MessageProcessingHandler resultHandler;
private final MessageProcessingHandler defaultHandler;
private BoltWorker worker;
public BoltMessageRouter( Log log, BoltWorker worker, BoltResponseMessageHandler<IOException> output,
Runnable onEachCompletedRequest )
{
this.initHandler = new InitHandler( output, onEachCompletedRequest, worker, log );
this.runHandler = new RunHandler( output, onEachCompletedRequest, worker, log );
this.resultHandler = new ResultHandler( output, onEachCompletedRequest, worker, log );
this.defaultHandler = new MessageProcessingHandler( output, onEachCompletedRequest, worker, log );
this.worker = worker;
}
@Override
public void onInit( String userAgent, Map<String,Object> authToken ) throws RuntimeException
{
// TODO: make the client transmit the version for now it is hardcoded to -1 to ensure current behaviour
worker.enqueue( session -> session.init( userAgent, authToken, initHandler ) );
}
@Override
public void onAckFailure() throws RuntimeException
{
worker.enqueue( session -> session.ackFailure( defaultHandler ) );
}
@Override
public void onReset() throws RuntimeException
{
worker.interrupt();
worker.enqueue( session -> session.reset( defaultHandler ) );
}
@Override
public void onRun( String statement, Map<String,Object> params )
{
worker.enqueue( session -> session.run( statement, params, runHandler ) );
}
@Override
public void onExternalError( Neo4jError error )
{
worker.enqueue( session -> session.externalError( error, defaultHandler ) );
}
@Override
public void onDiscardAll()
{
worker.enqueue( session -> session.discardAll( resultHandler ) );
}
@Override
public void onPullAll()
{
worker.enqueue( session -> session.pullAll( resultHandler ) );
}
private static class InitHandler extends MessageProcessingHandler
{
InitHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, BoltWorker worker, Log log )
{
super( handler, onCompleted, worker, log );
}
}
private static class RunHandler extends MessageProcessingHandler
{
RunHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, BoltWorker worker, Log log )
{
super( handler, onCompleted, worker, log );
}
}
private static class ResultHandler extends MessageProcessingHandler
{
ResultHandler( BoltResponseMessageHandler<IOException> handler, Runnable onCompleted, BoltWorker worker,
Log log )
{
super( handler, onCompleted, worker, log );
}
@Override
public void onRecords( final BoltResult result, final boolean pull ) throws Exception
{
result.accept( new BoltResult.Visitor()
{
@Override
public void visit( QueryResult.Record record ) throws Exception
{
if ( pull )
{
handler.onRecord( record );
}
}
@Override
public void addMetadata( String key, AnyValue value )
{
metadata.put( key, value );
}
} );
}
}
}