Skip to content

Commit

Permalink
adding Intercept Interface (#10)
Browse files Browse the repository at this point in the history
* adding Intercept Interface

* adding intercept classes

* addressing comments

* updating impl

* adding

* improving

* adding implementation

* fixes

* improving impl

* removing impl

Refactor intercept interface (#12)

* refactoring intercept interface

* cleaning up

* fix bug
  • Loading branch information
jerrypeng authored and merlimat committed Jan 24, 2020
1 parent 847c3de commit 2d79fd6
Show file tree
Hide file tree
Showing 39 changed files with 1,804 additions and 62 deletions.
Expand Up @@ -26,11 +26,16 @@
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;

import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.pulsar.common.naming.TopicName;

/**
* Configuration class for a ManagedLedger.
*/
Expand Down Expand Up @@ -68,6 +73,9 @@ public class ManagedLedgerConfig {
private Map<String, Object> bookKeeperEnsemblePlacementPolicyProperties;
private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE;
private Clock clock = Clock.systemUTC();
@Getter
@Setter
private Runnable createFunctionInterceptFunc;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down
Expand Up @@ -338,7 +338,7 @@ public void operationFailed(MetaStoreException e) {
callback.initializeFailed(new ManagedLedgerException(e));
}
}
});
}, config.getCreateFunctionInterceptFunc());

scheduleTimeoutTask();
}
Expand Down
Expand Up @@ -19,9 +19,12 @@
package org.apache.bookkeeper.mledger.impl;

import java.util.List;
import java.util.function.Function;

import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.pulsar.common.naming.TopicName;

/**
* Interface that describes the operations that the ManagedLedger need to do on the metadata store.
Expand Down Expand Up @@ -56,8 +59,11 @@ interface MetaStoreCallback<T> {
* whether the managed ledger metadata should be created if it doesn't exist already
* @throws MetaStoreException
*/
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback<ManagedLedgerInfo> callback);
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback<ManagedLedgerInfo> callback, Runnable createTopicIntercept);

default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback<ManagedLedgerInfo> callback) {
getManagedLedgerInfo(ledgerName, createIfMissing, callback, null);
}
/**
*
* @param ledgerName
Expand Down
Expand Up @@ -31,13 +31,15 @@
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -128,7 +130,7 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {

@Override
public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissing,
final MetaStoreCallback<ManagedLedgerInfo> callback) {
final MetaStoreCallback<ManagedLedgerInfo> callback, Runnable createTopicIntercept) {
// Try to get the content or create an empty node
zk.getData(prefix + ledgerName, false,
(rc, path, ctx, readData, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> {
Expand All @@ -143,6 +145,16 @@ public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissin
} else if (rc == Code.NONODE.intValue()) {
// Z-node doesn't exist
if (createIfMissing) {
// intercept
if (createTopicIntercept != null) {
try {
createTopicIntercept.run();
} catch (Exception e) {
callback.operationFailed(new MetaStoreException(e));
return;
}
}

log.info("Creating '{}{}'", prefix, ledgerName);

StringCallback createcb = (rc1, path1, ctx1, name) -> {
Expand Down
6 changes: 6 additions & 0 deletions pulsar-broker-common/pom.xml
Expand Up @@ -45,6 +45,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin-original</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down
Expand Up @@ -170,6 +170,9 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
private long delayedDeliveryTickTimeMillis = 1000;

@FieldContext(category = CATEGORY_SERVER, doc = "The class name of Intercept provider")
private String interceptProvider;

@FieldContext(
category = CATEGORY_WEBSOCKET,
doc = "Enable the WebSocket API service in broker"
Expand Down
@@ -0,0 +1,23 @@
package org.apache.pulsar.broker.intercept;

import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;

public interface FunctionsInterceptProvider {
/**
* Intercept call for create function
*
* @param functionConfig function config of the function to be created
* @param clientRole the role used to create function
*/
default void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException {}

/**
* Intercept call for update function
* @param functionConfig function config of the function to be updated
* @param existingFunctionConfig
* @param clientRole the role used to update function
*/
default void updateFunction(FunctionConfig functionConfig, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException {}
}
@@ -0,0 +1,34 @@
package org.apache.pulsar.broker.intercept;

import org.apache.pulsar.common.functions.FunctionConfig;

