Skip to content

Commit

Permalink
One more fix of code style in some classes
Browse files Browse the repository at this point in the history
  • Loading branch information
Kirill380 committed Sep 30, 2016
1 parent d4caa73 commit bb067db
Show file tree
Hide file tree
Showing 26 changed files with 539 additions and 461 deletions.
Expand Up @@ -51,7 +51,8 @@ public FileSystemLogAppender() {
} }


@Override @Override
public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDeliveryCallback listener) { public void doAppend(LogEventPack logEventPack, RecordHeader header,
LogDeliveryCallback listener) {
if (!closed) { if (!closed) {
try { try {
String path = logsRootPath + "/" + tenantDirName + "/" + applicationDirName; String path = logsRootPath + "/" + tenantDirName + "/" + applicationDirName;
Expand All @@ -63,8 +64,8 @@ public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDelivery
} }
LOG.debug("[{}] appended {} logs to directory", path, logEventPack.getEvents().size()); LOG.debug("[{}] appended {} logs to directory", path, logEventPack.getEvents().size());
listener.onSuccess(); listener.onSuccess();
} catch (Exception e) { } catch (Exception ex) {
LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed", getName()), e); LOG.error(MessageFormat.format("[{0}] Attempted to append logs failed", getName()), ex);
listener.onInternalError(); listener.onInternalError();
} }
} else { } else {
Expand All @@ -76,8 +77,11 @@ public void doAppend(LogEventPack logEventPack, RecordHeader header, LogDelivery
private List<String> eventsToStrings(List<LogEventDto> dtos) { private List<String> eventsToStrings(List<LogEventDto> dtos) {
List<String> events = new ArrayList<>(); List<String> events = new ArrayList<>();
for (LogEventDto logEventDto : dtos) { for (LogEventDto logEventDto : dtos) {
String event = new StringBuilder("{\"Log Header\": \"").append(logEventDto.getHeader()).append("\", \"Event\": ") String event = new StringBuilder("{\"Log Header\": \"")
.append(logEventDto.getEvent()).append("}").toString(); .append(logEventDto.getHeader())
.append("\", \"Event\": ")
.append(logEventDto.getEvent())
.append("}").toString();
events.add(event); events.add(event);
} }
return events; return events;
Expand All @@ -95,11 +99,12 @@ protected void initFromConfiguration(LogAppenderDto appenderDto, FileConfig conf
logger = new LogbackFileSystemLogger(); logger = new LogbackFileSystemLogger();
} }
initLogDirectories(appenderDto); initLogDirectories(appenderDto);
logger.init(appenderDto, configuration, Paths.get(logsRootPath, tenantDirName, applicationDirName, "application.log")); logger.init(appenderDto, configuration,
Paths.get(logsRootPath, tenantDirName, applicationDirName, "application.log"));
fileSystemLogEventService.createUserAndGroup(appenderDto, configuration, fileSystemLogEventService.createUserAndGroup(appenderDto, configuration,
Paths.get(logsRootPath, tenantDirName, applicationDirName).toAbsolutePath().toString()); Paths.get(logsRootPath, tenantDirName, applicationDirName).toAbsolutePath().toString());
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Failed to init file system log appender: ", e); LOG.error("Failed to init file system log appender: ", ex);
} }
} }


