Skip to content

Commit

Permalink
Well shit, I guess we have a working write ahead log
Browse files Browse the repository at this point in the history
  • Loading branch information
joshbooks committed Apr 1, 2019
1 parent 170e250 commit 5f314eb
Showing 1 changed file with 168 additions and 30 deletions.
198 changes: 168 additions & 30 deletions test/src/org/josh/JoshDB/FileTrie/ConsistencyTest.java
Expand Up @@ -12,10 +12,9 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReferenceArray;

public class ConsistencyTest
{
Expand Down Expand Up @@ -84,7 +83,7 @@ static void writeThreadFunction()
@Test
public void testWriteConsistency() throws InterruptedException
{
int numThreads = 0x20;
int numThreads = 0x7f;
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++)
{
Expand All @@ -97,39 +96,21 @@ public void testWriteConsistency() throws InterruptedException
threads[i].join();
}

InputStream testReader;

try
{
testReader = Files.newInputStream(testLocus);
}
catch (IOException e)
{
e.printStackTrace();
return;
}

byte[] testBuffer = new byte[MergeFile.PIPE_BUF];
byte[] testBuffer;
boolean[] didFindBufferForThread = new boolean[numThreads];
Arrays.fill(didFindBufferForThread, false);

for (int i = 0; i < numThreads; i++)
{
try
{
int retVal = testReader.read(testBuffer, 0, testBuffer.length);
if (retVal < MergeFile.PIPE_BUF)
{
System.out.println("Failed to read a full buffer. WTF?");
}
}
catch (IOException e)
{
e.printStackTrace();
return;
}
testBuffer =
MergeFile
.mergeFileForPath(testLocus)
.nextPageRetNullOnError();

byte valueForArray = testBuffer[0];

System.out.println("value for array was " + valueForArray);

boolean allCorrect = true;

for (int j = 0; j < MergeFile.PIPE_BUF; j++)
Expand Down Expand Up @@ -164,6 +145,163 @@ public void testWriteConsistency() throws InterruptedException
assert !missedAtLeastOne;
}

// for use with testReadWriteObjectConsistency and associated methods
private AtomicReferenceArray<Boolean> readSuccesses;

private static final byte[] readWriteTestObject = new byte[MergeFile.PIPE_BUF * 7];
static
{
Arrays.fill(readWriteTestObject, (byte) 69);
}

private void writeObjectThreadFunction()
{
// might want to compute these before starting threads to increase contention
List<byte[]> delimitedPages =
MergeFile
.delimitedObject
(
readWriteTestObject,
MergeFile.getObjectCount(testLocus).getAndIncrement()
);

for (byte[] page : delimitedPages)
{
try
{
MergeFile.mergeFileForPath(testLocus).appendToFileHelper(page);
}
catch (IOException e)
{
e.printStackTrace();
assert false;
}
}
}

private void readObjectThreadFunction(int numThreads)
{
byte[] nextPage = null;

HashMap<Long, List<byte[]>> sequenceNumberToPageList =
new HashMap<>();

while (true)
{
nextPage =
MergeFile.mergeFileForPath(testLocus).nextPageRetNullOnError();

if (nextPage == null)
{
break;
}

long sequenceNumberForPage =
MergeFile.sequenceNumberOfPage(nextPage);
if (sequenceNumberForPage > numThreads || sequenceNumberForPage < 0)
{
System.out.println("Got an invalid sequence number " + sequenceNumberForPage);
}

List<byte[]> pageList = new ArrayList<>();
List<byte[]> temp =
sequenceNumberToPageList.putIfAbsent(sequenceNumberForPage, pageList);

if (temp != null)
{
pageList = temp;
}

pageList.add(nextPage);

}

for (int i = 0 ; i < numThreads; i++)
{
List<byte[]> pageList =
sequenceNumberToPageList.get((long) i);

byte[] object = MergeFile.undelimitedObject(pageList);

if (object.length != readWriteTestObject.length)
{
System.out.println
(
"we got " + object.length + " bytes back out of "
+ readWriteTestObject.length + " total"
);
}

if (Arrays.equals(object, readWriteTestObject))
{
// initialize value to true provided it doesn't exist yet

while (readSuccesses.get(i) == null)
{
readSuccesses.compareAndSet(i, null, true);
}
}
else
{
System.out.println("Encountered a mismatch for sequence number " + i);
// set read success value to false
readSuccesses.set(i, false);
}
}
}

void startAndJoinThreads(int numThreads, Runnable target)
{
Thread[] threads = new Thread[numThreads];

for (int i = 0; i < numThreads; i++)
{
threads[i] = new Thread(target);
}

for (Thread thread: threads)
{
thread.start();
}

for (Thread thread: threads)
{
try
{
thread.join();
}
catch (InterruptedException e)
{
e.printStackTrace();
assert false;
}
}
}

@Test
public void testReadWriteObjectConsistency()
{
int numThreads = 0x20;

readSuccesses = new AtomicReferenceArray<>(numThreads);

startAndJoinThreads(numThreads, this::writeObjectThreadFunction);

startAndJoinThreads(numThreads, () -> readObjectThreadFunction(numThreads));

boolean globalSuccess = true;
for (int i = 0; i < numThreads; i++)
{
if (!readSuccesses.get(i))
{
System.out.println("got a mismatch on " + i);
globalSuccess = false;
}
}

assert globalSuccess;
}

@Test
public void testSequenceNumberParsing()
{
Expand Down

0 comments on commit 5f314eb

Please sign in to comment.