Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
dd8f3b1
Remove stop of daemon thread option
robooo Aug 1, 2019
925ea0a
Make consumers and producers public to allow list topics for both of …
robooo Aug 7, 2019
227190e
Extend tests with List Topics keyword
robooo Aug 7, 2019
9e670c0
Temp possible fix of list topics
robooo Oct 7, 2019
48dbe35
Change return of reference in decode_data to new list
robooo Oct 21, 2019
63f5d01
Update test for debug
robooo Oct 21, 2019
b6004a9
Update producer flush
robooo Nov 10, 2019
8880b39
Usage of OFFSET vars in robot tests
robooo Nov 10, 2019
388d952
FIx return only copy of messages to not overwrite
robooo Nov 10, 2019
5559fe2
Remove unused args
robooo Nov 11, 2019
29614ad
Update get_messages_from_thread args
robooo Nov 11, 2019
0a22dc7
Update assign_to_topic_partition
robooo Nov 11, 2019
8d1122a
Add topic partition related keywords
robooo Nov 11, 2019
45b8cec
Add seek keyword
robooo Nov 11, 2019
1f071df
pep8 updates
robooo Nov 11, 2019
58cc39e
Make decode_data private
robooo Nov 12, 2019
d001f2f
Remove remove_zero_bytes from _decode_data
robooo Nov 12, 2019
6352e30
Allow poll full message object via only_value arg
robooo Nov 12, 2019
9107c4f
Temp tests for travis validation
robooo Nov 12, 2019
7eb90c6
Bump requirements versions
robooo Nov 12, 2019
4b2993d
Bump requirements versions
robooo Nov 12, 2019
a08b633
Revert Poll error handling
robooo Nov 12, 2019
797573e
Revert Poll error handling
robooo Nov 12, 2019
4a3f630
Add partition for produce
robooo Nov 17, 2019
7090664
Change only_value default to True
robooo Nov 18, 2019
7acbb60
Minor test updates
robooo Nov 18, 2019
8d5a11b
Raise poll error
robooo Nov 18, 2019
8caeb75
Refactor is_assigned
robooo Nov 18, 2019
3f5d414
Introduce unassign
robooo Nov 18, 2019
834cb85
Add get_position keyword
robooo Nov 20, 2019
f82a0bb
Fix indent
robooo Mar 31, 2020
3bdb013
Update confluent-kafka-library
robooo Mar 31, 2020
01d0dd2
Run enterprise kafka for testing
robooo Apr 8, 2020
6280a28
Fix arg order in GetMessagesThread
robooo Apr 8, 2020
cb72c02
Add Create Topic Partition keyword
robooo Apr 8, 2020
fd2be61
Refactoring and bugfixing
robooo Apr 11, 2020
0939880
Update examples
robooo Apr 11, 2020
f060fdd
Update readme
robooo Apr 11, 2020
b6045cd
Bump version
robooo Apr 11, 2020
af40c10
Merge branch 'master' into updates
robooo Apr 11, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ services:
- docker

before_install:
- docker pull johnnypark/kafka-zookeeper
- docker run -td -p 2181:2181 -p 9092:9092 -e ADVERTISED_HOST=127.0.0.1 -e NUM_PARTITIONS=10 johnnypark/kafka-zookeeper
- cd examples && docker-compose up -d && cd ..
- docker ps -a
- pip install -r requirements.txt
- pip install requests
Expand All @@ -14,4 +13,4 @@ before_install:
- pip install .

script:
- python3 -m robot examples/test.robot
- python3 -m robot examples/
18 changes: 1 addition & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,7 @@

ConfluentKafkaLibrary library is wrapper for [confluent-kafka-python](https://github.com/confluentinc/confluent-kafka-python).

Still in development, right now supports:

* [ ] Consumer
* [X] Poll
* [X] Un/Subscribe
* [X] Create / Stop consumer
* [X] Assign
* [X] List topics
* [ ] commit
* [ ] offsets
* [X] Run in thread
* [X] Decode option of data from topic
* [X] Producer


ConfluentKafkaLibrary works with latest confluent-kafka-python 1.0.0.

ConfluentKafkaLibrary works with latest confluent-kafka-python, tags are 1:1 (ConfluentKafkaLibrary 1.3.0 == confluent-kafka-python 1.3.0 ). Bugfixes and updates are set after the '-' e.g. `1.3.0-1`.

## Documentation

Expand Down
4 changes: 2 additions & 2 deletions docs/index.html

Large diffs are not rendered by default.

56 changes: 56 additions & 0 deletions examples/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.3.1
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181

broker:
image: confluentinc/cp-enterprise-kafka:5.3.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

schema-registry:
image: confluentinc/cp-schema-registry:5.3.1
hostname: schema-registry
container_name: schema-registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

rest-proxy:
image: confluentinc/cp-kafka-rest:5.3.1
depends_on:
- zookeeper
- broker
- schema-registry
ports:
- 8082:8082
hostname: rest-proxy
container_name: rest-proxy
environment:
KAFKA_REST_HOST_NAME: rest-proxy
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
11 changes: 11 additions & 0 deletions examples/schema/producer/KeySchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
}
]
}
18 changes: 18 additions & 0 deletions examples/schema/producer/ValueSchema.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "number",
"type": [
"int",
"null"
]
}
]
}
169 changes: 150 additions & 19 deletions examples/test.robot
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,163 @@ Suite Setup Starting Test


*** Test Cases ***
Verify Topics
${group_id}= Create Consumer auto_offset_reset=earliest
${topics}= List Topics ${group_id}
Dictionary Should Contain Key ${topics} ${TEST_TOPIC}
[Teardown] Close Consumer ${group_id}

Basic Consumer
${group_id}= Create Consumer auto_offset_reset=earliest
Subscribe Topic group_id=${group_id} topics=test
${messages}= Poll group_id=${group_id} max_records=3
${messages}= Decode Data ${messages} decode_format=utf8
Subscribe Topic group_id=${group_id} topics=${TEST_TOPIC}
${messages}= Poll group_id=${group_id} max_records=3 decode_format=utf8
${data}= Create List Hello World {'test': 1}
Lists Should Be Equal ${messages} ${data}
Unsubscribe ${group_id}
Close Consumer ${group_id}
[Teardown] Basic Teardown ${group_id}

Verify Threaded Consumer
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
Produce Without Value
${topic_name}= Set Variable topicwithoutvaluee
Produce group_id=${PRODUCER_ID} topic=${topic_name}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
${group_id}= Create Consumer auto_offset_reset=earliest
Subscribe Topic group_id=${group_id} topics=test
${messages}= Poll group_id=${group_id} max_records=3
List Should Contain Sub List ${thread_messages} ${messages}
Unsubscribe ${group_id}
Close Consumer ${group_id}
Subscribe Topic group_id=${group_id} topics=${topic_name}
${messages}= Poll group_id=${group_id} max_records=1
Should Be Equal As Strings ${messages} [None]

Verify Position
${group_id}= Create Consumer
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID} ${OFFSET_END}
Assign To Topic Partition ${group_id} ${tp}
Sleep 5sec # Need to wait for an assignment
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_before}= Set Variable ${position[0].offset}

Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Dummy partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_after_produce}= Set Variable ${position[0].offset}
Should Be Equal As Integers ${position_before} ${position_after_produce}

${messages}= Poll group_id=${group_id} max_records=1 decode_format=utf8
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_after_poll_1}= Set Variable ${position[0].offset}
Should Not Be Equal As Integers ${position_after_poll_1} ${position_after_produce}

Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Dummy partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
${messages}= Poll group_id=${group_id} max_records=1 decode_format=utf8
${position}= Get Position group_id=${group_id} topic_partitions=${tp}
${position_after_poll_2}= Set Variable ${position[0].offset}
Should Be Equal As Integers ${position_after_poll_1 + 1} ${position_after_poll_2}
[Teardown] Basic Teardown ${group_id}

Consumer With Assignment To Last Message After Get Of Watermark Offsets
${group_id}= Create Consumer
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID}
${offset}= Get Watermark Offsets ${group_id} ${tp}
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID} ${offset[1]}
Assign To Topic Partition ${group_id} ${tp}
Prepare Data
${messages}= Poll group_id=${group_id} max_records=6 decode_format=utf8
Lists Should Be Equal ${TEST_DATA} ${messages}
[Teardown] Basic Teardown ${group_id}

Consumer With Assignment To OFFSET_END
${group_id}= Create Consumer
${tp}= Create Topic Partition ${TEST_TOPIC} ${P_ID} ${OFFSET_END}
Assign To Topic Partition ${group_id} ${tp}
# Need to wait for an async assignment, be aware the Is Assigned could return True but
# that doesn't mean assignment is completed
Sleep 5sec
Prepare Data
${messages}= Poll group_id=${group_id} poll_attempts=30 max_records=6 timeout=5 decode_format=utf8
Lists Should Be Equal ${TEST_DATA} ${messages}
[Teardown] Unassign Teardown ${group_id}

Verify Test And Threaded Consumer
[Setup] Clear Messages From Thread ${MAIN_THREAD}
${group_id}= Create Consumer
Subscribe Topic group_id=${group_id} topics=${TEST_TOPIC}
${messages}= Poll group_id=${group_id}
Prepare Data
${thread_messages}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
${messages}= Poll group_id=${group_id} max_records=6 decode_format=utf8
Lists Should Be Equal ${thread_messages} ${messages}
[Teardown] Run Keywords Basic Teardown ${group_id} AND
... Clear Messages From Thread ${MAIN_THREAD}

Verify Clean Of Threaded Consumer Messages
[Setup] Prepare Data
${thread_messages1}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
Clear Messages From Thread ${MAIN_THREAD}
${thread_messages2}= Get Messages From Thread ${MAIN_THREAD}
Lists Should Be Equal ${TEST_DATA} ${thread_messages1}
Should Be Empty ${thread_messages2}
[Teardown] Clear Messages From Thread ${MAIN_THREAD}

Remove And Publish New Messages From Threaded Consumer
[Setup] Prepare Data
${thread_messages1}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
Clear Messages From Thread ${MAIN_THREAD}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=After partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Clear partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
Sleep 1sec # if next command is polling messages in thread we need to wait a second

${thread_messages2}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
${data}= Create List After Clear
Should Be Equal ${data} ${thread_messages2}

Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=LAST partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
Sleep 1sec
Append To List ${data} LAST
${thread_messages2}= Get Messages From Thread ${MAIN_THREAD} decode_format=utf-8
Should Be Equal ${TEST_DATA} ${thread_messages1}
Should Be Equal ${data} ${thread_messages2}
[Teardown] Clear Messages From Thread ${MAIN_THREAD}


*** Keywords ***
Starting Test
${thread}= Start Consumer Threaded topics=test auto_offset_reset=earliest
Set Global Variable ${MAIN_THREAD} ${thread}

Set Suite Variable ${TEST_TOPIC} test
${thread}= Start Consumer Threaded topics=${TEST_TOPIC}
Set Suite Variable ${MAIN_THREAD} ${thread}
${producer_group_id}= Create Producer
Produce group_id=${producer_group_id} topic=test value=Hello
Produce group_id=${producer_group_id} topic=test value=World
Produce group_id=${producer_group_id} topic=test value={'test': 1}
Flush ${producer_group_id}
Set Suite Variable ${PRODUCER_ID} ${producer_group_id}

${topics}= List Topics ${producer_group_id}
${partitions}= Get Topic Partitions ${topics['${TEST_TOPIC}']}
${partition_id}= Set Variable ${partitions[0].id}
Set Suite Variable ${P_ID} ${partition_id}
${tp}= Create Topic Partition ${TEST_TOPIC} ${partition_id} ${OFFSET_BEGINNING}

${data}= Create List Hello World {'test': 1} {'test': 2} {'test': 3} {'test': 4}
Set Suite Variable ${TEST_DATA} ${data}
Prepare Data

Prepare Data
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=Hello partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value=World partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 1} partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 2} partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 3} partition=${P_ID}
Produce group_id=${PRODUCER_ID} topic=${TEST_TOPIC} value={'test': 4} partition=${P_ID}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${PRODUCER_ID}
Sleep 1sec # if next command is polling messages in thread we need to wait a second

All Messages Are Delivered
[Arguments] ${producer_id}
${count}= Flush ${producer_id}
Log Reaming messages to be delivered: ${count}
Should Be Equal As Integers ${count} 0

Basic Teardown
[Arguments] ${group_id}
Unsubscribe ${group_id}
Close Consumer ${group_id}

Unassign Teardown
[Arguments] ${group_id}
Unassign ${group_id}
Close Consumer ${group_id}
74 changes: 74 additions & 0 deletions examples/test_avro.robot
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
*** Settings ***
Library ConfluentKafkaLibrary
Library Collections
Library String

Suite Setup Starting Test


*** Test Cases ***
Avro Producer With Schemas As String Argument
[Setup] Clear Messages From Thread ${MAIN_THREAD}
${value_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"},{"name": "number","type": ["int","null"]}]}
${key_schema}= Set Variable {"namespace": "example.avro","type": "record","name": "User","fields": [{"name": "name","type": "string"}]}
${producer_id}= Create Producer schema_registry_url=http://127.0.0.1:8081
... value_schema=${value_schema} key_schema=${key_schema}
${value}= Create Dictionary name=Robot number=${10}
Produce group_id=${producer_id} topic=avro_testing1 partition=${0} value=${value} key=${KEY}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
Sleep 1s

${consumer_group_id}= Create Consumer auto_offset_reset=earliest schema_registry_url=http://127.0.0.1:8081
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing1
${messages}= Poll group_id=${consumer_group_id}
Should Be Equal ${TEST_DATA} ${messages}
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
Should Be Equal ${TEST_DATA} ${thread_messages}
[Teardown] Basic Teardown ${consumer_group_id}

Avro Producer With Path To Schemas
[Setup] Clear Messages From Thread ${MAIN_THREAD}
${value_schema_file_path}= Set Variable examples/schema/producer/ValueSchema.avsc
${key_schema_file_path}= Set Variable examples/schema/producer/KeySchema.avsc
${producer_id}= Create Producer schema_registry_url=http://127.0.0.1:8081
... value_schema=${value_schema_file_path} key_schema=${key_schema_file_path}
${value}= Create Dictionary name=Robot number=${10}
Produce group_id=${producer_id} topic=avro_testing2 partition=${0} value=${value} key=${KEY}
Wait Until Keyword Succeeds 10x 0.5s All Messages Are Delivered ${producer_id}
Sleep 1s

${consumer_group_id}= Create Consumer auto_offset_reset=earliest schema_registry_url=http://127.0.0.1:8081
Subscribe Topic group_id=${consumer_group_id} topics=avro_testing2
${messages}= Poll group_id=${consumer_group_id}
Should Be Equal ${TEST_DATA} ${messages}
${thread_messages}= Get Messages From Thread ${MAIN_THREAD}
Should Be Equal ${TEST_DATA} ${thread_messages}
[Teardown] Basic Teardown ${consumer_group_id}


*** Keywords ***
Starting Test
Set Suite Variable @{TEST_TOPIC} avro_testing1 avro_testing2
Set Suite Variable &{KEY} name=testkey
${value}= Create Dictionary name=Robot number=${10}
${data}= Create List ${value}
Set Suite Variable ${TEST_DATA} ${data}

${thread}= Start Consumer Threaded topics=${TEST_TOPIC} schema_registry_url=http://127.0.0.1:8081 auto_offset_reset=latest
Set Suite Variable ${MAIN_THREAD} ${thread}

All Messages Are Delivered
[Arguments] ${producer_id}
${count}= Flush ${producer_id}
Log Reaming messages to be delivered: ${count}
Should Be Equal As Integers ${count} 0

Basic Teardown
[Arguments] ${group_id}
Unsubscribe ${group_id}
Close Consumer ${group_id}

Unassign Teardown
[Arguments] ${group_id}
Unassign ${group_id}
Close Consumer ${group_id}
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
robotframework==3.1.1
confluent-kafka==1.0.0
robotframework==3.1.2
confluent-kafka==1.2.0
uuid==1.30
Loading