/
RskWebSocketJsonRpcHandler.java
141 lines (112 loc) · 5.66 KB
/
RskWebSocketJsonRpcHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
* This file is part of RskJ
* Copyright (C) 2018 RSK Labs Ltd.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package co.rsk.rpc.netty;
import java.io.IOException;
import co.rsk.util.JacksonParserUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import co.rsk.jsonrpc.JsonRpcBooleanResult;
import co.rsk.jsonrpc.JsonRpcError;
import co.rsk.jsonrpc.JsonRpcIdentifiableMessage;
import co.rsk.jsonrpc.JsonRpcResultOrError;
import co.rsk.rpc.EthSubscriptionNotificationEmitter;
import co.rsk.rpc.modules.RskJsonRpcRequest;
import co.rsk.rpc.modules.RskJsonRpcRequestVisitor;
import co.rsk.rpc.modules.eth.subscribe.EthSubscribeRequest;
import co.rsk.rpc.modules.eth.subscribe.EthUnsubscribeRequest;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* This handler decodes inbound messages and dispatches valid JSON-RPC requests.
*
* Note that we split JSON-RPC handling in two because jsonrpc4j wasn't able to handle the PUB-SUB model.
* Eventually, we might want to implement all methods in this style and remove jsonrpc4j.
*
* We make this object Sharable so it can be instanced once in the netty pipeline
* and since all objects used by this object are thread safe,
*/
@Sharable
public class RskWebSocketJsonRpcHandler extends SimpleChannelInboundHandler<ByteBufHolder> implements RskJsonRpcRequestVisitor {
private static final Logger LOGGER = LoggerFactory.getLogger(RskWebSocketJsonRpcHandler.class);
private static final String ID = "id";
private final EthSubscriptionNotificationEmitter emitter;
private final ObjectMapper mapper = new ObjectMapper();
private final RskWebSocketJsonParameterValidator parameterValidator = new RskWebSocketJsonParameterValidator();
public RskWebSocketJsonRpcHandler(EthSubscriptionNotificationEmitter emitter) {
this.emitter = emitter;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBufHolder msg) {
ByteBuf content = msg.copy().content();
try (ByteBufInputStream source = new ByteBufInputStream(content)) {
final JsonNode jsonNodeRequest = JacksonParserUtil.readTree(mapper, source);
RskWebSocketJsonParameterValidator.Result validationResult = parameterValidator.validate(jsonNodeRequest);
RskJsonRpcRequest request = JacksonParserUtil.treeToValue(mapper, jsonNodeRequest, RskJsonRpcRequest.class);
JsonRpcResultOrError resultOrError = null;
if (validationResult.isValid()) {
// TODO(mc) we should support the ModuleDescription method filters
resultOrError = request.accept(this, ctx);
} else {
resultOrError = new JsonRpcError(JsonRpcError.INVALID_PARAMS, validationResult.getMessage());
}
JsonRpcIdentifiableMessage response = resultOrError.responseFor(request.getId());
ctx.writeAndFlush(new TextWebSocketFrame(getJsonWithTypedId(jsonNodeRequest, response)));
return;
} catch (IOException e) {
LOGGER.trace("Not a known or valid JsonRpcRequest", e);
// We need to release this resource, netty only takes care about 'ByteBufHolder msg'
content.release(content.refCnt());
}
// delegate to the next handler if the message can't be matched to a known JSON-RPC request
ctx.fireChannelRead(msg);
}
/**
* Uses the ID of the request to set the response so it can have the same type in json payload
*/
private String getJsonWithTypedId(JsonNode jsonNodeRequest, JsonRpcIdentifiableMessage response) throws JsonProcessingException {
// get the json representation of the response object
JsonNode jsonNodeResponse = mapper.valueToTree(response);
// set its ID with the the one that was provided in the request
((ObjectNode) jsonNodeResponse).set(ID, jsonNodeRequest.get(ID));
// creates the string json payload
return mapper.writeValueAsString(jsonNodeResponse);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
emitter.unsubscribe(ctx.channel());
super.channelInactive(ctx);
}
@Override
public JsonRpcResultOrError visit(EthUnsubscribeRequest request, ChannelHandlerContext ctx) {
boolean unsubscribed = emitter.unsubscribe(request.getParams().getSubscriptionId());
return new JsonRpcBooleanResult(unsubscribed);
}
@Override
public JsonRpcResultOrError visit(EthSubscribeRequest request, ChannelHandlerContext ctx) {
return request.getParams().accept(emitter, ctx.channel());
}
}