Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bot: modularize HTTP server #1149

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -27,12 +27,15 @@

import java.io.IOException;
import java.nio.file.Path;
import java.net.*;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.logging.*;
import java.util.stream.Collectors;

import com.sun.net.httpserver.*;

class BotRunnerError extends RuntimeException {
BotRunnerError(String msg) {
super(msg);
@@ -196,9 +199,10 @@ private void drain(Duration timeout) throws TimeoutException {
private final BotRunnerConfiguration config;
private final List<Bot> bots;
private final ScheduledThreadPoolExecutor executor;
private final Logger log;
private final BotWatchdog botWatchdog;

private static final Logger log = Logger.getLogger("org.openjdk.skara.bot");

public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {
this.config = config;
this.bots = bots;
@@ -214,7 +218,6 @@ public BotRunner(BotRunnerConfiguration config, List<Bot> bots) {

executor = new ScheduledThreadPoolExecutor(config.concurrency());
botWatchdog = new BotWatchdog(Duration.ofMinutes(10));
log = Logger.getLogger("org.openjdk.skara.bot");
}

private void checkPeriodicItems() {
@@ -251,7 +254,7 @@ private void itemWatchdog() {
}
}

private void processRestRequest(JSONValue request) {
void processWebhook(JSONValue request) {
try (var __ = new LogContext("work_id", String.valueOf(workIdCounter.incrementAndGet()))) {
log.log(Level.FINE, "Starting processing of incoming rest request", TaskPhases.BEGIN);
log.fine("Request: " + request);
@@ -278,13 +281,20 @@ public void run(Duration timeout) {
log.info("Periodic task interval: " + config.scheduledExecutionPeriod());
log.info("Concurrency: " + config.concurrency());

RestReceiver restReceiver = null;
if (config.restReceiverPort().isPresent()) {
log.info("Listening for webhooks on port: " + config.restReceiverPort().get());
HttpServer server = null;
var serverConfig = config.httpServer(this);
if (serverConfig.isPresent()) {
try {
restReceiver = new RestReceiver(config.restReceiverPort().get(), this::processRestRequest);
var port = serverConfig.get().port();
var address = new InetSocketAddress(port);
server = HttpServer.create(address, 0);
server.setExecutor(null);
for (var context : serverConfig.get().contexts()) {
server.createContext(context.path(), context.handler());
}
server.start();
} catch (IOException e) {
log.log(Level.WARNING, "Failed to create RestReceiver", e);
log.log(Level.WARNING, "Failed to create HTTP server", e);
}
}

@@ -299,8 +309,8 @@ public void run(Duration timeout) {
e.printStackTrace();
}

if (restReceiver != null) {
restReceiver.close();
if (server != null) {
server.stop(0);
}
executor.shutdown();
}
@@ -37,8 +37,11 @@
import java.nio.file.*;
import java.time.Duration;
import java.util.*;
import java.util.function.BiFunction;
import java.util.logging.Logger;

import com.sun.net.httpserver.HttpHandler;

public class BotRunnerConfiguration {
private final Logger log;
private final JSONObject config;
@@ -345,11 +348,65 @@ Path scratchFolder() {
return Paths.get(config.get("scratch").get("path").asString());
}

Optional<Integer> restReceiverPort() {
if (!config.contains("webhooks")) {
static class HttpContextConfiguration {
private final String path;
private final HttpHandler handler;

private HttpContextConfiguration(String path, HttpHandler handler) {
this.path = path;
this.handler = handler;
}

String path() {
return path;
}

HttpHandler handler() {
return handler;
}
}

static class HttpServerConfiguration {
private final int port;
private final List<HttpContextConfiguration> contexts;

private HttpServerConfiguration(int port, List<HttpContextConfiguration> contexts) {
this.port = port;
this.contexts = contexts;
}

int port() {
return port;
}

List<HttpContextConfiguration> contexts() {
return contexts;
}
}

Optional<HttpServerConfiguration> httpServer(BotRunner runner) {
if (!config.contains("http-server")) {
return Optional.empty();
}
return Optional.of(config.get("webhooks").get("port").asInt());

Map<String, BiFunction<BotRunner, JSONObject, HttpHandler>> factories = Map.of(
WebhookHandler.name(), WebhookHandler::create
);
var contexts = new ArrayList<HttpContextConfiguration>();
var port = config.get("http-server").get("port").asInt();
for (var field : config.get("http-server").fields()) {
if (field.name().startsWith("/")) {
var path = field.name();
var type = field.value().get("type").asString();
if (!factories.containsKey(type)) {
throw new RuntimeException("Unknown kind of HTTP handler: " + type);
}
var handler = factories.get(type).apply(runner, field.value().asObject());
contexts.add(new HttpContextConfiguration(path, handler));
}
}

return Optional.of(new HttpServerConfiguration(port, contexts));
}

Duration watchdogTimeout() {
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2019 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2021, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@@ -26,50 +26,47 @@
import org.openjdk.skara.json.*;

import java.io.*;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;

class RestReceiver {
private final HttpServer server;
private final Consumer<JSONValue> consumer;
private final Logger log = Logger.getLogger("org.openjdk.skara.bot");
class WebhookHandler implements HttpHandler {
private final static Logger log = Logger.getLogger("org.openjdk.skara.bot");
private final BotRunner runner;

class Handler implements HttpHandler {

@Override
public void handle(HttpExchange exchange) throws IOException {
var input = new String(exchange.getRequestBody().readAllBytes(), StandardCharsets.UTF_8);
private WebhookHandler(BotRunner runner) {
this.runner = runner;
}

// Reply immediately
var response = "{}";
exchange.sendResponseHeaders(200, response.length());
OutputStream outputStream = exchange.getResponseBody();
outputStream.write(response.getBytes());
outputStream.close();
@Override
public void handle(HttpExchange exchange) throws IOException {
var input = new String(exchange.getRequestBody().readAllBytes(), StandardCharsets.UTF_8);

try {
var parsedInput = JSON.parse(input);
consumer.accept(parsedInput);
} catch (RuntimeException e) {
log.log(Level.WARNING, "Failed to parse incoming request: " + input, e);
}
JSONValue json = null;
try {
json = JSON.parse(input);
} catch (Exception e) {
log.log(Level.WARNING, "Failed to parse incoming request: " + input, e);
exchange.sendResponseHeaders(400, 0);
exchange.getResponseBody().close();
return;
}

// Reply immediately
var response = "{}";
exchange.sendResponseHeaders(200, response.length());
var output = exchange.getResponseBody();
output.write(response.getBytes(StandardCharsets.UTF_8));
output.close();

runner.processWebhook(json);
}

RestReceiver(int port, Consumer<JSONValue> consumer) throws IOException
{
this.consumer = consumer;
InetSocketAddress address = new InetSocketAddress(port);
server = HttpServer.create(address, 0);
server.createContext("/", new Handler());
server.setExecutor(null);
server.start();
static String name() {
return "webhook";
}

void close() {
server.stop(0);
static WebhookHandler create(BotRunner runner, JSONObject configuration) {
return new WebhookHandler(runner);
}
}