Skip to content

Commit

Permalink
Kafka::Connection now strips response header
Browse files Browse the repository at this point in the history
  • Loading branch information
maiha committed Mar 6, 2018
1 parent b99b6f6 commit a2d447e
Show file tree
Hide file tree
Showing 12 changed files with 26 additions and 62 deletions.
4 changes: 2 additions & 2 deletions spec/units/protocol/fetch_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ describe Kafka::Protocol::FetchResponse do

describe "(3: UnknownTopicOrPartitionCode)" do
let(res) {
Kafka::Protocol::FetchResponse.from_kafka(bytes(0, 0, 0, 45, 0, 0, 0, 0, 0, 0, 0, 1, 0, 13, 115, 101, 118, 101, 110, 46, 99, 105, 46, 104, 116, 116, 112, 0, 0, 0, 1, 0, 0, 0, 0, 0, 3, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0))
Kafka::Protocol::FetchResponse.from_kafka(bytes(0, 0, 0, 0, 0, 0, 0, 1, 0, 13, 115, 101, 118, 101, 110, 46, 99, 105, 46, 104, 116, 116, 112, 0, 0, 0, 1, 0, 0, 0, 0, 0, 3, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, 0, 0))
}

it "creates object" do
Expand All @@ -51,7 +51,7 @@ describe Kafka::Protocol::FetchResponse do

describe "(1 data)" do
let(res) {
Kafka::Protocol::FetchResponse.from_kafka(bytes(0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 116, 49, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 30, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 18, 89, 42, 71, 135, 0, 0, 255, 255, 255, 255, 0, 0, 0, 4, 116, 101, 115, 116))
Kafka::Protocol::FetchResponse.from_kafka(bytes(0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 116, 49, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 30, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 18, 89, 42, 71, 135, 0, 0, 255, 255, 255, 255, 0, 0, 0, 4, 116, 101, 115, 116))
}

it "creates object" do
Expand Down
2 changes: 1 addition & 1 deletion spec/units/protocol/heartbeat_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ end

describe Kafka::Protocol::HeartbeatResponse do
it "from_kafka" do
res = Kafka::Protocol::HeartbeatResponse.from_kafka(bytes(0, 0, 0, 6, 0, 0, 0, 0, 0, 16))
res = Kafka::Protocol::HeartbeatResponse.from_kafka(bytes(0, 0, 0, 0, 0, 16))
expect(res.correlation_id).to eq(0)
expect(res.error_code).to eq(16)
end
Expand Down
4 changes: 2 additions & 2 deletions spec/units/protocol/metadata_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ end
describe Kafka::Protocol::MetadataResponse do
describe "(no topics)" do
it "from_kafka" do
res = Kafka::Protocol::MetadataResponse.from_kafka(bytes(0, 0, 0, 28, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 117, 98, 117, 110, 116, 117, 0, 0, 35, 132, 0, 0, 0, 0))
res = Kafka::Protocol::MetadataResponse.from_kafka(bytes(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 117, 98, 117, 110, 116, 117, 0, 0, 35, 132, 0, 0, 0, 0))
expect(res.correlation_id).to eq(0)
expect(res.brokers).to eq([Kafka::Protocol::Structure::Broker.new(1, "ubuntu", 9092)])
expect(res.topics).to eq([] of Kafka::Protocol::Structure::TopicMetadata)
Expand All @@ -25,7 +25,7 @@ describe Kafka::Protocol::MetadataResponse do
let(broker) { res.brokers.first.not_nil! }

let(res) {
Kafka::Protocol::MetadataResponse.from_kafka(bytes(0, 0, 0, 64, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 117, 98, 117, 110, 116, 117, 0, 0, 35, 132, 0, 0, 0, 1, 0, 0, 0, 2, 116, 49, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1))
Kafka::Protocol::MetadataResponse.from_kafka(bytes(0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 6, 117, 98, 117, 110, 116, 117, 0, 0, 35, 132, 0, 0, 0, 1, 0, 0, 0, 2, 116, 49, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1))
}

it "from_kafka" do
Expand Down
2 changes: 1 addition & 1 deletion spec/units/protocol/offset_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ end
describe Kafka::Protocol::OffsetResponse do
describe "(1 topic)" do
let(res) {
Kafka::Protocol::OffsetResponse.from_kafka(bytes(0, 0, 0, 42, 0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 116, 49, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0))
Kafka::Protocol::OffsetResponse.from_kafka(bytes(0, 0, 0, 0, 0, 0, 0, 1, 0, 2, 116, 49, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 5, 0, 0, 0, 0, 0, 0, 0, 0))
}

let(tpo) { res.topic_partition_offsets.first.not_nil! }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ describe Kafka::Protocol::Structure::Builder do

