Skip to content
This repository has been archived by the owner on Oct 3, 2018. It is now read-only.

Use optimistic locking to reduce lock contention in metric store #6

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions src/main/java/com/pinterest/yuvi/bitstream/BitStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ public void write(int n, long v) {
}

public Map<String, Double> getStats() {
HashMap<String, Double> stats = new HashMap<>();
stats.put("dataLength", new Double(index));
stats.put("dataSize", new Double(getSerializedByteSize()));
stats.put("capacity", new Double(capacity));
Map<String, Double> stats = new HashMap<>();
stats.put("dataLength", (double) index);
stats.put("dataSize", (double) getSerializedByteSize());
stats.put("capacity", (double) capacity);
return Collections.unmodifiableMap(stats);
}

Expand All @@ -117,10 +117,9 @@ public BitStreamIterator read() {
/**
* Construct a new BitReader using the data in the given ByteBuffer.
* @param buffer a buffer containing the data
* @throws Exception if the parsing failed.
* @return a new BitReader
*/
public static BitStream deserialize(ByteBuffer buffer) throws Exception {
public static BitStream deserialize(ByteBuffer buffer) {
int validDataSize = buffer.getInt();
byte shift = buffer.get();
long[] data = new long[validDataSize];
Expand All @@ -134,9 +133,8 @@ public static BitStream deserialize(ByteBuffer buffer) throws Exception {
/**
* Write the data to a pre-allocated ByteBuffer.
* @param buffer must have capacity greater or equal to serializedSize
* @throws Exception if buffer is invalid
*/
public void serialize(ByteBuffer buffer) throws Exception {
public void serialize(ByteBuffer buffer) {
int validDataSize = getLastDataIndex();
buffer.putInt(validDataSize);
buffer.put(shift);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,8 @@ public void addPoint(Metric metric, long ts, double val) {
@Override
public Map<String, Object> getStats() {
Map<String, Object> stats = new HashMap<>();
metricStore.getStats().entrySet()
.forEach(entry -> stats.put("metricStore_" + entry.getKey(), entry.getValue()));
tagStore.getStats().entrySet()
.forEach(entry -> stats.put("tagStore_" + entry.getKey(), entry.getValue()));
metricStore.getStats().forEach((key, value) -> stats.put("metricStore_" + key, value));
tagStore.getStats().forEach((key, value) -> stats.put("tagStore_" + key, value));
return stats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class CachingVarBitTimeSeriesIterator implements TimeSeriesIterator {
private int prevNumberOfTrailingZeros;
private long prevValue;
private int idx;
private List<Point> ts;

/**
* Create an iterator to read a given delta time series store.
Expand All @@ -43,20 +42,20 @@ public CachingVarBitTimeSeriesIterator(int count, BitStreamIterator timestamps,
this.values = values;
}

private void readFirst() {
private void readFirst(List<Point> pointList) {
long t2 = timestamps.read(32);
previousDelta = timestamps.read(14);
prevTimestamp = t2 + previousDelta;

prevValue = values.read(64);
double v = Double.longBitsToDouble(prevValue);

ts.add(new Point(prevTimestamp, v));
pointList.add(new Point(prevTimestamp, v));

idx++;
}

private void readNext() {
private void readNext(List<Point> pointList) {
long deltaOfDelta;
if (timestamps.tryRead(1, 0)) {
deltaOfDelta = 0;
Expand Down Expand Up @@ -94,25 +93,24 @@ private void readNext() {
double val = Double.longBitsToDouble(v);
prevValue = v;

ts.add(new Point(timeStamp, val));
pointList.add(new Point(timeStamp, val));

idx++;
}

/**
* Perform the decompression. May only be called once.
* @throws Exception if the decompression fails.
* @return the decompressed time series.
*/
public List<Point> getPoints() {
ts = new ArrayList<Point>(count);
List<Point> pointList = new ArrayList<>(count);
if (count == 0) {
return ts;
return pointList;
}
readFirst();
readFirst(pointList);
while (idx < count) {
readNext();
readNext(pointList);
}
return ts;
return pointList;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@
*/
public interface TimeSeriesIterator {

public List<Point> getPoints();
List<Point> getPoints();
}
57 changes: 12 additions & 45 deletions src/main/java/com/pinterest/yuvi/metricstore/VarBitMetricStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -25,8 +25,7 @@ public class VarBitMetricStore implements MetricStore {
// TODO: Tune the default metrics size
private static final int DEFAULT_METRIC_STORE_SIZE = 10_000;

private HashMap<Long, VarBitTimeSeries> series;
private final ReentrantReadWriteLock mu;
private Map<Long, VarBitTimeSeries> series;

public VarBitMetricStore() {
this(DEFAULT_METRIC_STORE_SIZE);
Expand All @@ -36,68 +35,36 @@ public VarBitMetricStore() {
* Create an empty metric store.
*/
public VarBitMetricStore(int initialSize) {
series = new HashMap<>(initialSize);
mu = new ReentrantReadWriteLock();
series = new ConcurrentHashMap<>(initialSize);

LOG.info("Created a var bit metric store with size {}.", initialSize);
}

@Override
public List<Point> getSeries(long uuid) {
mu.readLock().lock();
try {
VarBitTimeSeries s = series.get(uuid);
if (s == null) {
return Collections.emptyList();
}
return s.read().getPoints();
} finally {
mu.readLock().unlock();
VarBitTimeSeries s = series.get(uuid);
if (s == null) {
return Collections.emptyList();
}
return s.read().getPoints();
}

@Override
public void addPoint(long uuid, long ts, double val) {
// Grab read lock for short path.
VarBitTimeSeries s;
mu.readLock().lock();
try {
s = series.get(uuid);
} finally {
mu.readLock().unlock();
}
if (s == null) {
// Retry with write lock if short path failed.
mu.writeLock().lock();
try {
s = series.get(uuid);
if (s == null) {
s = new VarBitTimeSeries();
series.put(uuid, s);
}
} finally {
mu.writeLock().unlock();
}
}
VarBitTimeSeries s = series.computeIfAbsent(uuid, k -> new VarBitTimeSeries());
s.append(ts, val);
}

private ArrayList<Long> getUuids() throws Exception {
// Copy the keys so that we don't hold the readLock for too long.
mu.readLock().lock();
try {
return new ArrayList<Long>(series.keySet());
} finally {
mu.readLock().unlock();
}
private List<Long> getUuids() {
return new ArrayList<>(series.keySet());
}

@Override
public Map<String, Object> getStats() {
Map<String, Object> stats = new HashMap<>();
stats.put("MetricCount", new Double(series.size()));
stats.put("MetricCount", (double) series.size());
List<Map<String, Double>> tsStats =
series.values().stream().map(ts -> ts.getStats()).collect(Collectors.toList());
series.values().stream().map(VarBitTimeSeries::getStats).collect(Collectors.toList());

stats.put("TimeStampSizeDistribution",
tsStats.stream().map(ts -> ts.get("timestamps_dataLength"))
Expand Down
Loading