Skip to content

Commit

Permalink
Issue 5737: (SLTS) Improve metrics (pravega#5746)
Browse files Browse the repository at this point in the history
Improve SLTS metrics by adding more metrics.

Signed-off-by: Sachin Joshi <sachin.joshi@emc.com>
  • Loading branch information
sachin-j-joshi committed Feb 23, 2021
1 parent 0527199 commit 6f90911
Show file tree
Hide file tree
Showing 24 changed files with 198 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* Factory for ExtendedS3 {@link Storage} implemented using {@link ChunkedSegmentStorage} and {@link ExtendedS3ChunkStorage}.
Expand All @@ -36,7 +36,7 @@ public class ExtendedS3SimpleStorageFactory implements SimpleStorageFactory {

@NonNull
@Getter
private final ExecutorService executor;
private final ScheduledExecutorService executor;

@Override
public Storage createStorageAdapter(int containerId, ChunkMetadataStore metadataStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* Factory for FileSystem {@link Storage} implemented using {@link ChunkedSegmentStorage} and {@link FileSystemChunkStorage}.
Expand All @@ -33,7 +33,7 @@ public class FileSystemSimpleStorageFactory implements SimpleStorageFactory {

@NonNull
@Getter
private final ExecutorService executor;
private final ScheduledExecutorService executor;

@Override
public Storage createStorageAdapter(int containerId, ChunkMetadataStore metadataStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;

/**
* Factory for HDFS {@link Storage} implemented using {@link ChunkedSegmentStorage} and {@link HDFSChunkStorage}.
Expand All @@ -34,7 +34,7 @@ public class HDFSSimpleStorageFactory implements SimpleStorageFactory {

@NonNull
@Getter
private final Executor executor;
private final ScheduledExecutorService executor;

@Override
public Storage createStorageAdapter(int containerId, ChunkMetadataStore metadataStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,19 @@ public void close() {
public final static class ThreadPool implements AutoCloseable {
private final OpStatsLogger queueSize;
private final OpStatsLogger activeThreads;
private final OpStatsLogger storageQueueSize;
private final OpStatsLogger storageActiveThreads;
private final ScheduledExecutorService executor;
private final ScheduledExecutorService storageExecutor;
private final ScheduledFuture<?> reporter;

public ThreadPool(ScheduledExecutorService executor) {
public ThreadPool(ScheduledExecutorService executor, ScheduledExecutorService storageExecutor) {
this.executor = Preconditions.checkNotNull(executor, "executor");
this.storageExecutor = Preconditions.checkNotNull(storageExecutor, "storageExecutor");
this.queueSize = STATS_LOGGER.createStats(MetricsNames.THREAD_POOL_QUEUE_SIZE);
this.activeThreads = STATS_LOGGER.createStats(MetricsNames.THREAD_POOL_ACTIVE_THREADS);
this.storageQueueSize = STATS_LOGGER.createStats(MetricsNames.STORAGE_THREAD_POOL_QUEUE_SIZE);
this.storageActiveThreads = STATS_LOGGER.createStats(MetricsNames.STORAGE_THREAD_POOL_ACTIVE_THREADS);
this.reporter = executor.scheduleWithFixedDelay(this::report, 1000, 1000, TimeUnit.MILLISECONDS);
}

Expand All @@ -106,6 +112,8 @@ public void close() {
this.reporter.cancel(true);
this.queueSize.close();
this.activeThreads.close();
this.storageQueueSize.close();
this.storageActiveThreads.close();
}

private void report() {
Expand All @@ -114,6 +122,11 @@ private void report() {
this.queueSize.reportSuccessValue(s.getQueueSize());
this.activeThreads.reportSuccessValue(s.getActiveThreadCount());
}
ExecutorServiceHelpers.Snapshot ss = ExecutorServiceHelpers.getSnapshot(this.storageExecutor);
if (ss != null) {
this.storageQueueSize.reportSuccessValue(ss.getQueueSize());
this.storageActiveThreads.reportSuccessValue(ss.getActiveThreadCount());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private ServiceBuilder(ServiceBuilderConfig serviceBuilderConfig, ServiceConfig
this.storageExecutor = executorBuilder.apply(serviceConfig.getStorageThreadPoolSize(), instancePrefix + "storage-io", Thread.NORM_PRIORITY);
this.lowPriorityExecutor = executorBuilder.apply(serviceConfig.getLowPriorityThreadPoolSize(),
instancePrefix + "low-priority-cleanup", Thread.MIN_PRIORITY);
this.threadPoolMetrics = new SegmentStoreMetrics.ThreadPool(this.coreExecutor);
this.threadPoolMetrics = new SegmentStoreMetrics.ThreadPool(this.coreExecutor, this.storageExecutor);

this.cacheManager = new CacheManager(serviceConfig.getCachePolicy(), this.coreExecutor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,18 +166,20 @@ final public CompletableFuture<ChunkHandle> createWithContent(String chunkName,
Preconditions.checkArgument(null != data, "data must not be null");
Preconditions.checkArgument(length > 0, "length must be non-zero and non-negative");

val traceId = LoggerHelpers.traceEnter(log, "create", chunkName);
val traceId = LoggerHelpers.traceEnter(log, "CreateWithContent", chunkName);
val opContext = new OperationContext();

// Call concrete implementation.
val returnFuture = doCreateWithContentAsync(chunkName, length, data, opContext);
returnFuture.thenAcceptAsync(handle -> {
// Record metrics.
val elapsed = opContext.getInclusiveLatency();
ChunkStorageMetrics.CREATE_LATENCY.reportSuccessEvent(elapsed);
// Write metrics for create
ChunkStorageMetrics.WRITE_LATENCY.reportSuccessEvent(elapsed);
ChunkStorageMetrics.WRITE_BYTES.add(length);
ChunkStorageMetrics.CREATE_COUNT.inc();
log.debug("Create - chunk={}, latency={}.", chunkName, elapsed.toMillis());
LoggerHelpers.traceLeave(log, "create", traceId, chunkName);
log.debug("CreateWithContent - chunk={}, bytesWritten={}, latency={}.", handle.getChunkName(), length, elapsed.toMillis());
LoggerHelpers.traceLeave(log, "CreateWithContent", traceId, chunkName);
}, executor);
return returnFuture;
}
Expand Down Expand Up @@ -507,6 +509,11 @@ public void close() {
this.closed.set(true);
}

@Override
public void report() {
// Nothing to report yet.
}

/**
* Retrieves the ChunkInfo for given name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* It is recommended that the implementations should extend {@link BaseChunkStorage}.
*/
@Beta
public interface ChunkStorage extends AutoCloseable {
public interface ChunkStorage extends AutoCloseable, StatsReporter {
/**
* Gets a value indicating whether this Storage implementation supports {@link ChunkStorage#truncate(ChunkHandle, long)} operation on underlying storage object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import io.pravega.shared.MetricsNames;
import io.pravega.shared.metrics.Counter;
import io.pravega.shared.metrics.DynamicLogger;
import io.pravega.shared.metrics.MetricsProvider;
import io.pravega.shared.metrics.OpStatsLogger;
import io.pravega.shared.metrics.StatsLogger;
Expand All @@ -19,6 +20,7 @@
* Defines all Metrics used by the {@link BaseChunkStorage} class.
*/
public class ChunkStorageMetrics {
static final DynamicLogger DYNAMIC_LOGGER = MetricsProvider.getDynamicLogger();
private static final StatsLogger STATS_LOGGER = MetricsProvider.createStatsLogger("ChunkStorage");

static final OpStatsLogger READ_LATENCY = STATS_LOGGER.createStats(MetricsNames.STORAGE_READ_LATENCY);
Expand All @@ -34,6 +36,10 @@ public class ChunkStorageMetrics {
static final OpStatsLogger SLTS_CONCAT_LATENCY = STATS_LOGGER.createStats(MetricsNames.SLTS_CONCAT_LATENCY);
static final OpStatsLogger SLTS_TRUNCATE_LATENCY = STATS_LOGGER.createStats(MetricsNames.SLTS_TRUNCATE_LATENCY);
static final OpStatsLogger SLTS_READ_INDEX_SCAN_LATENCY = STATS_LOGGER.createStats(MetricsNames.SLTS_READ_INDEX_SCAN_LATENCY);
static final OpStatsLogger SLTS_READ_INDEX_NUM_SCANNED = STATS_LOGGER.createStats(MetricsNames.SLTS_READ_INDEX_NUM_SCANNED);

static final OpStatsLogger SLTS_SYS_READ_INDEX_SCAN_LATENCY = STATS_LOGGER.createStats(MetricsNames.SLTS_SYS_READ_INDEX_SCAN_LATENCY);
static final OpStatsLogger SLTS_SYS_READ_INDEX_NUM_SCANNED = STATS_LOGGER.createStats(MetricsNames.SLTS_SYS_READ_INDEX_NUM_SCANNED);

static final Counter READ_BYTES = STATS_LOGGER.createCounter(MetricsNames.STORAGE_READ_BYTES);
static final Counter WRITE_BYTES = STATS_LOGGER.createCounter(MetricsNames.STORAGE_WRITE_BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

Expand All @@ -65,7 +67,7 @@
*/
@Slf4j
@Beta
public class ChunkedSegmentStorage implements Storage {
public class ChunkedSegmentStorage implements Storage, StatsReporter {
/**
* Configuration options for this ChunkedSegmentStorage instance.
*/
Expand Down Expand Up @@ -139,16 +141,18 @@ public class ChunkedSegmentStorage implements Storage {
@Getter
private final GarbageCollector garbageCollector;

private final ScheduledFuture<?> reporter;

/**
* Creates a new instance of the ChunkedSegmentStorage class.
*
* @param containerId container id.
* @param chunkStorage ChunkStorage instance.
* @param metadataStore Metadata store.
* @param executor An Executor for async operations.
* @param executor A {@link ScheduledExecutorService} for async operations.
* @param config Configuration options for this ChunkedSegmentStorage instance.
*/
public ChunkedSegmentStorage(int containerId, ChunkStorage chunkStorage, ChunkMetadataStore metadataStore, Executor executor, ChunkedSegmentStorageConfig config) {
public ChunkedSegmentStorage(int containerId, ChunkStorage chunkStorage, ChunkMetadataStore metadataStore, ScheduledExecutorService executor, ChunkedSegmentStorageConfig config) {
this.containerId = containerId;
this.config = Preconditions.checkNotNull(config, "config");
this.chunkStorage = Preconditions.checkNotNull(chunkStorage, "chunkStorage");
Expand All @@ -165,8 +169,9 @@ public ChunkedSegmentStorage(int containerId, ChunkStorage chunkStorage, ChunkMe
chunkStorage,
metadataStore,
config,
(ScheduledExecutorService) executor);
executor);
this.closed = new AtomicBoolean(false);
this.reporter = executor.scheduleAtFixedRate(this::report, 1000, 1000, TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -643,11 +648,19 @@ public CompletableFuture<Boolean> exists(String streamSegmentName, Duration time
}, streamSegmentName);
}

@Override
public void report() {
garbageCollector.report();
metadataStore.report();
chunkStorage.report();
}

@Override
public void close() {
close("metadataStore", this.metadataStore);
close("garbageCollector", this.garbageCollector);
close("chunkStorage", this.chunkStorage);
this.reporter.cancel(true);
this.closed.set(true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

import static io.pravega.shared.MetricsNames.SLTS_GC_QUEUE_SIZE;

/**
* Implements simple garbage collector for cleaning up the deleted chunks.
* The garbage collector maintains a in memory queue of chunks to delete which is drained by a background task.
Expand All @@ -55,7 +57,7 @@
* </ol>
*/
@Slf4j
public class GarbageCollector extends AbstractThreadPoolService implements AutoCloseable {
public class GarbageCollector extends AbstractThreadPoolService implements AutoCloseable, StatsReporter {
private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(10);
/**
* Set of garbage chunks.
Expand Down Expand Up @@ -364,6 +366,11 @@ public void close() {
}
}

@Override
public void report() {
ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue(SLTS_GC_QUEUE_SIZE, queueSize.get());
}

@RequiredArgsConstructor
@Data
class GarbageChunkInfo implements Delayed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;

import static io.pravega.shared.MetricsNames.SLTS_READ_INDEX_CHUNK_INDEX_SIZE;
import static io.pravega.shared.MetricsNames.SLTS_READ_INDEX_SEGMENT_INDEX_SIZE;
import static io.pravega.shared.MetricsNames.SLTS_READ_INDEX_SEGMENT_MISS_RATE;

/**
* An in-memory implementation of cache for read index that maps chunk start offset to chunk name for recently used segments.
* The least accessed segments are removed entirely as well as removing chunks that are least recently used.
*/
class ReadIndexCache {
class ReadIndexCache implements StatsReporter {
/**
* Keeps track of all per segment ReadIndex.
*/
Expand Down Expand Up @@ -227,6 +231,13 @@ public void cleanUp() {
indexEntryCache.cleanUp();
}

@Override
public void report() {
ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue(SLTS_READ_INDEX_SEGMENT_INDEX_SIZE, segmentsReadIndexCache.size());
ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue(SLTS_READ_INDEX_SEGMENT_MISS_RATE, segmentsReadIndexCache.stats().missRate());
ChunkStorageMetrics.DYNAMIC_LOGGER.reportGaugeValue(SLTS_READ_INDEX_CHUNK_INDEX_SIZE, indexEntryCache.size());
}

/**
* Per segment read index.
* The index contains mapping from start offset to entries containing chunk name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import java.util.concurrent.atomic.AtomicLong;

import static io.pravega.segmentstore.storage.chunklayer.ChunkStorageMetrics.SLTS_READ_BYTES;
import static io.pravega.segmentstore.storage.chunklayer.ChunkStorageMetrics.SLTS_READ_LATENCY;
import static io.pravega.segmentstore.storage.chunklayer.ChunkStorageMetrics.SLTS_READ_INDEX_NUM_SCANNED;
import static io.pravega.segmentstore.storage.chunklayer.ChunkStorageMetrics.SLTS_READ_INDEX_SCAN_LATENCY;
import static io.pravega.segmentstore.storage.chunklayer.ChunkStorageMetrics.SLTS_SYS_READ_INDEX_NUM_SCANNED;
import static io.pravega.segmentstore.storage.chunklayer.ChunkStorageMetrics.SLTS_SYS_READ_INDEX_SCAN_LATENCY;
import static io.pravega.segmentstore.storage.chunklayer.ChunkStorageMetrics.SLTS_READ_LATENCY;

@Slf4j
class ReadOperation implements Callable<CompletableFuture<Integer>> {
Expand Down Expand Up @@ -248,7 +251,13 @@ private CompletableFuture<Void> findChunkForOffset(MetadataTransaction txn) {
chunkedSegmentStorage.getExecutor())
.thenAcceptAsync(v -> {
val elapsed = readIndexTimer.getElapsed();
SLTS_READ_INDEX_SCAN_LATENCY.reportSuccessEvent(elapsed);
if (segmentMetadata.isStorageSystemSegment()) {
SLTS_SYS_READ_INDEX_SCAN_LATENCY.reportSuccessEvent(elapsed);
SLTS_SYS_READ_INDEX_NUM_SCANNED.reportSuccessValue(cntScanned.get());
} else {
SLTS_READ_INDEX_SCAN_LATENCY.reportSuccessEvent(elapsed);
SLTS_READ_INDEX_NUM_SCANNED.reportSuccessValue(cntScanned.get());
}
log.debug("{} read - chunk lookup - op={}, segment={}, offset={}, scanned={}, latency={}.",
chunkedSegmentStorage.getLogPrefix(), System.identityHashCode(this),
handle.getSegmentName(), offset, cntScanned.get(), elapsed.toMillis());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright (c) Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*/
package io.pravega.segmentstore.storage.chunklayer;

/**
* Contract for components that reports stats.
*/
public interface StatsReporter {
/**
* Reports the Stats for the component.
*/
void report();
}
Loading

0 comments on commit 6f90911

Please sign in to comment.