/
Connection.fs
149 lines (119 loc) · 4.71 KB
/
Connection.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
namespace Algorithm
open System
open Nata.IO
open Nata.IO.Channel
open Nata.Core
type LogConnection =
{ name : string
indexOf : Indexer<int64>
readFrom : ReaderFrom<Target option,int64> }
type LogStore =
{ name : string
channel : string
connect : string->string->Codec<string,Target option>->LogConnection[] }
module EventStore =
open Nata.IO.EventStore
let [<Literal>] Name = "EventStore"
let [<Literal>] Channel = "Stream"
let connect (host:string) (stream:string) (codec:Codec<string,Target option>) : LogConnection[] =
let settings =
{ Settings.Server = { Server.Host = host; Port = 1113 }
Settings.User = { User.Name = "admin"; Password = "changeit" }
Settings.Options = { Options.BatchSize = 1 } }
let streamFor =
let dataCodec =
Codec.BytesToString
|> Codec.concatenate codec
settings
|> Stream.connect
|> Source.mapData dataCodec
let indexOf, readFrom =
let stream =
streamFor stream
stream |> indexer,
stream |> readerFrom
[| { name = stream
indexOf = indexOf
readFrom = readFrom } |]
let store = { LogStore.name=Name
LogStore.channel=Channel
LogStore.connect=connect }
module KafkaNet =
open Nata.IO.KafkaNet
let [<Literal>] Name = "KafkaNet"
let [<Literal>] Channel = "Topic"
let connect (host:string) (topic:string) (codec:Codec<string,Target option>) : LogConnection[] =
let cluster = [ host ]
let partitions =
let indexOf =
Topic.connect cluster topic
|> indexer
indexOf Position.Start
|> Offsets.partitions
[|
for partition in partitions do
let partitionFor =
let dataCodec =
Codec.BytesToString
|> Codec.concatenate codec
let indexCodec =
Offset.Codec.OffsetToInt64 partition
TopicPartition.connect cluster
|> Source.mapData dataCodec
|> Source.mapIndex indexCodec
let name = sprintf "%s/%d" topic partition
let indexOf, readFrom =
let partition =
partitionFor (topic,partition)
partition |> indexer,
partition |> readerFrom
yield
{ name = name
indexOf = indexOf
readFrom = readFrom }
|]
let store = { LogStore.name=Name
LogStore.channel=Channel
LogStore.connect=connect }
module Kafunk =
open Nata.IO.Kafunk
let [<Literal>] Name = "Kafunk"
let [<Literal>] Channel = "Topic"
let connect (host:string) (topic:string) (codec:Codec<string,Target option>) : LogConnection[] =
let cluster = Kafunk.Connection.create { Settings.defaultSettings with Hosts=[ host ] }
let partitions =
let indexOf =
Topic.connect cluster topic
|> indexer
indexOf Position.Start
|> Offsets.partitions
[|
for partition in partitions do
let partitionFor =
let dataCodec =
Codec.BytesToString
|> Codec.concatenate codec
let indexCodec =
Offset.Codec.OffsetToInt64 partition
TopicPartition.connect cluster
|> Source.mapData dataCodec
|> Source.mapIndex indexCodec
let name = sprintf "%s/%d" topic partition
let indexOf, readFrom =
let partition =
partitionFor (topic,partition)
partition |> indexer,
partition |> readerFrom
yield
{ name = name
indexOf = indexOf
readFrom = readFrom }
|]
let store = { LogStore.name=Name
LogStore.channel=Channel
LogStore.connect=connect }
module Connections =
let kafkanet = KafkaNet.store
let kafunk = Kafunk.store
let eventstore = EventStore.store
let all = [| kafkanet; kafunk; eventstore |]