Skip to content

Commit

Permalink
some more testing plus corruption protection on dictionary deserializ…
Browse files Browse the repository at this point in the history
…ation
  • Loading branch information
sdether committed Jul 3, 2011
1 parent f9e4cdb commit 040b714
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Firkin.Test/App.config
Expand Up @@ -11,7 +11,7 @@
</layout>
</appender>
<root>
<level value="WARN" />
<level value="DEBUG" />
<appender-ref ref="ConsoleAppender" />
</root>
</log4net>
Expand Down
100 changes: 97 additions & 3 deletions Firkin.Test/TFirkinDictionary.cs
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using log4net;
using NUnit.Framework;

Expand Down Expand Up @@ -176,12 +177,13 @@ public class TFirkinDictionary {
}
}

[Test, Ignore]
[Test, Explicit]
public void Memory_consumption() {
var r = new Random(1234);
var keys = new Queue<string>();
AddItems(keys, 200);
var baseline = GC.GetTotalMemory(true);
string capture = "";
using(var d = new FirkinDictionary<string, string>(_path)) {
var n = 0;
var t = 0;
Expand All @@ -193,14 +195,17 @@ public class TFirkinDictionary {
var v = TestUtil.GetRandomString(r);
if(d.ContainsKey(key)) {
var x = d[key];
capture = ".." + x;
}
d[key] = v;
switch(r.Next(10)) {
case 1:
keys.Enqueue(key);
break;
case 4:
AddItems(keys, 10);
if(keys.Count < 200) {
AddItems(keys, 10);
}
break;
}
if(n >= 5000) {
Expand All @@ -223,8 +228,97 @@ public class TFirkinDictionary {
if(t >= 200000) {
break;
}
if(keys.Count < 50) {
AddItems(keys, 100);
}
}
_log.DebugFormat("total items {0} after {1} iterations with {2} left in queue", d.Count, t, keys.Count);
Console.WriteLine("total items {0} after {1} iterations with {2} left in queue", d.Count, t, keys.Count);
_log.Debug(capture.Substring(0, 10));
}
}

[Test, Explicit]
public void Memory_consumption_parallel_writes_and_merges() {
var r = new Random(1234);
var keys = new Queue<string>();
AddItems(keys, 200);
var baseline = GC.GetTotalMemory(true);
using(var d = new FirkinDictionary<string, string>(_path)) {
var t = 0;
var capture = "";
var done = false;
var n = 0;
var merger = new Thread(() => {
var m = 0;
while(!done) {
if(n >= 5000) {
m++;
var before = GC.GetTotalMemory(true);
Console.WriteLine(
"merge {0}, before {1:0.00}MB)",
m,
(before - baseline) / 1024 / 1024
);
var expiredKeys = (from entry in d
where entry.Value.Length != 0 && r.Next(4) == 1
select entry.Key).ToArray();
foreach(var key in expiredKeys) {
d.Remove(key);
}
var during = GC.GetTotalMemory(true);
Console.WriteLine(
"merge {0}, during {1:0.00}MB)",
m,
(during - baseline) / 1024 / 1024
);
d.Merge();
var after = GC.GetTotalMemory(true);
var c = d.Count;
Console.WriteLine(
"merge {0}, iteration {1}, items: {2}, after {3:0.00}MB, storage {4:0.00}bytes/item)",
m,
t,
c,
(after - baseline) / 1024 / 1024,
(after - baseline) / c
);
n = 0;
}
}
}) { IsBackground = true };
merger.Start();
while(keys.Any()) {
n++;
t++;
var key = keys.Dequeue();
var v = TestUtil.GetRandomString(r);
if(d.ContainsKey(key)) {
var x = d[key];
capture = ".." + x;
}
d[key] = v;
switch(r.Next(10)) {
case 1:
keys.Enqueue(key);
break;
case 4:
if(keys.Count < 200) {
AddItems(keys, 10);
}
break;
}
if(t >= 1000000) {
break;
}
if(keys.Count < 50) {
AddItems(keys, 100);
}
}
done = true;
merger.Join();
Console.WriteLine("total items {0} after {1} iterations with {2} left in queue", d.Count, t, keys.Count);
_log.Debug(capture.Substring(0, 10));
}
}

Expand Down
112 changes: 112 additions & 0 deletions Firkin.Test/TFirkinHash.cs
Expand Up @@ -19,6 +19,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using log4net;
using NUnit.Framework;
using Droog.Firkin.Util;
Expand Down Expand Up @@ -422,6 +423,117 @@ public class TFirkinHash {
}
}

[Test]
public void Concurrent_read_write_delete_consistency_with_multiple_merges() {
var r = new Random(1234);
var id = 0;
var keys = new Queue<string>();
AddKeys(keys, 200, ref id);
var mergeCounter = 0;
var merges = 0;
_hash = new FirkinHash<string>(_path, 100 * 2048);
var dictionary = new Dictionary<string, byte[]>();
var modified = new HashSet<string>();
var workers = new List<Thread>();
var faults = new List<Exception>();
var iterations = 0;
var maxIterations = 10000;
for(var i = 0; i < 10; i++) {
var workerId = i;
var worker = new Thread(() => {
try {
_log.DebugFormat("worker {0} started", workerId);
while(iterations < maxIterations) {
Interlocked.Increment(ref iterations);
Interlocked.Increment(ref mergeCounter);
string k;
lock(keys) {
if(keys.Count < 10) {
AddKeys(keys, 100, ref id);
}
k = keys.Dequeue();
}
var entry = _hash.Get(k);
var v = TestUtil.GetRandomBytes(r);
if(entry != null) {
lock(keys) {
if(modified.Contains(k)) {
continue;
}
modified.Add(k);
}
var v2 = entry.ReadBytes();
if(r.Next(4) == 3) {
lock(dictionary) {
dictionary.Remove(k);
}
_hash.Delete(k);
} else {
lock(dictionary) {
dictionary[k] = v;
}
_hash.Put(k, v.ToStream(), v.Length);
}
} else {
lock(dictionary) {
dictionary[k] = v;
}
_hash.Put(k, v.ToStream(), v.Length);
}
lock(keys) {
if(!modified.Contains(k) && r.Next(3) == 1) {
keys.Enqueue(k);
}
}
Thread.Sleep(10);
}
_log.DebugFormat("worker {0} finished", workerId);
} catch(Exception e) {
faults.Add(e);
}
}) { IsBackground = true };
worker.Start();
workers.Add(worker);
}
var start = DateTime.UtcNow;
while(iterations < maxIterations) {
if(DateTime.UtcNow > start.AddSeconds(30)) {
throw new TimeoutException(string.Format("didn't finish, merges: {0}, items {1}, existing modified: {2}", merges, _hash.Count, modified.Count));
}
if(faults.Any()) {
throw faults.First();
}
if(mergeCounter >= 2000) {
merges++;
_hash.Merge();
mergeCounter = 0;
_log.DebugFormat("merge {0} completed", merges);
}
}
foreach(var worker in workers) {
worker.Join();
}
var files = 0;
foreach(var file in Directory.GetFiles(_path)) {
_log.DebugFormat(Path.GetFileName(file));
if(Path.GetExtension(file) == ".data") {
files++;
}
}
_log.DebugFormat("merges: {0}, items {1}, existing modified: {2}, files: {3}", merges, _hash.Count, modified.Count, files);
Assert.AreEqual(dictionary.Count, _hash.Count);
foreach(var pair in dictionary) {
Assert.AreEqual(0, pair.Value.Compare(_hash.Get(pair.Key).ReadBytes()));
}
}

private void AddKeys(Queue<string> keys, int n, ref int id) {
for(var i = 0; i < n; i++) {
id++;
keys.Enqueue("key_" + id);
}
}

[Test]
public void Can_truncate_hash() {
var r = new Random(1234);
Expand Down
27 changes: 27 additions & 0 deletions Firkin/CorruptDictionaryException.cs
@@ -0,0 +1,27 @@
/*
* Firkin
* Copyright (C) 2010 Arne F. Claassen
* http://www.claassen.net/geek/blog geekblog [at] claassen [dot] net
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;

namespace Droog.Firkin {
public class CorruptDictionaryException : Exception {
public CorruptDictionaryException(string error)
: base(error) {
throw new NotImplementedException();
}
}
}
11 changes: 8 additions & 3 deletions Firkin/Data/LazyFirkinCollection.cs
Expand Up @@ -24,9 +24,9 @@ namespace Droog.Firkin.Data {
public class LazyFirkinCollection<TKey, TValue> : ICollection<TValue> {
private readonly ICollection<TKey> _keys;
private readonly Func<TKey, FirkinStream> _getStream;
private readonly Func<FirkinStream, TValue> _getValue;
private readonly TryDeserializeDelegate<TKey, TValue> _getValue;

public LazyFirkinCollection(ICollection<TKey> keys, Func<TKey, FirkinStream> getStream, Func<FirkinStream, TValue> getValue) {
public LazyFirkinCollection(ICollection<TKey> keys, Func<TKey, FirkinStream> getStream, TryDeserializeDelegate<TKey, TValue> getValue) {
_keys = keys;
_getStream = getStream;
_getValue = getValue;
Expand All @@ -38,7 +38,10 @@ public class LazyFirkinCollection<TKey, TValue> : ICollection<TValue> {
if(s == null) {
continue;
}
yield return _getValue(s);
TValue value;
if(_getValue(key, s, out value)) {
yield return value;
}
}
}

Expand Down Expand Up @@ -67,4 +70,6 @@ public class LazyFirkinCollection<TKey, TValue> : ICollection<TValue> {

public bool IsReadOnly { get { return true; } }
}

public delegate bool TryDeserializeDelegate<TKey, TValue>(TKey key, FirkinStream stream, out TValue value);
}
1 change: 1 addition & 0 deletions Firkin/Firkin.csproj
Expand Up @@ -49,6 +49,7 @@
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="CorruptDictionaryException.cs" />
<Compile Include="Data\HintRecord.cs" />
<Compile Include="FirkinHashChange.cs" />
<Compile Include="FirkinHashChangeAction.cs" />
Expand Down

0 comments on commit 040b714

Please sign in to comment.