Skip to content

Commit

Permalink
Merge branch 'master' into PIP-190-2.10.x
Browse files Browse the repository at this point in the history
  • Loading branch information
momo-jun committed Aug 8, 2022
2 parents fe33323 + 805eba7 commit eb12bb4
Show file tree
Hide file tree
Showing 80 changed files with 1,428 additions and 193 deletions.
7 changes: 7 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -794,6 +794,13 @@ athenzDomainNames=
# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=

## Configure the datasource of basic authenticate, supports the file and Base64 format.
# file:
# basicAuthConf=/path/my/.htpasswd
# use Base64 to encode the contents of .htpasswd:
# basicAuthConf=YOUR-BASE64-DATA
basicAuthConf=

### --- Token Authentication Provider --- ###

## Symmetric key
Expand Down
15 changes: 15 additions & 0 deletions conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ tlsEnableHostnameVerification=false
# fails, then the cert is untrusted and the connection is dropped.
tlsTrustCertsFilePath=

# Path for the TLS certificate file
tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Enable TLS with KeyStore type configuration in broker.
useKeyStoreTls=false

Expand All @@ -68,6 +74,15 @@ tlsTrustStorePath=
# TLS TrustStore password
tlsTrustStorePassword=

# TLS KeyStore type configuration: JKS, PKCS12
tlsKeyStoreType=JKS

# TLS TrustStore path
tlsKeyStorePath=

# TLS TrustStore password
tlsKeyStorePassword=

# Set up TLS provider for web service
# When TLS authentication with CACert is used, the valid value is either OPENSSL or JDK.
# When TLS authentication with KeyStore is used, available options can be SunJSSE, Conscrypt and so on.
Expand Down
7 changes: 7 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ maxHttpServerConnections=2048
# Max concurrent web requests
maxConcurrentHttpRequests=1024

## Configure the datasource of basic authenticate, supports the file and Base64 format.
# file:
# basicAuthConf=/path/my/.htpasswd
# use Base64 to encode the contents of .htpasswd:
# basicAuthConf=YOUR-BASE64-DATA
basicAuthConf=

### --- Token Authentication Provider --- ###

## Symmetric key
Expand Down
6 changes: 6 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,12 @@ athenzDomainNames=
# When this parameter is not empty, unauthenticated users perform as anonymousUserRole
anonymousUserRole=

## Configure the datasource of basic authenticate, supports the file and Base64 format.
# file:
# basicAuthConf=/path/my/.htpasswd
# use Base64 to encode the contents of .htpasswd:
# basicAuthConf=YOUR-BASE64-DATA
basicAuthConf=

### --- Token Authentication Provider --- ###

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) {
}

ledgerClosed(lh);
createLedgerAfterClosed();
}
}, System.nanoTime());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2367,13 +2367,15 @@ void testFindNewestMatchingAfterLedgerRollover() throws Exception {
ledger.addEntry("fourth".getBytes(Encoding));
Position last = ledger.addEntry("last-expired".getBytes(Encoding));

ledger.getConfig().setMaxEntriesPerLedger(1);
// roll a new ledger
int numLedgersBefore = ledger.getLedgersInfo().size();
ledger.getConfig().setMaxEntriesPerLedger(1);
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(ledger.getLedgersInfo().size(), 1);
Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
});
Awaitility.await().atMost(20, TimeUnit.SECONDS)
.until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);

// the algorithm looks for "expired" messages
// starting from the first, then it moves to the last message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
Expand Down Expand Up @@ -69,7 +68,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
Expand Down Expand Up @@ -2106,8 +2104,7 @@ public void testMaximumRolloverTime() throws Exception {
ledger.addEntry("data".getBytes());

Awaitility.await().untilAsserted(() -> {
assertEquals(ledger.getLedgersInfoAsList().size(), 1);
assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
assertEquals(ledger.getLedgersInfoAsList().size(), 2);
});
}

