Skip to content

Commit

Permalink
Fixed some warns.
Browse files Browse the repository at this point in the history
  • Loading branch information
Acarus committed Oct 1, 2016
1 parent fb7d67a commit d1982fd
Show file tree
Hide file tree
Showing 50 changed files with 1,803 additions and 1,044 deletions.
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.kaaproject.kaa.server.flume.sink.hdfs; package org.kaaproject.kaa.server.flume.sink.hdfs;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand Down
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */

package org.kaaproject.kaa.server.flume.sink.hdfs; package org.kaaproject.kaa.server.flume.sink.hdfs;


import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -68,9 +69,11 @@ public class KaaHdfsSink extends AbstractSink implements Configurable, Configura
private static final Logger LOG = LoggerFactory.getLogger(KaaHdfsSink.class); private static final Logger LOG = LoggerFactory.getLogger(KaaHdfsSink.class);
/** /**
* Singleton credential manager that manages static credentials for the * Singleton credential manager that manages static credentials for the
* entire JVM * entire JVM.
*/ */
private static final AtomicReference<KerberosUser> staticLogin = new AtomicReference<>(); //NOSONAR private static final AtomicReference<KerberosUser> staticLogin =
new AtomicReference<>(); //NOSONAR

private boolean started = false; private boolean started = false;
private KaaEventFactory eventFactory; private KaaEventFactory eventFactory;
private Context context; private Context context;
Expand Down Expand Up @@ -136,10 +139,10 @@ private static synchronized UserGroupInformation kerberosLogin(
UserGroupInformation curUser = null; UserGroupInformation curUser = null;
try { try {
curUser = UserGroupInformation.getLoginUser(); curUser = UserGroupInformation.getLoginUser();
} catch (IOException e) { } catch (IOException ex) {
// not a big deal but this shouldn't typically happen because it will // not a big deal but this shouldn't typically happen because it will
// generally fall back to the UNIX user // generally fall back to the UNIX user
LOG.debug("Unable to get login user before Kerberos auth attempt.", e); LOG.debug("Unable to get login user before Kerberos auth attempt.", ex);
} }


// we already have logged in successfully // we already have logged in successfully
Expand All @@ -150,8 +153,8 @@ private static synchronized UserGroupInformation kerberosLogin(
// no principal found // no principal found
} else { } else {


LOG.info("{}: Attempting kerberos login as principal ({}) from keytab " + LOG.info("{}: Attempting kerberos login as principal ({}) from keytab file ({})",
"file ({})", new Object[]{sink, principal, keytab}); new Object[]{sink, principal, keytab});


// attempt static kerberos login // attempt static kerberos login
UserGroupInformation.loginUserFromKeytab(principal, keytab); UserGroupInformation.loginUserFromKeytab(principal, keytab);
Expand All @@ -170,18 +173,24 @@ public void configure(Context context) {
statisticsInterval = context.getLong(CONFIG_STATISTICS_INTERVAL, DEFAULT_STATISTICS_INTERVAL); statisticsInterval = context.getLong(CONFIG_STATISTICS_INTERVAL, DEFAULT_STATISTICS_INTERVAL);


// writers // writers
threadsPoolSize = context.getInteger(CONFIG_HDFS_THREAD_POOL_SIZE, DEFAULT_HDFS_THREAD_POOL_SIZE); threadsPoolSize = context.getInteger(
rollTimerPoolSize = context.getInteger(CONFIG_HDFS_ROLL_TIMER_POOL_SIZE, DEFAULT_HDFS_ROLL_TIMER_POOL_SIZE); CONFIG_HDFS_THREAD_POOL_SIZE, DEFAULT_HDFS_THREAD_POOL_SIZE);

rollTimerPoolSize = context.getInteger(
CONFIG_HDFS_ROLL_TIMER_POOL_SIZE, DEFAULT_HDFS_ROLL_TIMER_POOL_SIZE);
maxOpenFiles = context.getInteger(CONFIG_HDFS_MAX_OPEN_FILES, DEFAULT_HDFS_MAX_OPEN_FILES); maxOpenFiles = context.getInteger(CONFIG_HDFS_MAX_OPEN_FILES, DEFAULT_HDFS_MAX_OPEN_FILES);
cacheCleanupInterval = context.getInteger(CONFIG_HDFS_CACHE_CLEANUP_INTERVAL, DEFAULT_HDFS_CACHE_CLEANUP_INTERVAL) * 1000; cacheCleanupInterval = context.getInteger(
writerExpirationInterval = context.getInteger(CONFIG_HDFS_WRITER_EXPIRATION_INTERVAL, DEFAULT_HDFS_WRITER_EXPIRATION_INTERVAL); CONFIG_HDFS_CACHE_CLEANUP_INTERVAL, DEFAULT_HDFS_CACHE_CLEANUP_INTERVAL) * 1000;
writerExpirationInterval = context.getInteger(
CONFIG_HDFS_WRITER_EXPIRATION_INTERVAL, DEFAULT_HDFS_WRITER_EXPIRATION_INTERVAL);
callTimeout = context.getLong(CONFIG_HDFS_CALL_TIMEOUT, DEFAULT_HDFS_CALL_TIMEOUT); callTimeout = context.getLong(CONFIG_HDFS_CALL_TIMEOUT, DEFAULT_HDFS_CALL_TIMEOUT);


rollInterval = context.getLong(CONFIG_HDFS_ROLL_INTERVAL, DEFAULT_HDFS_ROLL_INTERVAL); rollInterval = context.getLong(CONFIG_HDFS_ROLL_INTERVAL, DEFAULT_HDFS_ROLL_INTERVAL);
rollSize = context.getLong(CONFIG_HDFS_ROLL_SIZE, DEFAULT_HDFS_ROLL_SIZE); rollSize = context.getLong(CONFIG_HDFS_ROLL_SIZE, DEFAULT_HDFS_ROLL_SIZE);
rollCount = context.getLong(CONFIG_HDFS_ROLL_COUNT, DEFAULT_HDFS_ROLL_COUNT); rollCount = context.getLong(CONFIG_HDFS_ROLL_COUNT, DEFAULT_HDFS_ROLL_COUNT);
batchSize = context.getLong(CONFIG_HDFS_BATCH_SIZE, DEFAULT_HDFS_BATCH_SIZE); batchSize = context.getLong(CONFIG_HDFS_BATCH_SIZE, DEFAULT_HDFS_BATCH_SIZE);
defaultBlockSize = context.getLong(CONFIG_HDFS_DEFAULT_BLOCK_SIZE, DEFAULT_HDFS_DEFAULT_BLOCK_SIZE); defaultBlockSize = context.getLong(
CONFIG_HDFS_DEFAULT_BLOCK_SIZE, DEFAULT_HDFS_DEFAULT_BLOCK_SIZE);


filePrefix = context.getString(CONFIG_HDFS_FILE_PREFIX, DEFAULT_HDFS_FILE_PREFIX); filePrefix = context.getString(CONFIG_HDFS_FILE_PREFIX, DEFAULT_HDFS_FILE_PREFIX);


Expand Down Expand Up @@ -217,11 +226,11 @@ private <T> T callWithTimeout(Callable<T> callable)
} else { } else {
return future.get(); return future.get();
} }
} catch (TimeoutException eT) { } catch (TimeoutException ex) {
future.cancel(true); future.cancel(true);
sinkCounter.incrementConnectionFailedCount(); sinkCounter.incrementConnectionFailedCount();
throw new IOException("Callable timed out after " + callTimeout + " ms", throw new IOException("Callable timed out after " + callTimeout + " ms",
eT); ex);
} catch (ExecutionException e1) { } catch (ExecutionException e1) {
sinkCounter.incrementConnectionFailedCount(); sinkCounter.incrementConnectionFailedCount();
Throwable cause = e1.getCause(); Throwable cause = e1.getCause();
Expand Down Expand Up @@ -270,7 +279,8 @@ public Status process() throws EventDeliveryException {
// cacheCleanupStartInterval = System.currentTimeMillis(); // cacheCleanupStartInterval = System.currentTimeMillis();
//} //}


Map<KaaSinkKey, List<KaaRecordEvent>> incomingEventsMap = eventFactory.processIncomingFlumeEvent(event); Map<KaaSinkKey, List<KaaRecordEvent>> incomingEventsMap =
eventFactory.processIncomingFlumeEvent(event);
if (incomingEventsMap == null || incomingEventsMap.isEmpty()) { if (incomingEventsMap == null || incomingEventsMap.isEmpty()) {
if (LOG.isWarnEnabled()) { if (LOG.isWarnEnabled()) {
LOG.warn("Unable to parse incoming event: " + event); LOG.warn("Unable to parse incoming event: " + event);
Expand Down Expand Up @@ -315,9 +325,9 @@ public Status process() throws EventDeliveryException {
return Status.BACKOFF; return Status.BACKOFF;
} }
return Status.READY; return Status.READY;
} catch (IOException eIO) { } catch (IOException eIo) {
transaction.rollback(); transaction.rollback();
LOG.warn("HDFS IO error", eIO); LOG.warn("HDFS IO error", eIo);
return Status.BACKOFF; return Status.BACKOFF;
} catch (Throwable th) { //NOSONAR } catch (Throwable th) { //NOSONAR
transaction.rollback(); transaction.rollback();
Expand All @@ -343,8 +353,8 @@ public void start() {
new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); new ThreadFactoryBuilder().setNameFormat(timeoutName).build());


String rollerName = "hdfs-" + getName() + "-roll-timer-%d"; String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
timedRollerPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(rollTimerPoolSize, timedRollerPool = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(
new ThreadFactoryBuilder().setNameFormat(rollerName).build()); rollTimerPoolSize, new ThreadFactoryBuilder().setNameFormat(rollerName).build());


if (statisticsInterval > 0) { if (statisticsInterval > 0) {
String statisticsName = "hdfs-" + getName() + "-statistics-%d"; String statisticsName = "hdfs-" + getName() + "-statistics-%d";
Expand All @@ -354,11 +364,12 @@ public void start() {
Runnable action = new Runnable() { Runnable action = new Runnable() {
@Override @Override
public void run() { public void run() {
LOG.info("Statistics: Drain attempt events: " + sinkCounter.getEventDrainAttemptCount() + "; " + LOG.info("Statistics: Drain attempt events: " + sinkCounter.getEventDrainAttemptCount()
"Drain success events: " + sinkCounter.getEventDrainSuccessCount()); + "; " + "Drain success events: " + sinkCounter.getEventDrainSuccessCount());
} }
}; };
statisticsFuture = statisticsPool.scheduleWithFixedDelay(action, 0, statisticsInterval, TimeUnit.SECONDS); statisticsFuture = statisticsPool.scheduleWithFixedDelay(
action, 0, statisticsInterval, TimeUnit.SECONDS);
} }


cacheCleanupStartInterval = System.currentTimeMillis(); cacheCleanupStartInterval = System.currentTimeMillis();
Expand All @@ -374,11 +385,11 @@ public void run() {
proxyTicket, proxyTicket,
sinkCounter); sinkCounter);


writerCache = CacheBuilder.newBuilder(). writerCache = CacheBuilder.newBuilder()
maximumSize(maxOpenFiles). .maximumSize(maxOpenFiles)
expireAfterWrite(writerExpirationInterval, TimeUnit.SECONDS). .expireAfterWrite(writerExpirationInterval, TimeUnit.SECONDS)
removalListener(this). .removalListener(this)
build(bucketWriterLoader); .build(bucketWriterLoader);


writerFlushMap = new HashMap<HdfsSinkKey, BucketWriter>(); writerFlushMap = new HashMap<HdfsSinkKey, BucketWriter>();


Expand All @@ -401,8 +412,8 @@ public void stop() {
try { try {
close(entry.getValue()); close(entry.getValue());
} catch (Exception ex) { } catch (Exception ex) {
LOG.warn("Exception while closing " + entry.getKey() + ". " + LOG.warn("Exception while closing " + entry.getKey() + ". "
"Exception follows.", ex); + "Exception follows.", ex);
if (ex instanceof InterruptedException) { if (ex instanceof InterruptedException) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
Expand All @@ -420,7 +431,9 @@ public void stop() {
try { try {
while (execService.isTerminated() == false) { while (execService.isTerminated() == false) {
execService.awaitTermination( execService.awaitTermination(
Math.max(ConfigurationConstants.DEFAULT_HDFS_CALL_TIMEOUT, callTimeout), TimeUnit.MILLISECONDS); Math.max(
ConfigurationConstants.DEFAULT_HDFS_CALL_TIMEOUT, callTimeout),
TimeUnit.MILLISECONDS);
} }
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.warn("shutdown interrupted on " + execService, ex); LOG.warn("shutdown interrupted on " + execService, ex);
Expand All @@ -440,8 +453,8 @@ public void stop() {


@Override @Override
public String toString() { public String toString() {
return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName() + return "{ Sink type:" + getClass().getSimpleName() + ", name:" + getName()
" }"; + " }";
} }


public long getEventDrainSuccessCount() { public long getEventDrainSuccessCount() {
Expand All @@ -459,10 +472,10 @@ public void onRemoval(
try { try {
writerFlushMap.remove(key); writerFlushMap.remove(key);
writer.close(); writer.close();
} catch (IOException e) { } catch (IOException ex) {
LOG.warn(entry.getKey().toString(), e); LOG.warn(entry.getKey().toString(), ex);
} catch (InterruptedException e) { } catch (InterruptedException ex) {
LOG.warn(entry.getKey().toString(), e); LOG.warn(entry.getKey().toString(), ex);
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
} }
Expand Down Expand Up @@ -503,9 +516,9 @@ private boolean authenticate() {
// resolves _HOST pattern using standard Hadoop search/replace // resolves _HOST pattern using standard Hadoop search/replace
// via DNS lookup when 2nd argument is empty // via DNS lookup when 2nd argument is empty
principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, ""); principal = SecurityUtil.getServerPrincipal(kerbConfPrincipal, "");
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Host lookup error resolving kerberos principal (" LOG.error("Host lookup error resolving kerberos principal ("
+ kerbConfPrincipal + "). Exception follows.", e); + kerbConfPrincipal + "). Exception follows.", ex);
return false; return false;
} }


Expand All @@ -520,30 +533,30 @@ private boolean authenticate() {
// since we don't have to be unnecessarily protective if they switch all // since we don't have to be unnecessarily protective if they switch all
// HDFS sinks to use a different principal all at once. // HDFS sinks to use a different principal all at once.
Preconditions.checkState(prevUser == null || prevUser.equals(newUser), Preconditions.checkState(prevUser == null || prevUser.equals(newUser),
"Cannot use multiple kerberos principals in the same agent. " + "Cannot use multiple kerberos principals in the same agent. "
" Must restart agent to use new principal or keytab. " + + " Must restart agent to use new principal or keytab. "
"Previous = %s, New = %s", prevUser, newUser); + "Previous = %s, New = %s", prevUser, newUser);


// attempt to use cached credential if the user is the same // attempt to use cached credential if the user is the same
// this is polite and should avoid flooding the KDC with auth requests // this is polite and should avoid flooding the KDC with auth requests
UserGroupInformation curUser = null; UserGroupInformation curUser = null;
if (prevUser != null && prevUser.equals(newUser)) { if (prevUser != null && prevUser.equals(newUser)) {
try { try {
curUser = UserGroupInformation.getLoginUser(); curUser = UserGroupInformation.getLoginUser();
} catch (IOException e) { } catch (IOException ex) {
LOG.warn("User unexpectedly had no active login. Continuing with " + LOG.warn("User unexpectedly had no active login. Continuing with "
"authentication", e); + "authentication", ex);
} }
} }


if (curUser == null || !curUser.getUserName().equals(principal)) { if (curUser == null || !curUser.getUserName().equals(principal)) {
try { try {
// static login // static login
kerberosLogin(this, principal, kerbKeytab); kerberosLogin(this, principal, kerbKeytab);
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Authentication or file read error while attempting to " LOG.error("Authentication or file read error while attempting to "
+ "login as kerberos principal (" + principal + ") using " + "login as kerberos principal (" + principal + ") using "
+ "keytab (" + kerbKeytab + "). Exception follows.", e); + "keytab (" + kerbKeytab + "). Exception follows.", ex);
return false; return false;
} }
} else { } else {
Expand All @@ -560,8 +573,8 @@ private boolean authenticate() {
try { try {
proxyTicket = UserGroupInformation.createProxyUser( proxyTicket = UserGroupInformation.createProxyUser(
proxyUserName, UserGroupInformation.getLoginUser()); proxyUserName, UserGroupInformation.getLoginUser());
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Unable to login as proxy user. Exception follows.", e); LOG.error("Unable to login as proxy user. Exception follows.", ex);
return false; return false;
} }
} }
Expand All @@ -572,9 +585,9 @@ private boolean authenticate() {
} else if (useSecurity) { } else if (useSecurity) {
try { try {
ugi = UserGroupInformation.getLoginUser(); ugi = UserGroupInformation.getLoginUser();
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Unexpected error: Unable to get authenticated user after " + LOG.error("Unexpected error: Unable to get authenticated user after "
"apparent successful login! Exception follows.", e); + "apparent successful login! Exception follows.", ex);
return false; return false;
} }
} }
Expand All @@ -592,9 +605,8 @@ private boolean authenticate() {
LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod()); LOG.info(" Superuser auth: {}", superUser.getAuthenticationMethod());
LOG.info(" Superuser name: {}", superUser.getUserName()); LOG.info(" Superuser name: {}", superUser.getUserName());
LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab()); LOG.info(" Superuser using keytab: {}", superUser.isFromKeytab());
} catch (IOException e) { } catch (IOException ex) {
LOG.error("Unexpected error: unknown superuser impersonating proxy.", LOG.error("Unexpected error: unknown superuser impersonating proxy.", ex);
e);
return false; return false;
} }
} }
Expand All @@ -608,7 +620,7 @@ private boolean authenticate() {
} }


/** /**
* Append to bucket writer with timeout enforced * Append to bucket writer with timeout enforced.
*/ */
private void appendBatch(final BucketWriter bucketWriter, final List<KaaRecordEvent> events) private void appendBatch(final BucketWriter bucketWriter, final List<KaaRecordEvent> events)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Expand All @@ -623,7 +635,7 @@ public Void call() throws Exception {
} }


/** /**
* Flush bucket writer with timeout enforced * Flush bucket writer with timeout enforced.
*/ */
private void flush(final BucketWriter bucketWriter) private void flush(final BucketWriter bucketWriter)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Expand All @@ -638,7 +650,7 @@ public Void call() throws Exception {
} }


/** /**
* Close bucket writer with timeout enforced * Close bucket writer with timeout enforced.
*/ */
private void close(final BucketWriter bucketWriter) private void close(final BucketWriter bucketWriter)
throws IOException, InterruptedException { throws IOException, InterruptedException {
Expand Down
Expand Up @@ -22,7 +22,7 @@
import io.swagger.annotations.ApiResponse; import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses; import io.swagger.annotations.ApiResponses;


import org.kaaproject.kaa.common.dto.ctl.CtlSchemaDto; import org.kaaproject.kaa.common.dto.ctl.CTLSchemaDto;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaExportMethod; import org.kaaproject.kaa.common.dto.ctl.CTLSchemaExportMethod;
import org.kaaproject.kaa.common.dto.ctl.CTLSchemaMetaInfoDto; import org.kaaproject.kaa.common.dto.ctl.CTLSchemaMetaInfoDto;
import org.kaaproject.kaa.common.dto.file.FileData; import org.kaaproject.kaa.common.dto.file.FileData;
Expand Down Expand Up @@ -76,7 +76,7 @@ public class CtlController extends AbstractAdminController {
@ApiResponse(code = 500, message = "An unexpected error occurred on the server side")}) @ApiResponse(code = 500, message = "An unexpected error occurred on the server side")})
@RequestMapping(value = "CTL/saveSchema", params = {"body"}, method = RequestMethod.POST) @RequestMapping(value = "CTL/saveSchema", params = {"body"}, method = RequestMethod.POST)
@ResponseBody @ResponseBody
public CtlSchemaDto saveCtlSchemaWithAppToken( public CTLSchemaDto saveCtlSchemaWithAppToken(
@ApiParam(name = "body", value = "The CTL schema structure", required = true) @ApiParam(name = "body", value = "The CTL schema structure", required = true)
@RequestParam String body, @RequestParam String body,
@ApiParam(name = "tenantId", value = "A unique tenant identifier", required = false) @ApiParam(name = "tenantId", value = "A unique tenant identifier", required = false)
Expand Down Expand Up @@ -156,7 +156,7 @@ public void deleteCtlSchemaByFqnVersionTenantIdAndApplicationToken(
@ApiResponse(code = 500, message = "An unexpected error occurred on the server side")}) @ApiResponse(code = 500, message = "An unexpected error occurred on the server side")})
@RequestMapping(value = "CTL/getSchema", params = {"fqn", "version"}, method = RequestMethod.GET) @RequestMapping(value = "CTL/getSchema", params = {"fqn", "version"}, method = RequestMethod.GET)
@ResponseBody @ResponseBody
public CtlSchemaDto getCtlSchemaByFqnVersionTenantIdAndApplicationToken( public CTLSchemaDto getCtlSchemaByFqnVersionTenantIdAndApplicationToken(
@ApiParam(name = "fqn", @ApiParam(name = "fqn",
value = "The fully qualified name of the CTL schema", required = true) value = "The fully qualified name of the CTL schema", required = true)
@RequestParam String fqn, @RequestParam String fqn,
Expand Down Expand Up @@ -192,7 +192,7 @@ public CtlSchemaDto getCtlSchemaByFqnVersionTenantIdAndApplicationToken(
@ApiResponse(code = 500, message = "An unexpected error occurred on the server side")}) @ApiResponse(code = 500, message = "An unexpected error occurred on the server side")})
@RequestMapping(value = "CTL/getSchemaById", params = {"id"}, method = RequestMethod.GET) @RequestMapping(value = "CTL/getSchemaById", params = {"id"}, method = RequestMethod.GET)
@ResponseBody @ResponseBody
public CtlSchemaDto getCtlSchemaById( public CTLSchemaDto getCtlSchemaById(
@ApiParam(name = "id", value = "A unique CTL schema identifier", required = true) @ApiParam(name = "id", value = "A unique CTL schema identifier", required = true)
@RequestParam String id) throws KaaAdminServiceException { @RequestParam String id) throws KaaAdminServiceException {
return ctlService.getCtlSchemaById(id); return ctlService.getCtlSchemaById(id);
Expand Down

0 comments on commit d1982fd

Please sign in to comment.