Skip to content

Commit

Permalink
Added transaction file adapter; added log4net, reimplementing any com…
Browse files Browse the repository at this point in the history
…mented out comments
  • Loading branch information
danielcrenna committed Aug 29, 2011
1 parent 14fdd64 commit b4c67c8
Show file tree
Hide file tree
Showing 26 changed files with 940 additions and 22 deletions.
3 changes: 3 additions & 0 deletions pvc.nuspec
Expand Up @@ -12,5 +12,8 @@
<licenseUrl>https://github.com/danielcrenna/pvc/blob/master/LICENSE</licenseUrl>
<iconUrl>http://apitize.com.s3.amazonaws.com/logo-pvc.png</iconUrl>
<tags>pipes distributed producer consumer</tags>
<dependencies>
<dependency id="log4net" version="1.2.10" />
</dependencies>
</metadata>
</package>
1 change: 1 addition & 0 deletions src/packages/repositories.config
Expand Up @@ -4,4 +4,5 @@
<repository path="..\pvc.Tests\packages.config" />
<repository path="..\pvc.Core\packages.config" />
<repository path="..\pvc.Adapters.MSMQ\packages.config" />
<repository path="..\pvc.Adapters.TransactionFile\packages.config" />
</repositories>
5 changes: 2 additions & 3 deletions src/pvc.Adapters.MSMQ/Properties/AssemblyInfo.cs
@@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
Expand All @@ -8,9 +7,9 @@
[assembly: AssemblyTitle("pvc.Adapters.MSMQ")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("CDIC")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("pvc.Adapters.MSMQ")]
[assembly: AssemblyCopyright("Copyright © CDIC 2011")]
[assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

Expand Down
5 changes: 2 additions & 3 deletions src/pvc.Adapters.RabbitMQ/Properties/AssemblyInfo.cs
@@ -1,5 +1,4 @@
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
Expand All @@ -8,9 +7,9 @@
[assembly: AssemblyTitle("pvc.Adapters.RabbitMQ")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("Microsoft")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("pvc.Adapters.RabbitMQ")]
[assembly: AssemblyCopyright("Copyright © Microsoft 2010")]
[assembly: AssemblyCopyright("")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

Expand Down
59 changes: 59 additions & 0 deletions src/pvc.Adapters.TransactionFile/Checksums/FileChecksum.cs
@@ -0,0 +1,59 @@
using System;
using System.IO;

namespace pvc.Adapters.TransactionFile.Checksums
{
public class FileChecksum : IChecksum
{
private readonly string _filename;
private readonly StreamReader _reader;
private readonly StreamWriter _writer;
private readonly FileStream _fileStream;

public FileChecksum(string filename)
{
_filename = filename;
var newFile = false;
if (File.Exists(filename))
{
_fileStream = File.Open(filename, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite);
}
else
{
_fileStream = File.Open(filename, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite);
newFile = true;
}
_reader = new StreamReader(_fileStream);
_writer = new StreamWriter(_fileStream);
if (newFile || _fileStream.Length == 0)
{
Reset();
}
}

#region IChecksum Members

public void SetValue(long value)
{
_fileStream.Seek(0, SeekOrigin.Begin);
_writer.Write(value.ToString());
_writer.Flush();
}

public long GetValue()
{
_fileStream.Seek(0, SeekOrigin.Begin);
var val = _reader.ReadToEnd();
return Convert.ToInt64(val);
}

public void Reset()
{
SetValue(0);
}

public string Name { get { return _filename; } }

#endregion
}
}
26 changes: 26 additions & 0 deletions src/pvc.Adapters.TransactionFile/IBlockingQueue.cs
@@ -0,0 +1,26 @@
namespace pvc.Adapters.TransactionFile
{
/// <summary>
/// Interface to represent a blocking queue. The Dequeue method will block until an item is available
/// </summary>
/// <typeparam name="T">The type of objects to be placed in the queue</typeparam>
public interface IBlockingQueue<T>
{
string Name { get; }

bool RecordAvailable
{
get;
}

void Enqueue(T data);

T Dequeue();

int Count
{
get;
}
}

}
13 changes: 13 additions & 0 deletions src/pvc.Adapters.TransactionFile/IChecksum.cs
@@ -0,0 +1,13 @@
namespace pvc.Adapters.TransactionFile
{
public interface IChecksum
{
void SetValue(long value);

long GetValue();

void Reset();

string Name { get; }
}
}
@@ -0,0 +1,46 @@
using System;
using System.Collections.Generic;

namespace pvc.Adapters.TransactionFile.Processing
{
public class HashedCommandProcessorFactory<T> : ICommandProcessorFactory<T>
{
private readonly Dictionary<Type, IList<ICommandProcessor<T>>> _table;

public void AddProcessorToType(Type type, ICommandProcessor<T> processor)
{
if (type == null)
{
throw new ArgumentNullException("type");
}
if (processor == null)
{
throw new ArgumentNullException("processor");
}
IList<ICommandProcessor<T>> tmp;
_table.TryGetValue(type, out tmp);
if (tmp == null)
{
tmp = new List<ICommandProcessor<T>>();
_table.Add(type, tmp);
}
tmp.Add(processor);
}

public HashedCommandProcessorFactory()
{
_table = new Dictionary<Type, IList<ICommandProcessor<T>>>();
}

#region ICommandProcessorFactory<T> Members

public IList<ICommandProcessor<T>> GetProcessorsForCommand(T command)
{
IList<ICommandProcessor<T>> ret;
_table.TryGetValue(command.GetType(), out ret);
return ret;
}

#endregion
}
}
@@ -0,0 +1,7 @@
namespace pvc.Adapters.TransactionFile.Processing
{
public interface ICommandProcessor<in T>
{
void ProcessCommand(T command);
}
}
@@ -0,0 +1,9 @@
using System.Collections.Generic;

namespace pvc.Adapters.TransactionFile.Processing
{
public interface ICommandProcessorFactory<T>
{
IList<ICommandProcessor<T>> GetProcessorsForCommand(T command);
}
}
@@ -0,0 +1,7 @@
namespace pvc.Adapters.TransactionFile.Processing.Instrumentation
{
public interface IQueueProcessorInstrumentation
{
void IncrementMessage();
}
}
@@ -0,0 +1,80 @@
using System;
using System.Diagnostics;

namespace pvc.Adapters.TransactionFile.Processing.Instrumentation
{
public class WMIQueueProcessorInstrumentation : IQueueProcessorInstrumentation
{
PerformanceCounter _totalMessages;
PerformanceCounter _messagesPerSecond;

public void IncrementMessage()
{
_totalMessages.Increment();
_messagesPerSecond.Increment();
}

public int TotalMessages
{
get
{
return Convert.ToInt32(_totalMessages.NextValue());
}
}

public decimal TotalMessagesPerSecond
{
get
{
return Convert.ToDecimal(_messagesPerSecond.NextValue());
}
}

private void InitCounters(string groupName)
{
_totalMessages = new PerformanceCounter();
_totalMessages.CategoryName = groupName;
_totalMessages.CounterName = "Messages Read";
_totalMessages.MachineName = ".";
_totalMessages.ReadOnly = false;
_totalMessages.RawValue = 0;

_messagesPerSecond = new PerformanceCounter();
_messagesPerSecond.CategoryName = groupName;
_messagesPerSecond.CounterName = "Messages Read / Sec";
_messagesPerSecond.MachineName = ".";
_messagesPerSecond.ReadOnly = false;
}

private static void CreateCounters(string groupName)
{
if (PerformanceCounterCategory.Exists(groupName))
{
PerformanceCounterCategory.Delete(groupName);
}

var counters = new CounterCreationDataCollection();

var totalOps = new CounterCreationData();
totalOps.CounterName = "Messages Read";
totalOps.CounterHelp = "Total number of messages read";
totalOps.CounterType = PerformanceCounterType.NumberOfItems32;
counters.Add(totalOps);

var opsPerSecond = new CounterCreationData();
opsPerSecond.CounterName = "Messages Read / Sec";
opsPerSecond.CounterHelp = "Messages read per second";
opsPerSecond.CounterType = PerformanceCounterType.RateOfCountsPerSecond32;
counters.Add(opsPerSecond);

PerformanceCounterCategory.Create(groupName, "PVC", counters);
}

public WMIQueueProcessorInstrumentation(string groupName)
{
CreateCounters(groupName);
InitCounters(groupName);
}
}

}

0 comments on commit b4c67c8

Please sign in to comment.