Expand Down Expand Up @@ -2254,7 +2251,7 @@ public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(0);
config.setMaxEntriesPerLedger(2);
config.setMaxEntriesPerLedger(1);
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);

Expand All @@ -2264,25 +2261,19 @@ public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
long preLedgerId = ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId();
ml.pendingAddEntries.add(OpAddEntry.
createNoRetainBuffer(ml, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null));
// let current ledger close
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
ml.rollCurrentLedgerIfFull();
AtomicLong currentLedgerId = new AtomicLong(-1);
// create a new ledger
Awaitility.await().untilAsserted(() -> {
currentLedgerId.set(ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId());
assertNotEquals(preLedgerId, currentLedgerId.get());
});
// let retention expire
Thread.sleep(1500);
// delete the expired ledger
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));

// the closed and expired ledger should be deleted
assertEquals(ml.getLedgersInfoAsList().size(), 1);
assertEquals(currentLedgerId.get(),
ml.getLedgersInfoAsList().get(ml.getLedgersInfoAsList().size() - 1).getLedgerId());
assertTrue(ml.getLedgersInfoAsList().size() <= 1);
assertEquals(ml.getTotalSize(), 0);
ml.close();
}

Expand Down Expand Up @@ -2547,12 +2538,10 @@ public void testGetPositionAfterN() throws Exception {
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> {
assertEquals(managedLedger.getLedgersInfo().size(), 2);
assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
});
Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
assertEquals(0, managedLedger.getLedgersInfoAsList().get(2).getEntries());
log.info("### ledgers {}", managedLedger.getLedgersInfo());

long firstLedger = managedLedger.getLedgersInfo().firstKey();
Expand Down Expand Up @@ -2597,8 +2586,8 @@ public void testGetNumberOfEntriesInStorage() throws Exception {
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> {
assertEquals(managedLedger.getLedgersInfo().size(), 2);
assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
assertEquals(managedLedger.getLedgersInfo().size(), 3);
assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.LedgerOpened);
});
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
Expand Down Expand Up @@ -3446,10 +3435,12 @@ public void testManagedLedgerRollOverIfFull() throws Exception {

// all the messages have benn acknowledged
// and all the ledgers have been removed except the last ledger
Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
stateUpdater.setAccessible(true);
stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
Awaitility.await().untilAsserted(() ->
Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger));
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
package org.apache.pulsar.broker.authentication;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashMap;
Expand All @@ -32,30 +35,55 @@
import lombok.Cleanup;
import org.apache.commons.codec.digest.Crypt;
import org.apache.commons.codec.digest.Md5Crypt;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
import org.apache.pulsar.client.api.url.URL;