public class FunctionsInterceptService {

private final FunctionsInterceptProvider provider;

public FunctionsInterceptService(FunctionsInterceptProvider functionsInterceptProvider) {
this.provider = functionsInterceptProvider;
}

/**
* Intercept call for create function
*
* @param functionConfig function config of the function to be created
* @param clientRole the role used to create function
*/
public void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException {
provider.createFunction(functionConfig, clientRole);
}

/**
* Intercept call for update source
*
* @param updates updates to this function's function config
* @param existingFunctionConfig the existing function config
* @param clientRole the role used to update function
*/
public void updateFunction(FunctionConfig updates, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException {
provider.updateFunction(updates, existingFunctionConfig, clientRole);
}

}
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;

import lombok.Getter;
import lombok.NoArgsConstructor;

import java.util.Optional;

@Getter
@NoArgsConstructor
public class InterceptException extends Exception {

private Optional<Integer> errorCode = Optional.empty();

public InterceptException(Integer errorCode, String errorMessage) {
super(errorMessage);
this.errorCode = Optional.of(errorCode);
}
}
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;

/**
* This class provides a mechanism to intercept various API calls
*/
public interface InterceptProvider {

default TenantsInterceptProvider getTenantInterceptProvider() {
return new TenantsInterceptProvider() {};
}

default NamespacesInterceptProvider getNamespaceInterceptProvider() {
return new NamespacesInterceptProvider() {};
}

default TopicsInterceptProvider getTopicInterceptProvider() {
return new TopicsInterceptProvider() {};
}

default FunctionsInterceptProvider getFunctionsInterceptProvider() {
return new FunctionsInterceptProvider() {};
}

default SourcesInterceptProvider getSourcesInterceptProvider() {
return new SourcesInterceptProvider() {};
}

default SinksInterceptProvider getSinksInterceptProvider() {
return new SinksInterceptProvider() {};
}

/**
* Perform initialization for the intercept provider
*
* @param conf broker config object
*/
default void initialize(ServiceConfiguration conf, PulsarAdmin pulsarAdmin) throws InterceptException {}
}
@@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.intercept;

import com.google.common.annotations.Beta;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* Service that manages intercepting API calls to a pulsar cluster
*/
@Beta
public class InterceptService {
private static final Logger log = LoggerFactory.getLogger(InterceptService.class);

private InterceptProvider provider;
private final ServiceConfiguration conf;
private final TenantsInterceptService tenantInterceptService;
private final NamespacesInterceptService namespaceInterceptService;
private final TopicInterceptService topicInterceptService;
private final FunctionsInterceptService functionInterceptService;
private final SinksInterceptService sinkInterceptService;
private final SourcesInterceptService sourceInterceptService;

public InterceptService(ServiceConfiguration conf, PulsarAdmin pulsarAdmin)
throws PulsarServerException {
this.conf = conf;

try {
final String providerClassname = conf.getInterceptProvider();
if (StringUtils.isNotBlank(providerClassname)) {
provider = (InterceptProvider) Class.forName(providerClassname).newInstance();
provider.initialize(conf, pulsarAdmin);
log.info("Interceptor {} has been loaded.", providerClassname);
} else {
provider = new InterceptProvider() {};
}

tenantInterceptService = new TenantsInterceptService(provider.getTenantInterceptProvider());
namespaceInterceptService = new NamespacesInterceptService(provider.getNamespaceInterceptProvider());
topicInterceptService = new TopicInterceptService(provider.getTopicInterceptProvider());
functionInterceptService = new FunctionsInterceptService(provider.getFunctionsInterceptProvider());
sourceInterceptService = new SourcesInterceptService(provider.getSourcesInterceptProvider());
sinkInterceptService = new SinksInterceptService(provider.getSinksInterceptProvider());

} catch (Throwable e) {
throw new PulsarServerException("Failed to load an intercept provider.", e);
}
}

public TenantsInterceptService tenants() {
return tenantInterceptService;
}

public NamespacesInterceptService namespaces() {
return namespaceInterceptService;
}

public TopicInterceptService topics() {
return topicInterceptService;
}

public FunctionsInterceptService functions() {
return functionInterceptService;
}

public SourcesInterceptService sources() {
return sourceInterceptService;
}

public SinksInterceptService sinks() {
return sinkInterceptService;
}
}

0 comments on commit 2d79fd6

Please sign in to comment.