Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[Pulsar Proxy] Add error log for pulsar proxy starter (apache#8451)
Browse files Browse the repository at this point in the history
### Motivation

Currently, the proxy service starter will throw all exceptions to the main method directly, it's hard to check the error log if there is an exception when proxy service start.

### Modifications

Add try-catch for proxy start method.

(cherry picked from commit 442a547)
  • Loading branch information
gaoran10 authored and codelipenghui committed Nov 10, 2020
1 parent b2f0837 commit bcb97d1
Showing 1 changed file with 84 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.slf4j.bridge.SLF4JBridgeHandler.install;
import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;

import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.proxy.server.plugin.servlet.ProxyAdditionalServletWithClassLoader;
Expand Down Expand Up @@ -76,97 +77,104 @@ public class ProxyServiceStarter {
private boolean help = false;

public ProxyServiceStarter(String[] args) throws Exception {
// setup handlers
removeHandlersForRootLogger();
install();

DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
});

JCommander jcommander = new JCommander();
try {
jcommander.addObject(this);
jcommander.parse(args);
if (help || isBlank(configFile)) {
jcommander.usage();
return;
}
} catch (Exception e) {
jcommander.usage();
System.exit(-1);
}

// load config file
final ProxyConfiguration config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);

if (!isBlank(zookeeperServers)) {
// Use zookeeperServers from command line
config.setZookeeperServers(zookeeperServers);
}

if (!isBlank(globalZookeeperServers)) {
// Use globalZookeeperServers from command line
config.setConfigurationStoreServers(globalZookeeperServers);
}
if (!isBlank(configurationStoreServers)) {
// Use configurationStoreServers from command line
config.setConfigurationStoreServers(configurationStoreServers);
}

if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
|| config.isAuthorizationEnabled()) {
checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
checkArgument(!isEmpty(config.getConfigurationStoreServers()),
"configurationStoreServers must be provided");
}

if ((!config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURL()))
|| (config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURLTLS()))) {
checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
}
// setup handlers
removeHandlersForRootLogger();
install();

AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));
// create proxy service
ProxyService proxyService = new ProxyService(config, authenticationService);
// create a web-service
final WebServer server = new WebServer(config, authenticationService);
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
System.out.println(String.format("%s [%s] error Uncaught exception in thread %s: %s", dateFormat.format(new Date()), thread.getContextClassLoader(), thread.getName(), exception.getMessage()));
});

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
JCommander jcommander = new JCommander();
try {
proxyService.close();
server.stop();
jcommander.addObject(this);
jcommander.parse(args);
if (help || isBlank(configFile)) {
jcommander.usage();
return;
}
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
jcommander.usage();
System.exit(-1);
}
}));

proxyService.start();
// load config file
final ProxyConfiguration config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);

// Setup metrics
DefaultExports.initialize();
if (!isBlank(zookeeperServers)) {
// Use zookeeperServers from command line
config.setZookeeperServers(zookeeperServers);
}

// Report direct memory from Netty counters
Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
@Override
public double get() {
return getJvmDirectMemoryUsed();
if (!isBlank(globalZookeeperServers)) {
// Use globalZookeeperServers from command line
config.setConfigurationStoreServers(globalZookeeperServers);
}
if (!isBlank(configurationStoreServers)) {
// Use configurationStoreServers from command line
config.setConfigurationStoreServers(configurationStoreServers);
}

if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
|| config.isAuthorizationEnabled()) {
checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
checkArgument(!isEmpty(config.getConfigurationStoreServers()),
"configurationStoreServers must be provided");
}
}).register(CollectorRegistry.defaultRegistry);

Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
@Override
public double get() {
return PlatformDependent.maxDirectMemory();
if ((!config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURL()))
|| (config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURLTLS()))) {
checkArgument(!isEmpty(config.getZookeeperServers()), "zookeeperServers must be provided");
}
}).register(CollectorRegistry.defaultRegistry);

addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));
// create proxy service
ProxyService proxyService = new ProxyService(config, authenticationService);
// create a web-service
final WebServer server = new WebServer(config, authenticationService);

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
proxyService.close();
server.stop();
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
}
}));

proxyService.start();

// Setup metrics
DefaultExports.initialize();

// Report direct memory from Netty counters
Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
@Override
public double get() {
return getJvmDirectMemoryUsed();
}
}).register(CollectorRegistry.defaultRegistry);

Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
@Override
public double get() {
return PlatformDependent.maxDirectMemory();
}
}).register(CollectorRegistry.defaultRegistry);

addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());

// start web-service
server.start();

// start web-service
server.start();
} catch (Exception e) {
log.error("Failed to start pulsar proxy service. error msg " + e.getMessage(), e);
throw new PulsarServerException(e);
}
}

public static void main(String[] args) throws Exception {
Expand Down

0 comments on commit bcb97d1

Please sign in to comment.