Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to change kafka producer partition class to Round Robin Paritioner #4952

Closed
ilovechai opened this issue Mar 29, 2021 · 9 comments
Closed

Comments

@ilovechai
Copy link
Contributor

We are using egeria-2.4 version at the moment, and trying to change the partitioner.class property in eventbus config, but kafka producer is unable to pick that up.

Property key and values tried:

  • "partitioner.class": "org.apache.kafka.clients.producer.RoundRobinPartitioner"
  • "partitioner.class": "org.apache.kafka.clients.producer.internals.RoundRobinPartitioner"

Below is omag log error:

2021-03-25 20:31:03.406 ERROR [          pool-4-thread-1] o.a.e.t.k.KafkaOpenMetadataEventProducer : Bad exception from sending events Invalid value org.apache.kafka.clients.producer.internals.RoundRobinPartitioner for configuration partitioner.class: Class org.apache.kafka.clients.producer.internals.RoundRobinPartitioner could not be found.
Thu Mar 25 20:31:03 GMT 2021 igc_omrs Shutdown OCF-KAFKA-TOPIC-CONNECTOR-0011 The Apache Kafka producer for topic open-metadata.repository-services.cohort.one_catalog.OMRSTopic is shutting down after sending 0 messages and with 1 unsent messages

Omag server config

