Skip to content

Commit

Permalink
Issue 6683: LTS - Fix thread visibility issues (#6687)
Browse files Browse the repository at this point in the history
Fix possible thread visibility issue with SystemJournal serialization.

Signed-off-by: Sachin Joshi <sachin.joshi@emc.com>
  • Loading branch information
sachin-j-joshi committed May 10, 2022
1 parent 0205da8 commit e72018d
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 58 deletions.
Expand Up @@ -30,10 +30,8 @@
import io.pravega.segmentstore.storage.metadata.StorageMetadataWritesFencedOutException;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -51,8 +49,8 @@ class ConcatOperation implements Callable<CompletableFuture<Void>> {
private final long offset;
private final String sourceSegment;
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final List<String> chunksToDelete = Collections.synchronizedList(new ArrayList<>());
private final List<ChunkNameOffsetPair> newReadIndexEntries = Collections.synchronizedList(new ArrayList<>());
private final List<String> chunksToDelete = new Vector<>();
private final List<ChunkNameOffsetPair> newReadIndexEntries = new Vector<>();
private final Timer timer;

private volatile SegmentMetadata targetSegmentMetadata;
Expand Down
Expand Up @@ -23,11 +23,9 @@
import io.pravega.segmentstore.storage.metadata.SegmentMetadata;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -113,7 +111,7 @@ class DefragmentOperation implements Callable<CompletableFuture<Void>> {
private final List<String> chunksToDelete;
private final ChunkedSegmentStorage chunkedSegmentStorage;

private volatile List<ChunkInfo> chunksToConcat = Collections.synchronizedList(new ArrayList<>());
private volatile List<ChunkInfo> chunksToConcat = new Vector<>();
private final List<ChunkNameOffsetPair> newReadIndexEntries;
private volatile ChunkMetadata target;
private volatile String targetChunkName;
Expand Down Expand Up @@ -259,7 +257,7 @@ private CompletableFuture<Void> concatChunks() {
segmentMetadata.setLastChunkStartOffset(segmentMetadata.getLength() - target.getLength());
}

final List<CompletableFuture<Void>> futures = Collections.synchronizedList(new ArrayList<>());
final List<CompletableFuture<Void>> futures = new Vector<>();
// Update metadata for affected chunks.
for (int i = 1; i < concatArgs.length; i++) {
final int n = i;
Expand All @@ -280,7 +278,7 @@ private CompletableFuture<Void> concatChunks() {
}

private CompletableFuture<Void> gatherChunks() {
chunksToConcat = Collections.synchronizedList(new ArrayList<>());
chunksToConcat = new Vector<>();

return txn.get(targetChunkName)
.thenComposeAsync(storageMetadata -> {
Expand Down
Expand Up @@ -29,10 +29,8 @@
import io.pravega.shared.NameUtils;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -153,7 +151,7 @@ private void logEnd() {
}

private CompletableFuture<Void> readData(MetadataTransaction txn) {
val chunkReadFutures = Collections.synchronizedList(new ArrayList<CompletableFuture<Void>>());
val chunkReadFutures = new Vector<CompletableFuture<Void>>();
return Futures.loop(
() -> bytesRemaining.get() > 0 && null != currentChunkName,
() -> {
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -186,7 +187,7 @@ public class SystemJournal {
/**
* List of chunks (journals & snapshots) to delete after snapshot.
*/
final private List<String> pendingGarbageChunks = Collections.synchronizedList(new ArrayList<>());
final private List<String> pendingGarbageChunks = new Vector<>();

/**
* Configuration {@link ChunkedSegmentStorageConfig} for the {@link ChunkedSegmentStorage}.
Expand Down Expand Up @@ -689,28 +690,29 @@ private CompletableFuture<Void> validateSystemSnapshotExistsInTxn(MetadataTransa
if (null == systemSnapshot) {
return CompletableFuture.completedFuture(null);
}

val iterator = systemSnapshot.getSegmentSnapshotRecords().iterator();
// For each segment in snapshot
return Futures.loop(
systemSnapshot.getSegmentSnapshotRecords(),
segmentSnapshot ->
txn.get(segmentSnapshot.segmentMetadata.getKey())
.thenComposeAsync(m -> validateChunksInSegmentSnapshot(txn, segmentSnapshot), executor)
.thenComposeAsync(vv -> validateSegment(txn, segmentSnapshot.segmentMetadata.getKey()), executor)
.thenApplyAsync(v -> true, executor),
executor);
() -> iterator.hasNext(),
() -> {
val segmentSnapshot = iterator.next();
return txn.get(segmentSnapshot.segmentMetadata.getKey())
.thenComposeAsync(m -> validateChunksInSegmentSnapshot(txn, segmentSnapshot), executor)
.thenComposeAsync(vv -> validateSegment(txn, segmentSnapshot.segmentMetadata.getKey()), executor);

}, executor);
}

private CompletableFuture<Void> validateChunksInSegmentSnapshot(MetadataTransaction txn, SegmentSnapshotRecord segmentSnapshot) {
val iterator = segmentSnapshot.getChunkMetadataCollection().iterator();
// For each chunk in the segment
return Futures.loop(
segmentSnapshot.getChunkMetadataCollection(),
m -> txn.get(m.getKey())
.thenApplyAsync(mm -> {
Preconditions.checkState(null != mm, "Chunk metadata must not be null.");
return true;
}, executor),
executor);
() -> iterator.hasNext(),
() -> {
val m = iterator.next();
return txn.get(m.getKey())
.thenAcceptAsync(mm -> Preconditions.checkState(null != mm, "Chunk metadata must not be null."), executor);
}, executor);
}

/**
Expand Down Expand Up @@ -831,7 +833,7 @@ private CompletableFuture<Void> applySystemLogOperations(MetadataTransaction txn

val epochToStartScanning = new AtomicLong();
val fileIndexToRecover = new AtomicInteger(1);
val journalsProcessed = Collections.synchronizedList(new ArrayList<String>());
val journalsProcessed = new Vector<String>();
// Starting with journal file after last snapshot,
if (null != systemSnapshotRecord) {
epochToStartScanning.set(systemSnapshotRecord.epoch);
Expand Down Expand Up @@ -909,11 +911,13 @@ private CompletableFuture<Void> processJournalContents(MetadataTransaction txn,
try {
log.debug("SystemJournal[{}] Processing journal {}.", containerId, systemLogName);
val batch = BATCH_SERIALIZER.deserialize(input);

val iterator = batch.getSystemJournalRecords().iterator();
return Futures.loop(
batch.getSystemJournalRecords(),
record -> applyRecord(txn, state, record)
.thenApply(r -> true),
() -> iterator.hasNext(),
() -> {
val record = iterator.next();
return applyRecord(txn, state, record);
},
executor);
} catch (EOFException e) {
log.debug("SystemJournal[{}] Done processing journal {}.", containerId, systemLogName);
Expand All @@ -940,7 +944,7 @@ private CompletableFuture<Void> applyRecord(MetadataTransaction txn,
}
state.visitedRecords.add(record);
state.recordsProcessedCount.incrementAndGet();
CompletableFuture<Void> retValue = null;
final CompletableFuture<Void> retValue;
// ChunkAddedRecord.
if (record instanceof ChunkAddedRecord) {
val chunkAddedRecord = (ChunkAddedRecord) record;
Expand All @@ -958,8 +962,7 @@ private CompletableFuture<Void> applyRecord(MetadataTransaction txn,
} else if (record instanceof SystemSnapshotRecord) {
val snapshotRecord = (SystemSnapshotRecord) record;
retValue = Futures.toVoid(applySystemSnapshotRecord(txn, state, snapshotRecord));
}
if (null == retValue) {
} else {
// Unknown record.
retValue = CompletableFuture.failedFuture(new IllegalStateException(String.format("Unknown record type encountered. record = %s", record)));
}
Expand Down Expand Up @@ -1034,11 +1037,11 @@ private CompletableFuture<Void> applyChunkAddition(MetadataTransaction txn, Map<
Preconditions.checkState(null != oldChunkName, "oldChunkName must not be null");
Preconditions.checkState(null != newChunkName && !newChunkName.isEmpty(), "newChunkName must not be null or empty");

return txn.get(segmentName)
return validateSegment(txn, segmentName)
.thenComposeAsync(v -> txn.get(segmentName), executor)
.thenComposeAsync(m -> {
val segmentMetadata = (SegmentMetadata) m;
segmentMetadata.checkInvariants();
validateSegment(txn, segmentName);
// set length.
segmentMetadata.setLength(offset);

Expand Down Expand Up @@ -1166,10 +1169,10 @@ private CompletableFuture<SystemSnapshotRecord> createSystemSnapshotRecord(Metad
val systemSnapshot = SystemSnapshotRecord.builder()
.epoch(epoch)
.fileIndex(currentFileIndex.get())
.segmentSnapshotRecords(new ArrayList<>())
.segmentSnapshotRecords(new Vector<>())
.build();

val futures = Collections.synchronizedList(new ArrayList<CompletableFuture<Void>>());
val futures = new ArrayList<CompletableFuture<Void>>();
for (val systemSegment : systemSegments) {
// Find segment metadata.
val future = txn.get(systemSegment)
Expand All @@ -1179,7 +1182,7 @@ private CompletableFuture<SystemSnapshotRecord> createSystemSnapshotRecord(Metad

val segmentSnapshot = SegmentSnapshotRecord.builder()
.segmentMetadata(segmentMetadata)
.chunkMetadataCollection(new ArrayList<>())
.chunkMetadataCollection(new Vector<>())
.build();

// Enumerate all chunks.
Expand Down Expand Up @@ -1223,9 +1226,7 @@ private CompletableFuture<SystemSnapshotRecord> createSystemSnapshotRecord(Metad
}

// Add to the system snapshot.
synchronized (systemSnapshot) {
systemSnapshot.segmentSnapshotRecords.add(segmentSnapshot);
}
systemSnapshot.segmentSnapshotRecords.add(segmentSnapshot);
}, executor);
}, executor);
futures.add(future);
Expand Down Expand Up @@ -1452,7 +1453,7 @@ protected void declareVersions() {
}

private void read00(RevisionDataInput input, SystemJournalRecordBatchBuilder b) throws IOException {
b.systemJournalRecords(input.readCollection(ELEMENT_DESERIALIZER));
b.systemJournalRecords(input.readCollection(ELEMENT_DESERIALIZER, () -> new Vector<>()));
}

private void write00(SystemJournalRecordBatch object, RevisionDataOutput output) throws IOException {
Expand Down Expand Up @@ -1694,7 +1695,7 @@ private void write00(SegmentSnapshotRecord object, RevisionDataOutput output) th

private void read00(RevisionDataInput input, SegmentSnapshotRecord.SegmentSnapshotRecordBuilder b) throws IOException {
b.segmentMetadata((SegmentMetadata) SEGMENT_METADATA_SERIALIZER.deserialize(input.getBaseStream()));
b.chunkMetadataCollection(input.readCollection(ELEMENT_DESERIALIZER));
b.chunkMetadataCollection(input.readCollection(ELEMENT_DESERIALIZER, () -> new Vector<>()));
}
}
}
Expand Down Expand Up @@ -1769,7 +1770,7 @@ private void write00(SystemSnapshotRecord object, RevisionDataOutput output) thr
private void read00(RevisionDataInput input, SystemSnapshotRecord.SystemSnapshotRecordBuilder b) throws IOException {
b.epoch(input.readCompactLong());
b.fileIndex(input.readCompactInt());
b.segmentSnapshotRecords(input.readCollection(ELEMENT_DESERIALIZER));
b.segmentSnapshotRecords(input.readCollection(ELEMENT_DESERIALIZER, () -> new Vector<>()));
}
}
}
Expand Down
Expand Up @@ -31,8 +31,8 @@

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -48,7 +48,7 @@ class TruncateOperation implements Callable<CompletableFuture<Void>> {
private final SegmentHandle handle;
private final long offset;
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final List<String> chunksToDelete = Collections.synchronizedList(new ArrayList<>());
private final List<String> chunksToDelete = new Vector<>();
private final long traceId;
private final Timer timer;
private volatile String currentChunkName;
Expand Down
Expand Up @@ -36,8 +36,8 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -216,7 +216,7 @@ public CompletableFuture<Void> overwriteChunk(String chunkName, InputStream inpu
*/
public CompletableFuture<List<ExtendedChunkInfo>> getExtendedChunkInfoList(String streamSegmentName, boolean checkStorage) {
Preconditions.checkNotNull(streamSegmentName, "streamSegmentName");
val infoList = Collections.synchronizedList(new ArrayList<ExtendedChunkInfo>());
val infoList = new Vector<ExtendedChunkInfo>();
return chunkedSegmentStorage.executeSerialized(() -> chunkedSegmentStorage.tryWith(
chunkedSegmentStorage.getMetadataStore().beginTransaction(true, streamSegmentName),
txn -> txn.get(streamSegmentName)
Expand Down
Expand Up @@ -33,9 +33,8 @@
import lombok.val;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -63,8 +62,8 @@ class WriteOperation implements Callable<CompletableFuture<Void>> {
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final long traceId;
private final Timer timer;
private final List<SystemJournal.SystemJournalRecord> systemLogRecords = Collections.synchronizedList(new ArrayList<>());
private final List<ChunkNameOffsetPair> newReadIndexEntries = Collections.synchronizedList(new ArrayList<>());
private final List<SystemJournal.SystemJournalRecord> systemLogRecords = new Vector<>();
private final List<ChunkNameOffsetPair> newReadIndexEntries = new Vector<>();
private final AtomicInteger chunksAddedCount = new AtomicInteger();

private volatile boolean isCommitted = false;
Expand Down

0 comments on commit e72018d

Please sign in to comment.