Skip to content

Commit

Permalink
KAFKA-14881: Rework UserScramCredentialRecord (apache#13513)
Browse files Browse the repository at this point in the history
Rework UserScramCredentialRecord to store serverKey and StoredKey rather than saltedPassword. This
is necessary to support migration from ZK, since those are the fields we stored in ZK.  Update
latest MetadataVersion to IBP_3_5_IV2 and make SCRAM support conditional on this version.  Moved
ScramCredentialData.java from org.apache.kafka.image to org.apache.kafka.metadata, which seems more
appropriate.

Reviewers: Colin P. McCabe <cmccabe@apache.org>
  • Loading branch information
pprovenzano committed Apr 18, 2023
1 parent 61530d6 commit abca865
Show file tree
Hide file tree
Showing 15 changed files with 223 additions and 76 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Expand Up @@ -308,7 +308,7 @@
<suppress checks="CyclomaticComplexity"
files="(ClientQuotasImage|KafkaEventQueue|MetadataDelta|QuorumController|ReplicationControlManager).java"/>
<suppress checks="NPathComplexity"
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver).java"/>
files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|KRaftMigrationDriver|ScramControlManager).java"/>
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="metadata[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="BooleanExpressionComplexity"
Expand Down
22 changes: 15 additions & 7 deletions core/src/main/scala/kafka/tools/StorageTool.scala
Expand Up @@ -65,7 +65,7 @@ object StorageTool extends Logging {
val metadataRecords : ArrayBuffer[ApiMessageAndVersion] = ArrayBuffer()
getUserScramCredentialRecords(namespace).foreach(userScramCredentialRecords => {
if (!metadataVersion.isScramSupported()) {
throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later.");
throw new TerseFailure(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.");
}
for (record <- userScramCredentialRecords) {
metadataRecords.append(new ApiMessageAndVersion(record, 0.toShort))
Expand Down Expand Up @@ -227,12 +227,20 @@ object StorageTool extends Logging {
val iterations = getIterations(argMap, scramMechanism)
val saltedPassword = getSaltedPassword(argMap, scramMechanism, salt, iterations)

val myrecord = new UserScramCredentialRecord()
.setName(name)
.setMechanism(scramMechanism.`type`)
.setSalt(salt)
.setIterations(iterations)
.setSaltedPassword(saltedPassword)
val myrecord = try {
val formatter = new ScramFormatter(scramMechanism);

new UserScramCredentialRecord()
.setName(name)
.setMechanism(scramMechanism.`type`)
.setSalt(salt)
.setStoredKey(formatter.storedKey(formatter.clientKey(saltedPassword)))
.setServerKey(formatter.serverKey(saltedPassword))
.setIterations(iterations)
} catch {
case e: Throwable =>
throw new TerseFailure(s"Error attempting to create UserScramCredentialRecord: ${e.getMessage}")
}
myrecord
}

Expand Down
Expand Up @@ -117,6 +117,6 @@ public void testNoAutoStart() {

@ClusterTest
public void testDefaults(ClusterConfig config) {
Assertions.assertEquals(MetadataVersion.IBP_3_5_IV1, config.metadataVersion());
Assertions.assertEquals(MetadataVersion.IBP_3_5_IV2, config.metadataVersion());
}
}
2 changes: 1 addition & 1 deletion core/src/test/java/kafka/test/annotation/ClusterTest.java
Expand Up @@ -41,6 +41,6 @@
String name() default "";
SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
String listener() default "";
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV1;
MetadataVersion metadataVersion() default MetadataVersion.IBP_3_5_IV2;
ClusterConfigProperty[] serverProperties() default {};
}
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
Expand Up @@ -84,7 +84,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
assertEquals(String.format(
"Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
"SupportedMaxVersion: 3.5-IV1\tFinalizedVersionLevel: 3.3-IV1\t"),
"SupportedMaxVersion: 3.5-IV2\tFinalizedVersionLevel: 3.3-IV1\t"),
env.outputWithoutEpoch())
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
"disable", "--feature", "metadata.version"), env.out))
assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
"metadata.version. Local controller 1000 only supports versions 1-10", env.outputWithoutEpoch())
"metadata.version. Local controller 1000 only supports versions 1-11", env.outputWithoutEpoch())
}
TestUtils.resource(FeatureCommandTestEnv()) { env =>
assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/tools/StorageToolTest.scala
Expand Up @@ -325,7 +325,7 @@ Found problem:
try {
assertEquals(1, StorageTool.main(args))
} catch {
case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV0 or later.", exitString)
case e: StorageToolTestException => assertEquals(s"SCRAM is only supported in metadataVersion IBP_3_5_IV2 or later.", exitString)
} finally {
Exit.resetExitProcedure()
}
Expand Down
Expand Up @@ -26,11 +26,13 @@
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;

import org.slf4j.Logger;

import java.util.ArrayList;
Expand Down Expand Up @@ -111,22 +113,25 @@ public String toString() {

static class ScramCredentialValue {
private final byte[] salt;
private final byte[] saltedPassword;
private final byte[] storedKey;
private final byte[] serverKey;
private final int iterations;

ScramCredentialValue(
byte[] salt,
byte[] saltedPassword,
byte[] storedKey,
byte[] serverKey,
int iterations
) {
this.salt = salt;
this.saltedPassword = saltedPassword;
this.storedKey = storedKey;
this.serverKey = serverKey;
this.iterations = iterations;
}

@Override
public int hashCode() {
return Objects.hash(salt, saltedPassword, iterations);
return Objects.hash(salt, storedKey, serverKey, iterations);
}

@Override
Expand All @@ -135,15 +140,17 @@ public boolean equals(Object o) {
if (!(o.getClass() == this.getClass())) return false;
ScramCredentialValue other = (ScramCredentialValue) o;
return Arrays.equals(salt, other.salt) &&
Arrays.equals(saltedPassword, other.saltedPassword) &&
Arrays.equals(storedKey, other.storedKey) &&
Arrays.equals(serverKey, other.serverKey) &&
iterations == other.iterations;
}

@Override
public String toString() {
return "ScramCredentialValue" +
"(salt=" + "[hidden]" +
", saltedPassword=" + "[hidden]" +
", storedKey=" + "[hidden]" +
", serverKey=" + "[hidden]" +
", iterations=" + "[hidden]" +
")";
}
Expand Down Expand Up @@ -226,16 +233,15 @@ public ControllerResult<AlterUserScramCredentialsResponseData> alterCredentials(
setMechanism(deletion.mechanism()), (short) 0));
}
for (ScramCredentialUpsertion upsertion : userToUpsert.values()) {
response.results().add(new AlterUserScramCredentialsResult().
setUser(upsertion.name()).
setErrorCode(NONE.code()).
setErrorMessage(null));
records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
setName(upsertion.name()).
setMechanism(upsertion.mechanism()).
setSalt(upsertion.salt()).
setSaltedPassword(upsertion.saltedPassword()).
setIterations(upsertion.iterations()), (short) 0));
ApiError error = finishUpsertion(records, upsertion);
if (!error.isFailure()) {
response.results().add(new AlterUserScramCredentialsResult().
setUser(upsertion.name()).
setErrorCode(NONE.code()).
setErrorMessage(null));
} else {
userToError.put(upsertion.name(), error);
}
}
for (Entry<String, ApiError> entry : userToError.entrySet()) {
response.results().add(new AlterUserScramCredentialsResult().
Expand All @@ -246,6 +252,30 @@ public ControllerResult<AlterUserScramCredentialsResponseData> alterCredentials(
return ControllerResult.atomicOf(records, response);
}

static ApiError finishUpsertion(List<ApiMessageAndVersion> records, ScramCredentialUpsertion upsertion) {
org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(
ScramMechanism.fromType(upsertion.mechanism()).mechanismName());

try { // Convert from saltedPassword to storedKey and serverKey
ScramFormatter formatter = new ScramFormatter(internalMechanism);

records.add(new ApiMessageAndVersion(new UserScramCredentialRecord().
setName(upsertion.name()).
setMechanism(upsertion.mechanism()).
setSalt(upsertion.salt()).

// Convert from saltedPassword to storedKey and serverKey
setStoredKey(formatter.storedKey(formatter.clientKey(upsertion.saltedPassword()))).
setServerKey(formatter.serverKey(upsertion.saltedPassword())).
setIterations(upsertion.iterations()), (short) 0));

} catch (Throwable e) {
return ApiError.fromThrowable(e);
}
return ApiError.NONE;
}

static ApiError validateUpsertion(ScramCredentialUpsertion upsertion) {
ScramMechanism mechanism = ScramMechanism.fromType(upsertion.mechanism());
ApiError error = validateScramUsernameAndMechanism(upsertion.name(), mechanism);
Expand Down Expand Up @@ -300,7 +330,8 @@ public void replay(UserScramCredentialRecord record) {
ScramCredentialKey key = new ScramCredentialKey(record.name(),
ScramMechanism.fromType(record.mechanism()));
ScramCredentialValue value = new ScramCredentialValue(record.salt(),
record.saltedPassword(),
record.storedKey(),
record.serverKey(),
record.iterations());
if (credentials.put(key, value) == null) {
log.info("Created new SCRAM credential for {} with mechanism {}.",
Expand All @@ -311,14 +342,4 @@ public void replay(UserScramCredentialRecord record) {
}
}

ApiMessageAndVersion toRecord(ScramCredentialKey key, ScramCredentialValue value) {
return new ApiMessageAndVersion(new UserScramCredentialRecord().
setName(key.username).
setMechanism(key.mechanism.type()).
setSalt(value.salt).
setSaltedPassword(value.saltedPassword).
setIterations(value.iterations),
(short) 0);
}

}
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metadata.RemoveUserScramCredentialRecord;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.metadata.ScramCredentialData;

import java.util.HashMap;
import java.util.Map.Entry;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.CredentialInfo;
import org.apache.kafka.common.message.DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.metadata.ScramCredentialData;

import java.util.Collections;
import java.util.List;
Expand Down
Expand Up @@ -15,53 +15,58 @@
* limitations under the License.
*/

package org.apache.kafka.image;
package org.apache.kafka.metadata;

import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.metadata.UserScramCredentialRecord;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;

import java.security.GeneralSecurityException;
import java.util.Arrays;
import java.util.Objects;


/**
* Represents the ACLs in the metadata image.
*
* This class is thread-safe.
*/
public final class ScramCredentialData {
private final byte[] salt;
private final byte[] saltedPassword;
private final byte[] storedKey;
private final byte[] serverKey;
private final int iterations;

static ScramCredentialData fromRecord(
public static ScramCredentialData fromRecord(
UserScramCredentialRecord record
) {
return new ScramCredentialData(
record.salt(),
record.saltedPassword(),
record.storedKey(),
record.serverKey(),
record.iterations());
}

public ScramCredentialData(
byte[] salt,
byte[] saltedPassword,
byte[] storedKey,
byte[] serverKey,
int iterations
) {
this.salt = salt;
this.saltedPassword = saltedPassword;
this.storedKey = storedKey;
this.serverKey = serverKey;
this.iterations = iterations;
}

public byte[] salt() {
return salt;
}

public byte[] saltedPassword() {
return saltedPassword;
public byte[] storedKey() {
return storedKey;
}

public byte[] serverKey() {
return serverKey;
}

public int iterations() {
Expand All @@ -76,25 +81,18 @@ public UserScramCredentialRecord toRecord(
setName(userName).
setMechanism(mechanism.type()).
setSalt(salt).
setSaltedPassword(saltedPassword).
setStoredKey(storedKey).
setServerKey(serverKey).
setIterations(iterations);
}

public ScramCredential toCredential(
ScramMechanism mechanism
) throws GeneralSecurityException {
org.apache.kafka.common.security.scram.internals.ScramMechanism internalMechanism =
org.apache.kafka.common.security.scram.internals.ScramMechanism.forMechanismName(mechanism.mechanismName());
ScramFormatter formatter = new ScramFormatter(internalMechanism);
return new ScramCredential(salt,
formatter.storedKey(formatter.clientKey(saltedPassword)),
formatter.serverKey(saltedPassword),
iterations);
public ScramCredential toCredential(ScramMechanism mechanism) {
return new ScramCredential(salt, storedKey, serverKey, iterations);
}

@Override
public int hashCode() {
return Objects.hash(salt, saltedPassword, iterations);
return Objects.hash(salt, storedKey, serverKey, iterations);
}

@Override
Expand All @@ -103,15 +101,17 @@ public boolean equals(Object o) {
if (!o.getClass().equals(ScramCredentialData.class)) return false;
ScramCredentialData other = (ScramCredentialData) o;
return Arrays.equals(salt, other.salt) &&
Arrays.equals(saltedPassword, other.saltedPassword) &&
Arrays.equals(storedKey, other.storedKey) &&
Arrays.equals(serverKey, other.serverKey) &&
iterations == other.iterations;
}

@Override
public String toString() {
return "ScramCredentialData" +
"(salt=" + "[hidden]" +
", saltedPassword=" + "[hidden]" +
", storedKey=" + "[hidden]" +
", serverKey=" + "[hidden]" +
", iterations=" + "[hidden]" +
")";
}
Expand Down
Expand Up @@ -26,8 +26,10 @@
"about": "The SCRAM mechanism." },
{ "name": "Salt", "type": "bytes", "versions": "0+",
"about": "A random salt generated by the client." },
{ "name": "SaltedPassword", "type": "bytes", "versions": "0+",
"about": "The salted password." },
{ "name": "StoredKey", "type": "bytes", "versions": "0+",
"about": "The key the Server uses to authenticate the Client." },
{ "name": "ServerKey", "type": "bytes", "versions": "0+",
"about": "The key the Client uses to validate the Servers identity." },
{ "name": "Iterations", "type": "int32", "versions": "0+",
"about": "The number of iterations used in the SCRAM credential." }
]
Expand Down

0 comments on commit abca865

Please sign in to comment.