{
    "class": "OMAGServerConfig",
    "versionId": "V2.0",
    "localServerId": "9186d4b0-6a36-4484-8822-6db4e5347493",
    "localServerName": "igc_omrs",
    "localServerType": "OpenMetadataandGovernanceServer",
    "organizationName": "ibm",
    "localServerURL": "omag:8080",
    "localServerUserId": "OMAGServer",
    "maxPageSize": 1000,
    "eventBusConfig": {
        "class": "EventBusConfig",
        "topicURLRoot": "egeria.omag",
        "configurationProperties": {
            "producer": {
                "bootstrap.servers": "kafka:9093",
                "acks": "all",
                "retries": "0",
                "batch.size": "16384",
                "linger.ms": "1",
                "buffer.memory": "33554432",
                "max.request.size": "10485760",
                "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic",
                "partitioner.class": "org.apache.kafka.clients.producer.internals.RoundRobinPartitioner"
            },
            "consumer": {
                "bootstrap.servers": "kafka:9093",
                "zookeeper.session.timeout.ms": "300000",
                "zookeeper.sync.time.ms": "2000",
                "fetch.message.max.bytes": "10485760",
                "max.partition.fetch.bytes": "10485760",
                "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic"
            }
        },
        "additionalProperties": {
            "producer": {
                "bootstrap.servers": "kafka:9093",
                "acks": "all",
                "retries": "0",
                "batch.size": "16384",
                "linger.ms": "1",
                "buffer.memory": "33554432",
                "max.request.size": "10485760",
                "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic",
                "partitioner.class": "org.apache.kafka.clients.producer.internals.RoundRobinPartitioner"
            },
            "consumer": {
                "bootstrap.servers": "kafka:9093",
                "zookeeper.session.timeout.ms": "300000",
                "zookeeper.sync.time.ms": "2000",
                "fetch.message.max.bytes": "10485760",
                "max.partition.fetch.bytes": "10485760",
                "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                "kafka.omrs.topic.id": "kafka-omrs-topic"
            }
        }
    },
    "repositoryServicesConfig": {
        "class": "RepositoryServicesConfig",
        "auditLogConnections": [
            {
                "class": "Connection",
                "headerVersion": 0,
                "displayName": "Console",
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "type": {
                        "class": "ElementType",
                        "headerVersion": 0,
                        "elementOrigin": "LOCAL_COHORT",
                        "elementVersion": 0,
                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                        "elementTypeName": "ConnectorType",
                        "elementTypeVersion": 1,
                        "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                    },
                    "guid": "4afac741-3dcc-4c60-a4ca-a6dede994e3f",
                    "qualifiedName": "ConsoleAuditLogStoreConnector",
                    "displayName": "ConsoleAuditLogStoreConnector",
                    "description": "Connectorsupportsloggingofauditlogmessagestostdout.",
                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.repositoryservices.auditlogstore.console.ConsoleAuditLogStoreProvider"
                },
                "configurationProperties": {
                    "supportedSeverities": [
                        "<Unknown>",
                        "Information",
                        "Event",
                        "Decision",
                        "Action",
                        "Error",
                        "Exception",
                        "Security",
                        "Startup",
                        "Shutdown",
                        "Asset",
                        "Types",
                        "Cohort"
                    ]
                }
            }
        ],
        "localRepositoryConfig": {
            "class": "LocalRepositoryConfig",
            "metadataCollectionId": "d6077e3d-15f4-43ef-80e2-e9760d22cd62",
            "localRepositoryMode": "REPOSITORY_PROXY",
            "localRepositoryLocalConnection": {
                "class": "Connection",
                "headerVersion": 0,
                "url": "https://internal-nginx-svc:12443/ibm/iis/igc/",
                "additionalProperties": {
                    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.producer.topic.id": "igcunifiedgovevents",
                    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.consumer.topic.id": "igc-kg-bridge-out",
                    "group.id": "IGCOMRSConsumerGroup",
                    "iis.rest.password": "V48PqjvYOd",
                    "bootstrap.servers": "kafka:9093",
                    "redis.key.expire": "86400",
                    "zookeeper.session.timeout.ms": "300000",
                    "redis.host": "redis-ha-master-svc:6380",
                    "zookeeper.sync.time.ms": "2000",
                    "iis.rest.address": "https://internal-nginx-svc:12443/",
                    "iis.rest.user": "isadmin"
                },
                "displayName": "IGCConnection",
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "connectorProviderClassName": "com.ibm.iis.openmetadata.adapters.igc.v2.repositoryconnector.IGCV2OMRSRepositoryConnectorProvider"
                }
            },
            "localRepositoryRemoteConnection": {
                "class": "Connection",
                "headerVersion": 0,
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "type": {
                        "class": "ElementType",
                        "headerVersion": 0,
                        "elementOrigin": "LOCAL_COHORT",
                        "elementVersion": 0,
                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                        "elementTypeName": "ConnectorType",
                        "elementTypeVersion": 1,
                        "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                    },
                    "guid": "75ea56d1-656c-43fb-bc0c-9d35c5553b9e",
                    "qualifiedName": "OMRSRESTAPIRepositoryConnector",
                    "displayName": "OMRSRESTAPIRepositoryConnector",
                    "description": "OMRSRepositoryConnectorthatcallstherepositoryservicesRESTAPIofaremoteserver.",
                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.repositoryservices.rest.repositoryconnector.OMRSRESTRepositoryConnectorProvider"
                },
                "endpoint": {
                    "class": "Endpoint",
                    "headerVersion": 0,
                    "address": "omag:8080/servers/igc_omrs"
                }
            },
            "eventsToSaveRule": "ALL",
            "eventsToSendRule": "ALL",
            "eventMapperConnection": {
                "class": "Connection",
                "headerVersion": 0,
                "url": "https://internal-nginx-svc:12443/ibm/iis/igc/",
                "additionalProperties": {
                    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.producer.topic.id": "igcunifiedgovevents",
                    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                    "kafka.consumer.topic.id": "igc-kg-bridge-out",
                    "group.id": "IGCOMRSConsumerGroup",
                    "iis.rest.password": "V48PqjvYOd",
                    "bootstrap.servers": "kafka:9093",
                    "redis.key.expire": "86400",
                    "zookeeper.session.timeout.ms": "300000",
                    "redis.host": "redis-ha-master-svc:6380",
                    "zookeeper.sync.time.ms": "2000",
                    "iis.rest.address": "https://internal-nginx-svc:12443/",
                    "iis.rest.user": "isadmin"
                },
                "displayName": "IGCEventMapper",
                "connectorType": {
                    "class": "ConnectorType",
                    "headerVersion": 0,
                    "connectorProviderClassName": "com.ibm.iis.openmetadata.adapters.igc.v2.eventmapper.IGCV2OMRSRepositoryEventMapperProvider"
                }
            }
        },
        "cohortConfigList": [
            {
                "class": "CohortConfig",
                "cohortName": "one_catalog",
                "cohortRegistryConnection": {
                    "class": "Connection",
                    "headerVersion": 0,
                    "connectorType": {
                        "class": "ConnectorType",
                        "headerVersion": 0,
                        "type": {
                            "class": "ElementType",
                            "headerVersion": 0,
                            "elementOrigin": "LOCAL_COHORT",
                            "elementVersion": 0,
                            "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                            "elementTypeName": "ConnectorType",
                            "elementTypeVersion": 1,
                            "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                        },
                        "guid": "108b85fe-d7a8-45c3-9f88-742ac4e4fd14",
                        "qualifiedName": "FileBasedCohortRegistryStoreConnector",
                        "displayName": "FileBasedCohortRegistryStoreConnector",
                        "description": "Connectorsupportsstoringoftheopenmetadatacohortregistryinafile.",
                        "connectorProviderClassName": "org.odpi.openmetadata.adapters.repositoryservices.cohortregistrystore.file.FileBasedRegistryStoreProvider"
                    },
                    "endpoint": {
                        "class": "Endpoint",
                        "headerVersion": 0,
                        "address": "igc_omrs.one_catalog.registrystore"
                    }
                },
                "cohortOMRSTopicConnection": {
                    "class": "VirtualConnection",
                    "headerVersion": 0,
                    "connectorType": {
                        "class": "ConnectorType",
                        "headerVersion": 0,
                        "connectorProviderClassName": "org.odpi.openmetadata.repositoryservices.connectors.omrstopic.OMRSTopicProvider"
                    },
                    "embeddedConnections": [
                        {
                            "class": "EmbeddedConnection",
                            "headerVersion": 0,
                            "position": 0,
                            "displayName": "one_catalogOMRSTopic",
                            "embeddedConnection": {
                                "class": "Connection",
                                "headerVersion": 0,
                                "connectorType": {
                                    "class": "ConnectorType",
                                    "headerVersion": 0,
                                    "type": {
                                        "class": "ElementType",
                                        "headerVersion": 0,
                                        "elementOrigin": "LOCAL_COHORT",
                                        "elementVersion": 0,
                                        "elementTypeId": "954421eb-33a6-462d-a8ca-b5709a1bd0d4",
                                        "elementTypeName": "ConnectorType",
                                        "elementTypeVersion": 1,
                                        "elementTypeDescription": "Asetofpropertiesdescribingatypeofconnector."
                                    },
                                    "guid": "3851e8d0-e343-400c-82cb-3918fed81da6",
                                    "qualifiedName": "KafkaOpenMetadataTopicConnector",
                                    "displayName": "KafkaOpenMetadataTopicConnector",
                                    "description": "KafkaOpenMetadataTopicConnectorsupportsstringbasedeventsoveranApacheKafkaeventbus.",
                                    "connectorProviderClassName": "org.odpi.openmetadata.adapters.eventbus.topic.kafka.KafkaOpenMetadataTopicProvider",
                                    "recognizedConfigurationProperties": [
                                        "producer",
                                        "consumer",
                                        "local.server.id",
                                        "sleepTime"
                                    ]
                                },
                                "endpoint": {
                                    "class": "Endpoint",
                                    "headerVersion": 0,
                                    "address": "open-metadata.repository-services.cohort.one_catalog.OMRSTopic"
                                },
                                "configurationProperties": {
                                    "producer": {
                                        "bootstrap.servers": "kafka:9093",
                                        "acks": "all",
                                        "retries": "0",
                                        "batch.size": "16384",
                                        "linger.ms": "1",
                                        "buffer.memory": "33554432",
                                        "max.request.size": "10485760",
                                        "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                                        "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
                                        "kafka.omrs.topic.id": "kafka-omrs-topic",
                                        "partitioner.class": "org.apache.kafka.clients.producer.internals.RoundRobinPartitioner"
                                    },
                                    "local.server.id": "9186d4b0-6a36-4484-8822-6db4e5347493",
                                    "consumer": {
                                        "bootstrap.servers": "kafka:9093",
                                        "zookeeper.session.timeout.ms": "300000",
                                        "zookeeper.sync.time.ms": "2000",
                                        "fetch.message.max.bytes": "10485760",
                                        "max.partition.fetch.bytes": "10485760",
                                        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                                        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
                                        "kafka.omrs.topic.id": "kafka-omrs-topic"
                                    }
                                }
                            }
                        }
                    ]
                },
                "cohortOMRSTopicProtocolVersion": "V1",
                "eventsToProcessRule": "ALL"
            }
        ]
    },
    "auditTrail": [
        "FriMar1907:13:13GMT2021adminupdatedconfigurationforlocalserver'sURLroottoomag:8080.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforlocalserver'sowningorganization'snametoibm.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationfordefaulteventbus.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforthelocalrepository.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforthelocalrepository.",
        "FriMar1907:13:14GMT2021adminpreservinglocalmetadatacollectionidd6077e3d-15f4-43ef-80e2-e9760d22cd62.",
        "FriMar1907:13:14GMT2021adminupdatedconfigurationforcohortone_catalog.",
        "FriMar1907:13:15GMT2021adminupdatedconfigurationforcohortone_catalog.",
        "WedMar2420:42:58GMT2021adminupdatedconfigurationfordefaulteventbus.",
        "WedMar2421:10:34GMT2021adminupdatedconfigurationfordefaulteventbus.",
        "WedMar2422:01:53GMT2021adminupdatedconfigurationfordefaulteventbus."
    ]
}

