From a46a215b37d5fa8a550bd9235553f083bc7e8795 Mon Sep 17 00:00:00 2001 From: Fangbin Sun Date: Mon, 3 Aug 2020 23:59:55 +0800 Subject: [PATCH] Change some WebApplicationException log level to debug (#7725) ### Motivation Some user may face the following `Temporary Redirect` issue when the request topic not owned by the current broker: ``` 19:21:48.215 [pulsar-web-42-5] ERROR org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] Failed to get list of subscriptions for persistent://default_tenant/default_namespace/default_topic-partition-0 javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateReadOperationOnTopic(PersistentTopicsBase.java:245) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptionsForNonPartitionedTopic(PersistentTopicsBase.java:874) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalGetSubscriptions(PersistentTopicsBase.java:825) at org.apache.pulsar.broker.admin.v2.PersistentTopics.getSubscriptions(PersistentTopics.java:461) 09:41:45.485 [pulsar-web-42-7] WARN org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin] [persistent://public/default/test-partition-1] Failed to create subscription consumer-test at message id -1:-1:-1 javax.ws.rs.WebApplicationException: HTTP 307 Temporary Redirect at org.apache.pulsar.broker.web.PulsarWebResource.validateTopicOwnership(PulsarWebResource.java:599) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.validateAdminAccessForSubscriber(PersistentTopicsBase.java:283) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscriptionForNonPartitionedTopic(PersistentTopicsBase.java:1752) at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.internalCreateSubscription(PersistentTopicsBase.java:1673) at org.apache.pulsar.broker.admin.v2.PersistentTopics.createSubscription(PersistentTopics.java:816) ``` As discussed in https://github.com/apache/pulsar/issues/7189, it is better to use debug level when this happens. ### Modifications Change some `WebApplicationException` log level from error/warn to debug in `PersistentTopicsBase`. --- .../admin/impl/PersistentTopicsBase.java | 50 ++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 40663993e21c85..40d42680cbe5d7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -599,6 +599,13 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse, boolean authoritative, boolean force) { try { validateWriteOperationOnTopic(authoritative); + } catch (WebApplicationException wae) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to delete partitioned topic {}, redirecting to other brokers.", + clientAppId(), topicName, wae); + } + resumeAsyncResponseExceptionally(asyncResponse, wae); + return; } catch (Exception e) { log.error("[{}] Failed to delete partitioned topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); @@ -880,6 +887,13 @@ private void internalGetSubscriptionsForNonPartitionedTopic(AsyncResponse asyncR final List subscriptions = Lists.newArrayList(); topic.getSubscriptions().forEach((subName, sub) -> subscriptions.add(subName)); asyncResponse.resume(subscriptions); + } catch (WebApplicationException wae) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to get subscriptions for non-partitioned topic {}, redirecting to other brokers.", + clientAppId(), topicName, wae); + } + resumeAsyncResponseExceptionally(asyncResponse, wae); + return; } catch (Exception e) { log.error("[{}] Failed to get list of subscriptions for {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); @@ -1208,11 +1222,15 @@ private void internalDeleteSubscriptionForNonPartitionedTopic(AsyncResponse asyn log.info("[{}][{}] Deleted subscription {}", clientAppId(), topicName, subName); asyncResponse.resume(Response.noContent().build()); } catch (Exception e) { - log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e); if (e.getCause() instanceof SubscriptionBusyException) { + log.error("[{}] Failed to delete subscription {} from topic {}", clientAppId(), subName, topicName, e); asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers")); } else if (e instanceof WebApplicationException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to delete subscription from topic {}, redirecting to other brokers.", + clientAppId(), topicName, e); + } asyncResponse.resume(e); } else { log.error("[{}] Failed to delete subscription {} {}", clientAppId(), topicName, subName, e); @@ -1292,8 +1310,11 @@ private void internalDeleteSubscriptionForNonPartitionedTopicForcefully(AsyncRes log.info("[{}][{}] Deleted subscription forcefully {}", clientAppId(), topicName, subName); asyncResponse.resume(Response.noContent().build()); } catch (Exception e) { - log.error("[{}] Failed to delete subscription forcefully {} from topic {}", clientAppId(), subName, topicName, e); if (e instanceof WebApplicationException) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to delete subscription forcefully from topic {}, redirecting to other brokers.", + clientAppId(), topicName, e); + } asyncResponse.resume(e); } else { log.error("[{}] Failed to delete subscription forcefully {} {}", clientAppId(), topicName, subName, e); @@ -1388,6 +1409,12 @@ private void internalSkipAllMessagesForNonPartitionedTopic(AsyncResponse asyncRe } sub.clearBacklog().whenComplete(biConsumer); } + } catch (WebApplicationException wae) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to skip all messages for subscription on topic {}, redirecting to other brokers.", + clientAppId(), topicName, wae); + } + resumeAsyncResponseExceptionally(asyncResponse, wae); } catch (Exception e) { log.error("[{}] Failed to skip all messages for subscription {} on topic {}", clientAppId(), subName, topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); @@ -1489,6 +1516,13 @@ private void internalExpireMessagesForAllSubscriptionsForNonPartitionedTopic(Asy validateWriteOperationOnTopic(authoritative); topic = (PersistentTopic) getTopicReference(topicName); + } catch (WebApplicationException wae) { + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to expire messages for all subscription on topic {}, redirecting to other brokers.", + clientAppId(), topicName, wae); + } + resumeAsyncResponseExceptionally(asyncResponse, wae); + return; } catch (Exception e) { log.error("[{}] Failed to expire messages for all subscription on topic {}", clientAppId(), topicName, e); resumeAsyncResponseExceptionally(asyncResponse, e); @@ -1765,12 +1799,14 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn .get(); } catch (Throwable e) { Throwable t = e.getCause(); - log.warn("[{}] [{}] Failed to create subscription {} at message id {}", clientAppId(), topicName, - subscriptionName, targetMessageId, e); if (t instanceof SubscriptionInvalidCursorPosition) { asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, "Unable to find position for position specified: " + t.getMessage())); } else if (e instanceof WebApplicationException) { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Failed to create subscription {} at message id {}, redirecting to other brokers.", clientAppId(), topicName, + subscriptionName, targetMessageId, e); + } asyncResponse.resume(e); } else if (t instanceof SubscriptionBusyException) { asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED, @@ -2710,8 +2746,10 @@ protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean aut validateReadOperationOnTopic(authoritative); topic = getTopicReference(topicName); } catch (WebApplicationException wae) { - log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.", - clientAppId(), topicName, wae); + if (log.isDebugEnabled()) { + log.debug("[{}] Failed to get last messageId {}, redirecting to other brokers.", + clientAppId(), topicName, wae); + } resumeAsyncResponseExceptionally(asyncResponse, wae); return; } catch (Exception e) {