Skip to content
This repository has been archived by the owner. It is now read-only.
Kafunk: F# Kafka client
Branch: master
Clone or download
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
.paket
docs
lib
src/kafunk upon coordinator failover, now it does refresh metadata and retry Apr 12, 2018
support improve producer logging Apr 28, 2017
tests/kafunk.Tests fix producer consumer test Mar 27, 2018
.gitattributes
.gitignore
.travis.yml
LICENSE
README.md Update README.md Aug 21, 2018
RELEASE_NOTES.md
appveyor.yml
build.cmd
build.fsx
build.sh
kafunk.sln
paket.dependencies lz4 doesn't work yet Mar 19, 2018
paket.lock

README.md

UPDATE

We found a bug in the implementation of the v0.11+ protocol wherein messages were skipped during consumption. The bug only manifests when using the newer protocol version (the default). Due to this bug and for long term maintenance, we've started investing into the Confluent.Kafka client: https://github.com/jet/confluent-kafka-fsharp.

Kafunk

NuGet Status Build status Build status

Kafunk is a Kafka client written in F#.

See the home page for details.

Please also join the F# Open Source Group

Version Support

Version Status Notes
0.9.0 Complete
0.10.0 Complete
0.10.1 Complete
0.11+auto Protocol Protocol implementation bug found (skipped messages)

Feature Support

Feature Status
GZip Complete
Snappy Complete
LZ4 https://github.com/jet/kafunk/issues/125
TLS https://github.com/jet/kafunk/issues/66
SASL https://github.com/jet/kafunk/issues/139
ACL https://github.com/jet/kafunk/issues/140
TXNS https://github.com/jet/kafunk/issues/214

Hello World

#r "kafunk.dll"
#r "FSharp.Control.AsyncSeq.dll"

open Kafunk
open System

let conn = Kafka.connHost "existential-host"

// metadata

let metadata = 
  Kafka.metadata conn (MetadataRequest([|"absurd-topic"|])) 
  |> Async.RunSynchronously

for b in metadata.brokers do
  printfn "broker|host=%s port=%i nodeId=%i" b.host b.port b.nodeId

for t in metadata.topicMetadata do
  printfn "topic|topic_name=%s topic_error_code=%i" t.topicName t.topicErrorCode
  for p in t.partitionMetadata do
    printfn "topic|topic_name=%s|partition|partition_id=%i" t.topicName p.partitionId


// producer

let producerCfg =
  ProducerConfig.create (
    topic = "absurd-topic", 
    partition = Partitioner.roundRobin, 
    requiredAcks = RequiredAcks.Local)

let producer =
  Producer.createAsync conn producerCfg
  |> Async.RunSynchronously

// produce single message

let prodRes =
  Producer.produce producer (ProducerMessage.ofBytes ("hello world"B))
  |> Async.RunSynchronously

printfn "partition=%i offset=%i" prodRes.partition prodRes.offset






// consumer

let consumerCfg = 
  ConsumerConfig.create ("consumer-group", "absurd-topic")

let consumer =
  Consumer.create conn consumerCfg

// commit on every message set

Consumer.consume consumer 
  (fun (s:ConsumerState) (ms:ConsumerMessageSet) -> async {
    printfn "member_id=%s topic=%s partition=%i" s.memberId ms.topic ms.partition
    do! Consumer.commitOffsets consumer (ConsumerMessageSet.commitPartitionOffsets ms) })
|> Async.RunSynchronously


// commit periodically


Consumer.consumePeriodicCommit consumer
  (TimeSpan.FromSeconds 10.0) 
  (fun (s:ConsumerState) (ms:ConsumerMessageSet) -> async {
    printfn "member_id=%s topic=%s partition=%i" s.memberId ms.topic ms.partition })
|> Async.RunSynchronously


// commit consumer offsets explicitly

Consumer.commitOffsets consumer [| 0, 1L |]
|> Async.RunSynchronously

// commit consumer offsets explicitly to a relative time

Consumer.commitOffsetsToTime consumer Time.EarliestOffset
|> Async.RunSynchronously


// get current consumer state

let consumerState = 
  Consumer.state consumer
  |> Async.RunSynchronously

printfn "generation_id=%i member_id=%s leader_id=%s assignment_stratgey=%s partitions=%A" 
  consumerState.generationId consumerState.memberId consumerState.leaderId 
  consumerState.assignmentStrategy consumerState.assignments 



// fetch offsets of a consumer group for all topics

let consumerOffsets =
  Consumer.fetchOffsets conn "consumer-group" [||]
  |> Async.RunSynchronously

for (t,os) in consumerOffsets do
  for (p,o) in os do
    printfn "topic=%s partition=%i offset=%i" t p o


// fetch topic offset information

let offsets = 
  Offsets.offsets conn "absurd-topic" [] [ Time.EarliestOffset ; Time.LatestOffset ] 1
  |> Async.RunSynchronously

for kvp in offsets do
  for (tn,offsets) in kvp.Value.topics do
    for p in offsets do
      printfn "time=%i topic=%s partition=%i offsets=%A" kvp.Key tn p.partition p.offsets

Maintainers

You can’t perform that action at this time.