Skip to content

Commit

Permalink
Merge branch 'up/master' into website/branch-2.7.2-chapter-6
Browse files Browse the repository at this point in the history
* up/master: (55 commits)
  [broker] remove useless method "PersistentTopic#getPersistentTopic" (apache#12655)
  [Python Schema] Python schema support custom Avro configurations for Enum type (apache#12642)
  Allow to configure different implementations for Pulsar functions state store (apache#12646)
  Remove replicator global test from the quarantine group (apache#12648)
  [Java Client] Remove invalid call to Thread.currentThread().interrupt(); (apache#12652)
  k8s runtime: force deletion to avoid hung function worker during connector restart (apache#12504)
  [Broker] Optimize exception information for schemas (apache#12647)
  Close Zk database on unit tests (apache#12649)
  Fix call sync method in an async callback when enabling geo replicator. (apache#12590)
  [pulsar-broker] Add git branch information for PulsarVersion (apache#12541)
  PulsarAdmin: Fix last exit code storage (apache#12581)
  Add @test annotation to test methods (apache#12640)
  Upgrade debezium to 1.7.1 (apache#12644)
  [ML] Avoid passing OpAddEntry across a thread boundary in asyncAddEntry (apache#12606)
  [Functions] Prevent NPE while stopping a non started Pulsar LogAppender (apache#12643)
  Update io-debezium-source.md (apache#12638)
  Add missing cmds on pulsar-admin document page (apache#12634)
  Clean up the metadata of the non-persistent partitioned topics. (apache#12550)
  modify check waitingForPingResponse with volatile (apache#12615)
  [pulsar-admin] Check backlog quota policy for namespace (apache#12512)
  ...
  • Loading branch information
Yan Zhang committed Nov 7, 2021
2 parents caf9085 + 40356eb commit 497e20e
Show file tree
Hide file tree
Showing 234 changed files with 9,929 additions and 938 deletions.
6 changes: 1 addition & 5 deletions build/run_unit_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,8 @@ function broker_flaky() {
echo "::endgroup::"
echo "::group::Running quarantined tests"
$MVN_COMMAND test -pl pulsar-broker -Dgroups='quarantine' -DexcludedGroups='' -DfailIfNoTests=false \
-DtestForkCount=2 -Dexclude='**/Replicator*Test.java' ||
-DtestForkCount=2 ||
print_testng_failures pulsar-broker/target/surefire-reports/testng-failed.xml "Quarantined test failure in" "Quarantined test failures"
# run quarantined Replicator tests separately
$MVN_COMMAND test -pl pulsar-broker -Dgroups='quarantine' -DexcludedGroups='' -DfailIfNoTests=false \
-DtestForkCount=2 -Dinclude='**/Replicator*Test.java' || \
print_testng_failures pulsar-broker/target/surefire-reports/testng-failed.xml "Quarantined test failure in" "Quarantined Replicator*Test failures"
echo "::endgroup::"
echo "::group::Running flaky tests"
$MVN_TEST_COMMAND -pl pulsar-broker -Dgroups='flaky' -DtestForkCount=2
Expand Down
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ zookeeperSessionExpiredPolicy=shutdown
# Enable or disable system topic
systemTopicEnabled=false

# The schema compatibility strategy to use for system topics
systemTopicSchemaCompatibilityStrategy=ALWAYS_COMPATIBLE

# Enable or disable topic level policies, topic level policies depends on the system topic
# Please enable the system topic first.
topicLevelPoliciesEnabled=false
Expand Down
4 changes: 4 additions & 0 deletions conf/functions_worker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ functionRuntimeFactoryConfigs:
# extraFunctionDependenciesDir:
# # Additional memory padding added on top of the memory requested by the function per on a per instance basis
# percentMemoryPadding: 10
# # The duration in seconds before the StatefulSet deleted on function stop/restart.
# # Value must be non-negative integer. The value zero indicates delete immediately.
# # Default is 5 seconds.
# gracePeriodSeconds: 5

## A set of the minimum amount of resources functions must request.
## Support for this depends on function runtime.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -699,10 +699,14 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}

OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);
// retain buffer in this thread
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
executor.executeOrdered(name, safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
}

@Override
Expand All @@ -711,10 +715,14 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}

OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx);
// retain buffer in this thread
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation)));
executor.executeOrdered(name, safeRun(() -> {
OpAddEntry addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx);
internalAsyncAddEntry(addOperation);
}));
}

private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
Expand Down Expand Up @@ -1501,9 +1509,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
// If op is used by another ledger handle, we need to close it and create a new one
if (existsOp.ledger != null) {
existsOp.close();
existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
// release the extra retain
ReferenceCountUtil.release(existsOp.data);
existsOp = OpAddEntry.createNoRetainBuffer(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx);
}
existsOp.setLedger(currentLedger);
pendingAddEntries.add(existsOp);
Expand Down Expand Up @@ -2543,7 +2549,8 @@ private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
// move the mark delete position to the highestPositionToDelete only if it is smaller than the add confirmed
// to prevent the edge case where the cursor is caught up to the latest and highestPositionToDelete may be larger than the last add confirmed
if (highestPositionToDelete.compareTo((PositionImpl) cursor.getMarkDeletedPosition()) > 0
&& highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0 ) {
&& highestPositionToDelete.compareTo((PositionImpl) cursor.getManagedLedger().getLastConfirmedEntry()) <= 0
&& !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) {
cursor.asyncMarkDelete(highestPositionToDelete, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

public class NonDurableCursorImpl extends ManagedCursorImpl {

private volatile boolean readCompacted;

NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
super(bookkeeper, config, ledger, cursorName);
Expand Down Expand Up @@ -116,6 +118,14 @@ public void asyncDeleteCursor(final String consumerName, final DeleteCursorCallb
callback.deleteCursorComplete(ctx);
}

public void setReadCompacted(boolean readCompacted) {
this.readCompacted = readCompacted;
}

public boolean isReadCompacted() {
return readCompacted;
}

@Override
public synchronized String toString() {
return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,28 +75,28 @@ enum State {
CLOSED
}

public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntry(ml, data, callback, ctx);
public static OpAddEntry createNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) {
OpAddEntry op = createOpAddEntryNoRetainBuffer(ml, data, callback, ctx);
op.numberOfMessages = numberOfMessages;
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
}
return op;
}

private static OpAddEntry createOpAddEntry(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
private static OpAddEntry createOpAddEntryNoRetainBuffer(ManagedLedgerImpl ml, ByteBuf data, AddEntryCallback callback, Object ctx) {
OpAddEntry op = RECYCLER.get();
op.ml = ml;
op.ledger = null;
op.data = data.retain();
op.data = data;
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
Expand Down Expand Up @@ -155,7 +155,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx)
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());

if (!checkAndCompleteOp(ctx)) {
// means callback might have been completed by different thread (timeout task thread).. so do nothing
return;
Expand Down Expand Up @@ -255,7 +255,7 @@ private void updateLatency() {

/**
* Checks if add-operation is completed
*
*
* @return true if task is not already completed else returns false.
*/
private boolean checkAndCompleteOp(Object ctx) {
Expand All @@ -276,7 +276,7 @@ void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {

/**
* It handles add failure on the given ledger. it can be triggered when add-entry fails or times out.
*
*
* @param lh
*/
void handleAddFailure(final LedgerHandle lh) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2830,7 +2830,7 @@ public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception {

List<OpAddEntry> oldOps = new ArrayList<>();
for (int i = 0; i < 10; i++) {
OpAddEntry op = OpAddEntry.create(ledger, ByteBufAllocator.DEFAULT.buffer(128), null, null);
OpAddEntry op = OpAddEntry.createNoRetainBuffer(ledger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
if (i > 4) {
op.setLedger(mock(LedgerHandle.class));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ protected ServerConfiguration newServerConfiguration(int port, String zkServers,
conf.setBookiePort(port);
if (!"".equals(ledgerRootPath)) {
conf.setMetadataServiceUri("zk://" + zkUtil.getZooKeeperConnectString() + ledgerRootPath);
}else {
} else {
conf.setZkServers(zkServers);
}
conf.setJournalDirName(journalDir.getPath());
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ flexible messaging model and an intuitive client API.</description>
<presto.version>332</presto.version>
<scala.binary.version>2.13</scala.binary.version>
<scala-library.version>2.13.6</scala-library.version>
<debezium.version>1.7.0.Final</debezium.version>
<debezium.version>1.7.1.Final</debezium.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<hbase.version>2.3.0</hbase.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Enable or disable system topic.")
private boolean systemTopicEnabled = false;

@FieldContext(
category = CATEGORY_SCHEMA,
doc = "The schema compatibility strategy to use for system topics"
)
private SchemaCompatibilityStrategy systemTopicSchemaCompatibilityStrategy =
SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Enable or disable topic level policies, topic level policies depends on the system topic, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
static final String HTTP_HEADER_VALUE_PREFIX = "Bearer ";

// When symmetric key is configured
static final String CONF_TOKEN_SETTING_PREFIX = "";
static final String CONF_TOKEN_SETTING_PREFIX = "tokenSettingPrefix";

// When symmetric key is configured
static final String CONF_TOKEN_SECRET_KEY = "tokenSecretKey";
Expand Down Expand Up @@ -253,38 +253,35 @@ private String getPrincipal(Jwt<?, Claims> jwt) {
* Try to get the validation key for tokens from several possible config options.
*/
private Key getValidationKey(ServiceConfiguration conf) throws IOException {
if (conf.getProperty(confTokenSecretKeySettingName) != null
&& StringUtils.isNotBlank((String) conf.getProperty(confTokenSecretKeySettingName))) {
final String validationKeyConfig = (String) conf.getProperty(confTokenSecretKeySettingName);
final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(validationKeyConfig);
String tokenSecretKey = (String) conf.getProperty(confTokenSecretKeySettingName);
String tokenPublicKey = (String) conf.getProperty(confTokenPublicKeySettingName);
if (StringUtils.isNotBlank(tokenSecretKey)) {
final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(tokenSecretKey);
return AuthTokenUtils.decodeSecretKey(validationKey);
} else if (conf.getProperty(confTokenPublicKeySettingName) != null
&& StringUtils.isNotBlank((String) conf.getProperty(confTokenPublicKeySettingName))) {
final String validationKeyConfig = (String) conf.getProperty(confTokenPublicKeySettingName);
final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(validationKeyConfig);
} else if (StringUtils.isNotBlank(tokenPublicKey)) {
final byte[] validationKey = AuthTokenUtils.readKeyFromUrl(tokenPublicKey);
return AuthTokenUtils.decodePublicKey(validationKey, publicKeyAlg);
} else {
throw new IOException("No secret key was provided for token authentication");
}
}

private String getTokenRoleClaim(ServiceConfiguration conf) throws IOException {
if (conf.getProperty(confTokenAuthClaimSettingName) != null
&& StringUtils.isNotBlank((String) conf.getProperty(confTokenAuthClaimSettingName))) {
return (String) conf.getProperty(confTokenAuthClaimSettingName);
String tokenAuthClaim = (String) conf.getProperty(confTokenAuthClaimSettingName);
if (StringUtils.isNotBlank(tokenAuthClaim)) {
return tokenAuthClaim;
} else {
return Claims.SUBJECT;
}
}

private SignatureAlgorithm getPublicKeyAlgType(ServiceConfiguration conf) throws IllegalArgumentException {
if (conf.getProperty(confTokenPublicAlgSettingName) != null
&& StringUtils.isNotBlank((String) conf.getProperty(confTokenPublicAlgSettingName))) {
String alg = (String) conf.getProperty(confTokenPublicAlgSettingName);
String tokenPublicAlg = (String) conf.getProperty(confTokenPublicAlgSettingName);
if (StringUtils.isNotBlank(tokenPublicAlg)) {
try {
return SignatureAlgorithm.forName(alg);
return SignatureAlgorithm.forName(tokenPublicAlg);
} catch (SignatureException ex) {
throw new IllegalArgumentException("invalid algorithm provided " + alg, ex);
throw new IllegalArgumentException("invalid algorithm provided " + tokenPublicAlg, ex);
}
} else {
return SignatureAlgorithm.RS256;
Expand All @@ -293,19 +290,19 @@ private SignatureAlgorithm getPublicKeyAlgType(ServiceConfiguration conf) throws

// get Token Audience Claim from configuration, if not configured return null.
private String getTokenAudienceClaim(ServiceConfiguration conf) throws IllegalArgumentException {
if (conf.getProperty(confTokenAudienceClaimSettingName) != null
&& StringUtils.isNotBlank((String) conf.getProperty(confTokenAudienceClaimSettingName))) {
return (String) conf.getProperty(confTokenAudienceClaimSettingName);
String tokenAudienceClaim = (String) conf.getProperty(confTokenAudienceClaimSettingName);
if (StringUtils.isNotBlank(tokenAudienceClaim)) {
return tokenAudienceClaim;
} else {
return null;
}
}

// get Token Audience that stands for this broker from configuration, if not configured return null.
private String getTokenAudience(ServiceConfiguration conf) throws IllegalArgumentException {
if (conf.getProperty(confTokenAudienceSettingName) != null
&& StringUtils.isNotBlank((String) conf.getProperty(confTokenAudienceSettingName))) {
return (String) conf.getProperty(confTokenAudienceSettingName);
String tokenAudience = (String) conf.getProperty(confTokenAudienceSettingName);
if (StringUtils.isNotBlank(tokenAudience)) {
return tokenAudience;
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ CompletableFuture<Boolean> allowSourceOpsAsync(NamespaceName namespaceName, Stri
CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
* Allow consume operations with in this namespace
* @param namespaceName The namespace that the consume operations can be executed in
* @param role The role to check
* @param authenticationData authentication data related to the role
* @return a boolean to determine whether authorized or not
*/
CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData);

/**
*
* Grant authorization-action permission on a namespace to the given client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return authorize(authenticationData, r -> super.allowSinkOpsAsync(namespaceName, r, authenticationData));
}

@Override
public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return authorize(authenticationData, r -> super.allowConsumeOpsAsync(namespaceName, r, authenticationData));
}

@Override
public CompletableFuture<Boolean> allowTenantOperationAsync(String tenantName,
String role,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,11 @@ public CompletableFuture<Boolean> allowSinkOpsAsync(NamespaceName namespaceName,
return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.sinks);
}

@Override
public CompletableFuture<Boolean> allowConsumeOpsAsync(NamespaceName namespaceName, String role, AuthenticationDataSource authenticationData) {
return allowTheSpecifiedActionOpsAsync(namespaceName, role, authenticationData, AuthAction.consume);
}

private CompletableFuture<Boolean> allowTheSpecifiedActionOpsAsync(NamespaceName namespaceName, String role,
AuthenticationDataSource authenticationData,
AuthAction authAction) {
Expand Down Expand Up @@ -525,6 +530,9 @@ public CompletableFuture<Boolean> allowNamespaceOperationAsync(NamespaceName nam
case PACKAGES:
isAuthorizedFuture = allowTheSpecifiedActionOpsAsync(namespaceName, role, authData, AuthAction.packages);
break;
case GET_TOPICS:
isAuthorizedFuture = allowConsumeOpsAsync(namespaceName, role, authData);
break;
default:
isAuthorizedFuture = CompletableFuture.completedFuture(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ protected boolean exists(String path) throws MetadataStoreException {
}
}

protected CompletableFuture<Boolean> existsAsync(String path) {
return cache.exists(path);
}

public int getOperationTimeoutSec() {
return operationTimeoutSec;
}
Expand Down
Loading

0 comments on commit 497e20e

Please sign in to comment.