Does kafka client in egeria-2.4 pick up/use the RoundRobin partitioner.class?

@planetf1
Copy link
Member

planetf1 commented Apr 2, 2021

@wbittles is this something you could look at?

@planetf1
Copy link
Member

planetf1 commented Apr 2, 2021

I've been able to pass additional properties to the kafka consumer/producer, for example for authentication/ssl related configuration as per https://github.com/planetf1/egeria/blob/master/open-metadata-implementation/adapters/open-connectors/event-bus-connectors/open-metadata-topic-connectors/kafka-open-metadata-topic-connector/README.md without any changes. I've not checked this for a few release but do not believe anything has changed

@ilovechai
Copy link
Contributor Author

@planetf1 we are able to pass additional properties as well, but it's this property that's picked up but cannot find the Round Robin Partitioner class in kafka client package.

@wbittles
Copy link
Contributor

wbittles commented Apr 6, 2021

It's not obvious how I can help with this , is the kafka version being used 2.4+ ?

@mandy-chessell
Copy link
Contributor

The error message is coming from Kafka - Egeria is just logging it (badly - raised issue #5028).
This means that Kafka is picking up the properties - it just can not use it.

Are these classes on the classpath?

@mandy-chessell
Copy link
Contributor

It seems org.apache.kafka.clients.producer.RoundRobinPartitioner was added in Kafka 2.4

https://issues.apache.org/jira/browse/KAFKA-3333

@ilovechai
Copy link
Contributor Author

ilovechai commented Apr 7, 2021

@mandy-chessell egeria 2.4 uses kafka-client:2.6.0 https://github.com/odpi/egeria/blob/egeria-release-2.4/build.gradle#L122. We also use kafka-client:2.3.1, not sure if that's overriding the transitive dependecy that egeria has. I'll upgrade kafka-client, test the changes, and get back.

@ilovechai
Copy link
Contributor Author

ilovechai commented Apr 8, 2021

okay looks like upgrading to kafka-client:2.7 picks up the round robin partitioner class.

Wed Apr 07 23:59:12 GMT 2021 igc_omrs Startup OMAG-ADMIN-0004 The igc_omrs server has successfully completed start up.  The following services are running: [Open Metadata Repository Services (OMRS)]
2021-04-07 23:59:12.458  INFO [          pool-4-thread-1] o.a.k.c.p.ProducerConfig                 : ProducerConfig values:
	acks = -1
	batch.size = 16384
	bootstrap.servers = [kafka:9093]
	buffer.memory = 33554432
	client.dns.lookup = use_all_dns_ips
..
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.RoundRobinPartitioner <-- 

Before offset:

[root###]# kubectl exec -it kafka-0 -- /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic open-metadata.repository-services.cohort.one_catalog.OMRSTopic
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:0:7649
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:1:9307
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:2:0

After offset:

[root###]# kubectl exec -it kafka-0 -- /opt/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic open-metadata.repository-services.cohort.one_catalog.OMRSTopic
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:0:7700  <--- 
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:1:9358  <---
open-metadata.repository-services.cohort.one_catalog.OMRSTopic:2:52    <---

Property that worked:

"partitioner.class": "org.apache.kafka.clients.producer.RoundRobinPartitioner"

@mandy-chessell
Copy link
Contributor

Looks like we can close this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants