Skip to content

Commit

Permalink
Merge 5a99d66 into 0778814
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuawscott committed Apr 1, 2020
2 parents 0778814 + 5a99d66 commit f88d102
Show file tree
Hide file tree
Showing 27 changed files with 252 additions and 409 deletions.
20 changes: 6 additions & 14 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,23 @@ cache:
- _build
matrix:
include:
- elixir: 1.9.4
otp_release: 21.1
- elixir: 1.8.1
otp_release: 21.1
- elixir: 1.8.2
otp_release: 21.3
env: COVERALLS=true CREDO=true
- elixir: 1.7.4
otp_release: 21.1
otp_release: 21.3
- elixir: 1.6.6
otp_release: 20.2
otp_release: 20.3
- elixir: 1.5.3
otp_release: 20.2
- elixir: 1.4.5
otp_release: 19.3
env: COVERALLS=true CREDO=true
- elixir: 1.3.4
otp_release: 19.3
dist: trusty
sudo: required
services:
- docker
env:
global:
- COVERALLS=false
- CREDO=false
before_script:
- IP_IFACE=eth0 ./scripts/docker_up.sh
- ./scripts/docker_up.sh
script:
- MIX_ENV=test mix deps.compile
- MIX_ENV=test mix compile --warnings-as-errors
Expand Down
16 changes: 5 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ KafkaEx
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](http://hexdocs.pm/kafka_ex/)

KafkaEx is an Elixir client for [Apache Kafka](http://kafka.apache.org/) with
support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.1.1+ and
Erlang OTP 18+.
support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.5+ and
Erlang OTP 19+.

See [http://hexdocs.pm/kafka_ex/](http://hexdocs.pm/kafka_ex/) for
documentation,
Expand Down Expand Up @@ -386,15 +386,9 @@ To launch the included test cluster, run
```

The `docker_up.sh` script will attempt to determine an IP address for your
computer on an active network interface. If it has trouble with this, you can
try manually specifying a network interface in the `IP_IFACE` environment
variable:
computer on an active network interface.

```
IP_IFACE=eth0 ./scripts/docker_up.sh
```

The test cluster runs Kafka 0.10.1.0.
The test cluster runs Kafka 0.11.0.1.

### Running the KafkaEx Tests

Expand Down Expand Up @@ -452,7 +446,7 @@ mix test --include integration --include server_0_p_8_p_0

### Static analysis

This requires Elixir 1.3.2+.
This requires Elixir 1.5+.

```
mix dialyzer
Expand Down
5 changes: 0 additions & 5 deletions all_tests.sh

This file was deleted.

4 changes: 2 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ config :kafka_ex,
# server.properties file.
# In the case below you would set "advertised.host.name=localhost"
brokers: [
{"localhost", 9092},
{"localhost", 9093},
{"localhost", 9094}
{"localhost", 9094},
{"localhost", 9095}
],
#
# OR:
Expand Down
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use Mix.Config

config :ex_unit, capture_log: true

config :kafka_ex,
sync_timeout: 60_000
46 changes: 46 additions & 0 deletions docker-compose-kafka.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#################################
# Common Kafka config
#################################

# Note: any property `something.bla` can be configure by setting `KAFKA_SOMETHING_BLA`.

######## topic creation

KAFKA_CREATE_TOPICS=consumer_group_implementation_test:4:2,test0p8p0:4:2

######## zookeeper

KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS=6000

######## advertised hosts names and protocols

# alternative to KAFKA_ADVERTISED_HOST_NAME is: HOSTNAME_COMMAND: ip addr | grep -o "inet [0-9.]*" | grep -v "127\.0\.0\.1" | grep -o "[0-9.]*"
KAFKA_ADVERTISED_HOST_NAME=localhost
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:SSL,OUTSIDE:SSL
KAFKA_ADVERTISED_PROTOCOL_NAME=OUTSIDE
KAFKA_PROTOCOL_NAME=INSIDE

######## SSL

KAFKA_SSL_KEYSTORE_LOCATION=/ssl/server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD=kafka_ex
KAFKA_SSL_KEY_PASSWORD=kafka_ex
KAFKA_SSL_TRUSTSTORE_LOCATION=/ssl/server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD=kafka_ex
KAFKA_SSL_SECURE_RANDOM_IMPLEMENTATION=SHA1PRNG

######## Config

KAFKA_DELETE_TOPIC_ENABLE=true
# KAFKA_NUM_NETWORK_THREADS=3
# KAFKA_NUM_IO_THREADS=8
# KAFKA_SOCKET_SEND_BUFFER_BYTES=102400
# KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=102400
# KAFKA_SOCKET_REQUEST_MAX_BYTES=104857600
KAFKA_LOG_DIRS=/tmp/kafka_log
KAFKA_NUM_PARTITIONS=1
KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR=1
KAFKA_LOG_RETENTION_HOURS=168
# KAFKA_LOG_SEGMENT_BYTES=1073741824
# KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=300000
29 changes: 19 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: '2'
version: '3.2'

services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
Expand All @@ -7,30 +8,38 @@ services:
kafka1:
image: wurstmeister/kafka:0.11.0.1
ports:
- "9092:9092"
- "9093:9093"
depends_on:
- zookeeper
volumes:
- ./kafka1/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_PORT: 9093

kafka2:
image: wurstmeister/kafka:0.11.0.1
ports:
- "9093:9093"
- "9094:9094"
depends_on:
- zookeeper
volumes:
- ./kafka2/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_PORT: 9094

kafka3:
image: wurstmeister/kafka:0.11.0.1
ports:
- "9094:9094"
- "9095:9095"
depends_on:
- zookeeper
volumes:
- ./kafka3/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_PORT: 9095
4 changes: 2 additions & 2 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,8 @@ defmodule KafkaEx.GenConsumer do
"""
@spec partition(GenServer.server()) ::
{topic :: binary, partition_id :: non_neg_integer}
def partition(gen_consumer) do
GenServer.call(gen_consumer, :partition)
def partition(gen_consumer, timeout \\ 5000) do
GenServer.call(gen_consumer, :partition, timeout)
end

@doc """
Expand Down
23 changes: 18 additions & 5 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ defmodule KafkaEx.Server do
# Default from GenServer
@default_call_timeout 5_000

# the timeout parameter is used also for network requests, which
# means that if we set the same value for the GenServer timeout,
# it will always timeout first => we won't get the expected behaviour,
# which is to have the GenServer.call answer with the timeout reply.
# Instead, we get a GenServer timeout error.
# To avoid this, we add here an "buffer" time that covers the time
# needed to process the logic until the network request, and back from it.
@overhead_timeout 2_000

@doc false
@spec call(
GenServer.server(),
Expand All @@ -262,7 +271,7 @@ defmodule KafkaEx.Server do
end

def call(server, request, timeout) when is_integer(timeout) do
GenServer.call(server, request, timeout)
GenServer.call(server, request, timeout + @overhead_timeout)
end

defmacro __using__(_) do
Expand Down Expand Up @@ -556,7 +565,7 @@ defmodule KafkaEx.Server do
response -> Offset.parse_response(response)
end

state = %{state | correlation_id: state.correlation_id + 1}
state = increment_state_correlation_id(state)
{response, state}
end

Expand Down Expand Up @@ -796,9 +805,9 @@ defmodule KafkaEx.Server do

defp check_brokers_sockets!(brokers) do
any_socket_opened = brokers
|> Enum.map(fn %Broker{socket: socket} -> !is_nil(socket) end)
|> Enum.reduce(&(&1 || &2))
if !any_socket_opened do
|> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end)

if not any_socket_opened do
sleep_for_reconnect()
raise "Brokers sockets are not opened"
end
Expand Down Expand Up @@ -994,6 +1003,10 @@ defmodule KafkaEx.Server do
defp default_partitioner do
Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner)
end

defp increment_state_correlation_id(%_{correlation_id: correlation_id} = state) do
%{state | correlation_id: correlation_id + 1}
end
end
end
end
74 changes: 35 additions & 39 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ defmodule KafkaEx.Server0P10AndLater do
|> first_broker_response(state)
|> ApiVersions.parse_response()

{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
{:reply, response, increment_state_correlation_id(state)}
end

def kafka_server_delete_topics(topics, network_timeout, state) do
Expand All @@ -214,27 +214,13 @@ defmodule KafkaEx.Server0P10AndLater do
api_version
)

broker = state.brokers |> Enum.find(& &1.is_controller)

{response, state} =
case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}
case request_to_controller(main_request, state) do
{{:ok, response}, state} ->
{DeleteTopics.parse_response(response, api_version), state}

_ ->
response =
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} -> {:error, reason}
response -> DeleteTopics.parse_response(response, api_version)
end

{response, %{state | correlation_id: state.correlation_id + 1}}
other ->
other
end

state = update_metadata(state)
Expand Down Expand Up @@ -265,34 +251,44 @@ defmodule KafkaEx.Server0P10AndLater do
api_version
)

broker = state.brokers |> Enum.find(& &1.is_controller)

{response, state} =
case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}
case request_to_controller(main_request, state) do
{{:ok, response}, state} ->
{CreateTopics.parse_response(response, api_version), state}

_ ->
response =
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} -> {:error, reason}
response -> CreateTopics.parse_response(response, api_version)
end

{response, %{state | correlation_id: state.correlation_id + 1}}
other ->
other
end

state = update_metadata(state)

{:reply, response, state}
end

defp request_to_controller(main_request, state) do
broker = state.brokers |> Enum.find(& &1.is_controller)

case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}

_ ->
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} ->
{{:error, reason}, increment_state_correlation_id(state)}

response ->
{{:ok, response}, increment_state_correlation_id(state)}
end
end
end

defp update_consumer_metadata(state),
do: update_consumer_metadata(state, @retry_count, 0)

Expand Down

0 comments on commit f88d102

Please sign in to comment.