/
MultiDestinationPublisherAgent.java
143 lines (126 loc) · 4.69 KB
/
MultiDestinationPublisherAgent.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.aeroncookbook.aeron.mdc;
import io.aeron.Aeron;
import io.aeron.Publication;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import org.agrona.CloseHelper;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.SleepingMillisIdleStrategy;
import org.agrona.concurrent.SystemEpochClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
import java.util.Enumeration;
public class MultiDestinationPublisherAgent implements Agent
{
private static final EpochClock CLOCK = SystemEpochClock.INSTANCE;
private static final int STREAM_ID = 100;
private static final Logger LOGGER = LoggerFactory.getLogger(MultiDestinationPublisherAgent.class);
private final MediaDriver mediaDriver;
private final Aeron aeron;
private final MutableDirectBuffer mutableDirectBuffer;
private final Publication publication;
private long nextAppend = Long.MIN_VALUE;
private long lastSeq = 0;
public MultiDestinationPublisherAgent(final String host, final int controlChannelPort)
{
this.mediaDriver = launchMediaDriver();
this.mutableDirectBuffer = new UnsafeBuffer(ByteBuffer.allocateDirect(Long.BYTES));
this.aeron = launchAeron(mediaDriver);
LOGGER.info("Media Driver directory is {}", mediaDriver.aeronDirectoryName());
final var publicationChannel = "aeron:udp?control-mode=dynamic|control=" + localHost(host) +
":" + controlChannelPort;
LOGGER.info("creating publication");
publication = aeron.addExclusivePublication(publicationChannel, STREAM_ID);
}
private Aeron launchAeron(final MediaDriver mediaDriver)
{
LOGGER.info("launching aeron");
return Aeron.connect(new Aeron.Context()
.aeronDirectoryName(mediaDriver.aeronDirectoryName())
.errorHandler(this::errorHandler)
.idleStrategy(new SleepingMillisIdleStrategy()));
}
private MediaDriver launchMediaDriver()
{
LOGGER.info("launching media driver");
final var mediaDriverContext = new MediaDriver.Context()
.spiesSimulateConnection(true)
.errorHandler(this::errorHandler)
.threadingMode(ThreadingMode.SHARED)
.sharedIdleStrategy(new SleepingMillisIdleStrategy())
.dirDeleteOnStart(true);
return MediaDriver.launch(mediaDriverContext);
}
private void errorHandler(final Throwable throwable)
{
LOGGER.error("unexpected failure {}", throwable.getMessage(), throwable);
}
@Override
public void onStart()
{
LOGGER.info("Starting up");
Agent.super.onStart();
}
@Override
public int doWork()
{
if (CLOCK.time() >= nextAppend)
{
lastSeq += 1;
mutableDirectBuffer.putLong(0, lastSeq);
publication.offer(mutableDirectBuffer, 0, Long.BYTES);
nextAppend = CLOCK.time() + 2000;
LOGGER.info("appended {}", lastSeq);
}
return 0;
}
@Override
public void onClose()
{
Agent.super.onClose();
LOGGER.info("Shutting down");
CloseHelper.quietClose(publication);
CloseHelper.quietClose(aeron);
CloseHelper.quietClose(mediaDriver);
}
public String localHost(final String fallback)
{
try
{
final Enumeration<NetworkInterface> interfaceEnumeration = NetworkInterface.getNetworkInterfaces();
while (interfaceEnumeration.hasMoreElements())
{
final var networkInterface = interfaceEnumeration.nextElement();
if (networkInterface.getName().startsWith("eth0"))
{
final Enumeration<InetAddress> interfaceAddresses = networkInterface.getInetAddresses();
while (interfaceAddresses.hasMoreElements())
{
if (interfaceAddresses.nextElement() instanceof Inet4Address inet4Address)
{
LOGGER.info("detected ip4 address as {}", inet4Address.getHostAddress());
return inet4Address.getHostAddress();
}
}
}
}
}
catch (final Exception e)
{
LOGGER.info("Failed to get address, using {}", fallback);
}
return fallback;
}
@Override
public String roleName()
{
return "mdc-publisher";
}
}