Skip to content

Commit

Permalink
MemoryRecords
Browse files Browse the repository at this point in the history
  • Loading branch information
maiha committed Mar 16, 2018
1 parent e85b31e commit 4442839
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 47 deletions.
21 changes: 18 additions & 3 deletions src/kafka/commands/produce.cr
Expand Up @@ -2,6 +2,7 @@ class Kafka
module Commands
module Produce
include Kafka::Protocol
include ZigZag

class ProduceOption
var required_acks : Int16, -1_i16
Expand Down Expand Up @@ -62,19 +63,33 @@ class Kafka
### v5

def produce_v5(entry : Kafka::Entry, datas : Array(Kafka::Data), opt : ProduceOption)
# res = raw_produce_v5(entry, datas, opt)
raise "not implemented"
# res = raw_produce_v5(entry, datas, opt)
# return extract_produce_info!(res)
end

def raw_produce_v5(entry : Kafka::Entry, datas : Array(Kafka::Data), opt : ProduceOption)
# req = build_produce_request_v5(entry, datas, opt)
req = build_produce_request_v5(entry, datas, opt)
# res = fetch_produce_response(req)
# return res
end

private def build_produce_request_v5(entry : Kafka::Entry, datas : Array(Kafka::Data), opt : ProduceOption)
tp = Structure::Record.new(entry, datas)

# records = datas.map{|data|
# record = Structure::Record.new(
# length: Varint.new(-1),
# attributes: Int8.new(0),
# timestamp_delta: Varlong.new(0_i64),
# offset_delta: Varint.new(0),
# key: Varbytes.new,
# val: Varbytes.new,
# headers: VarArray(Header).new
# )
# }

# tp = Structure::ProduceRequestV5
# tp = Structure::Record.new(entry, datas)
# Kafka::Protocol::ProduceRequestV1.new(0, client_id, opt.required_acks, opt.timeout_ms.to_i32, [tp])
end

Expand Down
14 changes: 14 additions & 0 deletions src/kafka/protocol/records/abstract_legacy_record_batch.cr
@@ -0,0 +1,14 @@
require "./record_batch"

module Kafka::Protocol::Structure
class AbstractLegacyRecordBatch
include RecordBatch

def initialize(@buffer : IO::Memory)
end

def each(&block)
raise "not implemented yet"
end
end
end
8 changes: 8 additions & 0 deletions src/kafka/protocol/records/compression_type.cr
@@ -0,0 +1,8 @@
module Kafka::Protocol::Structure
enum CompressionType
NONE = 0
GZIP = 1
SNAPPY = 2
LZ4 = 3
end
end
64 changes: 64 additions & 0 deletions src/kafka/protocol/records/default_record.cr
@@ -0,0 +1,64 @@
require "./record"

module Kafka::Protocol::Structure
class DefaultRecord
property attributes : Int8
property timestamp_delta : Varlong
property offset_delta : Varint
property key : Varbytes
property val : Varbytes
property headers : VarArray(Header)

def initialize(@bytes : Bytes, @base_offset : Int64)
@buffer = IO::Memory.new(@bytes)
@max_message_size = Int32.new(@bytes.size)

@record_start = 0
@attributes = Int8.from_kafka(@buffer)
@timestamp_delta = Varlong.decode(@buffer)
#long timestamp = baseTimestamp + timestampDelta;

# if (logAppendTime != null)
# timestamp = logAppendTime;

@offset_delta = Varint.decode(@buffer)
@offset = Int64.new(@base_offset + @offset_delta.value)
# int sequence = baseSequence >= 0 ?
# DefaultRecordBatch.incrementSequence(baseSequence, offsetDelta) :
# RecordBatch.NO_SEQUENCE;

@key = Varbytes.from_kafka(@buffer)
@val = Varbytes.from_kafka(@buffer)

num_headers = Varint.decode(@buffer).value
if num_headers < 0
raise "InvalidRecordException: Found invalid number of record headers %s" % num_headers
end

@headers = VarArray(Header).new
num_headers.times do
header_key_size = Varint.decode(@buffer).value
if header_key_size < 0
raise "InvalidRecordException: Invalid negative header key size %s" % header_key_size
end

header_key = String.new(@buffer.read_at(@buffer.pos, header_key_size, &.to_slice))
@buffer.seek(@buffer.pos + header_key_size)
end
end

include Record

def offset : Int64
@base_offset + @offset_delta.value
end

def key : Bytes
@key.bytes
end

