Skip to content

Commit

Permalink
Use thread safe datetime formatter
Browse files Browse the repository at this point in the history
This closes apache#62
  • Loading branch information
Masakazu Kitajo committed Oct 14, 2016
1 parent 1304d4b commit 229aabe
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 13 deletions.
Expand Up @@ -21,6 +21,9 @@
import java.io.OutputStream;
import java.sql.Date;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -102,7 +105,7 @@
public class PersistentTopics extends AdminResource {
private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);

private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

private static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
Expand Down Expand Up @@ -917,7 +920,7 @@ public Response peekNthMessage(@PathParam("property") String property, @PathPara
}
if (metadata.hasPublishTime()) {
responseBuilder.header("X-Pulsar-publish-time",
DATE_FORMAT.format(new Date(metadata.getPublishTime())));
DATE_FORMAT.format(Instant.ofEpochMilli(metadata.getPublishTime())));
}

// Decode if needed
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static com.yahoo.pulsar.broker.service.persistent.PersistentTopic.DATE_FORMAT;
import static com.yahoo.pulsar.common.api.Commands.readChecksum;

import java.time.Instant;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -88,7 +89,7 @@ public Consumer(Subscription subscription, SubType subType, long consumerId, Str
stats = new ConsumerStats();
stats.address = cnx.clientAddress().toString();
stats.consumerName = consumerName;
stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));
stats.connectedSince = DATE_FORMAT.format(Instant.now());

if (subType == SubType.Shared) {
this.pendingAcks = new ConcurrentOpenHashMap<PositionImpl, Integer>(256, 2);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import static com.yahoo.pulsar.common.api.Commands.readChecksum;
import static com.yahoo.pulsar.common.api.Commands.hasChecksum;

import java.time.Instant;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -74,7 +75,7 @@ public Producer(Topic topic, ServerCnx cnx, long producerId, String producerName

this.stats = new PublisherStats();
stats.address = cnx.clientAddress().toString();
stats.connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));
stats.connectedSince = DATE_FORMAT.format(Instant.now());
stats.producerName = producerName;
stats.producerId = producerId;

Expand Down
Expand Up @@ -18,6 +18,9 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Collections;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -118,7 +121,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {

private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;

public static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
public static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

// Timestamp of when this topic was last seen active
private volatile long lastActive;
Expand Down Expand Up @@ -956,10 +959,10 @@ public PersistentTopicInternalStats getInternalStats() {
stats.totalSize = ml.getTotalSize();
stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
stats.currentLedgerSize = ml.getCurrentLedgerSize();
stats.lastLedgerCreatedTimestamp = DATE_FORMAT.format(new Date(ml.getLastLedgerCreatedTimestamp()));
stats.lastLedgerCreatedTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(ml.getLastLedgerCreatedTimestamp()));
if (ml.getLastLedgerCreationFailureTimestamp() != 0) {
stats.lastLedgerCreationFailureTimestamp = DATE_FORMAT
.format(new Date(ml.getLastLedgerCreationFailureTimestamp()));
.format(Instant.ofEpochMilli(ml.getLastLedgerCreationFailureTimestamp()));
}

stats.waitingCursorsCount = ml.getWaitingCursorsCount();
Expand Down Expand Up @@ -989,7 +992,7 @@ public PersistentTopicInternalStats getInternalStats() {
cs.cursorLedger = cursor.getCursorLedger();
cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
cs.lastLedgerSwitchTimestamp = DATE_FORMAT.format(new Date(cursor.getLastLedgerSwitchTimestamp()));
cs.lastLedgerSwitchTimestamp = DATE_FORMAT.format(Instant.ofEpochMilli(cursor.getLastLedgerSwitchTimestamp()));
cs.state = cursor.getState();
stats.cursors.put(cursor.getName(), cs);
});
Expand Down
Expand Up @@ -21,6 +21,9 @@

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.List;
import java.util.concurrent.BlockingQueue;
Expand Down Expand Up @@ -693,7 +696,7 @@ void connectionOpened(final ClientCnx cnx) {

log.info("[{}] [{}] Created producer on cnx {}", topic, producerName, cnx.ctx().channel());
connectionId = cnx.ctx().channel().toString();
connectedSince = DATE_FORMAT.format(new Date(System.currentTimeMillis()));
connectedSince = DATE_FORMAT.format(Instant.now());

if (this.producerName == null) {
this.producerName = producerName;
Expand Down Expand Up @@ -1091,7 +1094,7 @@ public int getPendingQueueSize() {
return pendingMessages.size();
}

private static SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

private PulsarApi.CompressionType convertCompressionType(CompressionType compressionType) {
switch (compressionType) {
Expand Down
Expand Up @@ -19,7 +19,9 @@
import static com.google.common.base.Preconditions.checkArgument;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -98,7 +100,7 @@ private void receiveMessage() {
dm.messageId = Base64.getEncoder().encodeToString(msg.getMessageId().toByteArray());
dm.payload = Base64.getEncoder().encodeToString(msg.getData());
dm.properties = msg.getProperties();
dm.publishTime = DATE_FORMAT.format(msg.getPublishTime());
dm.publishTime = DATE_FORMAT.format(Instant.ofEpochMilli(msg.getPublishTime()));
if (msg.hasKey()) {
dm.key = msg.getKey();
}
Expand Down Expand Up @@ -192,7 +194,7 @@ private static String extractSubscription(HttpServletRequest request) {
return parts.get(8);
}

private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
private static final DateTimeFormatter DATE_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneId.systemDefault());

private static final Logger log = LoggerFactory.getLogger(ConsumerHandler.class);

Expand Down

0 comments on commit 229aabe

Please sign in to comment.