Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactored for Jetty 9.4 #166

Merged
merged 3 commits into from
Mar 10, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -377,8 +377,6 @@ project(':warp10') {
}

dependencies {
provided group: 'javax.servlet', name: 'servlet-api', version: '2.5'

//
// io.warp10 dependencies
//
Expand Down Expand Up @@ -444,10 +442,14 @@ project(':warp10') {
compile group: 'org.fusesource.jansi', name: 'jansi', version: '1.6'
//compile group: 'org.mvel', name: 'mvel2', version: '2.1.5.Final'
compile group: 'com.esotericsoftware.kryo', name: 'kryo', version: '2.21'
compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.0.5.v20130815'
compile group: 'org.eclipse.jetty', name: 'jetty-servlets', version: '9.0.5.v20130815'
compile group: 'org.eclipse.jetty.websocket', name: 'websocket-server', version: '9.0.5.v20130815'
compile group: 'org.eclipse.jetty.websocket', name: 'websocket-client', version: '9.0.5.v20130815'
//compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.0.5.v20130815'
//compile group: 'org.eclipse.jetty', name: 'jetty-servlets', version: '9.0.5.v20130815'
//compile group: 'org.eclipse.jetty.websocket', name: 'websocket-server', version: '9.0.5.v20130815'
//compile group: 'org.eclipse.jetty.websocket', name: 'websocket-client', version: '9.0.5.v20130815'
compile group: 'org.eclipse.jetty', name: 'jetty-server', version: '9.4.2.v20170220'
compile group: 'org.eclipse.jetty', name: 'jetty-servlets', version: '9.4.2.v20170220'
compile group: 'org.eclipse.jetty.websocket', name: 'websocket-server', version: '9.4.2.v20170220'
compile group: 'org.eclipse.jetty.websocket', name: 'websocket-client', version: '9.4.2.v20170220'
compile group: 'com.netflix.curator', name: 'curator-x-discovery', version: '1.3.3'
//compile group: 'com.google.code.gson', name: 'gson', version: '2.2.4'
compile group: 'io.fastjson', name: 'boon', version: '0.14'
Expand Down Expand Up @@ -527,7 +529,7 @@ project(':warp10') {
from {
configurations.compile.collect {
it.isDirectory() ? it : zipTree(it).matching {
exclude { it.toString().contains('servlet-api') }
exclude { it.toString().contains('/servlet-api') }
exclude { it.toString().contains('jmh') }
exclude { it.toString().contains('junit') }
exclude { it.toString().contains('parquet') }
Expand Down
7 changes: 1 addition & 6 deletions warp10/src/main/java/io/warp10/continuum/egress/Egress.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlets.gzip.GzipHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;

import com.google.common.base.Preconditions;

Expand Down Expand Up @@ -133,25 +133,21 @@ public Egress(KeyStore keystore, Properties props, boolean fetcher) throws Excep
GzipHandler gzip = new GzipHandler();
EgressExecHandler egressExecHandler = new EgressExecHandler(this.keystore, this.properties, directoryClient, geoDirectoryClient, storeClient);
gzip.setHandler(egressExecHandler);
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

gzip = new GzipHandler();
gzip.setHandler(new EgressFetchHandler(this.keystore, this.properties, directoryClient, storeClient));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

gzip = new GzipHandler();
gzip.setHandler(new EgressFindHandler(this.keystore, directoryClient));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

gzip = new GzipHandler();
gzip.setHandler(new EgressSplitsHandler(this.keystore, directoryClient, (HBaseStoreClient) storeClient));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

Expand All @@ -160,7 +156,6 @@ public Egress(KeyStore keystore, Properties props, boolean fetcher) throws Excep
} else {
GzipHandler gzip = new GzipHandler();
gzip.setHandler(new EgressFetchHandler(this.keystore, this.properties, null, storeClient));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

Expand Down Expand Up @@ -261,10 +263,7 @@ public EgressMobiusHandler(StoreClient storeClient, DirectoryClient directoryCli
if (properties.containsKey(Configuration.CONFIG_WARPSCRIPT_MOBIUS_POOL)) {
this.poolsize = Integer.parseInt(properties.getProperty(Configuration.CONFIG_WARPSCRIPT_MOBIUS_POOL));
}

configure(super.getWebSocketFactory());



Thread t = new Thread(this);
t.setDaemon(true);
t.setName("[MobiusHandler]");
Expand Down Expand Up @@ -292,7 +291,7 @@ public void configure(final WebSocketServletFactory factory) {

WebSocketCreator creator = new WebSocketCreator() {
@Override
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) {
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
MobiusWebSocket ws = (MobiusWebSocket) oldcreator.createWebSocket(req, resp);
ws.setMobiusHandler(self);
return ws;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

Expand All @@ -64,7 +66,7 @@ public class IngressStreamUpdateHandler extends WebSocketHandler.Simple {

private final Ingress ingress;

@WebSocket(maxMessageSize=1024 * 1024)
@WebSocket(maxTextMessageSize=1024 * 1024, maxBinaryMessageSize=1024 * 1024)
public static class StandaloneStreamUpdateWebSocket {

private IngressStreamUpdateHandler handler;
Expand Down Expand Up @@ -342,10 +344,8 @@ public IngressStreamUpdateHandler(Ingress ingress) {
super(StandaloneStreamUpdateWebSocket.class);

this.ingress = ingress;

configure(super.getWebSocketFactory());
}

}

@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException {
if (Constants.API_ENDPOINT_PLASMA_UPDATE.equals(target)) {
Expand All @@ -364,7 +364,7 @@ public void configure(final WebSocketServletFactory factory) {
WebSocketCreator creator = new WebSocketCreator() {

@Override
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) {
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
StandaloneStreamUpdateWebSocket ws = (StandaloneStreamUpdateWebSocket) oldcreator.createWebSocket(req, resp);
ws.setHandler(self);
return ws;
Expand All @@ -377,7 +377,8 @@ public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) {
// Update the maxMessageSize if need be
//
if (this.ingress.properties.containsKey(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)) {
factory.getPolicy().setMaxMessageSize(Long.parseLong(this.ingress.properties.getProperty(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)));
factory.getPolicy().setMaxTextMessageSize((int) Long.parseLong(this.ingress.properties.getProperty(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)));
factory.getPolicy().setMaxBinaryMessageSize((int) Long.parseLong(this.ingress.properties.getProperty(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)));
}
super.configure(factory);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class InfluxDBWarp10Plugin extends AbstractWarp10Plugin implements Runnab

@Override
public void run() {
Server server = new Server(new QueuedThreadPool(maxThreads, 8, (int) idleTimeout, queue));
Server server = new Server(new QueuedThreadPool(maxThreads, 9, (int) idleTimeout, queue));
ServerConnector connector = new ServerConnector(server, acceptors, selectors);
connector.setIdleTimeout(idleTimeout);
connector.setPort(port);
Expand Down Expand Up @@ -72,7 +72,7 @@ public void run() {
public void init(Properties properties) {
this.acceptors = Integer.parseInt(properties.getProperty(CONF_INFLUXDB_ACCEPTORS, "4"));
this.selectors = Integer.parseInt(properties.getProperty(CONF_INFLUXDB_SELECTORS, "2"));
this.maxThreads = Integer.parseInt(properties.getProperty(CONF_INFLUXDB_JETTY_THREADPOOL, "8"));
this.maxThreads = Integer.parseInt(properties.getProperty(CONF_INFLUXDB_JETTY_THREADPOOL, "9"));
this.idleTimeout = Integer.parseInt(properties.getProperty(CONF_INFLUXDB_IDLE_TIMEOUT, "30000"));
this.port = Integer.parseInt(properties.getProperty(CONF_INFLUXDB_PORT, "8086"));
this.host = properties.getProperty(CONF_INFLUXDB_HOST, "127.0.0.1");
Expand Down
2 changes: 0 additions & 2 deletions warp10/src/main/java/io/warp10/script/HyperLogLogPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
import java.nio.ByteOrder;
import java.util.Arrays;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.thrift.TDeserializer;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import org.apache.thrift.protocol.TCompactProtocol;
import org.eclipse.jetty.servlets.gzip.GzipOutputStream;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;

Expand Down Expand Up @@ -314,8 +316,6 @@ public StandalonePlasmaHandler(KeyStore keystore, Properties properties, Directo
}
this.metadataKey = keystore.getKey(KeyStore.AES_KAFKA_METADATA);

configure(super.getWebSocketFactory());

if (startThread) {
Thread t = new Thread(this);
t.setDaemon(true);
Expand Down Expand Up @@ -353,7 +353,7 @@ public void configure(final WebSocketServletFactory factory) {

WebSocketCreator creator = new WebSocketCreator() {
@Override
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) {
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
StandalonePlasmaWebSocket ws = (StandalonePlasmaWebSocket) oldcreator.createWebSocket(req, resp);
ws.setHandler(self);
return ws;
Expand All @@ -366,7 +366,8 @@ public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) {
// Update the maxMessageSize if need be
//
if (this.properties.containsKey(Configuration.PLASMA_FRONTEND_WEBSOCKET_MAXMESSAGESIZE)) {
factory.getPolicy().setMaxMessageSize(Long.parseLong(this.properties.getProperty(Configuration.PLASMA_FRONTEND_WEBSOCKET_MAXMESSAGESIZE)));
factory.getPolicy().setMaxTextMessageSize((int) Long.parseLong(this.properties.getProperty(Configuration.PLASMA_FRONTEND_WEBSOCKET_MAXMESSAGESIZE)));
factory.getPolicy().setMaxBinaryMessageSize((int) Long.parseLong(this.properties.getProperty(Configuration.PLASMA_FRONTEND_WEBSOCKET_MAXMESSAGESIZE)));
}

super.configure(factory);
Expand Down Expand Up @@ -552,7 +553,7 @@ protected void dispatch(GTSEncoder encoder) throws IOException {

if (refcount > 0) {

long maxmessagesize = this.getWebSocketFactory().getPolicy().getMaxMessageSize();
long maxmessagesize = Math.min(this.getWebSocketFactory().getPolicy().getMaxTextMessageSize(), this.getWebSocketFactory().getPolicy().getMaxBinaryMessageSize());

StringBuilder metasb = new StringBuilder();
StringBuilder sb = new StringBuilder();
Expand Down Expand Up @@ -727,6 +728,7 @@ protected void dispatch(GTSEncoder encoder) throws IOException {
sb.append(" ");
}
GTSHelper.encodeValue(sb, decoder.getValue());
sb.append("\n");
first = false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.server.WebSocketHandler;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
import org.eclipse.jetty.websocket.servlet.WebSocketCreator;
import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
import org.joda.time.format.DateTimeFormat;
Expand Down Expand Up @@ -95,7 +97,7 @@ public class StandaloneStreamUpdateHandler extends WebSocketHandler.Simple {

private final DateTimeFormatter dtf = DateTimeFormat.forPattern("yyyyMMdd'T'HHmmss.SSS").withZoneUTC();

@WebSocket(maxMessageSize=1024 * 1024)
@WebSocket(maxTextMessageSize=1024 * 1024, maxBinaryMessageSize=1024 * 1024)
public static class StandaloneStreamUpdateWebSocket {

private static final int METADATA_CACHE_SIZE = 1000;
Expand Down Expand Up @@ -505,11 +507,9 @@ public StandaloneStreamUpdateHandler(KeyStore keystore, Properties properties, S
this.datalogPSK = this.keyStore.decodeKey(properties.getProperty(Configuration.DATALOG_PSK));
} else {
this.datalogPSK = null;
}

configure(super.getWebSocketFactory());
}
}

public DirectoryClient getDirectoryClient() {
return this.directoryClient;
}
Expand All @@ -531,7 +531,7 @@ public void configure(final WebSocketServletFactory factory) {

WebSocketCreator creator = new WebSocketCreator() {
@Override
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) {
public Object createWebSocket(ServletUpgradeRequest req, ServletUpgradeResponse resp) {
StandaloneStreamUpdateWebSocket ws = (StandaloneStreamUpdateWebSocket) oldcreator.createWebSocket(req, resp);
ws.setHandler(self);
return ws;
Expand All @@ -544,7 +544,8 @@ public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp) {
// Update the maxMessageSize if need be
//
if (this.properties.containsKey(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)) {
factory.getPolicy().setMaxMessageSize(Long.parseLong(this.properties.getProperty(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)));
factory.getPolicy().setMaxTextMessageSize((int) Long.parseLong(this.properties.getProperty(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)));
factory.getPolicy().setMaxBinaryMessageSize((int) Long.parseLong(this.properties.getProperty(Configuration.INGRESS_WEBSOCKET_MAXMESSAGESIZE)));
}

super.configure(factory);
Expand Down
10 changes: 1 addition & 9 deletions warp10/src/main/java/io/warp10/standalone/Warp.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.HandlerList;
import org.eclipse.jetty.servlets.gzip.GzipHandler;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.CompressionType;
import org.iq80.leveldb.DB;
Expand Down Expand Up @@ -341,44 +341,36 @@ public static void main(String[] args) throws Exception {
GzipHandler gzip = new GzipHandler();
EgressExecHandler egressExecHandler = new EgressExecHandler(keystore, properties, sdc, geodir.getClient(), scc);
gzip.setHandler(egressExecHandler);
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);
setEgress(true);

gzip = new GzipHandler();
gzip.setHandler(new StandaloneIngressHandler(keystore, sdc, scc));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

gzip = new GzipHandler();
gzip.setHandler(new EgressFetchHandler(keystore, properties, sdc, scc));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

gzip = new GzipHandler();
gzip.setHandler(new EgressFindHandler(keystore, sdc));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

gzip = new GzipHandler();
gzip.setHandler(new StandaloneDeleteHandler(keystore, sdc, scc));
gzip.setBufferSize(65536);
gzip.setMinGzipSize(0);
handlers.addHandler(gzip);

handlers.addHandler(geodir);

//ContextHandler context = new ContextHandler();
StandalonePlasmaHandler plasmaHandler = new StandalonePlasmaHandler(keystore, properties, sdc);
scc.addPlasmaHandler(plasmaHandler);
scc.addPlasmaHandler(geodir);

//context.setHandler(plasmaHandler);
//handlers.addHandler(context);
handlers.addHandler(plasmaHandler);

StandaloneStreamUpdateHandler streamUpdateHandler = new StandaloneStreamUpdateHandler(keystore, properties, sdc, scc);
Expand Down