diff --git a/pvc.nuspec b/pvc.nuspec index 4fe97fd..1683d39 100644 --- a/pvc.nuspec +++ b/pvc.nuspec @@ -12,5 +12,8 @@ https://github.com/danielcrenna/pvc/blob/master/LICENSE http://apitize.com.s3.amazonaws.com/logo-pvc.png pipes distributed producer consumer + + + \ No newline at end of file diff --git a/src/packages/repositories.config b/src/packages/repositories.config index ef7cbda..c4fcab1 100644 --- a/src/packages/repositories.config +++ b/src/packages/repositories.config @@ -4,4 +4,5 @@ + \ No newline at end of file diff --git a/src/pvc.Adapters.MSMQ/Properties/AssemblyInfo.cs b/src/pvc.Adapters.MSMQ/Properties/AssemblyInfo.cs index 9727a79..17a8d5e 100644 --- a/src/pvc.Adapters.MSMQ/Properties/AssemblyInfo.cs +++ b/src/pvc.Adapters.MSMQ/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following @@ -8,9 +7,9 @@ [assembly: AssemblyTitle("pvc.Adapters.MSMQ")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("CDIC")] +[assembly: AssemblyCompany("")] [assembly: AssemblyProduct("pvc.Adapters.MSMQ")] -[assembly: AssemblyCopyright("Copyright © CDIC 2011")] +[assembly: AssemblyCopyright("")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] diff --git a/src/pvc.Adapters.RabbitMQ/Properties/AssemblyInfo.cs b/src/pvc.Adapters.RabbitMQ/Properties/AssemblyInfo.cs index f9790ff..95fb587 100644 --- a/src/pvc.Adapters.RabbitMQ/Properties/AssemblyInfo.cs +++ b/src/pvc.Adapters.RabbitMQ/Properties/AssemblyInfo.cs @@ -1,5 +1,4 @@ using System.Reflection; -using System.Runtime.CompilerServices; using System.Runtime.InteropServices; // General Information about an assembly is controlled through the following @@ -8,9 +7,9 @@ [assembly: AssemblyTitle("pvc.Adapters.RabbitMQ")] [assembly: AssemblyDescription("")] [assembly: AssemblyConfiguration("")] -[assembly: AssemblyCompany("Microsoft")] +[assembly: AssemblyCompany("")] [assembly: AssemblyProduct("pvc.Adapters.RabbitMQ")] -[assembly: AssemblyCopyright("Copyright © Microsoft 2010")] +[assembly: AssemblyCopyright("")] [assembly: AssemblyTrademark("")] [assembly: AssemblyCulture("")] diff --git a/src/pvc.Adapters.TransactionFile/Checksums/FileChecksum.cs b/src/pvc.Adapters.TransactionFile/Checksums/FileChecksum.cs new file mode 100644 index 0000000..900f03c --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Checksums/FileChecksum.cs @@ -0,0 +1,59 @@ +using System; +using System.IO; + +namespace pvc.Adapters.TransactionFile.Checksums +{ + public class FileChecksum : IChecksum + { + private readonly string _filename; + private readonly StreamReader _reader; + private readonly StreamWriter _writer; + private readonly FileStream _fileStream; + + public FileChecksum(string filename) + { + _filename = filename; + var newFile = false; + if (File.Exists(filename)) + { + _fileStream = File.Open(filename, FileMode.Open, FileAccess.ReadWrite, FileShare.ReadWrite); + } + else + { + _fileStream = File.Open(filename, FileMode.Create, FileAccess.ReadWrite, FileShare.ReadWrite); + newFile = true; + } + _reader = new StreamReader(_fileStream); + _writer = new StreamWriter(_fileStream); + if (newFile || _fileStream.Length == 0) + { + Reset(); + } + } + + #region IChecksum Members + + public void SetValue(long value) + { + _fileStream.Seek(0, SeekOrigin.Begin); + _writer.Write(value.ToString()); + _writer.Flush(); + } + + public long GetValue() + { + _fileStream.Seek(0, SeekOrigin.Begin); + var val = _reader.ReadToEnd(); + return Convert.ToInt64(val); + } + + public void Reset() + { + SetValue(0); + } + + public string Name { get { return _filename; } } + + #endregion + } +} diff --git a/src/pvc.Adapters.TransactionFile/IBlockingQueue.cs b/src/pvc.Adapters.TransactionFile/IBlockingQueue.cs new file mode 100644 index 0000000..efce0dc --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/IBlockingQueue.cs @@ -0,0 +1,26 @@ +namespace pvc.Adapters.TransactionFile +{ + /// + /// Interface to represent a blocking queue. The Dequeue method will block until an item is available + /// + /// The type of objects to be placed in the queue + public interface IBlockingQueue + { + string Name { get; } + + bool RecordAvailable + { + get; + } + + void Enqueue(T data); + + T Dequeue(); + + int Count + { + get; + } + } + +} diff --git a/src/pvc.Adapters.TransactionFile/IChecksum.cs b/src/pvc.Adapters.TransactionFile/IChecksum.cs new file mode 100644 index 0000000..a019137 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/IChecksum.cs @@ -0,0 +1,13 @@ +namespace pvc.Adapters.TransactionFile +{ + public interface IChecksum + { + void SetValue(long value); + + long GetValue(); + + void Reset(); + + string Name { get; } + } +} diff --git a/src/pvc.Adapters.TransactionFile/Processing/HashedProcessorFactory.cs b/src/pvc.Adapters.TransactionFile/Processing/HashedProcessorFactory.cs new file mode 100644 index 0000000..bae1d16 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Processing/HashedProcessorFactory.cs @@ -0,0 +1,46 @@ +using System; +using System.Collections.Generic; + +namespace pvc.Adapters.TransactionFile.Processing +{ + public class HashedCommandProcessorFactory : ICommandProcessorFactory + { + private readonly Dictionary>> _table; + + public void AddProcessorToType(Type type, ICommandProcessor processor) + { + if (type == null) + { + throw new ArgumentNullException("type"); + } + if (processor == null) + { + throw new ArgumentNullException("processor"); + } + IList> tmp; + _table.TryGetValue(type, out tmp); + if (tmp == null) + { + tmp = new List>(); + _table.Add(type, tmp); + } + tmp.Add(processor); + } + + public HashedCommandProcessorFactory() + { + _table = new Dictionary>>(); + } + + #region ICommandProcessorFactory Members + + public IList> GetProcessorsForCommand(T command) + { + IList> ret; + _table.TryGetValue(command.GetType(), out ret); + return ret; + } + + #endregion + } +} diff --git a/src/pvc.Adapters.TransactionFile/Processing/ICommandProcessor.cs b/src/pvc.Adapters.TransactionFile/Processing/ICommandProcessor.cs new file mode 100644 index 0000000..d2d806a --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Processing/ICommandProcessor.cs @@ -0,0 +1,7 @@ +namespace pvc.Adapters.TransactionFile.Processing +{ + public interface ICommandProcessor + { + void ProcessCommand(T command); + } +} diff --git a/src/pvc.Adapters.TransactionFile/Processing/ICommandProcessorFactory.cs b/src/pvc.Adapters.TransactionFile/Processing/ICommandProcessorFactory.cs new file mode 100644 index 0000000..1e0bd92 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Processing/ICommandProcessorFactory.cs @@ -0,0 +1,9 @@ +using System.Collections.Generic; + +namespace pvc.Adapters.TransactionFile.Processing +{ + public interface ICommandProcessorFactory + { + IList> GetProcessorsForCommand(T command); + } +} diff --git a/src/pvc.Adapters.TransactionFile/Processing/Instrumentation/IQueueProcessorInstrumentation.cs b/src/pvc.Adapters.TransactionFile/Processing/Instrumentation/IQueueProcessorInstrumentation.cs new file mode 100644 index 0000000..7d81ae4 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Processing/Instrumentation/IQueueProcessorInstrumentation.cs @@ -0,0 +1,7 @@ +namespace pvc.Adapters.TransactionFile.Processing.Instrumentation +{ + public interface IQueueProcessorInstrumentation + { + void IncrementMessage(); + } +} diff --git a/src/pvc.Adapters.TransactionFile/Processing/Instrumentation/WMIQueueProcessorInstrumentation.cs b/src/pvc.Adapters.TransactionFile/Processing/Instrumentation/WMIQueueProcessorInstrumentation.cs new file mode 100644 index 0000000..a63a651 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Processing/Instrumentation/WMIQueueProcessorInstrumentation.cs @@ -0,0 +1,80 @@ +using System; +using System.Diagnostics; + +namespace pvc.Adapters.TransactionFile.Processing.Instrumentation +{ + public class WMIQueueProcessorInstrumentation : IQueueProcessorInstrumentation + { + PerformanceCounter _totalMessages; + PerformanceCounter _messagesPerSecond; + + public void IncrementMessage() + { + _totalMessages.Increment(); + _messagesPerSecond.Increment(); + } + + public int TotalMessages + { + get + { + return Convert.ToInt32(_totalMessages.NextValue()); + } + } + + public decimal TotalMessagesPerSecond + { + get + { + return Convert.ToDecimal(_messagesPerSecond.NextValue()); + } + } + + private void InitCounters(string groupName) + { + _totalMessages = new PerformanceCounter(); + _totalMessages.CategoryName = groupName; + _totalMessages.CounterName = "Messages Read"; + _totalMessages.MachineName = "."; + _totalMessages.ReadOnly = false; + _totalMessages.RawValue = 0; + + _messagesPerSecond = new PerformanceCounter(); + _messagesPerSecond.CategoryName = groupName; + _messagesPerSecond.CounterName = "Messages Read / Sec"; + _messagesPerSecond.MachineName = "."; + _messagesPerSecond.ReadOnly = false; + } + + private static void CreateCounters(string groupName) + { + if (PerformanceCounterCategory.Exists(groupName)) + { + PerformanceCounterCategory.Delete(groupName); + } + + var counters = new CounterCreationDataCollection(); + + var totalOps = new CounterCreationData(); + totalOps.CounterName = "Messages Read"; + totalOps.CounterHelp = "Total number of messages read"; + totalOps.CounterType = PerformanceCounterType.NumberOfItems32; + counters.Add(totalOps); + + var opsPerSecond = new CounterCreationData(); + opsPerSecond.CounterName = "Messages Read / Sec"; + opsPerSecond.CounterHelp = "Messages read per second"; + opsPerSecond.CounterType = PerformanceCounterType.RateOfCountsPerSecond32; + counters.Add(opsPerSecond); + + PerformanceCounterCategory.Create(groupName, "PVC", counters); + } + + public WMIQueueProcessorInstrumentation(string groupName) + { + CreateCounters(groupName); + InitCounters(groupName); + } + } + +} diff --git a/src/pvc.Adapters.TransactionFile/Processing/QueueProcessor.cs b/src/pvc.Adapters.TransactionFile/Processing/QueueProcessor.cs new file mode 100644 index 0000000..7024aee --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Processing/QueueProcessor.cs @@ -0,0 +1,100 @@ +using System; +using System.Threading; +using log4net; +using pvc.Adapters.TransactionFile.Processing.Instrumentation; + +namespace pvc.Adapters.TransactionFile.Processing +{ + /// + /// Handles processing of a + /// + /// The type of commands to process + public class QueueProcessor + { + private readonly ICommandProcessorFactory _factory; + private readonly IBlockingQueue _queue; + private readonly Thread _thread; + private volatile bool _stop; + private readonly int _sleep; + readonly IQueueProcessorInstrumentation _instrumentation; + private readonly ILog _logger; + + /// + /// Run in another thread to process the queue. + /// + private void ProcessQueue() + { + while (!_stop) + { + try + { + var obj = _queue.Dequeue(); + if (_instrumentation != null) + { + _instrumentation.IncrementMessage(); + } + + var processors = _factory.GetProcessorsForCommand(obj); + if (processors != null) + { + foreach (var processor in processors) + { + try + { + processor.ProcessCommand(obj); + } + catch (Exception ex) + { + _logger.Error(string.Format("[{0}] Error occurred processing command of type {1} failed in processor {2}", _queue.Name, obj.GetType(), processor.GetType()), ex); + Console.WriteLine("[{0}] Error occurred processing command of type {2} failed in processor {3} {1}", _queue.Name, ex, obj.GetType(), processor.GetType()); + } + } + } + Thread.Sleep(_sleep); //in debug on single proc machine disk based thread starves dispatch thread + } + catch (Exception ex) + { + _logger.Error(string.Format("Error occurred in processing of queue {0}", _queue.Name), ex); + Console.WriteLine("Error occurred in processing of queue {0} {1}", _queue.Name, ex); + } + } + } + + public void Stop() + { + _stop = true; + if (!_thread.Join(10000)) + { + _thread.Abort(); + } + } + + public QueueProcessor(IBlockingQueue queue, ICommandProcessorFactory factory) : this(queue, factory, null, 0) { } + + public QueueProcessor(IBlockingQueue queue, ICommandProcessorFactory factory, IQueueProcessorInstrumentation instrumentation, int sleep) + { + if (queue == null) + { + throw new ArgumentNullException("queue"); + } + + if (factory == null) + { + throw new ArgumentNullException(""); + } + + _sleep = sleep; + _instrumentation = instrumentation; + _factory = factory; + _queue = queue; + + _logger = LogManager.GetLogger(GetType()); + _logger.Info(string.Format("Creating QueueProcessor connecting to queue {0}", _queue.Name)); + + var ts = new ThreadStart(ProcessQueue); + _thread = new Thread(ts); + _thread.IsBackground = true; + _thread.Start(); + } + } +} diff --git a/src/pvc.Adapters.TransactionFile/Properties/AssemblyInfo.cs b/src/pvc.Adapters.TransactionFile/Properties/AssemblyInfo.cs new file mode 100644 index 0000000..ac2d841 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Properties/AssemblyInfo.cs @@ -0,0 +1,35 @@ +using System.Reflection; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("pvc.Adapters.TransactionFile")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("")] +[assembly: AssemblyProduct("pvc.Adapters.TransactionFile")] +[assembly: AssemblyCopyright("")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("cfc010ba-39e3-41cd-81d9-0dc85ff25c9e")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] diff --git a/src/pvc.Adapters.TransactionFile/Queues/InMemoryBlockingQueue.cs b/src/pvc.Adapters.TransactionFile/Queues/InMemoryBlockingQueue.cs new file mode 100644 index 0000000..0dba508 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Queues/InMemoryBlockingQueue.cs @@ -0,0 +1,74 @@ +using System; +using System.Collections.Generic; +using System.Threading; + +namespace pvc.Adapters.TransactionFile.Queues +{ + public class InMemoryBlockingQueue : IBlockingQueue + { + private readonly Queue _queue; + private readonly AutoResetEvent _dataAddedEvent; + + public InMemoryBlockingQueue(int initialSize, string name) + { + if (name == null) + { + throw new ArgumentNullException("name"); + } + Name = name; + _queue = new Queue(initialSize); + _dataAddedEvent = new AutoResetEvent(false); + } + + public int Count + { + get + { + lock (_queue) + { + return _queue.Count; + } + } + } + + public InMemoryBlockingQueue(string name) : this(8, name) { } + + #region IBlockingQueue Members + + public string Name { get; private set; } + + public void Enqueue(T data) + { + lock (_queue) + { + _queue.Enqueue(data); + _dataAddedEvent.Set(); + } + } + + public bool RecordAvailable + { + get { return _queue.Count > 0; } + } + + public T Dequeue() + { + while (true) + { + if (_queue.Count == 0) + { + _dataAddedEvent.WaitOne(100000, true); + } + lock (_queue) + { + if (_queue.Count > 0) + { + return _queue.Dequeue(); + } + } + } + } + + #endregion + } +} diff --git a/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/ChecksummedTransactionFileReader.cs b/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/ChecksummedTransactionFileReader.cs new file mode 100644 index 0000000..b8d4160 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/ChecksummedTransactionFileReader.cs @@ -0,0 +1,72 @@ +using System; +using System.IO; +using System.Runtime.Serialization; +using System.Threading; +using pvc.Adapters.TransactionFile.Checksums; + +namespace pvc.Adapters.TransactionFile.Queues.TransactionFile +{ + /// + /// Reads transactions from the transaction file updating a persistent checksum to allow for recovery + /// from various problems such as application shutdown. + /// + /// The type of the objects which are to be read from the transaction file. + /// Usually this will be a shared base class or interface but for untyped access could just be object + class CheckSummedTransactionFileReader : TransactionFileReader + { + private readonly FileChecksum _checksum; + private readonly FileChecksum _writeChecksum; + + /// + /// Reads the next transaction from the transaction file and updates the checksum of the last known + /// position. + /// + /// + /// TODO: This method needs to be changed to support calling back and awaiting completion from a caller + /// before checksumming. Without this there are two possible issues which can come up. If the checksum is done + /// immediately after the read, it is possible that the application shuts down while the transaction + /// is being processed in which case you may miss a transaction on a shutdown. In the case of doing it + /// beforehand you may do a transaction twice. + /// + /// + public override T Dequeue() + { + while (true) + { + if (_writeChecksum.GetValue() > _fileStream.Position) + { + var tmp = (T)_formatter.Deserialize(_fileStream); + _checksum.SetValue(_fileStream.Position); + return tmp; + } + Thread.Sleep(1); + } + } + + public CheckSummedTransactionFileReader(string filename, string checksumName, IFormatter formatter) : base(filename, formatter) + { + if (checksumName == null) + { + throw new ArgumentNullException("ChecksumName"); + } + var fi = new FileInfo(filename); + var checksumfile = string.Format("{0}\\{1}.chk", fi.DirectoryName, checksumName); + var writeCheksumFilename = string.Format("{0}\\{1}.chk", fi.DirectoryName, fi.Name); + + if (_logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Using checksum file: {0}", checksumfile)); + } + + _checksum = new FileChecksum(checksumfile); + _writeChecksum = new FileChecksum(writeCheksumFilename); + + if (_logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Setting initial position to: {0}", _checksum.GetValue())); + } + + _fileStream.Seek(_checksum.GetValue(), SeekOrigin.Begin); + } + } +} diff --git a/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/TransactionFileReader.cs b/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/TransactionFileReader.cs new file mode 100644 index 0000000..91caaed --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/TransactionFileReader.cs @@ -0,0 +1,60 @@ +using System; +using System.IO; +using System.Runtime.Serialization; +using System.Threading; +using log4net; + +namespace pvc.Adapters.TransactionFile.Queues.TransactionFile +{ + /// + /// Reads transactions from the transaction file + /// + /// The type of transactions to read, generally this will be a shared base class or + /// interface but could also be 'object' in order to allow untyped access + class TransactionFileReader + { + protected FileStream _fileStream; + protected IFormatter _formatter; + protected ILog _logger; + + /// + /// Reads the next item off of the stream, blocking if one is not available. + /// + /// + public virtual T Dequeue() + { + while (true) + { + if (_fileStream.Position != _fileStream.Length) + { + var o = _formatter.Deserialize(_fileStream); + return (T)o; + } + Thread.Sleep(1); + } + } + + public TransactionFileReader(string filename, IFormatter formatter) + { + if (filename == null) + { + throw new ArgumentNullException("filename"); + } + if (formatter == null) + { + throw new ArgumentNullException("formatter"); + } + + _logger = LogManager.GetLogger(GetType()); + _formatter = formatter; + var fi = new FileInfo(filename); + + if (_logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Opening Transaction File: {0}", filename)); + } + + _fileStream = fi.Open(FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite); + } + } +} diff --git a/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/TransactionFileWriter.cs b/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/TransactionFileWriter.cs new file mode 100644 index 0000000..c18fcb2 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Queues/TransactionFile/TransactionFileWriter.cs @@ -0,0 +1,100 @@ +using System; +using System.IO; +using System.Runtime.Serialization; +using System.Threading; +using log4net; +using pvc.Adapters.TransactionFile.Checksums; + +namespace pvc.Adapters.TransactionFile.Queues.TransactionFile +{ + /// + /// Handles writing to the transaction file + /// + /// The type of transactions to read, generally this will be a shared base class or + /// interface but could also be 'object' in order to allow untyped access + public class TransactionFileWriter + { + private readonly IChecksum _writeChecksum; + private readonly IFormatter _formatter; + private readonly FileStream _fileStream; + private readonly Mutex _fileProtectMutex; + private readonly ILog _logger; + + public TransactionFileWriter(string filename, IFormatter formatter): this(filename, true, formatter) + { + + } + + public TransactionFileWriter(string filename, bool useMutex, IFormatter formatter) + { + if (filename == null) + { + throw new ArgumentNullException("filename"); + } + if (formatter == null) + { + throw new ArgumentNullException("Formatter"); + } + + _logger = LogManager.GetLogger(GetType()); + var fi = new FileInfo(filename); + if (_logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Opening file {0}", filename)); + } + + _fileStream = File.Open(filename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.ReadWrite); + + var filePath = string.Format("{0}\\{1}.chk", fi.DirectoryName, fi.Name); + if (_logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Opening Checksum: {0}", filename)); + } + + _writeChecksum = new FileChecksum(filePath); + _formatter = formatter; + if (useMutex) + { + bool creatednew; + _fileProtectMutex = new Mutex(false, fi.Name, out creatednew); + if (creatednew) + { + if (_logger.IsDebugEnabled) + { + _logger.Debug(string.Format("Created new mutex of name {0}", fi.Name)); + } + } + } + else + { + _fileProtectMutex = null; + } + } + + public void Enqueue(T value) + { + if (_fileProtectMutex != null) + { + var obtained = _fileProtectMutex.WaitOne(5000, false); + if (!obtained) + { + throw new Exception("Unable to acquire mutex"); + } + } + try + { + _fileStream.Seek(_writeChecksum.GetValue(), SeekOrigin.Begin); + _formatter.Serialize(_fileStream, value); + _fileStream.Flush(); + _writeChecksum.SetValue(_fileStream.Position); + } + finally + { + if (_fileProtectMutex != null) + { + _fileProtectMutex.ReleaseMutex(); + } + } + } + } +} diff --git a/src/pvc.Adapters.TransactionFile/Queues/TransactionFileBlockingQueue.cs b/src/pvc.Adapters.TransactionFile/Queues/TransactionFileBlockingQueue.cs new file mode 100644 index 0000000..42943e3 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/Queues/TransactionFileBlockingQueue.cs @@ -0,0 +1,69 @@ +using System; +using System.Runtime.Serialization; +using System.Runtime.Serialization.Formatters.Binary; +using pvc.Adapters.TransactionFile.Queues.TransactionFile; + +namespace pvc.Adapters.TransactionFile.Queues +{ + /// + /// An implementation of that uses a transaction + /// file for persistence and allows inter-process communication (IPC) + /// + /// + /// - Many processes can read and write to this blocking queue. All processes will see all writes. + /// + public class TransactionFileBlockingQueue : IBlockingQueue + { + private readonly TransactionFileWriter _writer; + private readonly TransactionFileReader _reader; + private readonly string _name; + + public bool RecordAvailable + { + get { return true; } // TODO: make look at checksums + } + + public TransactionFileBlockingQueue(string transactionFilename, string checkSumName, IFormatter formatter) + { + if(transactionFilename == null) + { + throw new ArgumentNullException("TransactionFileName"); + } + + if(formatter == null) + { + formatter = new BinaryFormatter(); + } + + _name = transactionFilename; + _writer = new TransactionFileWriter(transactionFilename, formatter); + + _reader = checkSumName != null + ? new CheckSummedTransactionFileReader(transactionFilename, checkSumName, formatter) + : new TransactionFileReader(transactionFilename, formatter); + } + + public TransactionFileBlockingQueue(string transactionFilename) : this(transactionFilename, null, null) { } + + #region IBlockingQueue Members + + public string Name + { + get { return _name; } + } + + public void Enqueue(T data) + { + _writer.Enqueue(data); + } + + public T Dequeue() + { + return _reader.Dequeue(); + } + + public int Count { get; private set; } + + #endregion + } +} diff --git a/src/pvc.Adapters.TransactionFile/TransactionFileConsumer.cs b/src/pvc.Adapters.TransactionFile/TransactionFileConsumer.cs new file mode 100644 index 0000000..8e275d6 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/TransactionFileConsumer.cs @@ -0,0 +1,21 @@ +using System.Runtime.Serialization.Formatters.Binary; +using pvc.Adapters.TransactionFile.Queues.TransactionFile; +using pvc.Core; + +namespace pvc.Adapters.TransactionFile +{ + public class TransactionFileConsumer : Consumes where T : Message + { + private readonly TransactionFileWriter _writer; + + public TransactionFileConsumer(string filename) + { + _writer = new TransactionFileWriter(filename, true, new BinaryFormatter()); + } + + public void Handle(T message) + { + _writer.Enqueue(message); + } + } +} diff --git a/src/pvc.Adapters.TransactionFile/TransactionFileProducer.cs b/src/pvc.Adapters.TransactionFile/TransactionFileProducer.cs new file mode 100644 index 0000000..579fd08 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/TransactionFileProducer.cs @@ -0,0 +1,62 @@ +using System.Collections.Generic; +using System.Runtime.Serialization.Formatters.Binary; +using pvc.Adapters.TransactionFile.Processing; +using pvc.Adapters.TransactionFile.Processing.Instrumentation; +using pvc.Adapters.TransactionFile.Queues; +using pvc.Core; + +namespace pvc.Adapters.TransactionFile +{ + public class TransactionFileProducer : Produces where T:Message + { + readonly TransactionFileBlockingQueue _reader; + private QueueProcessor _processor; + private Consumes _consumer; + + public TransactionFileProducer(string filename, string checksumFilename) + { + _reader = new TransactionFileBlockingQueue(filename, checksumFilename, new BinaryFormatter()); + _processor = new QueueProcessor(_reader, new AlwaysSendToMeFactory(new SendToMeCommandProcessor(this)), new WMIQueueProcessorInstrumentation(checksumFilename), 0); + } + + internal class AlwaysSendToMeFactory : ICommandProcessorFactory + { + private readonly List> _toSend = new List>(); + + public AlwaysSendToMeFactory(ICommandProcessor toSend) + { + _toSend.Add(toSend); + } + + public IList> GetProcessorsForCommand(T command) + { + return _toSend; + } + } + + internal class SendToMeCommandProcessor : ICommandProcessor + { + private readonly TransactionFileProducer _parent; + + public SendToMeCommandProcessor(TransactionFileProducer parent) + { + _parent = parent; + } + + public void ProcessCommand(T command) + { + _parent.Send(command); + } + } + + private void Send(T message) + { + _consumer.TryConsume(message); + } + + public void AttachConsumer(Consumes consumer) + { + _consumer = consumer; + } + } +} diff --git a/src/pvc.Adapters.TransactionFile/packages.config b/src/pvc.Adapters.TransactionFile/packages.config new file mode 100644 index 0000000..84d55fe --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/packages.config @@ -0,0 +1,5 @@ + + + + + \ No newline at end of file diff --git a/src/pvc.Adapters.TransactionFile/pvc.Adapters.TransactionFile.csproj b/src/pvc.Adapters.TransactionFile/pvc.Adapters.TransactionFile.csproj new file mode 100644 index 0000000..8d53746 --- /dev/null +++ b/src/pvc.Adapters.TransactionFile/pvc.Adapters.TransactionFile.csproj @@ -0,0 +1,80 @@ + + + + Debug + AnyCPU + 8.0.30703 + 2.0 + {FB05542F-9C7D-4360-9A6E-2A8E1630666F} + Library + Properties + pvc.Adapters.TransactionFile + pvc.Adapters.TransactionFile + v3.5 + 512 + + + true + full + false + ..\..\bin\lib\net35\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + ..\..\bin\lib\net35\ + TRACE + prompt + 4 + + + + ..\packages\log4net.1.2.10\lib\2.0\log4net.dll + + + ..\packages\pvc.0.0.1\lib\net35\pvc.Core.dll + + + ..\packages\pvc.0.0.1\lib\net35\pvc.Projections.dll + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/pvc.Core/pvc.Core.csproj b/src/pvc.Core/pvc.Core.csproj index ebc9023..94e5a1b 100644 --- a/src/pvc.Core/pvc.Core.csproj +++ b/src/pvc.Core/pvc.Core.csproj @@ -70,7 +70,6 @@ -