Skip to content

Commit

Permalink
Kafka 2.0.0 (#30)
Browse files Browse the repository at this point in the history
* KAFKA-2.0.0 WIP

* final touches

* relaxed constraint and made column change backwards compatible

* removed KsmService.yml
  • Loading branch information
simplesteph committed Aug 26, 2018
1 parent 65a3cd2 commit da7420c
Show file tree
Hide file tree
Showing 16 changed files with 172 additions and 196 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).

## [0.4-SNAPSHOT]

- Added S3 Acl Source (#27)
- Upgraded to Kafka 2.0
- New format to ACLs that allows Patterns (like prefixes)
- Upgrades to Docker Compose file

## [0.3] - 13/06/2018
- Added gRPC endpoint to perform API calls on KSM (the goal is to build a UI on top of KSM)
Expand Down
38 changes: 19 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ Your role is to ensure that Kafka Security Manager is never down, as it is now a

A sample CSV to manage ACL is:
```
KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
User:alice,Topic,foo,Read,Allow,*
User:bob,Group,bar,Write,Deny,12.34.56.78
User:peter,Cluster,kafka-cluster,Create,Allow,*
KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
User:alice,Topic,LITERAL,foo,Read,Allow,*
User:bob,Group,bar,PREFIXED,Write,Deny,12.34.56.78
User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow,*
```

**Important Note**: As of KSM 0.4, a new column `PatternType` has been added to match the changes that happened in Kafka 2.0. This enables KSM to manage `LITERAL` and `PREFIXED` ACLs. See #28

# Building

```
Expand Down Expand Up @@ -136,11 +138,6 @@ docker-compose down

For full usage of the docker-compose file see [kafka-stack-docker-compose](https://github.com/simplesteph/kafka-stack-docker-compose)

Add the entry to your `/etc/hosts` file
```
127.0.0.1 kafka1
```

## Extracting ACLs

You can initially extract all your existing ACL in Kafka by running the program with the config `extract=true` or `export EXTRACT=true`
Expand All @@ -151,10 +148,10 @@ Output should look like:
[2018-03-06 21:49:44,704] INFO Getting ACLs from Kafka (ExtractAcl)
[2018-03-06 21:49:44,704] INFO Closing Authorizer (ExtractAcl)
KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
User:bob,Group,bar,Write,Deny,12.34.56.78
User:alice,Topic,foo,Read,Allow,*
User:peter,Cluster,kafka-cluster,Create,Allow,*
KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
User:bob,Group,PREFIXED,bar,Write,Deny,12.34.56.78
User:alice,Topic,LITERAL,foo,Read,Allow,*
User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow,*
```

You can then use place this CSV anywhere and use it as your source of truth.
Expand All @@ -176,14 +173,17 @@ This provides a REST API to consume data from KSM. Swagger definition is provide

The API is defined according to the proto file in [src/main/protobuf/](src/main/protobuf/)

# Upgrade Notes
TODO: Mention to look for inter broker protocol version before doing this

# Compatibility

KSM Version | Kafka Version
--- | ---
0.4-SNAPSHOT | 1.1.x
0.3 | 1.1.x
0.2 | 1.1.x (upgrade to 0.3 recommended)
0.1 | 1.0.x (might work for earlier versions)
KSM Version | Kafka Version | Notes
--- | --- | ---
0.4-SNAPSHOT | 2.0.0 | important change: added column 'PatternType' in CSV
0.3 | 1.1.x |
0.2 | 1.1.x | upgrade to 0.3 recommended
0.1 | 1.0.x | might work for earlier versions

# Contributing

Expand Down
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ resolvers ++= Seq(

libraryDependencies ++= Seq(
// kafka
"org.apache.kafka" %% "kafka" % "1.1.0",
"net.manub" %% "scalatest-embedded-kafka" % "1.1.0-kafka1.1-nosr" % "test",
"org.apache.kafka" %% "kafka" % "2.0.0",
"net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % "test",

// test
"org.scalatest" %% "scalatest" % "3.0.5" % Test,

// logging
"org.slf4j" % "slf4j-api" % "1.7.25",
Expand Down
19 changes: 12 additions & 7 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@ services:
ZOO_SERVERS: server.1=zoo1:2888:3888

kafka1:
image: confluentinc/cp-kafka:4.0.0
image: confluentinc/cp-kafka:5.0.0
hostname: kafka1
ports:
- "9092:9092"
environment:
# add the entry "127.0.0.1 kafka1" to your /etc/hosts file
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
Expand All @@ -31,10 +32,14 @@ services:
environment:
KSM_READONLY: false
AUTHORIZER_ZOOKEEPER_CONNECT: "zoo1:2181"
SOURCE_CLASS: "com.github.simplesteph.ksm.source.GitHubSourceAcl"
SOURCE_GITHUB_USER: "simplesteph"
SOURCE_GITHUB_REPO: "kafka-security-manager-example"
SOURCE_GITHUB_FILEPATH: "acls.csv"
# FILE:
SOURCE_CLASS: "com.github.simplesteph.ksm.source.FileSourceAcl"
SOURCE_FILE_FILENAME: "example/acls.csv"
# GITHUB:
# SOURCE_CLASS: "com.github.simplesteph.ksm.source.GitHubSourceAcl"
# SOURCE_GITHUB_USER: "simplesteph"
# SOURCE_GITHUB_REPO: "kafka-security-manager-example"
# SOURCE_GITHUB_FILEPATH: "acls.csv"
FEATURE_GRPC: true
volumes:
- "./example:/example"
Expand Down
10 changes: 6 additions & 4 deletions example/acls.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
KafkaPrincipal,ResourceType,ResourceName,Operation,PermissionType,Host
User:alice,Topic,foo,Read,Allow,*
User:bob,Group,bar,Write,Deny,12.34.56.78
User:peter,Cluster,kafka-cluster,Create,Allow,*
KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
User:alice,Topic,LITERAL,foo,Read,Allow,*
User:alice,Topic,PREFIXED,baz,Read,Allow,*
User:bob,Group,LITERAL,bar,Write,Deny,12.34.56.78
User:alice,Topic,PREFIXED,my-kafka-streams-app,Create,Allow,*
User:peter,Cluster,LITERAL,kafka-cluster,Create,Allow,*
8 changes: 8 additions & 0 deletions src/main/protobuf/kafka/ksm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ message ResourceAndAclPb {
message ResourcePb {
string name = 1;
ResourceTypePb kafka_resource_type = 2;
PatternTypePb pattern_type = 3;
}

enum ResourceTypePb {
Expand All @@ -30,6 +31,13 @@ enum ResourceTypePb {
RESOURCE_TYPE_DELEGATIONTOKEN = 6;
}

enum PatternTypePb {
PATTERN_TYPE_INVALID = 0;
PATTERN_TYPE_UNSET = 1;
PATTERN_TYPE_LITERAL = 2;
PATTERN_TYPE_PREFIXED = 3;
}

message AclPb {
KafkaPrincipalPb principal = 1;
PermissionTypePb permission_type = 2;
Expand Down
97 changes: 0 additions & 97 deletions src/main/resources/specs/KsmService.yml

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,14 @@ class AclSynchronizer(authorizer: Authorizer,

private var sourceAclsCache: SourceAclResult = _

if (readOnly) log.warn("READ-ONLY mode is activated")
if (readOnly) {
log.warn("""
|=======================================================
|========== READ-ONLY mode is activated =========
|========== To disable: KSM_READONLY=false =========
|=======================================================
""".stripMargin)
}

def run(): Unit = if (!readOnly) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import java.io.Reader
import com.github.simplesteph.ksm.source.SourceAclResult
import com.github.tototoshi.csv.{CSVFormat, CSVReader, QUOTE_MINIMAL, Quoting}
import kafka.security.auth._
import org.apache.kafka.common.resource.PatternType
import org.apache.kafka.common.utils.SecurityUtils
import org.slf4j.LoggerFactory

import scala.collection.immutable
import scala.util.Try
import scala.util.{Failure, Success, Try}

class CsvAclParser

/**
* Parser that assumes that all ACLs are flattened
Expand All @@ -18,19 +22,24 @@ import scala.util.Try
*/
object CsvAclParser extends AclParser {

private val log = LoggerFactory.getLogger(classOf[CsvAclParser])

final val KAFKA_PRINCIPAL_COL = "KafkaPrincipal"
final val RESOURCE_TYPE_COL = "ResourceType"
final val RESOURCE_NAME_COL = "ResourceName"
final val OPERATION_COL = "Operation"
final val PERMISSION_TYPE_COL = "PermissionType"
final val HOST_COL = "Host"
final val PATTERN_TYPE_COL = "PatternType"

final val EXPECTED_COLS = List(KAFKA_PRINCIPAL_COL,
RESOURCE_TYPE_COL,
PATTERN_TYPE_COL,
RESOURCE_NAME_COL,
OPERATION_COL,
PERMISSION_TYPE_COL,
HOST_COL)
HOST_COL,
)

// we treat empty lines as Nil hence the format override
implicit val csvFormat: CSVFormat = new CSVFormat {
Expand All @@ -44,6 +53,7 @@ object CsvAclParser extends AclParser {

/**
* parse a row to return an ACL
*
* @param row a map of column name to row value
* @return an ACL
*/
Expand All @@ -55,15 +65,30 @@ object CsvAclParser extends AclParser {
val operation = Operation.fromString(row(OPERATION_COL))
val permissionType = PermissionType.fromString(row(PERMISSION_TYPE_COL))
val host = row(HOST_COL)
val patternType = Try(
PatternType.fromString(row(PATTERN_TYPE_COL).toUpperCase)) match {
case Success(pt) => pt
case Failure(e: NoSuchElementException) =>
// column is missing
log.warn(s"""Since you upgraded to Kafka 2.0, your CSV needs to include an extra column '$PATTERN_TYPE_COL', after $RESOURCE_TYPE_COL and before $RESOURCE_NAME_COL.
|The CSV header should be: KafkaPrincipal,ResourceType,PatternType,ResourceName,Operation,PermissionType,Host
|For a quick fix, you can run the application with KSM_EXTRACT=true and replace your current CSV with the output of the command
|For backwards compatibility, the default value $PATTERN_TYPE_COL=LITERAL has been chosen""".stripMargin)
// Default
PatternType.LITERAL
case Failure(e) =>
throw e
}

val resource = Resource(resourceType, resourceName)
val resource = Resource(resourceType, resourceName, patternType)
val acl = Acl(kafkaPrincipal, permissionType, host, operation)

(resource, acl)
}

/**
* Parses all the ACL as provided by the reader that wraps the CSV content
*
* @param reader we use the reader interface to use string and files interchangeably in the parser
* @return sourceAclResult
*/
Expand All @@ -86,6 +111,7 @@ object CsvAclParser extends AclParser {
def asCsv(r: Resource, a: Acl): String = {
List(a.principal.toString,
r.resourceType.toString,
r.patternType,
r.name,
a.operation.toString,
a.permissionType.toString,
Expand Down
Loading

0 comments on commit da7420c

Please sign in to comment.