def value : Bytes
@val.bytes
end
end
end
124 changes: 124 additions & 0 deletions src/kafka/protocol/records/default_record_batch.cr
@@ -0,0 +1,124 @@
require "./record_batch"

module Kafka::Protocol::Structure
class DefaultRecordBatch
include RecordBatch

BASE_OFFSET_OFFSET = 0
BASE_OFFSET_LENGTH = 8
LENGTH_OFFSET = BASE_OFFSET_OFFSET + BASE_OFFSET_LENGTH
LENGTH_LENGTH = 4
PARTITION_LEADER_EPOCH_OFFSET = LENGTH_OFFSET + LENGTH_LENGTH
PARTITION_LEADER_EPOCH_LENGTH = 4
MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH
MAGIC_LENGTH = 1
CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH
CRC_LENGTH = 4
ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH
ATTRIBUTE_LENGTH = 2
LAST_OFFSET_DELTA_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH
LAST_OFFSET_DELTA_LENGTH = 4
FIRST_TIMESTAMP_OFFSET = LAST_OFFSET_DELTA_OFFSET + LAST_OFFSET_DELTA_LENGTH
FIRST_TIMESTAMP_LENGTH = 8
MAX_TIMESTAMP_OFFSET = FIRST_TIMESTAMP_OFFSET + FIRST_TIMESTAMP_LENGTH
MAX_TIMESTAMP_LENGTH = 8
PRODUCER_ID_OFFSET = MAX_TIMESTAMP_OFFSET + MAX_TIMESTAMP_LENGTH
PRODUCER_ID_LENGTH = 8
PRODUCER_EPOCH_OFFSET = PRODUCER_ID_OFFSET + PRODUCER_ID_LENGTH
PRODUCER_EPOCH_LENGTH = 2
BASE_SEQUENCE_OFFSET = PRODUCER_EPOCH_OFFSET + PRODUCER_EPOCH_LENGTH
BASE_SEQUENCE_LENGTH = 4
RECORDS_COUNT_OFFSET = BASE_SEQUENCE_OFFSET + BASE_SEQUENCE_LENGTH
RECORDS_COUNT_LENGTH = 4
RECORDS_OFFSET = RECORDS_COUNT_OFFSET + RECORDS_COUNT_LENGTH
RECORD_BATCH_OVERHEAD = RECORDS_OFFSET

COMPRESSION_CODEC_MASK = 0x07
TRANSACTIONAL_FLAG_MASK = 0x10
CONTROL_FLAG_MASK = 0x20
TIMESTAMP_TYPE_MASK = 0x08

property count : Int32
property attributes : Int8
property magic : Int8
property base_offset : Int64
property last_offset_delta : Int32
property producer_id : Int64
property producer_epoch : Int8
property base_sequence : Int32

def initialize(@bytes : Bytes)
@buffer = IO::Memory.new(@bytes)
@max_message_size = Int32.new(@bytes.size)

@magic = get(MAGIC_OFFSET)
@count = getInt(RECORDS_COUNT_OFFSET)
@attributes = getShort(ATTRIBUTES_OFFSET)
@base_offset = getLong(BASE_OFFSET_OFFSET)
@last_offset_delta = getInt(LAST_OFFSET_DELTA_OFFSET)
@producer_id = getLong(PRODUCER_ID_OFFSET)
@producer_epoch = getShort(PRODUCER_EPOCH_OFFSET)
@base_sequence = getInt(BASE_SEQUENCE_OFFSET)
end

def last_offset : Int64
base_offset + last_offset_delta
end

def each(&block : Record -> _)
@buffer.seek(RECORD_BATCH_OVERHEAD)
return nil if count == 0

if compressed?
raise "not implemented yet"
else
if remaining > 0
begin
position = @buffer.pos
varint = Varint.from_kafka(@buffer)
record_offset = position + varint.read_bytes
record_length = varint.value
bytes = @buffer.read_at(record_offset, record_length){|io| io.to_slice}
@buffer.seek(record_offset + record_length)

record = DefaultRecord.new(bytes, base_offset)
block.call(record.as(Record))
rescue err
logger.error err.to_s
end
end
end
end

def compression_type : CompressionType
CompressionType.from_value(attributes & COMPRESSION_CODEC_MASK)
end

def compressed? : Bool
! compression_type.none?
end

private def remaining : Int32
@max_message_size - @buffer.pos
end

private def get(offset : Int32) : Int8
getShort(offset)
end

private def getShort(offset : Int32) : Int8
@buffer.seek(offset)
Int8.from_kafka(@buffer)
end

private def getInt(offset : Int32) : Int32
@buffer.seek(offset)
Int32.from_kafka(@buffer)
end