public class AuthenticationProviderBasic implements AuthenticationProvider {
private static final String HTTP_HEADER_NAME = "Authorization";
private static final String CONF_SYSTEM_PROPERTY_KEY = "pulsar.auth.basic.conf";
private static final String CONF_PULSAR_PROPERTY_KEY = "basicAuthConf";
private Map<String, String> users;

@Override
public void close() throws IOException {
// noop
}

public static byte[] readData(String data)
throws IOException, URISyntaxException, InstantiationException, IllegalAccessException {
if (data.startsWith("data:") || data.startsWith("file:")) {
return IOUtils.toByteArray(URL.createURL(data));
} else if (Files.exists(Paths.get(data))) {
return Files.readAllBytes(Paths.get(data));
} else if (org.apache.commons.codec.binary.Base64.isBase64(data)) {
return Base64.getDecoder().decode(data);
} else {
String msg = "Not supported config";
throw new IllegalArgumentException(msg);
}
}

@Override
public void initialize(ServiceConfiguration config) throws IOException {
File confFile = new File(System.getProperty(CONF_SYSTEM_PROPERTY_KEY));
if (!confFile.exists()) {
throw new IOException("The password auth conf file does not exist");
} else if (!confFile.isFile()) {
throw new IOException("The path is not a file");
String data = config.getProperties().getProperty(CONF_PULSAR_PROPERTY_KEY);
if (StringUtils.isEmpty(data)) {
data = System.getProperty(CONF_SYSTEM_PROPERTY_KEY);
}
if (StringUtils.isEmpty(data)) {
throw new IOException("No basic authentication config provided");
}

@Cleanup BufferedReader reader = null;
try {
byte[] bytes = readData(data);
reader = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(bytes)));
} catch (Exception e) {
throw new IllegalArgumentException(e);
}

@Cleanup BufferedReader reader = new BufferedReader(new FileReader(confFile));
users = new HashMap<>();
for (String line : reader.lines().toArray(s -> new String[s])) {
List<String> splitLine = Arrays.asList(line.split(":"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.authentication;

import static org.testng.Assert.assertEquals;
import com.google.common.io.Resources;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Base64;
import java.util.Properties;
import javax.naming.AuthenticationException;
import lombok.Cleanup;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.api.AuthData;
import org.testng.annotations.Test;

public class AuthenticationProviderBasicTest {
private final String basicAuthConf = Resources.getResource("authentication/basic/.htpasswd").getPath();
private final String basicAuthConfBase64 = Base64.getEncoder().encodeToString(Files.readAllBytes(Path.of(basicAuthConf)));

public AuthenticationProviderBasicTest() throws IOException {
}

private void testAuthenticate(AuthenticationProviderBasic provider) throws AuthenticationException {
AuthData authData = AuthData.of("superUser2:superpassword".getBytes(StandardCharsets.UTF_8));
provider.newAuthState(authData, null, null);
}

@Test
public void testLoadFileFromPulsarProperties() throws Exception {
@Cleanup
AuthenticationProviderBasic provider = new AuthenticationProviderBasic();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
Properties properties = new Properties();
properties.setProperty("basicAuthConf", basicAuthConf);
serviceConfiguration.setProperties(properties);
provider.initialize(serviceConfiguration);
testAuthenticate(provider);
}

@Test
public void testLoadBase64FromPulsarProperties() throws Exception {
@Cleanup
AuthenticationProviderBasic provider = new AuthenticationProviderBasic();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
Properties properties = new Properties();
properties.setProperty("basicAuthConf", basicAuthConfBase64);
serviceConfiguration.setProperties(properties);
provider.initialize(serviceConfiguration);
testAuthenticate(provider);
}

@Test
public void testLoadFileFromSystemProperties() throws Exception {
@Cleanup
AuthenticationProviderBasic provider = new AuthenticationProviderBasic();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
System.setProperty("pulsar.auth.basic.conf", basicAuthConf);
provider.initialize(serviceConfiguration);
testAuthenticate(provider);
}

@Test
public void testLoadBase64FromSystemProperties() throws Exception {
@Cleanup
AuthenticationProviderBasic provider = new AuthenticationProviderBasic();
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
System.setProperty("pulsar.auth.basic.conf", basicAuthConfBase64);
provider.initialize(serviceConfiguration);
testAuthenticate(provider);
}

@Test
public void testReadData() throws Exception {
byte[] data = Files.readAllBytes(Path.of(basicAuthConf));
String base64Data = Base64.getEncoder().encodeToString(data);

// base64 format
assertEquals(AuthenticationProviderBasic.readData("data:;base64," + base64Data), data);
assertEquals(AuthenticationProviderBasic.readData(base64Data), data);

// file format
assertEquals(AuthenticationProviderBasic.readData("file://" + basicAuthConf), data);
assertEquals(AuthenticationProviderBasic.readData(basicAuthConf), data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
superUser:mQQQIsyvvKRtU
superUser2:$apr1$foobarmq$kuSZlLgOITksCkRgl57ie/
Loading

0 comments on commit eb12bb4

Please sign in to comment.