Skip to content

Commit

Permalink
gotta flush to guarantee a good read, even on the same stream, it seems
Browse files Browse the repository at this point in the history
  • Loading branch information
sdether committed Jul 8, 2011
1 parent 3db96c3 commit cdb014e
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 42 deletions.
2 changes: 1 addition & 1 deletion Firkin.Test/App.config
Expand Up @@ -11,7 +11,7 @@
</layout>
</appender>
<root>
<level value="WARN" />
<level value="DEBUG" />
<appender-ref ref="ConsoleAppender" />
</root>
</log4net>
Expand Down
59 changes: 32 additions & 27 deletions Firkin.Test/TFirkinDictionary.cs
Expand Up @@ -21,6 +21,7 @@
using System.Linq;
using System.Threading;
using log4net;
using log4net.Config;
using NUnit.Framework;

namespace Droog.Firkin.Test {
Expand Down Expand Up @@ -153,12 +154,12 @@ public class TFirkinDictionary {
}
d[key] = v;
switch(r.Next(10)) {
case 1:
keys.Enqueue(key);
break;
case 4:
AddItems(keys, 10);
break;
case 1:
keys.Enqueue(key);
break;
case 4:
AddItems(keys, 10);
break;
}
if(n >= 3000) {
d.Merge();
Expand Down Expand Up @@ -231,6 +232,7 @@ public class TFirkinDictionary {

[Test, Explicit]
public void Run_till_crash_with_parallel_writes() {
BasicConfigurator.Configure();
var r = new Random(1234);
int serial = 0;
var keys = new Queue<string>();
Expand All @@ -251,9 +253,11 @@ public class TFirkinDictionary {
lock(keys) {
nextKey = keys.Dequeue();
}
if(d.ContainsKey(nextKey)) {
last = d[nextKey];
}
try {
if(d.ContainsKey(nextKey)) {
last = d[nextKey];
}
} catch(ObjectDisposedException) { }
var v = (uint)Interlocked.Increment(ref serial);
d[nextKey] = v;
lock(keys) {
Expand All @@ -264,7 +268,7 @@ public class TFirkinDictionary {
}
}
} catch(Exception e) {
Console.WriteLine("Worker {0} failed: {1}\r\n{2}", workerId, e.Message, e.StackTrace);
Console.WriteLine("Worker {0} failed: {1}\r\n{2}", workerId, e.Message, e);
faults.Add(e);
}
}) { IsBackground = true };
Expand Down Expand Up @@ -295,7 +299,8 @@ public class TFirkinDictionary {
Console.WriteLine(Path.GetFileName(file));
}
} catch(Exception e) {
Console.WriteLine("merger failed: {0}\r\n{1}", e.Message, e.StackTrace);
Console.WriteLine("merger failed: {0}\r\n{1}", e.Message, e);
throw e;
}
}
Thread.Sleep(1000);
Expand Down Expand Up @@ -364,14 +369,14 @@ public class TFirkinDictionary {
}
d[key] = v;
switch(r.Next(10)) {
case 1:
keys.Enqueue(key);
break;
case 4:
if(keys.Count < 200) {
AddItems(keys, 10);
}
break;
case 1:
keys.Enqueue(key);
break;
case 4:
if(keys.Count < 200) {
AddItems(keys, 10);
}
break;
}
if(n >= 5000) {
m++;
Expand Down Expand Up @@ -464,14 +469,14 @@ public class TFirkinDictionary {
}
d[key] = v;
switch(r.Next(10)) {
case 1:
keys.Enqueue(key);
break;
case 4:
if(keys.Count < 200) {
AddItems(keys, 10);
}
break;
case 1:
keys.Enqueue(key);
break;
case 4:
if(keys.Count < 200) {
AddItems(keys, 10);
}
break;
}
if(t >= 1000000) {
break;
Expand Down
23 changes: 15 additions & 8 deletions Firkin.Test/TFirkinHash.cs
Expand Up @@ -45,8 +45,12 @@ public class TFirkinHash {

[TearDown]
public void Teardown() {
_hash.Dispose();
Directory.Delete(_path, true);
if(_hash != null) {
_hash.Dispose();
}
if(Directory.Exists(_path)) {
Directory.Delete(_path, true);
}
}

[Test]
Expand Down Expand Up @@ -423,7 +427,7 @@ public class TFirkinHash {
}
}

[Test]
[Test, Explicit]
public void Concurrent_read_write_delete_consistency_with_multiple_merges() {
var r = new Random(1234);
var id = 0;
Expand All @@ -444,7 +448,10 @@ public class TFirkinHash {
try {
_log.DebugFormat("worker {0} started", workerId);
while(iterations < maxIterations) {
Interlocked.Increment(ref iterations);
var j = Interlocked.Increment(ref iterations);
if(j % 100 == 0) {
_log.DebugFormat("iteration {0}", j);
}
Interlocked.Increment(ref mergeCounter);
string k;
lock(keys) {
Expand Down Expand Up @@ -497,16 +504,16 @@ public class TFirkinHash {
}
var start = DateTime.UtcNow;
while(iterations < maxIterations) {
if(DateTime.UtcNow > start.AddSeconds(30)) {
throw new TimeoutException(string.Format("didn't finish, merges: {0}, items {1}, existing modified: {2}", merges, _hash.Count, modified.Count));
}
if(faults.Any()) {
throw faults.First();
}
if(DateTime.UtcNow > start.AddMinutes(5)) {
throw new TimeoutException(string.Format("didn't finish, merges: {0}, items: {1}, queue: {2}, existing modified: {2}", merges, _hash.Count, keys.Count, modified.Count));
}
if(mergeCounter >= 2000) {
merges++;
_hash.Merge();
mergeCounter = 0;
_hash.Merge();
_log.DebugFormat("merge {0} completed", merges);
}
}
Expand Down
4 changes: 1 addition & 3 deletions Firkin/CorruptDictionaryException.cs
Expand Up @@ -19,8 +19,6 @@

namespace Droog.Firkin {
public class CorruptDictionaryException : Exception {
public CorruptDictionaryException(string error)
: base(error) {
}
public CorruptDictionaryException(string error) : base(error) { }
}
}
7 changes: 7 additions & 0 deletions Firkin/CorruptKeyException.cs
@@ -0,0 +1,7 @@
using System;

namespace Droog.Firkin {
public class CorruptKeyException : Exception {
public CorruptKeyException(string error) : base(error) { }
}
}
3 changes: 3 additions & 0 deletions Firkin/Firkin.csproj
Expand Up @@ -36,6 +36,7 @@
<HintPath>..\redist\log4net.dll</HintPath>
</Reference>
<Reference Include="System" />
<Reference Include="System.configuration" />
<Reference Include="System.Core">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
Expand All @@ -50,10 +51,12 @@
</ItemGroup>
<ItemGroup>
<Compile Include="CorruptDictionaryException.cs" />
<Compile Include="CorruptKeyException.cs" />
<Compile Include="Data\HintRecord.cs" />
<Compile Include="FirkinHashChange.cs" />
<Compile Include="FirkinHashChangeAction.cs" />
<Compile Include="Data\LazyFirkinCollection.cs" />
<Compile Include="KeyTooLargeException.cs" />
<Compile Include="Serialization\BinaryFormatterGenerator.cs" />
<Compile Include="Serialization\ISerializerGenerator.cs" />
<Compile Include="Serialization\IStreamSerializer.cs" />
Expand Down
26 changes: 23 additions & 3 deletions Firkin/FirkinHash.cs
Expand Up @@ -18,6 +18,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Text;
Expand All @@ -31,6 +32,7 @@ public class FirkinHash<TKey> : IFirkinHash<TKey> {

//--- Constants ---
public const long DEFAULT_MAX_FILE_SIZE = 10 * 1024 * 1024;

private const string STORAGE_FILE_PREFIX = "store_";
private const string MERGE_FILE_PREFIX = "merge_";
private const string OLD_FILE_PREFIX = "old_";
Expand All @@ -44,8 +46,19 @@ private class MergePair {
}

//--- Class Fields ---
public static readonly uint MaxKeySize = 1024;
protected static readonly ILog _log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);

//--- Class Constructors ---
static FirkinHash() {
try {
var max = ConfigurationManager.AppSettings["firkin.maxkeysize"];
if(!string.IsNullOrEmpty(max)) {
MaxKeySize = uint.Parse(max);
}
} catch { }
}

//--- Fields ---
private readonly long _maxFileSize;
private readonly string _storeDirectory;
Expand Down Expand Up @@ -100,8 +113,12 @@ private class MergePair {
}
var action = FirkinHashChangeAction.Add;
lock(_indexSyncRoot) {
var keyBytes = _serializer.Serialize(key);
if(keyBytes.LongLength > MaxKeySize) {
throw new KeyTooLargeException(keyBytes.LongLength, MaxKeySize);
}
var keyInfo = _head.Write(new KeyValuePair() {
Key = _serializer.Serialize(key),
Key = _keyBytes,
Value = stream,
ValueSize = length
});
Expand Down Expand Up @@ -141,11 +158,13 @@ private class MergePair {
lock(_mergeSyncRoot) {
IFirkinFile[] oldFiles;
IFirkinFile head;
int recordCount;
lock(_indexSyncRoot) {
head = _head;
oldFiles = _files.Values.Where(x => x != head).OrderBy(x => x.FileId).ToArray();
recordCount = Count;
}
_log.DebugFormat("starting merge of {0} files (with head at id {1}) in '{2}' ", oldFiles.Length, head.FileId, _storeDirectory);
_log.DebugFormat("starting merge of {0} files, {1} records (with head at id {2}) in '{3}' ", oldFiles.Length, recordCount, head.FileId, _storeDirectory);
if(oldFiles.Length == 0) {

// not merging if there is only one archive file
Expand Down Expand Up @@ -243,7 +262,8 @@ private class MergePair {
}
_log.DebugFormat("added entries from file {0}: {1}", file.FileId, newIndex.Count);
}

_log.DebugFormat("total records in merged index: {0}", newIndex.Count);

// swap out index and file list
_index = newIndex;
_files = newFiles;
Expand Down
14 changes: 14 additions & 0 deletions Firkin/IO/FirkinFile.cs
Expand Up @@ -99,6 +99,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
_stream.Write(BitConverter.GetBytes(data.ValueSize));
dataStream.Position = 0;
dataStream.CopyTo(_stream, dataStream.Length);
_stream.Flush();
valuePosition = (uint)_stream.Position - data.ValueSize;
}
return new KeyInfo() {
Expand All @@ -122,6 +123,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
_stream.Write(BitConverter.GetBytes(data.ValueSize));
_stream.Write(data.Key);
data.Value.CopyTo(_stream, data.ValueSize);
_stream.Flush();
return (uint)_stream.Position - data.ValueSize;
}
}
Expand All @@ -134,6 +136,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
lock(_streamSyncRoot) {
CheckObjectDisposed();
_stream.Position = 0;
var keyCounter = 0;
while(true) {

// TODO: combine head logic with GetKeys()
Expand All @@ -143,7 +146,12 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
// end of file
yield break;
}
keyCounter++;
var keySize = BitConverter.ToUInt32(header, KEY_SIZE_OFFSET);
if(keySize > FirkinHash<object>.MaxKeySize) {
var error = string.Format("GetRecords: key {0} in file '{1}' had key of size {2}", keyCounter, Filename, keySize);
throw new CorruptKeyException(error);
}
var valueSize = BitConverter.ToUInt32(header, VALUE_SIZE_OFFSET);
var key = _stream.ReadBytes(keySize);
var value = new MemoryStream();
Expand Down Expand Up @@ -195,6 +203,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
lock(_streamSyncRoot) {
CheckObjectDisposed();
_stream.Position = 0;
var keyCounter = 0;
while(true) {
var recordPosition = _stream.Position;
var header = _stream.ReadBytes(HEADER_SIZE);
Expand All @@ -203,7 +212,12 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
// end of file
yield break;
}
keyCounter++;
var keySize = BitConverter.ToUInt32(header, KEY_SIZE_OFFSET);
if(keySize > FirkinHash<object>.MaxKeySize) {
var error = string.Format("GetKeys: key {0} in file '{1}' had key of size {2}", keyCounter, Filename, keySize);
throw new CorruptKeyException(error);
}
var valueSize = BitConverter.ToUInt32(header, VALUE_SIZE_OFFSET);
var key = _stream.ReadBytes(keySize);
_stream.Seek(valueSize, SeekOrigin.Current);
Expand Down
6 changes: 6 additions & 0 deletions Firkin/IO/FirkinHintFile.cs
Expand Up @@ -60,15 +60,21 @@ public class FirkinHintFile : IFirkinHintFile {
public IEnumerator<HintRecord> GetEnumerator() {
lock(_stream) {
_stream.Position = 0;
var keyCounter = 0;
while(true) {
var header = _stream.ReadBytes(HEADER_SIZE);
if(header.Length == 0) {

// end of file
yield break;
}
keyCounter++;
var serial = BitConverter.ToUInt32(header, SERIAL_OFFSET);
var keySize = BitConverter.ToUInt32(header, KEY_SIZE_OFFSET);
if(keySize > FirkinHash<object>.MaxKeySize) {
var error = string.Format("Hint Enumerator: key {0} in file '{1}' had key of size {2}", keyCounter, _filename, keySize);
throw new CorruptKeyException(error);
}
var valueSize = BitConverter.ToUInt32(header, VALUE_SIZE_OFFSET);
var valuePosition = BitConverter.ToUInt32(header, VALUE_POSITION_OFFSET);
var key = _stream.ReadBytes(keySize);
Expand Down
8 changes: 8 additions & 0 deletions Firkin/KeyTooLargeException.cs
@@ -0,0 +1,8 @@
using System;

namespace Droog.Firkin {
public class KeyTooLargeException : Exception {
public KeyTooLargeException(long actual, uint maxKeySize) {
}
}
}

0 comments on commit cdb014e

Please sign in to comment.