Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 6683: LTS - Fix thread visibility issues #6687

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -30,10 +30,8 @@
import io.pravega.segmentstore.storage.metadata.StorageMetadataWritesFencedOutException;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -51,8 +49,8 @@ class ConcatOperation implements Callable<CompletableFuture<Void>> {
private final long offset;
private final String sourceSegment;
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final List<String> chunksToDelete = Collections.synchronizedList(new ArrayList<>());
private final List<ChunkNameOffsetPair> newReadIndexEntries = Collections.synchronizedList(new ArrayList<>());
private final List<String> chunksToDelete = new Vector<>();
private final List<ChunkNameOffsetPair> newReadIndexEntries = new Vector<>();
private final Timer timer;

private volatile SegmentMetadata targetSegmentMetadata;
Expand Down
Expand Up @@ -23,11 +23,9 @@
import io.pravega.segmentstore.storage.metadata.SegmentMetadata;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -113,7 +111,7 @@ class DefragmentOperation implements Callable<CompletableFuture<Void>> {
private final List<String> chunksToDelete;
private final ChunkedSegmentStorage chunkedSegmentStorage;

private volatile List<ChunkInfo> chunksToConcat = Collections.synchronizedList(new ArrayList<>());
private volatile List<ChunkInfo> chunksToConcat = new Vector<>();
private final List<ChunkNameOffsetPair> newReadIndexEntries;
private volatile ChunkMetadata target;
private volatile String targetChunkName;
Expand Down Expand Up @@ -259,7 +257,7 @@ private CompletableFuture<Void> concatChunks() {
segmentMetadata.setLastChunkStartOffset(segmentMetadata.getLength() - target.getLength());
}

final List<CompletableFuture<Void>> futures = Collections.synchronizedList(new ArrayList<>());
final List<CompletableFuture<Void>> futures = new Vector<>();
// Update metadata for affected chunks.
for (int i = 1; i < concatArgs.length; i++) {
final int n = i;
Expand All @@ -280,7 +278,7 @@ private CompletableFuture<Void> concatChunks() {
}

private CompletableFuture<Void> gatherChunks() {
chunksToConcat = Collections.synchronizedList(new ArrayList<>());
chunksToConcat = new Vector<>();

return txn.get(targetChunkName)
.thenComposeAsync(storageMetadata -> {
Expand Down
Expand Up @@ -29,10 +29,8 @@
import io.pravega.shared.NameUtils;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -153,7 +151,7 @@ private void logEnd() {
}

private CompletableFuture<Void> readData(MetadataTransaction txn) {
val chunkReadFutures = Collections.synchronizedList(new ArrayList<CompletableFuture<Void>>());
val chunkReadFutures = new Vector<CompletableFuture<Void>>();
return Futures.loop(
() -> bytesRemaining.get() > 0 && null != currentChunkName,
() -> {
Expand Down
Expand Up @@ -41,6 +41,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -186,7 +187,7 @@ public class SystemJournal {
/**
* List of chunks (journals & snapshots) to delete after snapshot.
*/
final private List<String> pendingGarbageChunks = Collections.synchronizedList(new ArrayList<>());
final private List<String> pendingGarbageChunks = new Vector<>();

/**
* Configuration {@link ChunkedSegmentStorageConfig} for the {@link ChunkedSegmentStorage}.
Expand Down Expand Up @@ -689,28 +690,29 @@ private CompletableFuture<Void> validateSystemSnapshotExistsInTxn(MetadataTransa
if (null == systemSnapshot) {
return CompletableFuture.completedFuture(null);
}

val iterator = systemSnapshot.getSegmentSnapshotRecords().iterator();
// For each segment in snapshot
return Futures.loop(
systemSnapshot.getSegmentSnapshotRecords(),
segmentSnapshot ->
txn.get(segmentSnapshot.segmentMetadata.getKey())
.thenComposeAsync(m -> validateChunksInSegmentSnapshot(txn, segmentSnapshot), executor)
.thenComposeAsync(vv -> validateSegment(txn, segmentSnapshot.segmentMetadata.getKey()), executor)
.thenApplyAsync(v -> true, executor),
executor);
() -> iterator.hasNext(),
() -> {
val segmentSnapshot = iterator.next();
return txn.get(segmentSnapshot.segmentMetadata.getKey())
.thenComposeAsync(m -> validateChunksInSegmentSnapshot(txn, segmentSnapshot), executor)
.thenComposeAsync(vv -> validateSegment(txn, segmentSnapshot.segmentMetadata.getKey()), executor);

}, executor);
}

private CompletableFuture<Void> validateChunksInSegmentSnapshot(MetadataTransaction txn, SegmentSnapshotRecord segmentSnapshot) {
val iterator = segmentSnapshot.getChunkMetadataCollection().iterator();
// For each chunk in the segment
return Futures.loop(
segmentSnapshot.getChunkMetadataCollection(),
m -> txn.get(m.getKey())
.thenApplyAsync(mm -> {
Preconditions.checkState(null != mm, "Chunk metadata must not be null.");
return true;
}, executor),
executor);
() -> iterator.hasNext(),
() -> {
val m = iterator.next();
return txn.get(m.getKey())
.thenAcceptAsync(mm -> Preconditions.checkState(null != mm, "Chunk metadata must not be null."), executor);
}, executor);
}

/**
Expand Down Expand Up @@ -831,7 +833,7 @@ private CompletableFuture<Void> applySystemLogOperations(MetadataTransaction txn

val epochToStartScanning = new AtomicLong();
val fileIndexToRecover = new AtomicInteger(1);
val journalsProcessed = Collections.synchronizedList(new ArrayList<String>());
val journalsProcessed = new Vector<String>();
// Starting with journal file after last snapshot,
if (null != systemSnapshotRecord) {
epochToStartScanning.set(systemSnapshotRecord.epoch);
Expand Down Expand Up @@ -909,11 +911,13 @@ private CompletableFuture<Void> processJournalContents(MetadataTransaction txn,
try {
log.debug("SystemJournal[{}] Processing journal {}.", containerId, systemLogName);
val batch = BATCH_SERIALIZER.deserialize(input);

val iterator = batch.getSystemJournalRecords().iterator();
return Futures.loop(
batch.getSystemJournalRecords(),
record -> applyRecord(txn, state, record)
.thenApply(r -> true),
() -> iterator.hasNext(),
() -> {
val record = iterator.next();
return applyRecord(txn, state, record);
},
executor);
} catch (EOFException e) {
log.debug("SystemJournal[{}] Done processing journal {}.", containerId, systemLogName);
Expand All @@ -940,7 +944,7 @@ private CompletableFuture<Void> applyRecord(MetadataTransaction txn,
}
state.visitedRecords.add(record);
state.recordsProcessedCount.incrementAndGet();
CompletableFuture<Void> retValue = null;
final CompletableFuture<Void> retValue;
// ChunkAddedRecord.
if (record instanceof ChunkAddedRecord) {
val chunkAddedRecord = (ChunkAddedRecord) record;
Expand All @@ -958,8 +962,7 @@ private CompletableFuture<Void> applyRecord(MetadataTransaction txn,
} else if (record instanceof SystemSnapshotRecord) {
val snapshotRecord = (SystemSnapshotRecord) record;
retValue = Futures.toVoid(applySystemSnapshotRecord(txn, state, snapshotRecord));
}
if (null == retValue) {
} else {
// Unknown record.
retValue = CompletableFuture.failedFuture(new IllegalStateException(String.format("Unknown record type encountered. record = %s", record)));
}
Expand Down Expand Up @@ -1034,11 +1037,11 @@ private CompletableFuture<Void> applyChunkAddition(MetadataTransaction txn, Map<
Preconditions.checkState(null != oldChunkName, "oldChunkName must not be null");
Preconditions.checkState(null != newChunkName && !newChunkName.isEmpty(), "newChunkName must not be null or empty");

return txn.get(segmentName)
return validateSegment(txn, segmentName)
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
.thenComposeAsync(v -> txn.get(segmentName), executor)
.thenComposeAsync(m -> {
val segmentMetadata = (SegmentMetadata) m;
segmentMetadata.checkInvariants();
validateSegment(txn, segmentName);
// set length.
segmentMetadata.setLength(offset);

Expand Down Expand Up @@ -1166,10 +1169,10 @@ private CompletableFuture<SystemSnapshotRecord> createSystemSnapshotRecord(Metad
val systemSnapshot = SystemSnapshotRecord.builder()
.epoch(epoch)
.fileIndex(currentFileIndex.get())
.segmentSnapshotRecords(new ArrayList<>())
.segmentSnapshotRecords(new Vector<>())
.build();

val futures = Collections.synchronizedList(new ArrayList<CompletableFuture<Void>>());
val futures = new ArrayList<CompletableFuture<Void>>();
for (val systemSegment : systemSegments) {
// Find segment metadata.
val future = txn.get(systemSegment)
Expand All @@ -1179,7 +1182,7 @@ private CompletableFuture<SystemSnapshotRecord> createSystemSnapshotRecord(Metad

val segmentSnapshot = SegmentSnapshotRecord.builder()
.segmentMetadata(segmentMetadata)
.chunkMetadataCollection(new ArrayList<>())
.chunkMetadataCollection(new Vector<>())
.build();

// Enumerate all chunks.
Expand Down Expand Up @@ -1223,9 +1226,7 @@ private CompletableFuture<SystemSnapshotRecord> createSystemSnapshotRecord(Metad
}

// Add to the system snapshot.
synchronized (systemSnapshot) {
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
systemSnapshot.segmentSnapshotRecords.add(segmentSnapshot);
}
systemSnapshot.segmentSnapshotRecords.add(segmentSnapshot);
}, executor);
}, executor);
futures.add(future);
Expand Down Expand Up @@ -1452,7 +1453,7 @@ protected void declareVersions() {
}

private void read00(RevisionDataInput input, SystemJournalRecordBatchBuilder b) throws IOException {
b.systemJournalRecords(input.readCollection(ELEMENT_DESERIALIZER));
b.systemJournalRecords(input.readCollection(ELEMENT_DESERIALIZER, () -> new Vector<>()));
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
}

private void write00(SystemJournalRecordBatch object, RevisionDataOutput output) throws IOException {
Expand Down Expand Up @@ -1694,7 +1695,7 @@ private void write00(SegmentSnapshotRecord object, RevisionDataOutput output) th

private void read00(RevisionDataInput input, SegmentSnapshotRecord.SegmentSnapshotRecordBuilder b) throws IOException {
b.segmentMetadata((SegmentMetadata) SEGMENT_METADATA_SERIALIZER.deserialize(input.getBaseStream()));
b.chunkMetadataCollection(input.readCollection(ELEMENT_DESERIALIZER));
b.chunkMetadataCollection(input.readCollection(ELEMENT_DESERIALIZER, () -> new Vector<>()));
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -1769,7 +1770,7 @@ private void write00(SystemSnapshotRecord object, RevisionDataOutput output) thr
private void read00(RevisionDataInput input, SystemSnapshotRecord.SystemSnapshotRecordBuilder b) throws IOException {
b.epoch(input.readCompactLong());
b.fileIndex(input.readCompactInt());
b.segmentSnapshotRecords(input.readCollection(ELEMENT_DESERIALIZER));
b.segmentSnapshotRecords(input.readCollection(ELEMENT_DESERIALIZER, () -> new Vector<>()));
sachin-j-joshi marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
Expand Up @@ -31,8 +31,8 @@

import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -48,7 +48,7 @@ class TruncateOperation implements Callable<CompletableFuture<Void>> {
private final SegmentHandle handle;
private final long offset;
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final List<String> chunksToDelete = Collections.synchronizedList(new ArrayList<>());
private final List<String> chunksToDelete = new Vector<>();
private final long traceId;
private final Timer timer;
private volatile String currentChunkName;
Expand Down
Expand Up @@ -36,8 +36,8 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -216,7 +216,7 @@ public CompletableFuture<Void> overwriteChunk(String chunkName, InputStream inpu
*/
public CompletableFuture<List<ExtendedChunkInfo>> getExtendedChunkInfoList(String streamSegmentName, boolean checkStorage) {
Preconditions.checkNotNull(streamSegmentName, "streamSegmentName");
val infoList = Collections.synchronizedList(new ArrayList<ExtendedChunkInfo>());
val infoList = new Vector<ExtendedChunkInfo>();
return chunkedSegmentStorage.executeSerialized(() -> chunkedSegmentStorage.tryWith(
chunkedSegmentStorage.getMetadataStore().beginTransaction(true, streamSegmentName),
txn -> txn.get(streamSegmentName)
Expand Down
Expand Up @@ -33,9 +33,8 @@
import lombok.val;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -63,8 +62,8 @@ class WriteOperation implements Callable<CompletableFuture<Void>> {
private final ChunkedSegmentStorage chunkedSegmentStorage;
private final long traceId;
private final Timer timer;
private final List<SystemJournal.SystemJournalRecord> systemLogRecords = Collections.synchronizedList(new ArrayList<>());
private final List<ChunkNameOffsetPair> newReadIndexEntries = Collections.synchronizedList(new ArrayList<>());
private final List<SystemJournal.SystemJournalRecord> systemLogRecords = new Vector<>();
private final List<ChunkNameOffsetPair> newReadIndexEntries = new Vector<>();
private final AtomicInteger chunksAddedCount = new AtomicInteger();

private volatile boolean isCommitted = false;
Expand Down