From 9ab6d5d501767a114873ede0017c5ed9a5f9509c Mon Sep 17 00:00:00 2001 From: nick Date: Sun, 11 Apr 2021 13:49:39 +0200 Subject: [PATCH] update readme --- README.md | 243 +----------------------------------------------------- 1 file changed, 2 insertions(+), 241 deletions(-) diff --git a/README.md b/README.md index 33f0b90..f0a32b7 100644 --- a/README.md +++ b/README.md @@ -34,244 +34,5 @@ composer require flix-tech/avro-serde-php "~1.4" ## Credits This library was inspired by [jobcloud/php-kafka-lib](https://github.com/jobcloud/php-kafka-lib) :heart_eyes: -## Usage - -### Simple Producer -```php -withAdditionalBroker('localhost:9092') - ->build(); - -$message = KafkaProducerMessage::create('test-topic', 0) - ->withKey('asdf-asdf-asfd-asdf') - ->withBody('some test message payload') - ->withHeaders([ 'key' => 'value' ]); - -$producer->produce($message); - -// Shutdown producer, flush messages that are in queue. Give up after 20s -$result = $producer->flush(20000); -``` - -### Transactional producer -```php -withAdditionalBroker('localhost:9092') - ->build(); - -$message = KafkaProducerMessage::create('test-topic', 0) - ->withKey('asdf-asdf-asfd-asdf') - ->withBody('some test message payload') - ->withHeaders([ 'key' => 'value' ]); -try { - $producer->beginTransaction(10000); - $producer->produce($message); - $producer->commitTransaction(10000); -} catch (KafkaProducerTransactionRetryException $e) { - // something went wrong but you can retry the failed call (either beginTransaction or commitTransaction) -} catch (KafkaProducerTransactionAbortException $e) { - // you need to call $producer->abortTransaction(10000); and try again -} catch (KafkaProducerTransactionFatalException $e) { - // something went very wrong, re-create your producer, otherwise you could jeopardize the idempotency guarantees -} - -// Shutdown producer, flush messages that are in queue. Give up after 20s -$result = $producer->flush(20000); -``` - -### Avro Producer -To create an avro prodcuer add the avro encoder. - -```php - 'kafka-schema-registry:9081']) - ) - ), - new AvroObjectCacheAdapter() -); - -$registry = new AvroSchemaRegistry($cachedRegistry); -$recordSerializer = new RecordSerializer($cachedRegistry); - -//if no version is defined, latest version will be used -//if no schema definition is defined, the appropriate version will be fetched form the registry -$registry->addBodySchemaMappingForTopic( - 'test-topic', - new KafkaAvroSchema('bodySchemaName' /*, int $version, AvroSchema $definition */) -); -$registry->addKeySchemaMappingForTopic( - 'test-topic', - new KafkaAvroSchema('keySchemaName' /*, int $version, AvroSchema $definition */) -); - -// if you are only encoding key or value, you can pass that mode as additional third argument -// per default both key and body will get encoded -$encoder = new AvroEncoder($registry, $recordSerializer /*, AvroEncoderInterface::ENCODE_BODY */); - -$producer = KafkaProducerBuilder::create() - ->withAdditionalBroker('kafka:9092') - ->withEncoder($encoder) - ->build(); - -$schemaName = 'testSchema'; -$version = 1; -$message = KafkaProducerMessage::create('test-topic', 0) - ->withKey('asdf-asdf-asfd-asdf') - ->withBody(['name' => 'someName']) - ->withHeaders([ 'key' => 'value' ]); - -$producer->produce($message); - -// Shutdown producer, flush messages that are in queue. Give up after 20s -$result = $producer->flush(20000); -``` - -**NOTE:** To improve producer latency you can install the `pcntl` extension. -The php-simple-kafka-lib already has code in place, similarly described here: -https://github.com/arnaud-lb/php-rdkafka#performance--low-latency-settings - -### Simple Consumer - -```php -withAdditionalConfig( - [ - 'compression.codec' => 'lz4', - 'auto.commit.interval.ms' => 500 - ] - ) - ->withAdditionalBroker('kafka:9092') - ->withConsumerGroup('testGroup') - ->withAdditionalSubscription('test-topic') - ->build(); - -$consumer->subscribe(); - -while (true) { - try { - $message = $consumer->consume(); - // your business logic - $consumer->commit($message); - } catch (KafkaConsumerTimeoutException $e) { - //no messages were read in a given time - } catch (KafkaConsumerEndOfPartitionException $e) { - //only occurs if enable.partition.eof is true (default: false) - } catch (KafkaConsumerConsumeException $e) { - // Failed - } -} -``` - -### Avro Consumer -To create an avro consumer add the avro decoder. - -```php - 'kafka-schema-registry:9081']) - ) - ), - new AvroObjectCacheAdapter() -); - -$registry = new AvroSchemaRegistry($cachedRegistry); -$recordSerializer = new RecordSerializer($cachedRegistry); - -//if no version is defined, latest version will be used -//if no schema definition is defined, the appropriate version will be fetched form the registry -$registry->addBodySchemaMappingForTopic( - 'test-topic', - new KafkaAvroSchema('bodySchema' , 9 /* , AvroSchema $definition */) -); -$registry->addKeySchemaMappingForTopic( - 'test-topic', - new KafkaAvroSchema('keySchema' , 9 /* , AvroSchema $definition */) -); - -// if you are only decoding key or value, you can pass that mode as additional third argument -// per default both key and body will get decoded -$decoder = new AvroDecoder($registry, $recordSerializer /*, AvroDecoderInterface::DECODE_BODY */); - -$consumer = KafkaConsumerBuilder::create() - ->withAdditionalConfig( - [ - 'compression.codec' => 'lz4', - 'auto.commit.interval.ms' => 500 - ] - ) - ->withDecoder($decoder) - ->withAdditionalBroker('kafka:9092') - ->withConsumerGroup('testGroup') - ->withAdditionalSubscription('test-topic') - ->build(); - -$consumer->subscribe(); - -while (true) { - try { - $message = $consumer->consume(); - // your business logic - $consumer->commit($message); - } catch (KafkaConsumerTimeoutException $e) { - //no messages were read in a given time - } catch (KafkaConsumerEndOfPartitionException $e) { - //only occurs if enable.partition.eof is true (default: false) - } catch (KafkaConsumerConsumeException $e) { - // Failed - } -} -``` - +## Examples +Examples can be found [here](https://github.com/php-kafka/php-kafka-examples/tree/main/src/ext-php-simple-kafka-client/php-simple-kafka-lib)