-
Notifications
You must be signed in to change notification settings - Fork 0
/
FLUME
42 lines (32 loc) · 1.39 KB
/
FLUME
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
flume:
bin/flume-ng agent -c conf -f conf/$conf.properties -n $name -Dflume.monitoring.type=http -Dflume.monitoring.port=$port
org.apache.flume.node.Application
EventBus eb = new EventBus(agentName + "-event-bus");
application = new Application(components);
eb.register(application");
//life cycle component to run
now PollingPropertiesFileConfigurationProvider(agentName, configurationFile, eventBus, 30);
loadChannels(agentConf, channelComponentMap);
loadSources(agentConf, channelComponentMap, sourceRunnerMap);
loaderSinks(agentConf, channelComponentMap, sinkRunnerMap);
conf example:
$name.sources=source1
$name.channels=channel1
$name.sinks=sink1
$name.sources.source1.type=org.apache.flume.source.kafka.KafkaSource
$name.sources.source1.batchSize=50
$name.sources.source1.batchDurationMillis=2000
$name.sources.source1.kafka.bootstrap.servers=$servers
$name.sources.source1.kafka.topics=$topic
$name.sources.source1.kafka.consumer.group.id=$groupid
$name.sources.source1.channels=channel1
$name.channels.channel1.type=memory
$name.channels.capacity=1000
$name.channels.transactionCapacity=100
$name.sinks.sink1.channel=channel1
$name.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
$name.sinks.sink1.kafka.topic=$topic
$name.sinks.sink1.kafka.bootstrap.servers=$server
$name.sinks.sink1.kafka.flumeBatchSize=20
$name.sinks.sink1.kafka.producer.acks=1
$name.sinks.sink1.kafka.producer.linger.ms=1