private def getLong(offset : Int32) : Int64
@buffer.seek(offset)
Int64.from_kafka(@buffer)
end
end
end
28 changes: 28 additions & 0 deletions src/kafka/protocol/records/legacy_record.cr
@@ -0,0 +1,28 @@
require "./records"

module Kafka::Protocol::Structure
class LegacyRecord
# The current offset and size for all the fixed-length fields
CRC_OFFSET = 0
CRC_LENGTH = 4
MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH
MAGIC_LENGTH = 1
ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH
ATTRIBUTES_LENGTH = 1
TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH
TIMESTAMP_LENGTH = 8
KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH
KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH
KEY_SIZE_LENGTH = 4
KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH
KEY_OFFSET_V1 = KEY_SIZE_OFFSET_V1 + KEY_SIZE_LENGTH
VALUE_SIZE_LENGTH = 4

# The size for the record header
HEADER_SIZE_V0 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH
HEADER_SIZE_V1 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH + TIMESTAMP_LENGTH

# The amount of overhead bytes in a record
RECORD_OVERHEAD_V0 = HEADER_SIZE_V0 + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH
end
end
74 changes: 74 additions & 0 deletions src/kafka/protocol/records/memory_records.cr
@@ -0,0 +1,74 @@
require "./records"

module Kafka::Protocol::Structure
class MemoryRecords
include Records

def initialize(@buffer : IO::Memory, @max_message_size : Int32)
end

def each(&block : RecordBatch -> _)
@buffer.rewind
while batch = next_batch?
block.call(batch.as(RecordBatch))
end
end

private def next_batch? : RecordBatch?
position = @buffer.pos
return nil if (remaining < LOG_OVERHEAD)

@buffer.seek(position + SIZE_OFFSET)
record_size = Int32.from_kafka(@buffer)

if record_size < LegacyRecord::RECORD_OVERHEAD_V0
raise "CorruptRecordException: Record size is less than the minimum record overhead (%d)" % LegacyRecord::RECORD_OVERHEAD_V0
end

if record_size > @max_message_size
raise "CorruptRecordException: Record size exceeds the largest allowable message size (%d)." % @max_message_size
end

batch_size = record_size + LOG_OVERHEAD
return nil if remaining < batch_size

@buffer.seek(position + MAGIC_OFFSET)
magic = Int8.from_kafka(@buffer)

batch_bytes = @buffer.read_at(position, batch_size){|io| io.to_slice}
@buffer.seek(position + batch_size)

if magic < 0 || magic > RecordBatch::CURRENT_MAGIC_VALUE
raise "CorruptRecordException: Invalid magic found in record: %s" % magic
end

if magic > RecordBatch::MAGIC_VALUE_V1
return DefaultRecordBatch.new(batch_bytes)
else
return AbstractLegacyRecordBatch.new(IO::Memory.new(batch_bytes))
end
end

def to_kafka(io : IO)
raise "NotImplementedYet"
end

private def remaining : Int32
@max_message_size - @buffer.pos
end
end

def MemoryRecords.from_kafka(io : IO, debug_level = -1, hint = "")
on_debug_head_padding

byte_size = Int32.from_kafka(io, debug_level_succ, :size)
bytes = Bytes.new(byte_size)
io.read_fully(bytes)

on_debug "RECORDS[2] -> #{byte_size} bytes".colorize(:yellow)
on_debug_head_address

records = new(IO::Memory.new(bytes), bytes.size)
return records
end
end
16 changes: 16 additions & 0 deletions src/kafka/protocol/records/record.cr
@@ -0,0 +1,16 @@
module Kafka::Protocol::Structure
module Record
abstract def offset : Int64
abstract def key : Bytes
abstract def value : Bytes
end

structure RecordImpl,
length : Varint,
attributes : Int8,
timestamp_delta : Varlong,
offset_delta : Varint,
key : Varbytes,
val : Varbytes,
headers : VarArray(Header)
end
16 changes: 16 additions & 0 deletions src/kafka/protocol/records/record_batch.cr
@@ -0,0 +1,16 @@
require "./record"

module Kafka::Protocol::Structure
# https://github.com/apache/kafka/blob/1.0/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java

module RecordBatch
include Enumerable(Record)

MAGIC_VALUE_V0 = 0
MAGIC_VALUE_V1 = 1
MAGIC_VALUE_V2 = 2

# The current "magic" value
CURRENT_MAGIC_VALUE = MAGIC_VALUE_V2
end
end

0 comments on commit 4442839

Please sign in to comment.