describe "(a: 3 partitions, b: 1 partition)" do
it "build" do
res = Kafka::Protocol::MetadataResponse.from_kafka(bytes(0, 0, 0, 196, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 9, 108, 111, 99, 97, 108, 104, 111, 115, 116, 0, 0, 35, 132, 0, 0, 0, 2, 0, 9, 108, 111, 99, 97, 108, 104, 111, 115, 116, 0, 0, 35, 133, 0, 0, 0, 2, 0, 0, 0, 1, 98, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1, 97, 0, 0, 0, 3, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1))
res = Kafka::Protocol::MetadataResponse.from_kafka(bytes(0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 9, 108, 111, 99, 97, 108, 104, 111, 115, 116, 0, 0, 35, 132, 0, 0, 0, 2, 0, 9, 108, 111, 99, 97, 108, 104, 111, 115, 116, 0, 0, 35, 133, 0, 0, 0, 2, 0, 0, 0, 1, 98, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1, 97, 0, 0, 0, 3, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 2, 0, 0, 0, 1))

# a#0 leader=2,replica=[2, 1],isr=[2, 1]
# a#1 leader=1,replica=[1, 2],isr=[1, 2]
Expand Down
1 change: 1 addition & 0 deletions src/kafka.cr
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require "./predef/*"
require "./utils/*"

require "./kafka/data"
require "./kafka/execution"
require "./kafka/*"
require "./prog/*"

Expand Down
11 changes: 0 additions & 11 deletions src/kafka/commands.cr
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,6 @@ class Kafka
# kafka.init_producer_id
# ```
include Kafka::Commands::InitProducerId

######################################################################
### general

def execute(request : Kafka::Request, handler : Kafka::Handler)
Kafka::Execution.execute(connection, request, handler)
end

def execute(request : Kafka::Request)
execute(request, handler)
end
end
end

11 changes: 2 additions & 9 deletions src/kafka/connection.cr
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,8 @@ class Kafka::Connection
raise err
end

def read : Slice
size = socket!.read_bytes(Int32, IO::ByteFormat::BigEndian)
body = Slice(UInt8).new(size)
socket!.read_fully(body)

out = IO::Memory.new(4 + size)
out.write_bytes(size, format = IO::ByteFormat::BigEndian)
out.write(body)
out.to_slice
def read : IO
Kafka::Protocol.from_kafka(socket!)
rescue err
close
raise err
Expand Down
14 changes: 12 additions & 2 deletions src/kafka/execution.cr
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ class Kafka::Execution

# recv
recv = connection.read
handler.recv(recv)
handler.recv(recv.to_slice)

# convert
response = request.class.response.from_kafka(IO::Memory.new(recv), handler.verbose)
response = request.class.response.from_kafka(recv, handler.verbose)
handler.respond(response)

handler.completed(request, response)
Expand All @@ -27,3 +27,13 @@ class Kafka::Execution
raise err
end
end

class Kafka
def execute(request : Kafka::Request, handler : Kafka::Handler)
Kafka::Execution.execute(connection, request, handler)
end

def execute(request : Kafka::Request)
execute(request, handler)
end
end
3 changes: 1 addition & 2 deletions src/kafka/protocol/macros.cr
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ module Kafka::Protocol::Structure
macro define_from_kafka
def self.from_kafka(io : IO, debug_level = -1, hint = "")
on_debug_head_padding
body = Kafka::Protocol.from_kafka(io)
super(body, debug_level_succ)
super(io, debug_level_succ)
end

def self.from_kafka(io : IO, verbose : Bool)
Expand Down
20 changes: 2 additions & 18 deletions src/prog/options.cr
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,8 @@ module Options
end

protected def execute(request, broker = build_broker)
connect(broker) do |socket|
bytes = request.to_slice
spawn do
socket.write bytes
socket.flush
sleep 0
end

recv = Kafka::Protocol.read(socket)

if verbose
logger.error "recv: #{recv}"
STDERR.flush
end

fake_io = IO::Memory.new(recv)
return request.class.response.from_kafka(fake_io, verbose)
end
kafka = Kafka.new(broker)
kafka.execute(request)
end

protected def fetch_topic_metadata(topics, app_name)
Expand Down
14 changes: 1 addition & 13 deletions src/prog/request_operations.cr
Original file line number Diff line number Diff line change
@@ -1,16 +1,4 @@
module RequestOperations
protected def execute(request)
bytes = request.to_slice
connect do |socket|
spawn do
socket.write bytes
socket.flush
sleep 0
end
return request.class.response.from_kafka(socket)
end
end

protected def build_offset_request(topics, partition, replica = -1)
po = Kafka::Protocol::Structure::Partition.new(partition, latest_offset = -1_i64, max_offsets = 999999999)
taps = topics.map { |t| Kafka::Protocol::Structure::TopicAndPartitions.new(t, [po]) }
Expand All @@ -33,7 +21,7 @@ module RequestOperations
reqs.each do |leader, req|
spawn {
req = Kafka::Protocol::OffsetRequest.new(0, "kafka-info", -1, req.topic_partitions)
res = execute(req, broker.call(leader))
res = execute(req.as(Kafka::Request), broker.call(leader))
chan.send(res)
}
end
Expand Down

0 comments on commit a2d447e

Please sign in to comment.