Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 10 additions & 13 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,27 @@ cache:
matrix:
include:
- elixir: 1.9.4
otp_release: 21.1
- elixir: 1.8.1
otp_release: 21.1
# stick to OTP release 21.3 for now.
# We have problems with SSL and OTP 22
# https://github.com/kafkaex/kafka_ex/issues/389
otp_release: 21.3
env: COVERALLS=true CREDO=true
- elixir: 1.8.2
otp_release: 21.3
- elixir: 1.7.4
otp_release: 21.1
otp_release: 21.3
- elixir: 1.6.6
otp_release: 20.2
otp_release: 20.3
- elixir: 1.5.3
otp_release: 20.2
- elixir: 1.4.5
otp_release: 19.3
env: COVERALLS=true CREDO=true
- elixir: 1.3.4
otp_release: 19.3
dist: trusty
sudo: required
services:
- docker
env:
global:
- COVERALLS=false
- CREDO=false
before_script:
- IP_IFACE=eth0 ./scripts/docker_up.sh
- ./scripts/docker_up.sh
script:
- MIX_ENV=test mix deps.compile
- MIX_ENV=test mix compile --warnings-as-errors
Expand Down
16 changes: 5 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ KafkaEx
[![API Docs](https://img.shields.io/badge/api-docs-yellow.svg?style=flat)](http://hexdocs.pm/kafka_ex/)

KafkaEx is an Elixir client for [Apache Kafka](http://kafka.apache.org/) with
support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.1.1+ and
Erlang OTP 18+.
support for Kafka versions 0.8.0 and newer. KafkaEx requires Elixir 1.5+ and
Erlang OTP 19+.

See [http://hexdocs.pm/kafka_ex/](http://hexdocs.pm/kafka_ex/) for
documentation,
Expand Down Expand Up @@ -386,15 +386,9 @@ To launch the included test cluster, run
```

The `docker_up.sh` script will attempt to determine an IP address for your
computer on an active network interface. If it has trouble with this, you can
try manually specifying a network interface in the `IP_IFACE` environment
variable:
computer on an active network interface.

```
IP_IFACE=eth0 ./scripts/docker_up.sh
```

The test cluster runs Kafka 0.10.1.0.
The test cluster runs Kafka 0.11.0.1.

### Running the KafkaEx Tests

Expand Down Expand Up @@ -452,7 +446,7 @@ mix test --include integration --include server_0_p_8_p_0

### Static analysis

This requires Elixir 1.3.2+.
This requires Elixir 1.5+.

```
mix dialyzer
Expand Down
5 changes: 0 additions & 5 deletions all_tests.sh

This file was deleted.

4 changes: 2 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ config :kafka_ex,
# server.properties file.
# In the case below you would set "advertised.host.name=localhost"
brokers: [
{"localhost", 9092},
{"localhost", 9093},
{"localhost", 9094}
{"localhost", 9094},
{"localhost", 9095}
],
#
# OR:
Expand Down
3 changes: 3 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use Mix.Config

config :ex_unit, capture_log: true

config :kafka_ex,
sync_timeout: 60_000
46 changes: 46 additions & 0 deletions docker-compose-kafka.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#################################
# Common Kafka config
#################################

# Note: any property `something.bla` can be configure by setting `KAFKA_SOMETHING_BLA`.

######## topic creation

KAFKA_CREATE_TOPICS=consumer_group_implementation_test:4:2,test0p8p0:4:2

######## zookeeper

KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS=6000

######## advertised hosts names and protocols

# alternative to KAFKA_ADVERTISED_HOST_NAME is: HOSTNAME_COMMAND: ip addr | grep -o "inet [0-9.]*" | grep -v "127\.0\.0\.1" | grep -o "[0-9.]*"
KAFKA_ADVERTISED_HOST_NAME=localhost
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INSIDE:SSL,OUTSIDE:SSL
KAFKA_ADVERTISED_PROTOCOL_NAME=OUTSIDE
KAFKA_PROTOCOL_NAME=INSIDE

######## SSL

KAFKA_SSL_KEYSTORE_LOCATION=/ssl/server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD=kafka_ex
KAFKA_SSL_KEY_PASSWORD=kafka_ex
KAFKA_SSL_TRUSTSTORE_LOCATION=/ssl/server.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD=kafka_ex
KAFKA_SSL_SECURE_RANDOM_IMPLEMENTATION=SHA1PRNG

######## Config

KAFKA_DELETE_TOPIC_ENABLE=true
# KAFKA_NUM_NETWORK_THREADS=3
# KAFKA_NUM_IO_THREADS=8
# KAFKA_SOCKET_SEND_BUFFER_BYTES=102400
# KAFKA_SOCKET_RECEIVE_BUFFER_BYTES=102400
# KAFKA_SOCKET_REQUEST_MAX_BYTES=104857600
KAFKA_LOG_DIRS=/tmp/kafka_log
KAFKA_NUM_PARTITIONS=1
KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR=1
KAFKA_LOG_RETENTION_HOURS=168
# KAFKA_LOG_SEGMENT_BYTES=1073741824
# KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS=300000
29 changes: 19 additions & 10 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
version: '2'
version: '3.2'

services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
Expand All @@ -7,30 +8,38 @@ services:
kafka1:
image: wurstmeister/kafka:0.11.0.1
ports:
- "9092:9092"
- "9093:9093"
depends_on:
- zookeeper
volumes:
- ./kafka1/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 1
KAFKA_ADVERTISED_PORT: 9093

kafka2:
image: wurstmeister/kafka:0.11.0.1
ports:
- "9093:9093"
- "9094:9094"
depends_on:
- zookeeper
volumes:
- ./kafka2/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 2
KAFKA_ADVERTISED_PORT: 9094

kafka3:
image: wurstmeister/kafka:0.11.0.1
ports:
- "9094:9094"
- "9095:9095"
depends_on:
- zookeeper
volumes:
- ./kafka3/server.properties.in:/opt/kafka/config/server.properties.in
- ./scripts/docker-start-kafka.sh:/usr/bin/start-kafka.sh
- ./ssl:/ssl
env_file: docker-compose-kafka.env
environment:
KAFKA_BROKER_ID: 3
KAFKA_ADVERTISED_PORT: 9095
4 changes: 2 additions & 2 deletions lib/kafka_ex/gen_consumer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -471,8 +471,8 @@ defmodule KafkaEx.GenConsumer do
"""
@spec partition(GenServer.server()) ::
{topic :: binary, partition_id :: non_neg_integer}
def partition(gen_consumer) do
GenServer.call(gen_consumer, :partition)
def partition(gen_consumer, timeout \\ 5000) do
GenServer.call(gen_consumer, :partition, timeout)
end

@doc """
Expand Down
23 changes: 18 additions & 5 deletions lib/kafka_ex/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,15 @@ defmodule KafkaEx.Server do
# Default from GenServer
@default_call_timeout 5_000

# the timeout parameter is used also for network requests, which
# means that if we set the same value for the GenServer timeout,
# it will always timeout first => we won't get the expected behaviour,
# which is to have the GenServer.call answer with the timeout reply.
# Instead, we get a GenServer timeout error.
# To avoid this, we add here an "buffer" time that covers the time
# needed to process the logic until the network request, and back from it.
@overhead_timeout 2_000

@doc false
@spec call(
GenServer.server(),
Expand All @@ -262,7 +271,7 @@ defmodule KafkaEx.Server do
end

def call(server, request, timeout) when is_integer(timeout) do
GenServer.call(server, request, timeout)
GenServer.call(server, request, timeout + @overhead_timeout)
end

defmacro __using__(_) do
Expand Down Expand Up @@ -556,7 +565,7 @@ defmodule KafkaEx.Server do
response -> Offset.parse_response(response)
end

state = %{state | correlation_id: state.correlation_id + 1}
state = increment_state_correlation_id(state)
{response, state}
end

Expand Down Expand Up @@ -796,9 +805,9 @@ defmodule KafkaEx.Server do

defp check_brokers_sockets!(brokers) do
any_socket_opened = brokers
|> Enum.map(fn %Broker{socket: socket} -> !is_nil(socket) end)
|> Enum.reduce(&(&1 || &2))
if !any_socket_opened do
|> Enum.any?(fn %Broker{socket: socket} -> not is_nil(socket) end)

if not any_socket_opened do
sleep_for_reconnect()
raise "Brokers sockets are not opened"
end
Expand Down Expand Up @@ -994,6 +1003,10 @@ defmodule KafkaEx.Server do
defp default_partitioner do
Application.get_env(:kafka_ex, :partitioner, KafkaEx.DefaultPartitioner)
end

defp increment_state_correlation_id(%_{correlation_id: correlation_id} = state) do
%{state | correlation_id: correlation_id + 1}
end
end
end
end
74 changes: 35 additions & 39 deletions lib/kafka_ex/server_0_p_10_and_later.ex
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ defmodule KafkaEx.Server0P10AndLater do
|> first_broker_response(state)
|> ApiVersions.parse_response()

{:reply, response, %{state | correlation_id: state.correlation_id + 1}}
{:reply, response, increment_state_correlation_id(state)}
end

def kafka_server_delete_topics(topics, network_timeout, state) do
Expand All @@ -214,27 +214,13 @@ defmodule KafkaEx.Server0P10AndLater do
api_version
)

broker = state.brokers |> Enum.find(& &1.is_controller)

{response, state} =
case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}
case request_to_controller(main_request, state) do
{{:ok, response}, state} ->
{DeleteTopics.parse_response(response, api_version), state}

_ ->
response =
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} -> {:error, reason}
response -> DeleteTopics.parse_response(response, api_version)
end

{response, %{state | correlation_id: state.correlation_id + 1}}
other ->
other
end

state = update_metadata(state)
Expand Down Expand Up @@ -265,34 +251,44 @@ defmodule KafkaEx.Server0P10AndLater do
api_version
)

broker = state.brokers |> Enum.find(& &1.is_controller)

{response, state} =
case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}
case request_to_controller(main_request, state) do
{{:ok, response}, state} ->
{CreateTopics.parse_response(response, api_version), state}

_ ->
response =
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} -> {:error, reason}
response -> CreateTopics.parse_response(response, api_version)
end

{response, %{state | correlation_id: state.correlation_id + 1}}
other ->
other
end

state = update_metadata(state)

{:reply, response, state}
end

defp request_to_controller(main_request, state) do
broker = state.brokers |> Enum.find(& &1.is_controller)

case broker do
nil ->
Logger.log(:error, "Coordinator for topic is not available")
{:topic_not_found, state}

_ ->
broker
|> NetworkClient.send_sync_request(
main_request,
config_sync_timeout()
)
|> case do
{:error, reason} ->
{{:error, reason}, increment_state_correlation_id(state)}

response ->
{{:ok, response}, increment_state_correlation_id(state)}
end
end
end

defp update_consumer_metadata(state),
do: update_consumer_metadata(state, @retry_count, 0)

Expand Down
Loading