Permalink
Browse files

merging in mono fix

  • Loading branch information...
2 parents 11ec64b + d88f330 commit b8993d396d82990895280573f1b117931b805731 @sdether committed Jul 8, 2011
View
215 Firkin.Test/TFirkinDictionary.cs
@@ -21,6 +21,7 @@
using System.Linq;
using System.Threading;
using log4net;
+using log4net.Config;
using NUnit.Framework;
namespace Droog.Firkin.Test {
@@ -47,7 +48,9 @@ public class TFirkinDictionary {
if(_dictionary != null) {
_dictionary.Dispose();
}
- Directory.Delete(_path, true);
+ if(Directory.Exists(_path)) {
+ Directory.Delete(_path, true);
+ }
}
[Test]
@@ -151,12 +154,12 @@ public class TFirkinDictionary {
}
d[key] = v;
switch(r.Next(10)) {
- case 1:
- keys.Enqueue(key);
- break;
- case 4:
- AddItems(keys, 10);
- break;
+ case 1:
+ keys.Enqueue(key);
+ break;
+ case 4:
+ AddItems(keys, 10);
+ break;
}
if(n >= 3000) {
d.Merge();
@@ -178,6 +181,172 @@ public class TFirkinDictionary {
}
[Test, Explicit]
+ public void Continuous_read_write_delete_merge_with_parallel_merge() {
+ var r = new Random(1234);
+ uint serial = 0;
+ var keys = new Queue<string>();
+ AddItems(keys, 1000);
+ uint last = 0;
+ var mergeCounter = 0;
+ var inMerge = false;
+ using(var d = new FirkinDictionary<string, uint>(_path)) {
+ new Thread(() => {
+ while(true) {
+ if(mergeCounter > 100000) {
+ inMerge = true;
+ Interlocked.Exchange(ref mergeCounter, 0);
+ var s = serial;
+ Console.WriteLine("{0} iterations, merge time", s);
+ var limit = s - 20000;
+ var remove = (from entry in d where entry.Value < limit select entry.Key).ToArray();
+ Console.WriteLine("will remove {0} of {1}", remove.Length, d.Count);
+ foreach(var key in remove) {
+ d.Remove(key);
+ }
+ var preMerge = d.Count;
+ d.Merge();
+ var postMerge = d.Count;
+ inMerge = false;
+ Console.WriteLine("pre: {0} / post: {1}", preMerge, postMerge);
+ foreach(var file in Directory.GetFiles(_path)) {
+ Console.WriteLine(Path.GetFileName(file));
+ }
+ }
+ Thread.Sleep(1000);
+ }
+ }) { IsBackground = true }.Start();
+ while(true) {
+ Interlocked.Increment(ref mergeCounter);
+ var nextKey = keys.Dequeue();
+ if(d.ContainsKey(nextKey)) {
+ last = d[nextKey];
+ }
+ d[nextKey] = ++serial;
+ keys.Enqueue(r.Next(10) == 1 ? nextKey : Guid.NewGuid().ToString());
+ if(inMerge) {
+ Thread.Sleep(10);
+ }
+ }
+ }
+ }
+
+ [Test, Explicit]
+ public void Continuous_read_write_delete_merge_with_parallel_writes() {
+ var r = new Random(1234);
+ int serial = 0;
+ var keys = new Queue<string>();
+ AddItems(keys, 1000);
+ uint last = 0;
+ var mergeCounter = 0;
+ var inMerge = false;
+ var faults = new List<Exception>();
+ using(var d = new FirkinDictionary<string, uint>(_path)) {
+ for(var i = 0; i < 10; i++) {
+ var workerId = i;
+ var worker = new Thread(() => {
+ try {
+ _log.DebugFormat("worker {0} started", workerId);
+ while(true) {
+ Interlocked.Increment(ref mergeCounter);
+ string nextKey;
+ lock(keys) {
+ nextKey = keys.Dequeue();
+ }
+ try {
+ if(d.ContainsKey(nextKey)) {
+ last = d[nextKey];
+ }
+ } catch(ObjectDisposedException) { }
+ var v = (uint)Interlocked.Increment(ref serial);
+ d[nextKey] = v;
+ lock(keys) {
+ keys.Enqueue(r.Next(10) == 1 ? nextKey : Guid.NewGuid().ToString());
+ }
+ if(inMerge) {
+ Thread.Sleep(1000);
+ }
+ }
+ } catch(Exception e) {
+ Console.WriteLine("Worker {0} failed: {1}\r\n{2}", workerId, e.Message, e);
+ faults.Add(e);
+ }
+ }) { IsBackground = true };
+ worker.Start();
+ }
+ while(true) {
+ if(faults.Any()) {
+ throw faults.First();
+ }
+ if(mergeCounter > 100000) {
+ try {
+ inMerge = true;
+ Interlocked.Exchange(ref mergeCounter, 0);
+ var s = serial;
+ Console.WriteLine("{0} iterations, merge time", s);
+ var limit = s - 20000;
+ var remove = (from entry in d where entry.Value < limit select entry.Key).ToArray();
+ Console.WriteLine("will remove {0} of {1}", remove.Length, d.Count);
+ foreach(var key in remove) {
+ d.Remove(key);
+ }
+ var preMerge = d.Count;
+ d.Merge();
+ var postMerge = d.Count;
+ inMerge = false;
+ Console.WriteLine("pre: {0} / post: {1}", preMerge, postMerge);
+ foreach(var file in Directory.GetFiles(_path)) {
+ Console.WriteLine(Path.GetFileName(file));
+ }
+ } catch(Exception e) {
+ Console.WriteLine("merger failed: {0}\r\n{1}", e.Message, e);
+ throw e;
+ }
+ }
+ Thread.Sleep(1000);
+ }
+ }
+ }
+
+ [Test, Explicit]
+ public void Continuous_read_write_delete_merge() {
+ var r = new Random(1234);
+ uint serial = 0;
+ var keys = new Queue<string>();
+ AddItems(keys, 1000);
+ uint last = 0;
+ var mergeCounter = 0;
+ using(var d = new FirkinDictionary<string, uint>(_path)) {
+ while(true) {
+ Interlocked.Increment(ref mergeCounter);
+ var nextKey = keys.Dequeue();
+ if(d.ContainsKey(nextKey)) {
+ last = d[nextKey];
+ }
+ d[nextKey] = ++serial;
+ keys.Enqueue(r.Next(10) == 1 ? nextKey : Guid.NewGuid().ToString());
+ if(mergeCounter > 100000) {
+ Interlocked.Exchange(ref mergeCounter, 0);
+ var s = serial;
+ Console.WriteLine("{0} iterations, merge time", s);
+ var limit = s - 20000;
+ var remove = (from entry in d where entry.Value < limit select entry.Key).ToArray();
+ Console.WriteLine("will remove {0} of {1}", remove.Length, d.Count);
+ foreach(var key in remove) {
+ d.Remove(key);
+ }
+ var preMerge = d.Count;
+ d.Merge();
+ var postMerge = d.Count;
+ Console.WriteLine("pre: {0} / post: {1}", preMerge, postMerge);
+ foreach(var file in Directory.GetFiles(_path)) {
+ Console.WriteLine(Path.GetFileName(file));
+ }
+ }
+ }
+ }
+ }
+
+ [Test, Explicit]
public void Memory_consumption() {
var r = new Random(1234);
var keys = new Queue<string>();
@@ -199,14 +368,14 @@ public class TFirkinDictionary {
}
d[key] = v;
switch(r.Next(10)) {
- case 1:
- keys.Enqueue(key);
- break;
- case 4:
- if(keys.Count < 200) {
- AddItems(keys, 10);
- }
- break;
+ case 1:
+ keys.Enqueue(key);
+ break;
+ case 4:
+ if(keys.Count < 200) {
+ AddItems(keys, 10);
+ }
+ break;
}
if(n >= 5000) {
m++;
@@ -299,14 +468,14 @@ public class TFirkinDictionary {
}
d[key] = v;
switch(r.Next(10)) {
- case 1:
- keys.Enqueue(key);
- break;
- case 4:
- if(keys.Count < 200) {
- AddItems(keys, 10);
- }
- break;
+ case 1:
+ keys.Enqueue(key);
+ break;
+ case 4:
+ if(keys.Count < 200) {
+ AddItems(keys, 10);
+ }
+ break;
}
if(t >= 1000000) {
break;
View
23 Firkin.Test/TFirkinHash.cs
@@ -45,8 +45,12 @@ public class TFirkinHash {
[TearDown]
public void Teardown() {
- _hash.Dispose();
- Directory.Delete(_path, true);
+ if(_hash != null) {
+ _hash.Dispose();
+ }
+ if(Directory.Exists(_path)) {
+ Directory.Delete(_path, true);
+ }
}
[Test]
@@ -423,7 +427,7 @@ public class TFirkinHash {
}
}
- [Test]
+ [Test, Explicit]
public void Concurrent_read_write_delete_consistency_with_multiple_merges() {
var r = new Random(1234);
var id = 0;
@@ -444,7 +448,10 @@ public class TFirkinHash {
try {
_log.DebugFormat("worker {0} started", workerId);
while(iterations < maxIterations) {
- Interlocked.Increment(ref iterations);
+ var j = Interlocked.Increment(ref iterations);
+ if(j % 100 == 0) {
+ _log.DebugFormat("iteration {0}", j);
+ }
Interlocked.Increment(ref mergeCounter);
string k;
lock(keys) {
@@ -497,16 +504,16 @@ public class TFirkinHash {
}
var start = DateTime.UtcNow;
while(iterations < maxIterations) {
- if(DateTime.UtcNow > start.AddSeconds(30)) {
- throw new TimeoutException(string.Format("didn't finish, merges: {0}, items {1}, existing modified: {2}", merges, _hash.Count, modified.Count));
- }
if(faults.Any()) {
throw faults.First();
}
+ if(DateTime.UtcNow > start.AddMinutes(5)) {
+ throw new TimeoutException(string.Format("didn't finish, merges: {0}, items: {1}, queue: {2}, existing modified: {2}", merges, _hash.Count, keys.Count, modified.Count));
+ }
if(mergeCounter >= 2000) {
merges++;
- _hash.Merge();
mergeCounter = 0;
+ _hash.Merge();
_log.DebugFormat("merge {0} completed", merges);
}
}
View
6 Firkin/CorruptDictionaryException.cs
@@ -19,8 +19,6 @@
namespace Droog.Firkin {
public class CorruptDictionaryException : Exception {
- public CorruptDictionaryException(string error)
- : base(error) {
- }
+ public CorruptDictionaryException(string error) : base(error) { }
}
-}
+}
View
7 Firkin/CorruptKeyException.cs
@@ -0,0 +1,7 @@
+using System;
+
+namespace Droog.Firkin {
+ public class CorruptKeyException : Exception {
+ public CorruptKeyException(string error) : base(error) { }
+ }
+}
View
3 Firkin/Firkin.csproj
@@ -36,6 +36,7 @@
<HintPath>..\redist\log4net.dll</HintPath>
</Reference>
<Reference Include="System" />
+ <Reference Include="System.configuration" />
<Reference Include="System.Core">
<RequiredTargetFramework>3.5</RequiredTargetFramework>
</Reference>
@@ -50,10 +51,12 @@
</ItemGroup>
<ItemGroup>
<Compile Include="CorruptDictionaryException.cs" />
+ <Compile Include="CorruptKeyException.cs" />
<Compile Include="Data\HintRecord.cs" />
<Compile Include="FirkinHashChange.cs" />
<Compile Include="FirkinHashChangeAction.cs" />
<Compile Include="Data\LazyFirkinCollection.cs" />
+ <Compile Include="KeyTooLargeException.cs" />
<Compile Include="Serialization\BinaryFormatterGenerator.cs" />
<Compile Include="Serialization\ISerializerGenerator.cs" />
<Compile Include="Serialization\IStreamSerializer.cs" />
View
12 Firkin/FirkinDictionary.cs
@@ -28,8 +28,6 @@ namespace Droog.Firkin {
public class FirkinDictionary<TKey, TValue> : IDictionary<TKey, TValue>, IDisposable {
//--- Class Fields ---
- public static int MaxValueSize = 100 * 1024 * 1024;
- public static bool ThrowOnCorruptRecord = true;
protected static readonly ILog _log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
//--- Fields ---
@@ -128,14 +126,10 @@ public class FirkinDictionary<TKey, TValue> : IDictionary<TKey, TValue>, IDispos
}
private bool TryDeserialize(TKey key, FirkinStream stream, out TValue value) {
- if(stream.Length > MaxValueSize) {
+ if(stream.Length > _hash.MaxFileSize) {
var error = string.Format("Stream for key '{0}' was too large, length: {1}. Dictionary is likely corrupted!", key, stream.Length);
- if(ThrowOnCorruptRecord) {
- throw new CorruptDictionaryException(error);
- }
- _log.Error(error);
- value = default(TValue);
- return false;
+ _log.Warn(error);
+ throw new CorruptDictionaryException(error);
}
value = _valueSerializer.Deserialize(stream);
return true;
View
27 Firkin/FirkinHash.cs
@@ -18,6 +18,7 @@
using System;
using System.Collections;
using System.Collections.Generic;
+using System.Configuration;
using System.IO;
using System.Linq;
using System.Text;
@@ -31,6 +32,7 @@ public class FirkinHash<TKey> : IFirkinHash<TKey> {
//--- Constants ---
public const long DEFAULT_MAX_FILE_SIZE = 10 * 1024 * 1024;
+
private const string STORAGE_FILE_PREFIX = "store_";
private const string MERGE_FILE_PREFIX = "merge_";
private const string OLD_FILE_PREFIX = "old_";
@@ -44,8 +46,19 @@ private class MergePair {
}
//--- Class Fields ---
+ public static readonly uint MaxKeySize = 1024;
protected static readonly ILog _log = LogManager.GetLogger(System.Reflection.MethodBase.GetCurrentMethod().DeclaringType);
+ //--- Class Constructors ---
+ static FirkinHash() {
+ try {
+ var max = ConfigurationManager.AppSettings["firkin.maxkeysize"];
+ if(!string.IsNullOrEmpty(max)) {
+ MaxKeySize = uint.Parse(max);
+ }
+ } catch { }
+ }
+
//--- Fields ---
private readonly long _maxFileSize;
private readonly string _storeDirectory;
@@ -82,6 +95,7 @@ private class MergePair {
//--- Properties ---
public int Count { get { return _index.Count; } }
+ public long MaxFileSize { get { return _maxFileSize; } }
public IEnumerable<TKey> Keys { get { lock(_indexSyncRoot) { return _index.Keys.ToArray(); } } }
//--- Methods ---
@@ -99,8 +113,12 @@ private class MergePair {
}
var action = FirkinHashChangeAction.Add;
lock(_indexSyncRoot) {
+ var keyBytes = _serializer.Serialize(key);
+ if(keyBytes.LongLength > MaxKeySize) {
+ throw new KeyTooLargeException(keyBytes.LongLength, MaxKeySize);
+ }
var keyInfo = _head.Write(new KeyValuePair() {
- Key = _serializer.Serialize(key),
+ Key = keyBytes,
Value = stream,
ValueSize = length
});
@@ -140,11 +158,13 @@ private class MergePair {
lock(_mergeSyncRoot) {
IFirkinFile[] oldFiles;
IFirkinFile head;
+ int recordCount;
lock(_indexSyncRoot) {
head = _head;
oldFiles = _files.Values.Where(x => x != head).OrderBy(x => x.FileId).ToArray();
+ recordCount = Count;
}
- _log.DebugFormat("starting merge of {0} files (with head at id {1}) in '{2}' ", oldFiles.Length, head.FileId, _storeDirectory);
+ _log.DebugFormat("starting merge of {0} files, {1} records (with head at id {2}) in '{3}' ", oldFiles.Length, recordCount, head.FileId, _storeDirectory);
if(oldFiles.Length == 0) {
// not merging if there is only one archive file
@@ -242,7 +262,8 @@ private class MergePair {
}
_log.DebugFormat("added entries from file {0}: {1}", file.FileId, newIndex.Count);
}
-
+ _log.DebugFormat("total records in merged index: {0}", newIndex.Count);
+
// swap out index and file list
_index = newIndex;
_files = newFiles;
View
1 Firkin/IFirkinHash.cs
@@ -30,5 +30,6 @@ public interface IFirkinHash<TKey> : IEnumerable<KeyValuePair<TKey, FirkinStream
void Truncate();
int Count { get; }
IEnumerable<TKey> Keys { get; }
+ long MaxFileSize { get; }
}
}
View
14 Firkin/IO/FirkinFile.cs
@@ -99,6 +99,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
_stream.Write(BitConverter.GetBytes(data.ValueSize));
dataStream.Position = 0;
dataStream.CopyTo(_stream, dataStream.Length);
+ _stream.Flush();
valuePosition = (uint)_stream.Position - data.ValueSize;
}
return new KeyInfo() {
@@ -122,6 +123,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
_stream.Write(BitConverter.GetBytes(data.ValueSize));
_stream.Write(data.Key);
data.Value.CopyTo(_stream, data.ValueSize);
+ _stream.Flush();
return (uint)_stream.Position - data.ValueSize;
}
}
@@ -134,6 +136,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
lock(_streamSyncRoot) {
CheckObjectDisposed();
_stream.Position = 0;
+ var keyCounter = 0;
while(true) {
// TODO: combine head logic with GetKeys()
@@ -143,7 +146,12 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
// end of file
yield break;
}
+ keyCounter++;
var keySize = BitConverter.ToUInt32(header, KEY_SIZE_OFFSET);
+ if(keySize > FirkinHash<object>.MaxKeySize) {
+ var error = string.Format("GetRecords: key {0} in file '{1}' had key of size {2}", keyCounter, Filename, keySize);
+ throw new CorruptKeyException(error);
+ }
var valueSize = BitConverter.ToUInt32(header, VALUE_SIZE_OFFSET);
var key = _stream.ReadBytes(keySize);
var value = new MemoryStream();
@@ -195,6 +203,7 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
lock(_streamSyncRoot) {
CheckObjectDisposed();
_stream.Position = 0;
+ var keyCounter = 0;
while(true) {
var recordPosition = _stream.Position;
var header = _stream.ReadBytes(HEADER_SIZE);
@@ -203,7 +212,12 @@ public class FirkinFile : IFirkinArchiveFile, IFirkinActiveFile {
// end of file
yield break;
}
+ keyCounter++;
var keySize = BitConverter.ToUInt32(header, KEY_SIZE_OFFSET);
+ if(keySize > FirkinHash<object>.MaxKeySize) {
+ var error = string.Format("GetKeys: key {0} in file '{1}' had key of size {2}", keyCounter, Filename, keySize);
+ throw new CorruptKeyException(error);
+ }
var valueSize = BitConverter.ToUInt32(header, VALUE_SIZE_OFFSET);
var key = _stream.ReadBytes(keySize);
_stream.Seek(valueSize, SeekOrigin.Current);
View
6 Firkin/IO/FirkinHintFile.cs
@@ -60,15 +60,21 @@ public class FirkinHintFile : IFirkinHintFile {
public IEnumerator<HintRecord> GetEnumerator() {
lock(_stream) {
_stream.Position = 0;
+ var keyCounter = 0;
while(true) {
var header = _stream.ReadBytes(HEADER_SIZE);
if(header.Length == 0) {
// end of file
yield break;
}
+ keyCounter++;
var serial = BitConverter.ToUInt32(header, SERIAL_OFFSET);
var keySize = BitConverter.ToUInt32(header, KEY_SIZE_OFFSET);
+ if(keySize > FirkinHash<object>.MaxKeySize) {
+ var error = string.Format("Hint Enumerator: key {0} in file '{1}' had key of size {2}", keyCounter, _filename, keySize);
+ throw new CorruptKeyException(error);
+ }
var valueSize = BitConverter.ToUInt32(header, VALUE_SIZE_OFFSET);
var valuePosition = BitConverter.ToUInt32(header, VALUE_POSITION_OFFSET);
var key = _stream.ReadBytes(keySize);
View
8 Firkin/KeyTooLargeException.cs
@@ -0,0 +1,8 @@
+using System;
+
+namespace Droog.Firkin {
+ public class KeyTooLargeException : Exception {
+ public KeyTooLargeException(long actual, uint maxKeySize) {
+ }
+ }
+}

0 comments on commit b8993d3

Please sign in to comment.