/
avroProducer.php
102 lines (89 loc) · 3.28 KB
/
avroProducer.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
<?php
require_once('../vendor/autoload.php');
use FlixTech\AvroSerializer\Objects\RecordSerializer;
use FlixTech\SchemaRegistryApi\Registry\BlockingRegistry;
use FlixTech\SchemaRegistryApi\Registry\Cache\AvroObjectCacheAdapter;
use FlixTech\SchemaRegistryApi\Registry\CachedRegistry;
use FlixTech\SchemaRegistryApi\Registry\PromisingRegistry;
use GuzzleHttp\Client;
use Jobcloud\Kafka\Message\Encoder\AvroEncoder;
use Jobcloud\Kafka\Message\KafkaAvroSchema;
use Jobcloud\Kafka\Message\Registry\AvroSchemaRegistry;
use Jobcloud\Kafka\Producer\KafkaProducerBuilder;
use Jobcloud\Kafka\Message\KafkaProducerMessage;
use Ramsey\Uuid\Uuid;
// Instantiate cached schema registry (vendor: flix)
$registry = new CachedRegistry(
new BlockingRegistry(
new PromisingRegistry(
new Client(
[
'base_uri' => 'http://kafka-schema-registry:9083',
//'auth' => ['user', 'pw']
]
)
)
),
new AvroObjectCacheAdapter()
);
// Instantiate schema registry of lib (Note: In the future we will use our won cached registry)
$schemaRegistry = new AvroSchemaRegistry($registry);
// add schema for topic
$schemaRegistry->addBodySchemaMappingForTopic(
'php-kafka-lib-test-topic-avro',
new KafkaAvroSchema(
'nickzh.php.kafka.examples.entity.product-value'
// optional param - version: if not passed we will take latest
)
);
$schemaRegistry->addKeySchemaMappingForTopic(
'php-kafka-lib-test-topic-avro',
new KafkaAvroSchema(
'nickzh.php.kafka.examples.entity.product-key'
// optional param - version: if not passed we will take latest
)
);
// instantiate avro record serializer (vendor: flix)
$recordSerializer = new RecordSerializer($registry);
// initialize Avro encode (Note: In the future, we will use our own record serializer)
$encoder = new AvroEncoder($schemaRegistry, $recordSerializer);
// Get producer Builder instance
$builder = KafkaProducerBuilder::create();
$producer = $builder->withAdditionalConfig(
[
// will be visible in broker logs
'client.id' => 'php-kafka-lib-producer-avro',
// set compression (supported are: none,gzip,lz4,snappy,zstd)
'compression.codec' => 'snappy',
// Add additional output if you need to debug a problem
// 'log_level' => (string) LOG_DEBUG,
// 'debug' => 'all'
]
)
->withAdditionalBroker('kafka:9096')
->withEncoder($encoder)
->build();
for ($i = 0; $i < 10; ++$i) {
$message = KafkaProducerMessage::create('php-kafka-lib-test-topic-avro', 0)
->withKey(sprintf('test-key-%d', $i))
->withBody(
[
'id' => Uuid::uuid6()->toString(),
'name' => sprintf('Product %d', $i),
'description' => 'A random test product',
'price' => 21.25
]
)
->withHeaders(
[
'some' => 'test header'
]
);
$producer->produce($message);
echo sprintf('Queued message number: %d', $i) . PHP_EOL;
}
// Shutdown producer, flush messages that are in queue. Give up after 20s
$result = $producer->flush(20000);
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
echo 'Was not able to shutdown within 20s. Messages might be lost!' . PHP_EOL;
}