Expand All @@ -110,7 +115,8 @@ private void createTenantLogDirectory(String tenantId) {


private void createApplicationLogDirectory(String applicationToken) { private void createApplicationLogDirectory(String applicationToken) {
applicationDirName = "application_" + applicationToken; applicationDirName = "application_" + applicationToken;
fileSystemLogEventService.createDirectory(logsRootPath + "/" + tenantDirName + "/" + applicationDirName); fileSystemLogEventService
.createDirectory(logsRootPath + "/" + tenantDirName + "/" + applicationDirName);
} }


private void initLogDirectories(LogAppenderDto appender) { private void initLogDirectories(LogAppenderDto appender) {
Expand All @@ -126,8 +132,8 @@ public void close() {
if (logger != null) { if (logger != null) {
try { try {
logger.close(); logger.close();
} catch (IOException e) { } catch (IOException ex) {
LOG.warn("IO Exception catched: ", e); LOG.warn("IO Exception catched: ", ex);
} }
} }
fileSystemLogEventService = null; fileSystemLogEventService = null;
Expand Down
Expand Up @@ -25,15 +25,15 @@
public interface FileSystemLogEventService { public interface FileSystemLogEventService {


/** /**
* Create Directory with specific path * Create Directory with specific path.
* *
* @param path the path to directory * @param path the path to directory
*/ */
void createDirectory(String path); void createDirectory(String path);


/** /**
* Create log user and log group and give them permissions to access * Create log user and log group and give them permissions to access
* logs of application with specific id * logs of application with specific id.
* *
* @param applicationId the application id * @param applicationId the application id
* @param config the File log appender config * @param config the File log appender config
Expand Down
Expand Up @@ -67,7 +67,8 @@ public void createDirectory(String path) {


@Override @Override
public void createUserAndGroup(LogAppenderDto appender, FileConfig config, String path) { public void createUserAndGroup(LogAppenderDto appender, FileConfig config, String path) {
LOG.debug("Starting create user and group for application with id: {}", appender.getApplicationId()); LOG.debug("Starting create user and group for application with id: {}",
appender.getApplicationId());
String userName = "kaa_log_user_" + appender.getApplicationToken(); String userName = "kaa_log_user_" + appender.getApplicationToken();
String groupName = "kaa_log_group_" + appender.getApplicationToken(); String groupName = "kaa_log_group_" + appender.getApplicationToken();
String publicKey = config.getPublicKey(); String publicKey = config.getPublicKey();
Expand All @@ -85,8 +86,8 @@ public void createUserAndGroup(LogAppenderDto appender, FileConfig config, Strin
executeCommand(null, "sudo", createUserScript.getAbsolutePath(), executeCommand(null, "sudo", createUserScript.getAbsolutePath(),
userName, groupName, path, tmpKeyFile.getAbsolutePath()); userName, groupName, path, tmpKeyFile.getAbsolutePath());


} catch (IOException e) { } catch (IOException ex) {
LOG.error("Unexpected exception occurred while creating user", e); LOG.error("Unexpected exception occurred while creating user", ex);
} finally { } finally {
if (tmpKeyFile != null) { if (tmpKeyFile != null) {
tmpKeyFile.delete(); tmpKeyFile.delete();
Expand All @@ -105,8 +106,8 @@ public void createRootLogDirCommand(String logsRootPath) {
createRootLogDirScript = prepareScriptFile(CREATE_ROOT_LOG_DIR); createRootLogDirScript = prepareScriptFile(CREATE_ROOT_LOG_DIR);
executeCommand(null, "sudo", createRootLogDirScript.getAbsolutePath(), executeCommand(null, "sudo", createRootLogDirScript.getAbsolutePath(),
logsRootPath, DEFAULT_SYSTEM_USER); logsRootPath, DEFAULT_SYSTEM_USER);
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Can't create root log dir: " + logsRootPath, e); LOG.error("Can't create root log dir: " + logsRootPath, ex);
} finally { } finally {
if (createRootLogDirScript != null) { if (createRootLogDirScript != null) {
createRootLogDirScript.delete(); createRootLogDirScript.delete();
Expand All @@ -121,8 +122,8 @@ public void removeAll(String path) {
try { try {
FileUtils.deleteDirectory(directory); FileUtils.deleteDirectory(directory);
LOG.debug("Directory was successfully deleted"); LOG.debug("Directory was successfully deleted");
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Unable to delete directory with path: {}, exception catched: {}", path, e); LOG.error("Unable to delete directory with path: {}, exception catched: {}", path, ex);
} }
} }


Expand Down
Expand Up @@ -60,12 +60,15 @@ public void init(FlumeConfig configuration) {


@Override @Override
public List<Event> generateEvents(String appToken, LogSchema schema, List<LogEvent> logEvents, public List<Event> generateEvents(String appToken, LogSchema schema, List<LogEvent> logEvents,
ProfileInfo clientProfile, ProfileInfo serverProfile, RecordHeader header) { ProfileInfo clientProfile, ProfileInfo serverProfile,
LOG.debug("Build flume events with appToken [{}], schema version [{}], events: [{}] and header [{}].", appToken, schema.getVersion(), logEvents, header); RecordHeader header) {
LOG.debug("Build flume events with appToken [{}], schema version [{}], events: [{}]"
+ " and header [{}].", appToken, schema.getVersion(), logEvents, header);
List<Event> events = null; List<Event> events = null;
switch (flumeEventFormat) { switch (flumeEventFormat) {
case RECORDS_CONTAINER: case RECORDS_CONTAINER:
Event event = generateRecordsContainerEvent(appToken, schema, logEvents, clientProfile, serverProfile, header); Event event = generateRecordsContainerEvent(appToken, schema, logEvents, clientProfile,
serverProfile, header);
if (event != null) { if (event != null) {
events = Collections.singletonList(event); events = Collections.singletonList(event);
} }
Expand All @@ -79,8 +82,9 @@ public List<Event> generateEvents(String appToken, LogSchema schema, List<LogEve
return events; return events;
} }


private Event generateRecordsContainerEvent(String appToken, LogSchema schema, List<LogEvent> logEvents, private Event generateRecordsContainerEvent(String appToken, LogSchema schema,
ProfileInfo clientProfile, ProfileInfo serverProfile, RecordHeader header) { List<LogEvent> logEvents, ProfileInfo clientProfile,
ProfileInfo serverProfile, RecordHeader header) {
if (clientProfile == null && includeClientProfile) { if (clientProfile == null && includeClientProfile) {
LOG.error("Can't generate records container event. " + CLIENT_PROFILE_NOT_SET); LOG.error("Can't generate records container event. " + CLIENT_PROFILE_NOT_SET);
throw new RuntimeException(CLIENT_PROFILE_NOT_SET); throw new RuntimeException(CLIENT_PROFILE_NOT_SET);
Expand Down Expand Up @@ -130,8 +134,8 @@ private Event generateRecordsContainerEvent(String appToken, LogSchema schema, L
writer.write(logData, encoder); writer.write(logData, encoder);
encoder.flush(); encoder.flush();
event = EventBuilder.withBody(baos.toByteArray()); event = EventBuilder.withBody(baos.toByteArray());
} catch (IOException e) { } catch (IOException ex) {
LOG.warn("Can't convert avro object {} to binary. Exception catched: {}", logData, e); LOG.warn("Can't convert avro object {} to binary. Exception catched: {}", logData, ex);
} }
LOG.trace("Build flume event with array body [{}]", baos); LOG.trace("Build flume event with array body [{}]", baos);
return event; return event;
Expand Down
Expand Up @@ -68,15 +68,17 @@ public FlumeLogAppender() {
public void run() { public void run() {
long second = System.currentTimeMillis() / 1000; long second = System.currentTimeMillis() / 1000;
LOG.info( LOG.info(
"[{}] Received {} log record count, {} success flume callbacks, {} failure flume callbacks / second.", "[{}] Received {} log record count, {} success flume callbacks, "
+ "{} failure flume callbacks / second.",
second, inputLogCount.getAndSet(0), flumeSuccessLogCount.getAndSet(0), second, inputLogCount.getAndSet(0), flumeSuccessLogCount.getAndSet(0),
flumeFailureLogCount.getAndSet(0)); flumeFailureLogCount.getAndSet(0));
} }
}, 0L, 1L, TimeUnit.SECONDS); }, 0L, 1L, TimeUnit.SECONDS);
} }


@Override @Override
public void doAppend(final LogEventPack logEventPack, final RecordHeader header, final LogDeliveryCallback listener) { public void doAppend(final LogEventPack logEventPack, final RecordHeader header,
final LogDeliveryCallback listener) {
if (!closed) { if (!closed) {
if (executor == null || callbackExecutor == null || flumeClientManager == null) { if (executor == null || callbackExecutor == null || flumeClientManager == null) {
reinit(); reinit();
Expand Down Expand Up @@ -110,12 +112,12 @@ public void run() {
LOG.warn("Unable to generate Flume events from log event pack!"); LOG.warn("Unable to generate Flume events from log event pack!");
listener.onInternalError(); listener.onInternalError();
} }
} catch (EventDeliveryException e) { } catch (EventDeliveryException ex) {
LOG.warn("Can't send flume event. ", e); LOG.warn("Can't send flume event. ", ex);
listener.onConnectionError(); listener.onConnectionError();
} }
} catch (Exception e) { } catch (Exception ex) {
LOG.warn("Got exception. Can't process log events", e); LOG.warn("Got exception. Can't process log events", ex);
listener.onInternalError(); listener.onInternalError();
} }
} }
Expand All @@ -133,31 +135,40 @@ protected void initFromConfiguration(LogAppenderDto appender, FlumeConfig config
this.configuration = configuration; this.configuration = configuration;
flumeEventBuilder = new FlumeAvroEventBuilder(); flumeEventBuilder = new FlumeAvroEventBuilder();
flumeEventBuilder.init(configuration); flumeEventBuilder.init(configuration);
int executorPoolSize = Math.min(configuration.getExecutorThreadPoolSize(), MAX_CALLBACK_THREAD_POOL_SIZE); int executorPoolSize = Math.min(configuration.getExecutorThreadPoolSize(),
int callbackPoolSize = Math.min(configuration.getCallbackThreadPoolSize(), MAX_CALLBACK_THREAD_POOL_SIZE); MAX_CALLBACK_THREAD_POOL_SIZE);

int callbackPoolSize = Math.min(configuration.getCallbackThreadPoolSize(),
MAX_CALLBACK_THREAD_POOL_SIZE);

executor = Executors.newFixedThreadPool(executorPoolSize); executor = Executors.newFixedThreadPool(executorPoolSize);
callbackExecutor = Executors.newFixedThreadPool(callbackPoolSize); callbackExecutor = Executors.newFixedThreadPool(callbackPoolSize);
flumeClientManager = FlumeClientManager.getInstance(configuration); flumeClientManager = FlumeClientManager.getInstance(configuration);
} catch (Exception e) { } catch (Exception ex) {
LOG.error("Failed to init Flume log appender: ", e); LOG.error("Failed to init Flume log appender: ", ex);
} }
} }


public void reinit() { public void reinit() {
if (configuration == null) { if (configuration == null) {
LOG.warn("Flume configuration wasn't initialized. Invoke method init with configuration before."); LOG.warn("Flume configuration wasn't initialized. "
+ "Invoke method init with configuration before.");
return; return;
} }
if (flumeEventBuilder == null) { if (flumeEventBuilder == null) {
flumeEventBuilder = new FlumeAvroEventBuilder(); flumeEventBuilder = new FlumeAvroEventBuilder();
flumeEventBuilder.init(configuration); flumeEventBuilder.init(configuration);
} }
if (executor == null) { if (executor == null) {
int executorPoolSize = Math.min(configuration.getExecutorThreadPoolSize(), MAX_CALLBACK_THREAD_POOL_SIZE); int executorPoolSize = Math.min(configuration.getExecutorThreadPoolSize(),
MAX_CALLBACK_THREAD_POOL_SIZE);

executor = Executors.newFixedThreadPool(executorPoolSize); executor = Executors.newFixedThreadPool(executorPoolSize);
} }
if (callbackExecutor == null) { if (callbackExecutor == null) {
int callbackPoolSize = Math.min(configuration.getCallbackThreadPoolSize(), MAX_CALLBACK_THREAD_POOL_SIZE); int callbackPoolSize = Math.min(configuration.getCallbackThreadPoolSize(),
MAX_CALLBACK_THREAD_POOL_SIZE);

callbackExecutor = Executors.newFixedThreadPool(callbackPoolSize); callbackExecutor = Executors.newFixedThreadPool(callbackPoolSize);
} }
if (flumeClientManager == null) { if (flumeClientManager == null) {
Expand Down Expand Up @@ -208,12 +219,12 @@ public void onSuccess(AppendBatchAsyncResultPojo result) {
} }


@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable throwable) {
flumeFailureLogCount.getAndAdd(size); flumeFailureLogCount.getAndAdd(size);
LOG.warn("Failed to store record", t); LOG.warn("Failed to store record", throwable);
if (t instanceof IOException) { if (throwable instanceof IOException) {
callback.onConnectionError(); callback.onConnectionError();
} else if (t instanceof EventDeliveryException) { } else if (throwable instanceof EventDeliveryException) {
callback.onRemoteError(); callback.onRemoteError();
} else { } else {
callback.onInternalError(); callback.onInternalError();
Expand Down

0 comments on commit bb067db

Please sign in to comment.