/
RollingByTypeAndDayFileSink.java
274 lines (238 loc) · 10.5 KB
/
RollingByTypeAndDayFileSink.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
package com.baidu.unbiz.flume.sink;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.serialization.EventSerializer;
import org.apache.flume.serialization.EventSerializerFactory;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
/**
* 把<code>Event</code>写入文件的sink
* <p/>
* 这里有两个条件:<br/>
* 1)Event header中必须有timestamp,否则会忽略事件,并且会抛出{@link InputNotSpecifiedException} <br/>
* 2)Event body如果是按照<code>##$$##</code>分隔的,那么把分隔之前的字符串当做模块名称(module name)来处理;如果没有则默认为default文件名<br/>
* <p/>
* 输出到本地文件,首先要设置一个跟目录,通过<code>sink.directory</code>设置。
* 其次根据条件#2中提取出来的module name作为文件名称前缀,timestamp日志作为文件名称后缀,例如文件名为portal.20150606或者default.20150703。
* <p/>
* NOTE:具有rolling by day的功能,文件会按照timestamp进行天级别粒度的存储。
*
* @author zhangxu
*/
public class RollingByTypeAndDayFileSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(RollingByTypeAndDayFileSink.class);
private static final int defaultBatchSize = 100;
private int batchSize = defaultBatchSize;
private String directory;
private ConcurrentMap<String, OutputStreamWrapper> fileName2OutputStream = Maps.newConcurrentMap();
private String serializerType;
private Context serializerContext;
private SinkCounter sinkCounter;
public RollingByTypeAndDayFileSink() {
}
@Override
public void configure(Context context) {
String directory = context.getString("sink.directory");
serializerType = context.getString("sink.serializer", "TEXT");
serializerContext =
new Context(context.getSubProperties("sink." +
EventSerializer.CTX_PREFIX));
Preconditions.checkArgument(directory != null, "Directory may not be null");
Preconditions.checkNotNull(serializerType, "Serializer type is undefined");
batchSize = context.getInteger("sink.batchSize", defaultBatchSize);
this.directory = directory;
if (sinkCounter == null) {
sinkCounter = new SinkCounter(getName());
}
}
@Override
public void start() {
logger.info("Starting {}...", this);
sinkCounter.start();
super.start();
logger.info("RollingByTypeAndDaySink {} started.", getName());
}
@Override
public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
Status result = Status.READY;
try {
transaction.begin();
int eventAttemptCounter = 0;
OutputStreamWrapper outputStreamWrapper = null;
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {
sinkCounter.incrementEventDrainAttemptCount();
eventAttemptCounter++;
String moduleName = getModuleName(event);
Date date = getDate(event.getHeaders());
outputStreamWrapper = fileName2OutputStream.get(moduleName);
if (outputStreamWrapper == null) {
outputStreamWrapper = createOutputStreamWrapper(moduleName, date);
fileName2OutputStream.put(moduleName, outputStreamWrapper);
} else {
if (!DateUtil.isSameDay(outputStreamWrapper.getDate(), date)) {
logger.debug("Time to rotate {}", getFileByModuleName(moduleName, date));
destroyOutputStreamWrapper(outputStreamWrapper, moduleName, date);
outputStreamWrapper = createOutputStreamWrapper(moduleName, date);
fileName2OutputStream.put(moduleName, outputStreamWrapper);
}
}
outputStreamWrapper.getSerializer().write(event);
} else {
// No events found, request back-off semantics from runner
result = Status.BACKOFF;
break;
}
}
if (outputStreamWrapper != null) {
outputStreamWrapper.getSerializer().flush();
outputStreamWrapper.getOutputStream().flush();
}
transaction.commit();
sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
} catch (InputNotSpecifiedException ex) {
transaction.rollback();
//logger.error(ex.getMessage());
} catch (Exception ex) {
transaction.rollback();
throw new EventDeliveryException("Failed to process transaction", ex);
} finally {
transaction.close();
}
return result;
}
@Override
public void stop() {
logger.info("RollingByTypeAndDay sink {} stopping...", getName());
sinkCounter.stop();
super.stop();
Collection<OutputStreamWrapper> outputStreamWrapperCollection = fileName2OutputStream.values();
if (outputStreamWrapperCollection != null) {
for (OutputStreamWrapper outputStreamWrapper : outputStreamWrapperCollection) {
destroyOutputStreamWrapper(outputStreamWrapper);
}
}
logger.info("RollingByTypeAndDay sink {} stopped. Event metrics: {}", getName(), sinkCounter);
}
private String getModuleName(Event event) {
try {
String line = new String(event.getBody(), "UTF-8");
String[] seps = line.split("##\\$\\$##");
if (seps != null && seps.length == 2) {
if (StringUtils.isNotEmpty(seps[1])) {
event.setBody(seps[1].getBytes("UTF-8"));
}
return seps[0];
}
} catch (UnsupportedEncodingException e) {
logger.error(e.getMessage());
}
return "default";
}
private Date getDate(Map<String, String> eventHeaders) {
String timestamp = eventHeaders.get("timestamp");
if (StringUtils.isEmpty(timestamp)) {
throw new InputNotSpecifiedException("timestamp cannot be found in event header");
}
long millis = 0L;
if (!StringUtils.isEmpty(timestamp)) {
try {
millis = Long.parseLong(timestamp);
} catch (Exception e) {
throw new InputNotSpecifiedException("timestamp cannot be parsed in event header");
}
}
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(millis);
return calendar.getTime();
}
public OutputStreamWrapper createOutputStreamWrapper(String moduleName, Date date) throws EventDeliveryException {
OutputStreamWrapper outputStreamWrapper = new OutputStreamWrapper();
File currentFile = getFileByModuleName(moduleName, date);
logger.debug("Opening output stream for file {}", currentFile);
try {
OutputStream outputStream = new BufferedOutputStream(
new FileOutputStream(currentFile));
EventSerializer serializer = EventSerializerFactory.getInstance(
serializerType, serializerContext, outputStream);
serializer.afterCreate();
outputStreamWrapper.setOutputStream(outputStream);
outputStreamWrapper.setSerializer(serializer);
outputStreamWrapper.setDate(date);
sinkCounter.incrementConnectionCreatedCount();
} catch (IOException e) {
sinkCounter.incrementConnectionFailedCount();
throw new EventDeliveryException("Failed to open file "
+ getFileByModuleName(moduleName, date) + " while delivering event", e);
}
return outputStreamWrapper;
}
public void destroyOutputStreamWrapper(OutputStreamWrapper outputStreamWrapper) {
try {
destroyOutputStreamWrapper(outputStreamWrapper, "", new Date());
} catch (EventDeliveryException e) {
// omit
}
}
public void destroyOutputStreamWrapper(OutputStreamWrapper outputStreamWrapper, String moduleName, Date date)
throws EventDeliveryException {
if (outputStreamWrapper.getOutputStream() != null) {
logger.debug("Closing file {}", getFileByModuleName(moduleName, date));
try {
outputStreamWrapper.getSerializer().flush();
outputStreamWrapper.getSerializer().beforeClose();
outputStreamWrapper.getOutputStream().close();
sinkCounter.incrementConnectionClosedCount();
} catch (IOException e) {
sinkCounter.incrementConnectionFailedCount();
throw new EventDeliveryException("Unable to rotate file "
+ getFileByModuleName(moduleName, date) + " while delivering event", e);
} finally {
outputStreamWrapper.setOutputStream(null);
outputStreamWrapper.setSerializer(null);
}
}
outputStreamWrapper = null;
}
/**
* 返回应该写入的文件句柄,为${module}.YYYYMMDD
*
* @param moduleName 模块名称
* @param date 日期
*
* @return 文件
*/
public File getFileByModuleName(String moduleName, Date date) {
return new File(this.directory, moduleName + "." + DateUtil.formatDate(date));
}
public String getDirectory() {
return directory;
}
public void setDirectory(String directory) {
this.directory = directory;
}
}