Skip to content

Commit

Permalink
Refactor intercept interface (apache#12)
Browse files Browse the repository at this point in the history
* refactoring intercept interface

* cleaning up

* fix bug
  • Loading branch information
jerrypeng authored and merlimat committed Jul 16, 2019
1 parent 11f3053 commit 4189738
Show file tree
Hide file tree
Showing 20 changed files with 1,359 additions and 222 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.pulsar.broker.intercept;

import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;

public interface FunctionsInterceptProvider {
/**
* Intercept call for create function
*
* @param functionConfig function config of the function to be created
* @param clientRole the role used to create function
*/
default void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException {}

/**
* Intercept call for update function
* @param functionConfig function config of the function to be updated
* @param existingFunctionConfig
* @param clientRole the role used to update function
*/
default void updateFunction(FunctionConfig functionConfig, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package org.apache.pulsar.broker.intercept;

import org.apache.pulsar.common.functions.FunctionConfig;

public class FunctionsInterceptService {

private final FunctionsInterceptProvider provider;

public FunctionsInterceptService(FunctionsInterceptProvider functionsInterceptProvider) {
this.provider = functionsInterceptProvider;
}

/**
* Intercept call for create function
*
* @param functionConfig function config of the function to be created
* @param clientRole the role used to create function
*/
public void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException {
provider.createFunction(functionConfig, clientRole);
}

/**
* Intercept call for update source
*
* @param updates updates to this function's function config
* @param existingFunctionConfig the existing function config
* @param clientRole the role used to update function
*/
public void updateFunction(FunctionConfig updates, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException {
provider.updateFunction(updates, existingFunctionConfig, clientRole);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,116 +20,40 @@

import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;

/**
* This class provides a mechanism to intercept various API calls
*/
public interface InterceptProvider {

/**
* Perform initialization for the intercept provider
*
* @param conf broker config object
*/
default void initialize(ServiceConfiguration conf, PulsarAdmin pulsarAdmin) throws InterceptException {}
default TenantsInterceptProvider getTenantInterceptProvider() {
return new TenantsInterceptProvider() {};
}

/**
* Intercept call for create tenant
*
* @param tenant tenant name
* @param tenantInfo tenant info
* @param clientRole the role used to create tenant
*/
default void createTenant(String tenant, TenantInfo tenantInfo, String clientRole) throws InterceptException {}
default NamespacesInterceptProvider getNamespaceInterceptProvider() {
return new NamespacesInterceptProvider() {};
}

/**
* Intercept call for creating namespace
*
* @param namespaceName the namespace name
* @param policies polices for this namespace
* @param clientRole the role used to create namespace
*/
default void createNamespace(NamespaceName namespaceName, Policies policies, String clientRole) throws InterceptException {}

/**
* Intercept call for create topic
*
* @param topicName the topic name
* @param clientRole the role used to create topic
*/
default void createTopic(TopicName topicName, String clientRole) throws InterceptException {}
default TopicsInterceptProvider getTopicInterceptProvider() {
return new TopicsInterceptProvider() {};
}

/**
* Intercept create partitioned topic
* @param topicName the topic name
* @param numPartitions number of partitions to create for this partitioned topic
* @param clientRole the role used to create partitioned topic
*/
default void createPartitionedTopic(TopicName topicName, PartitionedTopicMetadata numPartitions, String clientRole) throws InterceptException {}
default FunctionsInterceptProvider getFunctionsInterceptProvider() {
return new FunctionsInterceptProvider() {};
}

/**
* Intercept update partitioned topic
* @param topicName the topic name
* @param numPartitions number of partitions to update to
* @param clientRole the role used to update partitioned topic
*/
default void updatePartitionedTopic(TopicName topicName, PartitionedTopicMetadata numPartitions, String clientRole) throws InterceptException {}
default SourcesInterceptProvider getSourcesInterceptProvider() {
return new SourcesInterceptProvider() {};
}

default SinksInterceptProvider getSinksInterceptProvider() {
return new SinksInterceptProvider() {};
}

/**
* Intercept call for create function
*
* @param functionConfig function config of the function to be created
* @param clientRole the role used to create function
*/
default void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException {}

/**
* Intercept call for update function
* @param functionConfig function config of the function to be updated
* @param existingFunctionConfig
* @param clientRole the role used to update function
*/
default void updateFunction(FunctionConfig functionConfig, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException {}

/**
* Intercept call for create source
*
* @param sourceConfig the source config of the source to be created
* @param clientRole the role used to create source
*/
default void createSource(SourceConfig sourceConfig, String clientRole) throws InterceptException {}

/**
* Intercept call for update source
* @param sourceConfig the source config of the source to be updated
* @param existingSourceConfig
* @param clientRole the role used to update source
*/
default void updateSource(SourceConfig sourceConfig, SourceConfig existingSourceConfig, String clientRole) throws InterceptException {}

/**
* Intercept call for create sink
* Perform initialization for the intercept provider
*
* @param sinkConfig the sink config of the sink to be created
* @param clientRole the role used to create sink
*/
default void createSink(SinkConfig sinkConfig, String clientRole) throws InterceptException {} ;

/**
* Intercept call for update sink
* @param sinkConfig the sink config of the sink to be updated
* @param existingSinkConfig
* @param clientRole the role used to update sink
* @param conf broker config object
*/
default void updateSink(SinkConfig sinkConfig, SinkConfig existingSinkConfig, String clientRole) throws InterceptException {}

default void initialize(ServiceConfiguration conf, PulsarAdmin pulsarAdmin) throws InterceptException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,8 @@
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,6 +41,12 @@ public class InterceptService {

private InterceptProvider provider;
private final ServiceConfiguration conf;
private final TenantsInterceptService tenantInterceptService;
private final NamespacesInterceptService namespaceInterceptService;
private final TopicInterceptService topicInterceptService;
private final FunctionsInterceptService functionInterceptService;
private final SinksInterceptService sinkInterceptService;
private final SourcesInterceptService sourceInterceptService;

public InterceptService(ServiceConfiguration conf, PulsarAdmin pulsarAdmin)
throws PulsarServerException {
Expand All @@ -58,121 +61,40 @@ public InterceptService(ServiceConfiguration conf, PulsarAdmin pulsarAdmin)
} else {
provider = new InterceptProvider() {};
}

tenantInterceptService = new TenantsInterceptService(provider.getTenantInterceptProvider());
namespaceInterceptService = new NamespacesInterceptService(provider.getNamespaceInterceptProvider());
topicInterceptService = new TopicInterceptService(provider.getTopicInterceptProvider());
functionInterceptService = new FunctionsInterceptService(provider.getFunctionsInterceptProvider());
sourceInterceptService = new SourcesInterceptService(provider.getSourcesInterceptProvider());
sinkInterceptService = new SinksInterceptService(provider.getSinksInterceptProvider());

} catch (Throwable e) {
throw new PulsarServerException("Failed to load an intercept provider.", e);
}
}

/**
* Intercept call for create tenant
*
* @param tenant tenant name
* @param tenantInfo tenant info
* @param clientRole the role used to create tenant
*/
public void createTenant(String tenant, TenantInfo tenantInfo, String clientRole) throws InterceptException {
provider.createTenant(tenant, tenantInfo, clientRole);
}

/**
* Intercept call for creating namespace
*
* @param namespaceName the namespace name
* @param policies polices for this namespace
* @param clientRole the role used to create namespace
*/
public void createNamespace(NamespaceName namespaceName, Policies policies, String clientRole) throws InterceptException {
provider.createNamespace(namespaceName, policies, clientRole);
}

/**
* Intercept create partitioned topic
* @param topicName the topic name
* @param partitionedTopicMetadata metadata related to the partioned topic
* @param clientRole the role used to create partitioned topic
*/
public void createPartitionedTopic(TopicName topicName, PartitionedTopicMetadata partitionedTopicMetadata, String clientRole) throws InterceptException {
provider.createPartitionedTopic(topicName, partitionedTopicMetadata, clientRole);
}

/**
* Intercept call for create topic
*
* @param topicName the topic name
* @param clientRole the role used to create topic
*/
public void createTopic(TopicName topicName, String clientRole) throws InterceptException {
provider.createTopic(topicName, clientRole);
}

/**
* Intercept update partitioned topic
* @param topicName the topic name
* @param partitionedTopicMetadata metadata related to the partioned topic
* @param clientRole the role used to update partitioned topic
*/
public void updatePartitionedTopic(TopicName topicName, PartitionedTopicMetadata partitionedTopicMetadata, String clientRole) throws InterceptException {
provider.updatePartitionedTopic(topicName, partitionedTopicMetadata, clientRole);
}

/**
* Intercept call for create function
*
* @param functionConfig function config of the function to be created
* @param clientRole the role used to create function
*/
public void createFunction(FunctionConfig functionConfig, String clientRole) throws InterceptException {
provider.createFunction(functionConfig, clientRole);
public TenantsInterceptService tenants() {
return tenantInterceptService;
}

/**
* Intercept call for update source
*
* @param updates updates to this function's function config
* @param existingFunctionConfig the existing function config
* @param clientRole the role used to update function
*/
public void updateFunction(FunctionConfig updates, FunctionConfig existingFunctionConfig, String clientRole) throws InterceptException {
provider.updateFunction(updates, existingFunctionConfig, clientRole);
public NamespacesInterceptService namespaces() {
return namespaceInterceptService;
}

/**
* Intercept call for create source
*
* @param sourceConfig the source config of the source to be created
* @param clientRole the role used to create source
*/
public void createSource(SourceConfig sourceConfig, String clientRole) throws InterceptException {
provider.createSource(sourceConfig, clientRole);
public TopicInterceptService topics() {
return topicInterceptService;
}

/**
* Intercept call for update source
* @param updates updates to this source's source config
* @param existingSourceConfig the existing source config
* @param clientRole the role used to update source
*/
public void updateSource(SourceConfig updates, SourceConfig existingSourceConfig, String clientRole) throws InterceptException {
provider.updateSource(updates, existingSourceConfig, clientRole);
public FunctionsInterceptService functions() {
return functionInterceptService;
}

/**
* Intercept call for create sink
*
* @param sinkConfig the sink config of the sink to be created
* @param clientRole the role used to create sink
*/
public void createSink(SinkConfig sinkConfig, String clientRole) throws InterceptException {
provider.createSink(sinkConfig, clientRole);
public SourcesInterceptService sources() {
return sourceInterceptService;
}

/**
* Intercept call for update sink
* @param updates updates to this sink's source config
* @param existingSinkConfig the existing source config
* @param clientRole the role used to update sink
*/
public void updateSink(SinkConfig updates, SinkConfig existingSinkConfig, String clientRole) throws InterceptException {
provider.updateSink(updates, existingSinkConfig, clientRole);
public SinksInterceptService sinks() {
return sinkInterceptService;
}
}

0 comments on commit 4189738

Please sign in to comment.