diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 8e1a53a09062f..08dad103e3e29 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -18,211 +18,37 @@ */ package org.apache.pulsar.functions.worker; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.AbstractService; -import java.io.IOException; -import java.net.URI; - import lombok.extern.slf4j.Slf4j; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.api.namespace.Namespace; -import org.apache.distributedlog.api.namespace.NamespaceBuilder; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.common.conf.InternalConfigurationData; -import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.functions.worker.rest.WorkerServer; -import javax.ws.rs.core.Response; - @Slf4j public class Worker extends AbstractService { private final WorkerConfig workerConfig; - private PulsarClient client; - private FunctionRuntimeManager functionRuntimeManager; - private FunctionMetaDataManager functionMetaDataManager; - private ClusterServiceCoordinator clusterServiceCoordinator; + private final WorkerService workerService; private Thread serverThread; - private Namespace dlogNamespace; - private MembershipManager membershipManager; - private SchedulerManager schedulerManager; public Worker(WorkerConfig workerConfig) { this.workerConfig = workerConfig; + this.workerService = new WorkerService(workerConfig); } @Override protected void doStart() { try { doStartImpl(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("Interrupted at starting worker", ie); } catch (Throwable t) { - t.printStackTrace(); + log.error("Failed to start worker", t); } } protected void doStartImpl() throws InterruptedException { - log.info("Starting worker {}...", workerConfig.getWorkerId()); - try { - log.info("Worker Configs: {}",new ObjectMapper().writerWithDefaultPrettyPrinter() - .writeValueAsString(workerConfig)); - } catch (JsonProcessingException e) { - log.warn("Failed to print worker configs with error {}", e.getMessage(), e); - } - - // initializing pulsar functions namespace - PulsarAdmin admin = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl()); - InternalConfigurationData internalConf; - // make sure pulsar broker is up - log.info("Checking if broker at {} is up...", this.workerConfig.getPulsarWebServiceUrl()); - int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries(); - int retries = 0; - while (true) { - try { - admin.clusters().getClusters(); - break; - } catch (PulsarAdminException e) { - log.warn("Retry to connect to Pulsar broker at {}", this.workerConfig.getPulsarWebServiceUrl()); - if (retries >= maxRetries) { - log.error("Failed to connect to Pulsar broker at {} after {} attempts", - this.workerConfig.getPulsarFunctionsNamespace(), maxRetries); - throw new RuntimeException("Failed to connect to Pulsar broker"); - } - retries ++; - Thread.sleep(1000); - } - } - - // getting namespace policy - log.info("Initializing Pulsar Functions namespace..."); - try { - try { - admin.namespaces().getPolicies(this.workerConfig.getPulsarFunctionsNamespace()); - } catch (PulsarAdminException e) { - if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { - // if not found than create - try { - admin.namespaces().createNamespace(this.workerConfig.getPulsarFunctionsNamespace()); - } catch (PulsarAdminException e1) { - // prevent race condition with other workers starting up - if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { - log.error("Failed to create namespace {} for pulsar functions", this.workerConfig - .getPulsarFunctionsNamespace(), e1); - throw new RuntimeException(e1); - } - } - try { - admin.namespaces().setRetention( - this.workerConfig.getPulsarFunctionsNamespace(), - new RetentionPolicies(Integer.MAX_VALUE, Integer.MAX_VALUE)); - } catch (PulsarAdminException e1) { - log.error("Failed to set retention policy for pulsar functions namespace", e); - throw new RuntimeException(e1); - } - } else { - log.error("Failed to get retention policy for pulsar function namespace {}", - this.workerConfig.getPulsarFunctionsNamespace(), e); - throw new RuntimeException(e); - } - } - try { - internalConf = admin.brokers().getInternalConfigurationData(); - } catch (PulsarAdminException e) { - log.error("Failed to retrieve broker internal configuration", e); - throw new RuntimeException(e); - } - } finally { - admin.close(); - } - - // initialize the dlog namespace - // TODO: move this as part of pulsar cluster initialization later - URI dlogUri; - try { - dlogUri = Utils.initializeDlogNamespace( - internalConf.getZookeeperServers(), - internalConf.getLedgersRootPath()); - } catch (IOException ioe) { - log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", - internalConf.getZookeeperServers(), ioe); - throw new RuntimeException(ioe); - } - - // create the dlog namespace for storing function packages - DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig); - try { - this.dlogNamespace = NamespaceBuilder.newBuilder() - .conf(dlogConf) - .clientId("function-worker-" + workerConfig.getWorkerId()) - .uri(dlogUri) - .build(); - } catch (Exception e) { - log.error("Failed to initialize dlog namespace {} for storing function packages", - dlogUri, e); - throw new RuntimeException(e); - } - - // initialize the function metadata manager - try { - - this.client = PulsarClient.create(this.workerConfig.getPulsarServiceUrl()); - log.info("Created Pulsar client"); - - //create scheduler manager - this.schedulerManager = new SchedulerManager(this.workerConfig, this.client); - - //create function meta data manager - this.functionMetaDataManager = new FunctionMetaDataManager( - this.workerConfig, this.schedulerManager, this.client); - - //create membership manager - this.membershipManager = new MembershipManager(this.workerConfig, this.client); - - // create function runtime manager - this.functionRuntimeManager = new FunctionRuntimeManager( - this.workerConfig, this.client, this.dlogNamespace, this.membershipManager); - - // Setting references to managers in scheduler - this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); - this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager); - this.schedulerManager.setMembershipManager(this.membershipManager); - - // initialize function metadata manager - this.functionMetaDataManager.initialize(); - - // Starting cluster services - log.info("Start cluster services..."); - this.clusterServiceCoordinator = new ClusterServiceCoordinator( - this.workerConfig.getWorkerId(), - membershipManager); - // start periodic snapshot routine - this.clusterServiceCoordinator.addTask( - "snapshot", - this.workerConfig.getSnapshotFreqMs(), - () -> functionMetaDataManager.snapshot()); - - this.clusterServiceCoordinator.addTask("membership-monitor", - this.workerConfig.getFailureCheckFreqMs(), - () -> membershipManager.checkFailures( - functionMetaDataManager, functionRuntimeManager, schedulerManager)); - - this.clusterServiceCoordinator.start(); - - // Start function runtime manager - this.functionRuntimeManager.start(); - - } catch (Exception e) { - log.error("Error Starting up in worker", e); - throw new RuntimeException(e); - } - - WorkerServer server = new WorkerServer( - this.workerConfig, this.functionMetaDataManager, this.functionRuntimeManager, - this.membershipManager, this.dlogNamespace); + WorkerServer server = new WorkerServer(workerService); this.serverThread = new Thread(server, server.getThreadName()); log.info("Start worker server on port {}...", this.workerConfig.getWorkerPort()); @@ -239,42 +65,6 @@ protected void doStop() { log.warn("Worker server thread is interrupted", e); } } - if (null != functionMetaDataManager) { - try { - functionMetaDataManager.close(); - } catch (Exception e) { - log.warn("Failed to close function metadata manager", e); - } - } - if (null != functionRuntimeManager) { - try { - functionRuntimeManager.close(); - } catch (Exception e) { - log.warn("Failed to close function runtime manager", e); - } - } - if (null != client) { - try { - client.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close pulsar client", e); - } - } - - if (null != clusterServiceCoordinator) { - clusterServiceCoordinator.close(); - } - - if (null != membershipManager) { - try { - membershipManager.close(); - } catch (PulsarClientException e) { - log.warn("Failed to close membership manager", e); - } - } - - if (null != schedulerManager) { - schedulerManager.close(); - } + workerService.stop(); } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java new file mode 100644 index 0000000000000..2c4adbf2a6163 --- /dev/null +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.worker; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.net.URI; +import javax.ws.rs.core.Response; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.api.namespace.Namespace; +import org.apache.distributedlog.api.namespace.NamespaceBuilder; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.functions.worker.rest.BaseApiResource; +import org.apache.pulsar.functions.worker.rest.Resources; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.glassfish.jersey.server.ResourceConfig; +import org.glassfish.jersey.servlet.ServletContainer; + +/** + * A service component contains everything to run a worker except rest server. + */ +@Slf4j +@Getter +public class WorkerService { + + private final WorkerConfig workerConfig; + + private PulsarClient client; + private FunctionRuntimeManager functionRuntimeManager; + private FunctionMetaDataManager functionMetaDataManager; + private ClusterServiceCoordinator clusterServiceCoordinator; + private Namespace dlogNamespace; + private MembershipManager membershipManager; + private SchedulerManager schedulerManager; + + public WorkerService(WorkerConfig workerConfig) { + this.workerConfig = workerConfig; + } + + public ServletContextHandler newServletContextHandler(String contextPath) { + final ResourceConfig config = new ResourceConfig(Resources.get()); + final ServletContextHandler contextHandler = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_CONFIG, this.workerConfig); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER, this.functionMetaDataManager); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_RUNTIME_MANAGER, this.functionRuntimeManager); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_MEMBERSHIP_MANAGER, this.membershipManager); + contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_DLOG_NAMESPACE, this.dlogNamespace); + contextHandler.setContextPath(contextPath); + + final ServletHolder apiServlet = + new ServletHolder(new ServletContainer(config)); + contextHandler.addServlet(apiServlet, "/*"); + + return contextHandler; + } + + public void start() throws InterruptedException { + log.info("Starting worker {}...", workerConfig.getWorkerId()); + try { + log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter() + .writeValueAsString(workerConfig)); + } catch (JsonProcessingException e) { + log.warn("Failed to print worker configs with error {}", e.getMessage(), e); + } + + // initializing pulsar functions namespace + PulsarAdmin admin = Utils.getPulsarAdminClient(this.workerConfig.getPulsarWebServiceUrl()); + InternalConfigurationData internalConf; + // make sure pulsar broker is up + log.info("Checking if broker at {} is up...", this.workerConfig.getPulsarWebServiceUrl()); + int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries(); + int retries = 0; + while (true) { + try { + admin.clusters().getClusters(); + break; + } catch (PulsarAdminException e) { + log.warn("Retry to connect to Pulsar broker at {}", this.workerConfig.getPulsarWebServiceUrl()); + if (retries >= maxRetries) { + log.error("Failed to connect to Pulsar broker at {} after {} attempts", + this.workerConfig.getPulsarFunctionsNamespace(), maxRetries); + throw new RuntimeException("Failed to connect to Pulsar broker"); + } + retries ++; + Thread.sleep(1000); + } + } + + // getting namespace policy + log.info("Initializing Pulsar Functions namespace..."); + try { + try { + admin.namespaces().getPolicies(this.workerConfig.getPulsarFunctionsNamespace()); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { + // if not found than create + try { + admin.namespaces().createNamespace(this.workerConfig.getPulsarFunctionsNamespace()); + } catch (PulsarAdminException e1) { + // prevent race condition with other workers starting up + if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { + log.error("Failed to create namespace {} for pulsar functions", this.workerConfig + .getPulsarFunctionsNamespace(), e1); + throw new RuntimeException(e1); + } + } + try { + admin.namespaces().setRetention( + this.workerConfig.getPulsarFunctionsNamespace(), + new RetentionPolicies(Integer.MAX_VALUE, Integer.MAX_VALUE)); + } catch (PulsarAdminException e1) { + log.error("Failed to set retention policy for pulsar functions namespace", e); + throw new RuntimeException(e1); + } + } else { + log.error("Failed to get retention policy for pulsar function namespace {}", + this.workerConfig.getPulsarFunctionsNamespace(), e); + throw new RuntimeException(e); + } + } + try { + internalConf = admin.brokers().getInternalConfigurationData(); + } catch (PulsarAdminException e) { + log.error("Failed to retrieve broker internal configuration", e); + throw new RuntimeException(e); + } + } finally { + admin.close(); + } + + // initialize the dlog namespace + // TODO: move this as part of pulsar cluster initialization later + URI dlogUri; + try { + dlogUri = Utils.initializeDlogNamespace( + internalConf.getZookeeperServers(), + internalConf.getLedgersRootPath()); + } catch (IOException ioe) { + log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", + internalConf.getZookeeperServers(), ioe); + throw new RuntimeException(ioe); + } + + // create the dlog namespace for storing function packages + DistributedLogConfiguration dlogConf = Utils.getDlogConf(workerConfig); + try { + this.dlogNamespace = NamespaceBuilder.newBuilder() + .conf(dlogConf) + .clientId("function-worker-" + workerConfig.getWorkerId()) + .uri(dlogUri) + .build(); + } catch (Exception e) { + log.error("Failed to initialize dlog namespace {} for storing function packages", + dlogUri, e); + throw new RuntimeException(e); + } + + // initialize the function metadata manager + try { + + this.client = PulsarClient.create(this.workerConfig.getPulsarServiceUrl()); + log.info("Created Pulsar client"); + + //create scheduler manager + this.schedulerManager = new SchedulerManager(this.workerConfig, this.client); + + //create function meta data manager + this.functionMetaDataManager = new FunctionMetaDataManager( + this.workerConfig, this.schedulerManager, this.client); + + //create membership manager + this.membershipManager = new MembershipManager(this.workerConfig, this.client); + + // create function runtime manager + this.functionRuntimeManager = new FunctionRuntimeManager( + this.workerConfig, this.client, this.dlogNamespace, this.membershipManager); + + // Setting references to managers in scheduler + this.schedulerManager.setFunctionMetaDataManager(this.functionMetaDataManager); + this.schedulerManager.setFunctionRuntimeManager(this.functionRuntimeManager); + this.schedulerManager.setMembershipManager(this.membershipManager); + + // initialize function metadata manager + this.functionMetaDataManager.initialize(); + + // Starting cluster services + log.info("Start cluster services..."); + this.clusterServiceCoordinator = new ClusterServiceCoordinator( + this.workerConfig.getWorkerId(), + membershipManager); + // start periodic snapshot routine + this.clusterServiceCoordinator.addTask( + "snapshot", + this.workerConfig.getSnapshotFreqMs(), + () -> functionMetaDataManager.snapshot()); + + this.clusterServiceCoordinator.addTask("membership-monitor", + this.workerConfig.getFailureCheckFreqMs(), + () -> membershipManager.checkFailures( + functionMetaDataManager, functionRuntimeManager, schedulerManager)); + + this.clusterServiceCoordinator.start(); + + // Start function runtime manager + this.functionRuntimeManager.start(); + + } catch (Exception e) { + log.error("Error Starting up in worker", e); + throw new RuntimeException(e); + } + } + + public void stop() { + if (null != functionMetaDataManager) { + try { + functionMetaDataManager.close(); + } catch (Exception e) { + log.warn("Failed to close function metadata manager", e); + } + } + if (null != functionRuntimeManager) { + try { + functionRuntimeManager.close(); + } catch (Exception e) { + log.warn("Failed to close function runtime manager", e); + } + } + if (null != client) { + try { + client.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close pulsar client", e); + } + } + + if (null != clusterServiceCoordinator) { + clusterServiceCoordinator.close(); + } + + if (null != membershipManager) { + try { + membershipManager.close(); + } catch (PulsarClientException e) { + log.warn("Failed to close membership manager", e); + } + } + + if (null != schedulerManager) { + schedulerManager.close(); + } + } + +} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java index ec83975b4bbf2..e1a840347f295 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java @@ -31,7 +31,7 @@ public final class Resources { private Resources() { } - static Set> get() { + public static Set> get() { return new HashSet<>(getClasses()); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java index d031fd2572afe..96e898e1a92b6 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java @@ -18,31 +18,19 @@ */ package org.apache.pulsar.functions.worker.rest; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.distributedlog.api.namespace.Namespace; -import org.apache.pulsar.functions.worker.FunctionMetaDataManager; -import org.apache.pulsar.functions.worker.FunctionRuntimeManager; -import org.apache.pulsar.functions.worker.MembershipManager; import org.apache.pulsar.functions.worker.WorkerConfig; +import org.apache.pulsar.functions.worker.WorkerService; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.glassfish.jersey.server.ResourceConfig; -import org.glassfish.jersey.servlet.ServletContainer; import java.net.BindException; import java.net.URI; @Slf4j -@RequiredArgsConstructor public class WorkerServer implements Runnable { private final WorkerConfig workerConfig; - private final FunctionMetaDataManager functionMetaDataManager; - private final FunctionRuntimeManager functionRuntimeManager; - private final MembershipManager membershipManager; - private final Namespace dlogNamespace; + private final WorkerService workerService; private static String getErrorMessage(Server server, int port, Exception ex) { if (ex instanceof BindException) { @@ -53,28 +41,16 @@ private static String getErrorMessage(Server server, int port, Exception ex) { return ex.getMessage(); } + public WorkerServer(WorkerService workerService) { + this.workerConfig = workerService.getWorkerConfig(); + this.workerService = workerService; + } + @Override public void run() { final Server server = new Server(this.workerConfig.getWorkerPort()); + server.setHandler(workerService.newServletContextHandler("/")); - final ResourceConfig config = new ResourceConfig(Resources.get()); - - final ServletContextHandler contextHandler = - new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_CONFIG, this.workerConfig); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_STATE_MANAGER, this.functionMetaDataManager); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_FUNCTION_RUNTIME_MANAGER, this.functionRuntimeManager); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_MEMBERSHIP_MANAGER, this.membershipManager); - contextHandler.setAttribute(BaseApiResource.ATTRIBUTE_WORKER_DLOG_NAMESPACE, this.dlogNamespace); - contextHandler.setContextPath("/"); - - server.setHandler(contextHandler); - - final ServletHolder apiServlet = - new ServletHolder(new ServletContainer(config)); - - contextHandler.addServlet(apiServlet, "/*"); try { server.start();