diff --git a/.gitignore b/.gitignore index 3df684f..2d757ac 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,8 @@ build_artifacts/* build_output/* +BenchmarkDotNet.Artifacts + .fake/**/* **/*.suo diff --git a/build.fsx b/build.fsx index d643212..6eee313 100644 --- a/build.fsx +++ b/build.fsx @@ -7,10 +7,9 @@ open Fake.SemVerHelper let buildArtifactPath = FullName "./build_artifacts" let packagesPath = FullName "./src/packages" -let keyFile = FullName "./GreenPipes.snk" -let assemblyVersion = "2.0.0.0" -let baseVersion = "2.0.0" +let assemblyVersion = "2.1.0.0" +let baseVersion = "2.1.0" let envVersion = (environVarOrDefault "APPVEYOR_BUILD_VERSION" (baseVersion + ".0")) let buildVersion = (envVersion.Substring(0, envVersion.LastIndexOf('.'))) diff --git a/src/GreenPipes.BenchmarkConsole/AgentBenchmarkConfig.cs b/src/GreenPipes.BenchmarkConsole/AgentBenchmarkConfig.cs new file mode 100644 index 0000000..af0405e --- /dev/null +++ b/src/GreenPipes.BenchmarkConsole/AgentBenchmarkConfig.cs @@ -0,0 +1,31 @@ +namespace GreenPipes.BenchmarkConsole +{ + using BenchmarkDotNet.Configs; + using BenchmarkDotNet.Diagnosers; + using BenchmarkDotNet.Engines; + using BenchmarkDotNet.Environments; + using BenchmarkDotNet.Jobs; + + + public class AgentBenchmarkConfig : + ManualConfig + { + public AgentBenchmarkConfig() + { + Add(MemoryDiagnoser.Default); + Add(new Job + { + Env = {Runtime = Runtime.Core}, + Run = + { + TargetCount = 2, + RunStrategy = RunStrategy.Throughput, + WarmupCount = 1, + LaunchCount = 1, + UnrollFactor = 1, + InvocationCount = 10_000 + } + }); + } + } +} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Analytics.cs b/src/GreenPipes.BenchmarkConsole/Analytics.cs deleted file mode 100644 index 093b3fb..0000000 --- a/src/GreenPipes.BenchmarkConsole/Analytics.cs +++ /dev/null @@ -1,64 +0,0 @@ -namespace GreenPipes.BenchmarkConsole -{ - using System; - using System.Collections.Generic; - using System.Linq; - - - public static class Analytics - { - public static double? Median( - this IEnumerable source, - Func selector) - where TValue : struct - { - return source.Select(selector).Median(); - } - - public static double? Percentile( - this IEnumerable source, - Func selector, int percentile = 95) - where TValue : struct - { - return source.Select(selector).Percentile(percentile); - } - - public static double? Median( - this IEnumerable source) - where T : struct - { - int count = source.Count(); - if (count == 0) - return null; - - source = source.OrderBy(n => n); - - int midpoint = count / 2; - if (count % 2 == 0) - { - return (Convert.ToDouble(source.ElementAt(midpoint - 1)) + Convert.ToDouble(source.ElementAt(midpoint))) - / 2.0; - } - return Convert.ToDouble(source.ElementAt(midpoint)); - } - - public static double? Percentile( - this IEnumerable source, int percentile) - where T : struct - { - int count = source.Count(); - if (count == 0) - return null; - - source = source.OrderBy(n => n); - - int point = count * percentile / 100; - if (count % 2 == 0) - { - return (Convert.ToDouble(source.ElementAt(point - 1)) + Convert.ToDouble(source.ElementAt(point))) - / 2.0; - } - return Convert.ToDouble(source.ElementAt(point)); - } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/DotNetCoreBenchmarkConfig.cs b/src/GreenPipes.BenchmarkConsole/DotNetCoreBenchmarkConfig.cs new file mode 100644 index 0000000..9683054 --- /dev/null +++ b/src/GreenPipes.BenchmarkConsole/DotNetCoreBenchmarkConfig.cs @@ -0,0 +1,31 @@ +namespace GreenPipes.BenchmarkConsole +{ + using BenchmarkDotNet.Configs; + using BenchmarkDotNet.Diagnosers; + using BenchmarkDotNet.Engines; + using BenchmarkDotNet.Environments; + using BenchmarkDotNet.Jobs; + + + public class DotNetCoreBenchmarkConfig : + ManualConfig + { + public DotNetCoreBenchmarkConfig() + { + Add(MemoryDiagnoser.Default); + Add(new Job + { + Env = {Runtime = Runtime.Core}, + Run = + { + TargetCount = 2, + RunStrategy = RunStrategy.Throughput, + WarmupCount = 1, + LaunchCount = 1, + UnrollFactor = 1, + InvocationCount = 1_000_000 + } + }); + } + } +} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/GreenPipes.BenchmarkConsole.csproj b/src/GreenPipes.BenchmarkConsole/GreenPipes.BenchmarkConsole.csproj index a807264..7789172 100644 --- a/src/GreenPipes.BenchmarkConsole/GreenPipes.BenchmarkConsole.csproj +++ b/src/GreenPipes.BenchmarkConsole/GreenPipes.BenchmarkConsole.csproj @@ -1,16 +1,20 @@  - - netcoreapp2.0 - true - GreenPipes.BenchmarkConsole - GreenPipes.BenchmarkConsole - Exe - - - - + + netcoreapp2.0 + true + GreenPipes.BenchmarkConsole + GreenPipes.BenchmarkConsole + Exe + + + + + + + + - + \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Options.cs b/src/GreenPipes.BenchmarkConsole/Options.cs deleted file mode 100644 index 4fee70b..0000000 --- a/src/GreenPipes.BenchmarkConsole/Options.cs +++ /dev/null @@ -1,1199 +0,0 @@ - -#if LINQ -using System.Linq; -#endif - -#if TEST -using NDesk.Options; -#endif - -namespace GreenPipes.BenchmarkConsole -{ - using System; - using System.Collections; - using System.Collections.Generic; - using System.Collections.ObjectModel; - using System.ComponentModel; - using System.IO; - using System.Runtime.Serialization; - using System.Security.Permissions; - using System.Text; - using System.Text.RegularExpressions; - - - public class OptionValueCollection : IList, - IList - { - OptionContext c; - - List values = new List(); - - internal OptionValueCollection(OptionContext c) - { - this.c = c; - } - - #region IEnumerable - - IEnumerator IEnumerable.GetEnumerator() - { - return values.GetEnumerator(); - } - - #endregion - - #region IEnumerable - - public IEnumerator GetEnumerator() - { - return values.GetEnumerator(); - } - - #endregion - - public List ToList() - { - return new List(values); - } - - public string[] ToArray() - { - return values.ToArray(); - } - - public override string ToString() - { - return string.Join(", ", values.ToArray()); - } - - #region ICollection - - void ICollection.CopyTo(Array array, int index) - { - (values as ICollection).CopyTo(array, index); - } - - bool ICollection.IsSynchronized - { - get { return (values as ICollection).IsSynchronized; } - } - - object ICollection.SyncRoot - { - get { return (values as ICollection).SyncRoot; } - } - - #endregion - - #region ICollection - - public void Add(string item) - { - values.Add(item); - } - - public void Clear() - { - values.Clear(); - } - - public bool Contains(string item) - { - return values.Contains(item); - } - - public void CopyTo(string[] array, int arrayIndex) - { - values.CopyTo(array, arrayIndex); - } - - public bool Remove(string item) - { - return values.Remove(item); - } - - public int Count - { - get { return values.Count; } - } - - public bool IsReadOnly - { - get { return false; } - } - - #endregion - - #region IList - - int IList.Add(object value) - { - return (values as IList).Add(value); - } - - bool IList.Contains(object value) - { - return (values as IList).Contains(value); - } - - int IList.IndexOf(object value) - { - return (values as IList).IndexOf(value); - } - - void IList.Insert(int index, object value) - { - (values as IList).Insert(index, value); - } - - void IList.Remove(object value) - { - (values as IList).Remove(value); - } - - void IList.RemoveAt(int index) - { - (values as IList).RemoveAt(index); - } - - bool IList.IsFixedSize - { - get { return false; } - } - - object IList.this[int index] - { - get { return this[index]; } - set { (values as IList)[index] = value; } - } - - #endregion - - #region IList - - public int IndexOf(string item) - { - return values.IndexOf(item); - } - - public void Insert(int index, string item) - { - values.Insert(index, item); - } - - public void RemoveAt(int index) - { - values.RemoveAt(index); - } - - void AssertValid(int index) - { - if (c.Option == null) - throw new InvalidOperationException("OptionContext.Option is null."); - if (index >= c.Option.MaxValueCount) - throw new ArgumentOutOfRangeException("index"); - if (c.Option.OptionValueType == OptionValueType.Required && - index >= values.Count) - throw new OptionException(string.Format( - c.OptionSet.MessageLocalizer("Missing required value for option '{0}'."), c.OptionName), - c.OptionName); - } - - public string this[int index] - { - get - { - AssertValid(index); - return index >= values.Count ? null : values[index]; - } - set { values[index] = value; } - } - - #endregion - } - - - public class OptionContext - { - OptionValueCollection c; - int index; - string name; - Option option; - OptionSet set; - - public OptionContext(OptionSet set) - { - this.set = set; - c = new OptionValueCollection(this); - } - - public Option Option - { - get { return option; } - set { option = value; } - } - - public string OptionName - { - get { return name; } - set { name = value; } - } - - public int OptionIndex - { - get { return index; } - set { index = value; } - } - - public OptionSet OptionSet - { - get { return set; } - } - - public OptionValueCollection OptionValues - { - get { return c; } - } - } - - - public enum OptionValueType - { - None, - Optional, - Required - } - - - public abstract class Option - { - static readonly char[] NameTerminator = {'=', ':'}; - int count; - string[] names; - - string prototype, - description; - - string[] separators; - OptionValueType type; - - protected Option(string prototype, string description) - : this(prototype, description, 1) - { - } - - protected Option(string prototype, string description, int maxValueCount) - { - if (prototype == null) - throw new ArgumentNullException("prototype"); - if (prototype.Length == 0) - throw new ArgumentException("Cannot be the empty string.", "prototype"); - if (maxValueCount < 0) - throw new ArgumentOutOfRangeException("maxValueCount"); - - this.prototype = prototype; - names = prototype.Split('|'); - this.description = description; - count = maxValueCount; - type = ParsePrototype(); - - if (count == 0 && type != OptionValueType.None) - throw new ArgumentException( - "Cannot provide maxValueCount of 0 for OptionValueType.Required or " + - "OptionValueType.Optional.", - "maxValueCount"); - if (type == OptionValueType.None && maxValueCount > 1) - throw new ArgumentException( - string.Format("Cannot provide maxValueCount of {0} for OptionValueType.None.", maxValueCount), - "maxValueCount"); - if (Array.IndexOf(names, "<>") >= 0 && - ((names.Length == 1 && type != OptionValueType.None) || - (names.Length > 1 && MaxValueCount > 1))) - throw new ArgumentException( - "The default option handler '<>' cannot require values.", - "prototype"); - } - - public string Prototype - { - get { return prototype; } - } - - public string Description - { - get { return description; } - } - - public OptionValueType OptionValueType - { - get { return type; } - } - - public int MaxValueCount - { - get { return count; } - } - - internal string[] Names - { - get { return names; } - } - - internal string[] ValueSeparators - { - get { return separators; } - } - - public string[] GetNames() - { - return (string[])names.Clone(); - } - - public string[] GetValueSeparators() - { - if (separators == null) - return new string[0]; - return (string[])separators.Clone(); - } - - protected static T Parse(string value, OptionContext c) - { - TypeConverter conv = TypeDescriptor.GetConverter(typeof(T)); - T t = default(T); - try - { - if (value != null) - t = (T)conv.ConvertFromString(value); - } - catch (Exception e) - { - throw new OptionException( - string.Format( - c.OptionSet.MessageLocalizer("Could not convert string `{0}' to type {1} for option `{2}'."), - value, typeof(T).Name, c.OptionName), - c.OptionName, e); - } - return t; - } - - OptionValueType ParsePrototype() - { - var type = '\0'; - var seps = new List(); - for (var i = 0; i < names.Length; ++i) - { - string name = names[i]; - if (name.Length == 0) - throw new ArgumentException("Empty option names are not supported.", "prototype"); - - int end = name.IndexOfAny(NameTerminator); - if (end == -1) - continue; - names[i] = name.Substring(0, end); - if (type == '\0' || type == name[end]) - type = name[end]; - else - throw new ArgumentException( - string.Format("Conflicting option types: '{0}' vs. '{1}'.", type, name[end]), - "prototype"); - AddSeparators(name, end, seps); - } - - if (type == '\0') - return OptionValueType.None; - - if (count <= 1 && seps.Count != 0) - throw new ArgumentException( - string.Format("Cannot provide key/value separators for Options taking {0} value(s).", count), - "prototype"); - if (count > 1) - { - if (seps.Count == 0) - separators = new[] {":", "="}; - else if (seps.Count == 1 && seps[0].Length == 0) - separators = null; - else - separators = seps.ToArray(); - } - - return type == '=' ? OptionValueType.Required : OptionValueType.Optional; - } - - static void AddSeparators(string name, int end, ICollection seps) - { - int start = -1; - for (int i = end + 1; i < name.Length; ++i) - { - switch (name[i]) - { - case '{': - if (start != -1) - throw new ArgumentException( - string.Format("Ill-formed name/value separator found in \"{0}\".", name), - "prototype"); - start = i + 1; - break; - case '}': - if (start == -1) - throw new ArgumentException( - string.Format("Ill-formed name/value separator found in \"{0}\".", name), - "prototype"); - seps.Add(name.Substring(start, i - start)); - start = -1; - break; - default: - if (start == -1) - seps.Add(name[i].ToString()); - break; - } - } - if (start != -1) - throw new ArgumentException( - string.Format("Ill-formed name/value separator found in \"{0}\".", name), - "prototype"); - } - - public void Invoke(OptionContext c) - { - OnParseComplete(c); - c.OptionName = null; - c.Option = null; - c.OptionValues.Clear(); - } - - protected abstract void OnParseComplete(OptionContext c); - - public override string ToString() - { - return Prototype; - } - } - - - [Serializable] - public class OptionException : Exception - { - string option; - - public OptionException() - { - } - - public OptionException(string message, string optionName) - : base(message) - { - option = optionName; - } - - public OptionException(string message, string optionName, Exception innerException) - : base(message, innerException) - { - option = optionName; - } - - protected OptionException(SerializationInfo info, StreamingContext context) - : base(info, context) - { - option = info.GetString("OptionName"); - } - - public string OptionName - { - get { return option; } - } - - [SecurityPermission(SecurityAction.LinkDemand, SerializationFormatter = true)] - public override void GetObjectData(SerializationInfo info, StreamingContext context) - { - base.GetObjectData(info, context); - info.AddValue("OptionName", option); - } - } - - - public delegate void OptionAction(TKey key, TValue value); - - - public class OptionSet : KeyedCollection - { - public OptionSet() - : this(delegate(string f) - { - return f; - }) - { - } - - public OptionSet(Converter localizer) - { - this.localizer = localizer; - } - - Converter localizer; - - public Converter MessageLocalizer - { - get { return localizer; } - } - - protected override string GetKeyForItem(Option item) - { - if (item == null) - throw new ArgumentNullException("option"); - if (item.Names != null && item.Names.Length > 0) - return item.Names[0]; - // This should never happen, as it's invalid for Option to be - // constructed w/o any names. - throw new InvalidOperationException("Option has no names!"); - } - - [Obsolete("Use KeyedCollection.this[string]")] - protected Option GetOptionForName(string option) - { - if (option == null) - throw new ArgumentNullException("option"); - try - { - return base[option]; - } - catch (KeyNotFoundException) - { - return null; - } - } - - protected override void InsertItem(int index, Option item) - { - base.InsertItem(index, item); - AddImpl(item); - } - - protected override void RemoveItem(int index) - { - base.RemoveItem(index); - Option p = Items[index]; - // KeyedCollection.RemoveItem() handles the 0th item - for (var i = 1; i < p.Names.Length; ++i) - { - Dictionary.Remove(p.Names[i]); - } - } - - protected override void SetItem(int index, Option item) - { - base.SetItem(index, item); - RemoveItem(index); - AddImpl(item); - } - - void AddImpl(Option option) - { - if (option == null) - throw new ArgumentNullException("option"); - var added = new List(option.Names.Length); - try - { - // KeyedCollection.InsertItem/SetItem handle the 0th name. - for (var i = 1; i < option.Names.Length; ++i) - { - Dictionary.Add(option.Names[i], option); - added.Add(option.Names[i]); - } - } - catch (Exception) - { - foreach (string name in added) - Dictionary.Remove(name); - throw; - } - } - - public new OptionSet Add(Option option) - { - base.Add(option); - return this; - } - - - sealed class ActionOption : Option - { - Action action; - - public ActionOption(string prototype, string description, int count, Action action) - : base(prototype, description, count) - { - if (action == null) - throw new ArgumentNullException("action"); - this.action = action; - } - - protected override void OnParseComplete(OptionContext c) - { - action(c.OptionValues); - } - } - - - public OptionSet Add(string prototype, Action action) - { - return Add(prototype, null, action); - } - - public OptionSet Add(string prototype, string description, Action action) - { - if (action == null) - throw new ArgumentNullException("action"); - Option p = new ActionOption(prototype, description, 1, - delegate(OptionValueCollection v) - { - action(v[0]); - }); - base.Add(p); - return this; - } - - public OptionSet Add(string prototype, OptionAction action) - { - return Add(prototype, null, action); - } - - public OptionSet Add(string prototype, string description, OptionAction action) - { - if (action == null) - throw new ArgumentNullException("action"); - Option p = new ActionOption(prototype, description, 2, - delegate(OptionValueCollection v) - { - action(v[0], v[1]); - }); - base.Add(p); - return this; - } - - - sealed class ActionOption : Option - { - Action action; - - public ActionOption(string prototype, string description, Action action) - : base(prototype, description, 1) - { - if (action == null) - throw new ArgumentNullException("action"); - this.action = action; - } - - protected override void OnParseComplete(OptionContext c) - { - action(Parse(c.OptionValues[0], c)); - } - } - - - sealed class ActionOption : Option - { - OptionAction action; - - public ActionOption(string prototype, string description, OptionAction action) - : base(prototype, description, 2) - { - if (action == null) - throw new ArgumentNullException("action"); - this.action = action; - } - - protected override void OnParseComplete(OptionContext c) - { - action( - Parse(c.OptionValues[0], c), - Parse(c.OptionValues[1], c)); - } - } - - - public OptionSet Add(string prototype, Action action) - { - return Add(prototype, null, action); - } - - public OptionSet Add(string prototype, string description, Action action) - { - return Add(new ActionOption(prototype, description, action)); - } - - public OptionSet Add(string prototype, OptionAction action) - { - return Add(prototype, null, action); - } - - public OptionSet Add(string prototype, string description, OptionAction action) - { - return Add(new ActionOption(prototype, description, action)); - } - - protected virtual OptionContext CreateOptionContext() - { - return new OptionContext(this); - } - -#if LINQ - public List Parse (IEnumerable arguments) - { - bool process = true; - OptionContext c = CreateOptionContext (); - c.OptionIndex = -1; - var def = GetOptionForName ("<>"); - var unprocessed = - from argument in arguments - where ++c.OptionIndex >= 0 && (process || def != null) - ? process - ? argument == "--" - ? (process = false) - : !Parse (argument, c) - ? def != null - ? Unprocessed (null, def, c, argument) - : true - : false - : def != null - ? Unprocessed (null, def, c, argument) - : true - : true - select argument; - List r = unprocessed.ToList (); - if (c.Option != null) - c.Option.Invoke (c); - return r; - } -#else - public List Parse(IEnumerable arguments) - { - OptionContext c = CreateOptionContext(); - c.OptionIndex = -1; - var process = true; - var unprocessed = new List(); - Option def = Contains("<>") ? this["<>"] : null; - foreach (string argument in arguments) - { - ++c.OptionIndex; - if (argument == "--") - { - process = false; - continue; - } - if (!process) - { - Unprocessed(unprocessed, def, c, argument); - continue; - } - if (!Parse(argument, c)) - Unprocessed(unprocessed, def, c, argument); - } - if (c.Option != null) - c.Option.Invoke(c); - return unprocessed; - } -#endif - - static bool Unprocessed(ICollection extra, Option def, OptionContext c, string argument) - { - if (def == null) - { - extra.Add(argument); - return false; - } - c.OptionValues.Add(argument); - c.Option = def; - c.Option.Invoke(c); - return false; - } - - readonly Regex ValueOption = new Regex( - @"^(?--|-|/)(?[^:=]+)((?[:=])(?.*))?$"); - - protected bool GetOptionParts(string argument, out string flag, out string name, out string sep, - out string value) - { - if (argument == null) - throw new ArgumentNullException("argument"); - - flag = name = sep = value = null; - Match m = ValueOption.Match(argument); - if (!m.Success) - { - return false; - } - flag = m.Groups["flag"].Value; - name = m.Groups["name"].Value; - if (m.Groups["sep"].Success && m.Groups["value"].Success) - { - sep = m.Groups["sep"].Value; - value = m.Groups["value"].Value; - } - return true; - } - - protected virtual bool Parse(string argument, OptionContext c) - { - if (c.Option != null) - { - ParseValue(argument, c); - return true; - } - - string f, - n, - s, - v; - if (!GetOptionParts(argument, out f, out n, out s, out v)) - return false; - - Option p; - if (Contains(n)) - { - p = this[n]; - c.OptionName = f + n; - c.Option = p; - switch (p.OptionValueType) - { - case OptionValueType.None: - c.OptionValues.Add(n); - c.Option.Invoke(c); - break; - case OptionValueType.Optional: - case OptionValueType.Required: - ParseValue(v, c); - break; - } - return true; - } - // no match; is it a bool option? - if (ParseBool(argument, n, c)) - return true; - // is it a bundled option? - if (ParseBundledValue(f, string.Concat(n + s + v), c)) - return true; - - return false; - } - - void ParseValue(string option, OptionContext c) - { - if (option != null) - foreach (string o in c.Option.ValueSeparators != null - ? option.Split(c.Option.ValueSeparators, StringSplitOptions.None) - : new[] {option}) - { - c.OptionValues.Add(o); - } - if (c.OptionValues.Count == c.Option.MaxValueCount || - c.Option.OptionValueType == OptionValueType.Optional) - c.Option.Invoke(c); - else if (c.OptionValues.Count > c.Option.MaxValueCount) - { - throw new OptionException(localizer(string.Format( - "Error: Found {0} option values when expecting {1}.", - c.OptionValues.Count, c.Option.MaxValueCount)), - c.OptionName); - } - } - - bool ParseBool(string option, string n, OptionContext c) - { - Option p; - string rn; - if (n.Length >= 1 && (n[n.Length - 1] == '+' || n[n.Length - 1] == '-') && - Contains(rn = n.Substring(0, n.Length - 1))) - { - p = this[rn]; - string v = n[n.Length - 1] == '+' ? option : null; - c.OptionName = option; - c.Option = p; - c.OptionValues.Add(v); - p.Invoke(c); - return true; - } - return false; - } - - bool ParseBundledValue(string f, string n, OptionContext c) - { - if (f != "-") - return false; - for (var i = 0; i < n.Length; ++i) - { - Option p; - string opt = f + n[i]; - string rn = n[i].ToString(); - if (!Contains(rn)) - { - if (i == 0) - return false; - throw new OptionException(string.Format(localizer( - "Cannot bundle unregistered option '{0}'."), opt), opt); - } - p = this[rn]; - switch (p.OptionValueType) - { - case OptionValueType.None: - Invoke(c, opt, n, p); - break; - case OptionValueType.Optional: - case OptionValueType.Required: - { - string v = n.Substring(i + 1); - c.Option = p; - c.OptionName = opt; - ParseValue(v.Length != 0 ? v : null, c); - return true; - } - default: - throw new InvalidOperationException("Unknown OptionValueType: " + p.OptionValueType); - } - } - return true; - } - - static void Invoke(OptionContext c, string name, string value, Option option) - { - c.OptionName = name; - c.Option = option; - c.OptionValues.Add(value); - option.Invoke(c); - } - - const int OptionWidth = 29; - - public void WriteOptionDescriptions(TextWriter o) - { - foreach (Option p in this) - { - var written = 0; - if (!WriteOptionPrototype(o, p, ref written)) - continue; - - if (written < OptionWidth) - o.Write(new string(' ', OptionWidth - written)); - else - { - o.WriteLine(); - o.Write(new string(' ', OptionWidth)); - } - - List lines = GetLines(localizer(GetDescription(p.Description))); - o.WriteLine(lines[0]); - var prefix = new string(' ', OptionWidth + 2); - for (var i = 1; i < lines.Count; ++i) - { - o.Write(prefix); - o.WriteLine(lines[i]); - } - } - } - - bool WriteOptionPrototype(TextWriter o, Option p, ref int written) - { - string[] names = p.Names; - - int i = GetNextOptionIndex(names, 0); - if (i == names.Length) - return false; - - if (names[i].Length == 1) - { - Write(o, ref written, " -"); - Write(o, ref written, names[0]); - } - else - { - Write(o, ref written, " --"); - Write(o, ref written, names[0]); - } - - for (i = GetNextOptionIndex(names, i + 1); - i < names.Length; - i = GetNextOptionIndex(names, i + 1)) - { - Write(o, ref written, ", "); - Write(o, ref written, names[i].Length == 1 ? "-" : "--"); - Write(o, ref written, names[i]); - } - - if (p.OptionValueType == OptionValueType.Optional || - p.OptionValueType == OptionValueType.Required) - { - if (p.OptionValueType == OptionValueType.Optional) - { - Write(o, ref written, localizer("[")); - } - Write(o, ref written, localizer("=" + GetArgumentName(0, p.MaxValueCount, p.Description))); - string sep = p.ValueSeparators != null && p.ValueSeparators.Length > 0 - ? p.ValueSeparators[0] - : " "; - for (var c = 1; c < p.MaxValueCount; ++c) - { - Write(o, ref written, localizer(sep + GetArgumentName(c, p.MaxValueCount, p.Description))); - } - if (p.OptionValueType == OptionValueType.Optional) - { - Write(o, ref written, localizer("]")); - } - } - return true; - } - - static int GetNextOptionIndex(string[] names, int i) - { - while (i < names.Length && names[i] == "<>") - { - ++i; - } - return i; - } - - static void Write(TextWriter o, ref int n, string s) - { - n += s.Length; - o.Write(s); - } - - static string GetArgumentName(int index, int maxIndex, string description) - { - if (description == null) - return maxIndex == 1 ? "VALUE" : "VALUE" + (index + 1); - string[] nameStart; - if (maxIndex == 1) - nameStart = new[] {"{0:", "{"}; - else - nameStart = new[] {"{" + index + ":"}; - for (var i = 0; i < nameStart.Length; ++i) - { - int start, - j = 0; - do - { - start = description.IndexOf(nameStart[i], j); - } - while (start >= 0 && j != 0 ? description[j++ - 1] == '{' : false); - if (start == -1) - continue; - int end = description.IndexOf("}", start); - if (end == -1) - continue; - return description.Substring(start + nameStart[i].Length, end - start - nameStart[i].Length); - } - return maxIndex == 1 ? "VALUE" : "VALUE" + (index + 1); - } - - static string GetDescription(string description) - { - if (description == null) - return string.Empty; - var sb = new StringBuilder(description.Length); - int start = -1; - for (var i = 0; i < description.Length; ++i) - { - switch (description[i]) - { - case '{': - if (i == start) - { - sb.Append('{'); - start = -1; - } - else if (start < 0) - start = i + 1; - break; - case '}': - if (start < 0) - { - if (i + 1 == description.Length || description[i + 1] != '}') - throw new InvalidOperationException("Invalid option description: " + description); - ++i; - sb.Append("}"); - } - else - { - sb.Append(description.Substring(start, i - start)); - start = -1; - } - break; - case ':': - if (start < 0) - goto default; - start = i + 1; - break; - default: - if (start < 0) - sb.Append(description[i]); - break; - } - } - return sb.ToString(); - } - - static List GetLines(string description) - { - var lines = new List(); - if (string.IsNullOrEmpty(description)) - { - lines.Add(string.Empty); - return lines; - } - int length = 80 - OptionWidth - 2; - int start = 0, - end; - do - { - end = GetLineEnd(start, length, description); - var cont = false; - if (end < description.Length) - { - char c = description[end]; - if (c == '-' || (char.IsWhiteSpace(c) && c != '\n')) - ++end; - else if (c != '\n') - { - cont = true; - --end; - } - } - lines.Add(description.Substring(start, end - start)); - if (cont) - { - lines[lines.Count - 1] += "-"; - } - start = end; - if (start < description.Length && description[start] == '\n') - ++start; - } - while (end < description.Length); - return lines; - } - - static int GetLineEnd(int start, int length, string description) - { - int end = Math.Min(start + length, description.Length); - int sep = -1; - for (int i = start; i < end; ++i) - { - switch (description[i]) - { - case ' ': - case '\t': - case '\v': - case '-': - case ',': - case '.': - case ';': - sep = i; - break; - case '\n': - return i; - } - } - if (sep == -1 || end == description.Length) - return end; - return sep; - } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Program.cs b/src/GreenPipes.BenchmarkConsole/Program.cs index 217e74d..742f104 100644 --- a/src/GreenPipes.BenchmarkConsole/Program.cs +++ b/src/GreenPipes.BenchmarkConsole/Program.cs @@ -1,79 +1,30 @@ -namespace GreenPipes.BenchmarkConsole +// Copyright 2012-2018 Chris Patterson +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +namespace GreenPipes.BenchmarkConsole { using System; - using System.Collections.Generic; - using System.Diagnostics; - using System.Threading; - using Throughput; + using BenchmarkDotNet.Running; class Program { - static List _remaining; - static void Main(string[] args) { Console.WriteLine("Green Pipes Benchmark"); Console.WriteLine(); - var optionSet = new ProgramOptionSet(); - - try - { - _remaining = optionSet.Parse(args); - - if (optionSet.Help) - { - ShowHelp(optionSet); - return; - } - - optionSet.ShowOptions(); - - if (optionSet.Benchmark.HasFlag(ProgramOptionSet.BenchmarkOptions.Latency)) - { - RunLatencyBenchmark(optionSet); - } - - if (Debugger.IsAttached) - { - Console.Write("Press any key to continue..."); - Console.ReadKey(); - } - } - catch (OptionException ex) - { - Console.Write("gpbench: "); - Console.WriteLine(ex.Message); - Console.WriteLine("Use 'gpbench --help' for detailed usage information."); - } - } - - static void RunLatencyBenchmark(ProgramOptionSet optionSet) - { - var messageLatencyOptionSet = new ThroughputOptionSet(); - - messageLatencyOptionSet.Parse(_remaining); - - IThroughputSettings settings = messageLatencyOptionSet; - - var benchmark = new ThroughputBenchmark(settings); - - benchmark.Run(CancellationToken.None); - } - - static void ShowHelp(OptionSet p) - { - Console.WriteLine("Usage: gpbench [OPTIONS]+"); - Console.WriteLine("Executes the benchmark using the specified options."); - Console.WriteLine("If no benchmark is specified, all benchmarks are executed."); - Console.WriteLine(); - Console.WriteLine("Options:"); - p.WriteOptionDescriptions(Console.Out); - - Console.WriteLine(); - Console.WriteLine("Benchmark Options:"); - new ThroughputOptionSet().WriteOptionDescriptions(Console.Out); + BenchmarkRunner.Run(); + BenchmarkRunner.Run(); } } } \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/ProgramOptionSet.cs b/src/GreenPipes.BenchmarkConsole/ProgramOptionSet.cs deleted file mode 100644 index 104a25a..0000000 --- a/src/GreenPipes.BenchmarkConsole/ProgramOptionSet.cs +++ /dev/null @@ -1,36 +0,0 @@ -namespace GreenPipes.BenchmarkConsole -{ - using System; - - - class ProgramOptionSet : - OptionSet - { - [Flags] - public enum BenchmarkOptions - { - Latency = 1, - RPC = 2, - } - - public ProgramOptionSet() - { - Add("v|verbose", "Verbose output", x => Verbose = x != null); - Add("h|help", "Display this help and exit", x => Help = x != null); - Add("run:", "Run benchmark (All, Latency, RPC)", value => Benchmark = value); - Add("rpc", "Run the RPC benchmark", x => Benchmark = BenchmarkOptions.RPC); - Add("latency", "Run the Latency benchmark", x => Benchmark = BenchmarkOptions.Latency); - - Benchmark = BenchmarkOptions.Latency | BenchmarkOptions.RPC; - } - - public BenchmarkOptions Benchmark { get; private set; } - - public bool Verbose { get; set; } - public bool Help { get; set; } - - public void ShowOptions() - { - } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/SendBenchmark.cs b/src/GreenPipes.BenchmarkConsole/SendBenchmark.cs new file mode 100644 index 0000000..910fda7 --- /dev/null +++ b/src/GreenPipes.BenchmarkConsole/SendBenchmark.cs @@ -0,0 +1,116 @@ +// Copyright 2012-2018 Chris Patterson +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +namespace GreenPipes.BenchmarkConsole +{ + using System; + using System.Threading.Tasks; + using BenchmarkDotNet.Attributes; + using Contracts; + using Pipes; + using Throughput; + + + [Config(typeof(DotNetCoreBenchmarkConfig))] + public class SendBenchmark + { + readonly IPipe _concurrencyPipe; + readonly TestContext _context; + readonly IPipe _doublePipe; + readonly IPipe _emptyPipe; + readonly IPipe _faultPipe; + readonly IPipe _retryPipe; + readonly IPipe _dispatchPipe; + + public SendBenchmark() + { + _context = new ThroughputTestContext(Guid.NewGuid(), "Payload"); + + _emptyPipe = Pipe.Empty(); + + _retryPipe = Pipe.New(x => + { + x.UseRetry(r => r.Immediate(1)); + + x.UseFilter(new BenchmarkFilter()); + }); + + _concurrencyPipe = Pipe.New(x => + { + x.UseConcurrencyLimit(Environment.ProcessorCount); + + x.UseFilter(new BenchmarkFilter()); + }); + + _doublePipe = Pipe.New(x => + { + x.UseConcurrencyLimit(Environment.ProcessorCount); + x.UseRetry(r => r.Immediate(1)); + + x.UseFilter(new BenchmarkFilter()); + }); + + _faultPipe = Pipe.New(x => + { + x.UseRetry(r => r.Immediate(1)); + + x.UseFilter(new FaultFilter()); + }); + + var dispatchPipe = new PipeRouter(); + _dispatchPipe = dispatchPipe; + + dispatchPipe.ConnectPipe(Pipe.Empty>()); + } + + [Benchmark] + public async Task EmptyPipe() + { + await _emptyPipe.Send(_context); + } + + [Benchmark] + public async Task RetryPipe() + { + await _retryPipe.Send(_context); + } + + [Benchmark] + public async Task ConcurrencyPipe() + { + await _concurrencyPipe.Send(_context); + } + + [Benchmark] + public async Task DoublePipe() + { + await _doublePipe.Send(_context); + } + + [Benchmark] + public async Task DispatchPipe() + { + await _dispatchPipe.SetConcurrencyLimit(32); + } + + public async Task FaultPipe() + { + try + { + await _faultPipe.Send(_context); + } + catch + { + } + } + } +} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/SupervisorBenchmark.cs b/src/GreenPipes.BenchmarkConsole/SupervisorBenchmark.cs new file mode 100644 index 0000000..6213d27 --- /dev/null +++ b/src/GreenPipes.BenchmarkConsole/SupervisorBenchmark.cs @@ -0,0 +1,52 @@ +namespace GreenPipes.BenchmarkConsole +{ + using System.Threading.Tasks; + using Agents; + using BenchmarkDotNet.Attributes; + + + [Config(typeof(AgentBenchmarkConfig))] + public class SupervisorBenchmark + { + [Benchmark] + public async Task AddAgentAndStop() + { + var supervisor = new Supervisor(); + + var provocateur = new Agent(); + + provocateur.SetReady(); + supervisor.SetReady(); + + supervisor.Add(provocateur); + + await supervisor.Ready; + + await supervisor.Stop(); + + await supervisor.Completed; + } + + [Benchmark] + public async Task AddAgentWithManagerAndStop() + { + var supervisor = new Supervisor(); + + var manager = new Supervisor(); + supervisor.Add(manager); + + var provocateur = new Agent(); + manager.Add(provocateur); + + manager.SetReady(); + supervisor.SetReady(); + provocateur.SetReady(); + + await supervisor.Ready; + + await supervisor.Stop(); + + await supervisor.Completed; + } + } +} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/IReportConsumerMetric.cs b/src/GreenPipes.BenchmarkConsole/Throughput/BenchmarkFilter.cs similarity index 55% rename from src/GreenPipes.BenchmarkConsole/Throughput/IReportConsumerMetric.cs rename to src/GreenPipes.BenchmarkConsole/Throughput/BenchmarkFilter.cs index b9e05c9..febc194 100644 --- a/src/GreenPipes.BenchmarkConsole/Throughput/IReportConsumerMetric.cs +++ b/src/GreenPipes.BenchmarkConsole/Throughput/BenchmarkFilter.cs @@ -1,26 +1,30 @@ -// Copyright 2012-2016 Chris Patterson -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use -// this file except in compliance with the License. You may obtain a copy of the -// License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissions and limitations under the License. -namespace GreenPipes.BenchmarkConsole.Throughput -{ - using System; - using System.Threading.Tasks; - - - public interface IReportConsumerMetric - { - Task Consumed(Guid messageId) - where T : class; - - void Sent(Guid messageId); - } +// Copyright 2012-2018 Chris Patterson +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +namespace GreenPipes.BenchmarkConsole.Throughput +{ + using System.Threading.Tasks; + + + public class BenchmarkFilter : + IFilter + { + public Task Send(TestContext context, IPipe next) + { + return next.Send(context); + } + + public void Probe(ProbeContext context) + { + } + } } \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/FaultFilter.cs b/src/GreenPipes.BenchmarkConsole/Throughput/FaultFilter.cs new file mode 100644 index 0000000..b7fafa5 --- /dev/null +++ b/src/GreenPipes.BenchmarkConsole/Throughput/FaultFilter.cs @@ -0,0 +1,19 @@ +namespace GreenPipes.BenchmarkConsole.Throughput +{ + using System; + using System.Threading.Tasks; + + + public class FaultFilter : + IFilter + { + public Task Send(TestContext context, IPipe next) + { + throw new InvalidOperationException(); + } + + public void Probe(ProbeContext context) + { + } + } +} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/IThroughputSettings.cs b/src/GreenPipes.BenchmarkConsole/Throughput/IThroughputSettings.cs deleted file mode 100644 index c850fb7..0000000 --- a/src/GreenPipes.BenchmarkConsole/Throughput/IThroughputSettings.cs +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2007-2016 Chris Patterson, Dru Sellers, Travis Smith, et. al. -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use -// this file except in compliance with the License. You may obtain a copy of the -// License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissions and limitations under the License. -namespace GreenPipes.BenchmarkConsole.Throughput -{ - public interface IThroughputSettings - { - long MessageCount { get; } - - int ConcurrencyLimit { get; } - - int Clients { get; } - - int PayloadSize { get; } - - int RetryCount { get; } - - int FaultCount { get; } - - bool Yield { get; } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/MessageMetric.cs b/src/GreenPipes.BenchmarkConsole/Throughput/MessageMetric.cs deleted file mode 100644 index 2c8c91a..0000000 --- a/src/GreenPipes.BenchmarkConsole/Throughput/MessageMetric.cs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2012-2016 Chris Patterson -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use -// this file except in compliance with the License. You may obtain a copy of the -// License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissions and limitations under the License. -namespace GreenPipes.BenchmarkConsole.Throughput -{ - using System; - - - public class MessageMetric - { - public MessageMetric(Guid messageId, long consumeLatency) - { - MessageId = messageId; - ConsumeLatency = consumeLatency; - } - - public Guid MessageId { get; } - public long ConsumeLatency { get; } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/MessageMetricCapture.cs b/src/GreenPipes.BenchmarkConsole/Throughput/MessageMetricCapture.cs deleted file mode 100644 index 2b8dfd3..0000000 --- a/src/GreenPipes.BenchmarkConsole/Throughput/MessageMetricCapture.cs +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2012-2016 Chris Patterson -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use -// this file except in compliance with the License. You may obtain a copy of the -// License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissions and limitations under the License. -namespace GreenPipes.BenchmarkConsole.Throughput -{ - using System; - using System.Collections.Concurrent; - using System.Diagnostics; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - using Util; - - - public class MessageMetricCapture : - IReportConsumerMetric - { - readonly TaskCompletionSource _consumeCompleted; - readonly ConcurrentBag _consumedMessages; - readonly long _messageCount; - readonly TaskCompletionSource _sendCompleted; - readonly ConcurrentBag _sentMessages; - readonly Stopwatch _stopwatch; - long _consumed; - long _sent; - - public MessageMetricCapture(long messageCount) - { - _messageCount = messageCount; - - _consumedMessages = new ConcurrentBag(); - _sentMessages = new ConcurrentBag(); - _sendCompleted = new TaskCompletionSource(); - _consumeCompleted = new TaskCompletionSource(); - - _stopwatch = Stopwatch.StartNew(); - } - - public Task SendCompleted => _sendCompleted.Task; - public Task ConsumeCompleted => _consumeCompleted.Task; - - Task IReportConsumerMetric.Consumed(Guid messageId) - { - _consumedMessages.Add(new ConsumedMessage(messageId, _stopwatch.ElapsedTicks)); - - var consumed = Interlocked.Increment(ref _consumed); - if (consumed == _messageCount) - _consumeCompleted.TrySetResult(_stopwatch.Elapsed); - - return TaskUtil.Completed; - } - - public void Sent(Guid messageId) - { - var sendTimestamp = _stopwatch.ElapsedTicks; - - _sentMessages.Add(new SentMessage(messageId, sendTimestamp)); - - var sent = Interlocked.Increment(ref _sent); - if (sent == _messageCount) - _sendCompleted.TrySetResult(_stopwatch.Elapsed); - } - - public MessageMetric[] GetMessageMetrics() - { - return _sentMessages.Join(_consumedMessages, x => x.MessageId, x => x.MessageId, - (sent, consumed) => new MessageMetric(sent.MessageId, Math.Max(0, consumed.Timestamp - sent.SendTimestamp))) - .ToArray(); - } - - - struct SentMessage - { - public readonly Guid MessageId; - public readonly long SendTimestamp; - - public SentMessage(Guid messageId, long sendTimestamp) - { - MessageId = messageId; - SendTimestamp = sendTimestamp; - } - } - - - struct ConsumedMessage - { - public readonly Guid MessageId; - public readonly long Timestamp; - - public ConsumedMessage(Guid messageId, long timestamp) - { - MessageId = messageId; - Timestamp = timestamp; - } - } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputBenchmark.cs b/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputBenchmark.cs deleted file mode 100644 index 4de6caf..0000000 --- a/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputBenchmark.cs +++ /dev/null @@ -1,160 +0,0 @@ -// Copyright 2012-2016 Chris Patterson -// -// Licensed under the Apache License, Version 2.0 (the "License"); you may not use -// this file except in compliance with the License. You may obtain a copy of the -// License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software distributed -// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR -// CONDITIONS OF ANY KIND, either express or implied. See the License for the -// specific language governing permissions and limitations under the License. -namespace GreenPipes.BenchmarkConsole.Throughput -{ - using System; - using System.Diagnostics; - using System.Linq; - using System.Threading; - using System.Threading.Tasks; - - - /// - /// Benchmark that determines the latency of messages between the time the message is published - /// to the broker until it is acked by RabbitMQ. And then consumed by the message consumer. - /// - public class ThroughputBenchmark - { - readonly string _payload; - readonly IThroughputSettings _settings; - MessageMetricCapture _capture; - TimeSpan _consumeDuration; - TimeSpan _sendDuration; - - public ThroughputBenchmark(IThroughputSettings settings) - { - _settings = settings; - - if (settings.MessageCount / settings.Clients * settings.Clients != settings.MessageCount) - throw new ArgumentException("The clients must be a factor of message count"); - - _payload = _settings.PayloadSize > 0 ? new string('*', _settings.PayloadSize) : null; - } - - public void Run(CancellationToken cancellationToken = default(CancellationToken)) - { - _capture = new MessageMetricCapture(_settings.MessageCount); - - IPipe pipe = Pipe.New(x => - { - if (_settings.RetryCount > 0) - x.UseRetry(r => r.Immediate(_settings.RetryCount)); - - if (_settings.ConcurrencyLimit > 0) - x.UseConcurrencyLimit(_settings.ConcurrencyLimit); - - x.UseExecuteAsync(async context => - { - if (_settings.Yield) - await Task.Yield(); - }); - - x.UseFilter(new ThroughputFilter(_capture, _settings.FaultCount)); - }); - - - Console.WriteLine("Running Throughput Benchmark"); - - RunBenchmark(pipe).Wait(cancellationToken); - - Console.WriteLine("Message Count: {0}", _settings.MessageCount); - Console.WriteLine("Clients: {0}", _settings.Clients); - Console.WriteLine("Payload Length: {0}", _payload?.Length ?? 0); - Console.WriteLine("Concurrency Limit: {0}", _settings.ConcurrencyLimit); - Console.WriteLine("Yield: {0}", _settings.Yield); - Console.WriteLine("Retry: {0}", _settings.RetryCount); - - Console.WriteLine("Total send duration: {0:g}", _sendDuration); - Console.WriteLine("Send message rate: {0:F2} (msg/s)", - _settings.MessageCount * 1000 / _sendDuration.TotalMilliseconds); - Console.WriteLine("Total consume duration: {0:g}", _consumeDuration); - Console.WriteLine("Consume message rate: {0:F2} (msg/s)", - _settings.MessageCount * 1000 / _consumeDuration.TotalMilliseconds); - - MessageMetric[] messageMetrics = _capture.GetMessageMetrics(); - - Console.WriteLine("Avg Consume Time: {0:F0}us", - messageMetrics.Average(x => x.ConsumeLatency) * 1000000 / Stopwatch.Frequency); - Console.WriteLine("Min Consume Time: {0:F0}us", - messageMetrics.Min(x => x.ConsumeLatency) * 1000000 / Stopwatch.Frequency); - Console.WriteLine("Max Consume Time: {0:F0}us", - messageMetrics.Max(x => x.ConsumeLatency) * 1000000 / Stopwatch.Frequency); - Console.WriteLine("Med Consume Time: {0:F0}us", - messageMetrics.Median(x => x.ConsumeLatency) * 1000000 / Stopwatch.Frequency); - Console.WriteLine("95t Consume Time: {0:F0}us", - messageMetrics.Percentile(x => x.ConsumeLatency) * 1000000 / Stopwatch.Frequency); - - Console.WriteLine(); - DrawResponseTimeGraph(messageMetrics, x => x.ConsumeLatency); - } - - void DrawResponseTimeGraph(MessageMetric[] metrics, Func selector) - { - var maxTime = metrics.Max(selector); - var minTime = metrics.Min(selector); - - const int segments = 10; - - var span = Math.Max(1, maxTime - minTime); - var increment = span / segments; - - var histogram = (from x in metrics.Select(selector) - let key = (x - minTime) * segments / span - where (key >= 0) && (key < segments) - let groupKey = key - group x by groupKey - into segment - orderby segment.Key - select new {Value = segment.Key, Count = segment.Count()}).ToList(); - - var maxCount = histogram.Max(x => x.Count); - - foreach (var item in histogram) - { - var barLength = item.Count * 60 / maxCount; - Console.WriteLine("{0,5}ms {2,-60} ({1,7})", (minTime + increment * item.Value) * 1000 / Stopwatch.Frequency, item.Count, - new string('*', barLength)); - } - } - - async Task RunBenchmark(IPipe pipe) - { - await Task.Yield(); - - var stripes = new Task[_settings.Clients]; - - for (var i = 0; i < _settings.Clients; i++) - stripes[i] = RunStripe(pipe, _settings.MessageCount / _settings.Clients); - - await Task.WhenAll(stripes).ConfigureAwait(false); - - _sendDuration = await _capture.SendCompleted.ConfigureAwait(false); - _consumeDuration = await _capture.ConsumeCompleted.ConfigureAwait(false); - } - - async Task RunStripe(IPipe pipe, long messageCount) - { - await Task.Yield(); - - for (long i = 0; i < messageCount; i++) - { - var messageId = Guid.NewGuid(); - var context = new ThroughputTestContext(messageId, _payload); - - _capture.Sent(messageId); - - await pipe.Send(context).ConfigureAwait(false); - } - } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputFilter.cs b/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputFilter.cs deleted file mode 100644 index b2e34d7..0000000 --- a/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputFilter.cs +++ /dev/null @@ -1,37 +0,0 @@ -namespace GreenPipes.BenchmarkConsole.Throughput -{ - using System; - using System.Threading.Tasks; - - - public class ThroughputFilter : - IFilter - { - readonly IReportConsumerMetric _report; - readonly int _faultCount; - - public ThroughputFilter(IReportConsumerMetric report, int faultCount) - { - _report = report; - _faultCount = faultCount; - } - - public void Probe(ProbeContext context) - { - } - - public async Task Send(TestContext context, IPipe next) - { - if (_faultCount > 0 && _faultCount < context.Attempts) - { - context.Attempts++; - - throw new NotSupportedException("Intentional"); - } - - await _report.Consumed(context.CorrelationId).ConfigureAwait(false); - - await next.Send(context).ConfigureAwait(false); - } - } -} \ No newline at end of file diff --git a/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputOptionSet.cs b/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputOptionSet.cs deleted file mode 100644 index 66bad90..0000000 --- a/src/GreenPipes.BenchmarkConsole/Throughput/ThroughputOptionSet.cs +++ /dev/null @@ -1,30 +0,0 @@ -namespace GreenPipes.BenchmarkConsole.Throughput -{ - public class ThroughputOptionSet : - OptionSet, - IThroughputSettings - { - public ThroughputOptionSet() - { - Add("count:", "The number of messages to send", value => MessageCount = value); - Add("concurrency:", "The number of concurrent consumers", value => ConcurrencyLimit = value); - Add("clients:", "The number of sending message clients", value => Clients = value); - Add("payload:", "The size of the additional payload for the message", value => PayloadSize = value); - Add("retry:", "The number of retries to allow", value => RetryCount = value); - Add("yield:", "If the task should yield or return", value => Yield = value); - Add("fault:", "The number of faults to throw", value => FaultCount = value); - - MessageCount = 1000000; - ConcurrencyLimit = 0; - Clients = 100; - } - - public int PayloadSize { get; set; } - public long MessageCount { get; set; } - public int Clients { get; set; } - public int ConcurrencyLimit { get; set; } - public int RetryCount { get; set; } - public bool Yield { get; set; } - public int FaultCount {get; set; } - } -} \ No newline at end of file diff --git a/src/GreenPipes.Tests/ConsumeContextRetryContext.cs b/src/GreenPipes.Tests/ConsumeContextRetryContext.cs index 87ab1bf..fedf1d6 100644 --- a/src/GreenPipes.Tests/ConsumeContextRetryContext.cs +++ b/src/GreenPipes.Tests/ConsumeContextRetryContext.cs @@ -35,6 +35,8 @@ public ConsumeContextRetryContext(RetryContext retryContext, Ret public CommandContext Context => _context; + public CancellationToken CancellationToken => _retryContext.CancellationToken; + public Exception Exception => _retryContext.Exception; public int RetryCount => _retryContext.RetryCount; @@ -84,6 +86,8 @@ public ConsumeContextRetryContext(RetryContext retryContext, TContext c public TFilter Context => _context; + public CancellationToken CancellationToken => _retryContext.CancellationToken; + public Exception Exception => _retryContext.Exception; public int RetryCount => _retryContext.RetryCount; diff --git a/src/GreenPipes.Tests/ConsumeContextRetryPolicyContext.cs b/src/GreenPipes.Tests/ConsumeContextRetryPolicyContext.cs index 1b756dd..293398a 100644 --- a/src/GreenPipes.Tests/ConsumeContextRetryPolicyContext.cs +++ b/src/GreenPipes.Tests/ConsumeContextRetryPolicyContext.cs @@ -45,6 +45,11 @@ public Task RetryFaulted(Exception exception) { return _policyContext.RetryFaulted(exception); } + + public void Dispose() + { + _policyContext?.Dispose(); + } } @@ -77,5 +82,10 @@ public Task RetryFaulted(Exception exception) { return _policyContext.RetryFaulted(exception); } + + public void Dispose() + { + _policyContext?.Dispose(); + } } } \ No newline at end of file diff --git a/src/GreenPipes.Tests/Retry_Specs.cs b/src/GreenPipes.Tests/Retry_Specs.cs index f97d5cb..21e3f2b 100644 --- a/src/GreenPipes.Tests/Retry_Specs.cs +++ b/src/GreenPipes.Tests/Retry_Specs.cs @@ -19,7 +19,6 @@ namespace GreenPipes.Tests using Contracts; using NUnit.Framework; using Policies; - using Policies.ExceptionFilters; using Util; @@ -56,6 +55,11 @@ public async Task Should_be_observable_at_each_retry() await observer.PostFault; Assert.That(observer.PostFaultCount, Is.EqualTo(4)); + Assert.That(observer.RetryFaultCount, Is.EqualTo(1)); + + var retryFault = await observer.RetryFault; + + Assert.That(retryFault.RetryCount, Is.EqualTo(4)); } [Test] @@ -428,19 +432,19 @@ class RetryObserver : IRetryObserver { readonly TaskCompletionSource _postFault; - readonly TaskCompletionSource _retryFault; + readonly TaskCompletionSource _retryFault; int _postFaultCount; int _retryFaultCount; public RetryObserver() { - _retryFault = new TaskCompletionSource(); + _retryFault = new TaskCompletionSource(); _postFault = new TaskCompletionSource(); } public int RetryFaultCount => _retryFaultCount; - public Task RetryFault => _retryFault.Task; + public Task RetryFault => _retryFault.Task; public int PostFaultCount => _postFaultCount; @@ -469,7 +473,7 @@ Task IRetryObserver.RetryFault(RetryContext context) { Interlocked.Increment(ref _retryFaultCount); - _retryFault.TrySetResult(true); + _retryFault.TrySetResult(context); return TaskUtil.Completed; } diff --git a/src/GreenPipes/Agents/Agent.cs b/src/GreenPipes/Agents/Agent.cs index 7847917..2ccd75d 100644 --- a/src/GreenPipes/Agents/Agent.cs +++ b/src/GreenPipes/Agents/Agent.cs @@ -24,19 +24,18 @@ namespace GreenPipes.Agents public class Agent : IAgent { - readonly TaskCompletionSource _completed; - readonly object _lock = new object(); - readonly TaskCompletionSource _ready; + readonly TaskCompletionSource _completed; + readonly TaskCompletionSource _ready; readonly Lazy _stopped; readonly Lazy _stopping; - bool _isStopped; + bool _isStopped; bool _isStopping; - TaskCompletionSource _setCompleted; + TaskCompletionSource _setCompleted; CancellationTokenSource _setCompletedCancel; - TaskCompletionSource _setReady; + TaskCompletionSource _setReady; CancellationTokenSource _setReadyCancel; /// @@ -44,8 +43,8 @@ public class Agent : /// public Agent() { - _ready = new TaskCompletionSource(); - _completed = new TaskCompletionSource(); + _ready = new TaskCompletionSource(); + _completed = new TaskCompletionSource(); _stopped = new Lazy(() => { @@ -112,7 +111,7 @@ public async Task Stop(StopContext context) /// protected virtual Task StopAgent(StopContext context) { - _completed.TrySetResult(DateTime.UtcNow); + _completed.TrySetResult(true); return TaskUtil.Completed; } @@ -122,7 +121,7 @@ protected virtual Task StopAgent(StopContext context) /// public virtual void SetReady() { - _ready.TrySetResult(DateTime.UtcNow); + _ready.TrySetResult(true); } /// @@ -140,7 +139,7 @@ public virtual void SetNotReady(Exception exception) /// protected void SetReady(Task readyTask) { - lock (_lock) + lock (_ready) { if (_setReady != null) { @@ -157,7 +156,7 @@ protected void SetReady(Task readyTask) if (_ready.Task.IsCompleted) return; - var setReady = _setReady = new TaskCompletionSource(); + var setReady = _setReady = new TaskCompletionSource(); setReady.Task.ContinueWith(SetReadyInternal, TaskScheduler.Default); var setReadyCancel = _setReadyCancel = new CancellationTokenSource(); @@ -174,7 +173,7 @@ void OnCompleted(Task task) // ReSharper disable once AssignNullToNotNullAttribute setReady.TrySetException(task.Exception); else - setReady.TrySetResult(DateTime.UtcNow); + setReady.TrySetResult(true); } readyTask.ContinueWith(OnCompleted, TaskScheduler.Default); @@ -187,7 +186,7 @@ void OnCompleted(Task task) /// protected void SetCompleted(Task completedTask) { - lock (_lock) + lock (_completed) { if (_setCompleted != null) { @@ -204,7 +203,7 @@ protected void SetCompleted(Task completedTask) if (_completed.Task.IsCompleted) return; - var setCompleted = _setCompleted = new TaskCompletionSource(); + var setCompleted = _setCompleted = new TaskCompletionSource(); setCompleted.Task.ContinueWith(SetCompletedInternal, TaskScheduler.Default); var setCompletedCancel = _setCompletedCancel = new CancellationTokenSource(); @@ -221,7 +220,7 @@ void OnCompleted(Task task) // ReSharper disable once AssignNullToNotNullAttribute setCompleted.TrySetException(task.Exception); else - setCompleted.TrySetResult(DateTime.UtcNow); + setCompleted.TrySetResult(true); } completedTask.ContinueWith(OnCompleted, TaskScheduler.Default); @@ -234,7 +233,7 @@ public override string ToString() return "Agent"; } - void SetReadyInternal(Task task) + void SetReadyInternal(Task task) { if (task.IsCanceled) _ready.TrySetCanceled(); @@ -246,7 +245,7 @@ void SetReadyInternal(Task task) _ready.TrySetResult(task.Result); } - void SetCompletedInternal(Task task) + void SetCompletedInternal(Task task) { if (task.IsCanceled) _completed.TrySetCanceled(); @@ -271,7 +270,7 @@ protected void SetFaulted(Task task) else _ready.TrySetException(new InvalidOperationException("The context faulted but no exception was present.")); - _completed.TrySetResult(DateTime.UtcNow); + _completed.TrySetResult(true); } } } \ No newline at end of file diff --git a/src/GreenPipes/Agents/Supervisor.cs b/src/GreenPipes/Agents/Supervisor.cs index 9520566..69723b9 100644 --- a/src/GreenPipes/Agents/Supervisor.cs +++ b/src/GreenPipes/Agents/Supervisor.cs @@ -14,7 +14,6 @@ namespace GreenPipes.Agents { using System; using System.Collections.Generic; - using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -96,7 +95,10 @@ protected override Task StopAgent(StopContext context) IAgent[] agents; lock (_agents) { - agents = _agents.Values.Where(x => !x.Completed.IsCompleted).ToArray(); + if (_agents.Count == 0) + agents = new IAgent[0]; + else + agents = _agents.Values.Where(x => !x.Completed.IsCompleted).ToArray(); } return StopSupervisor(new Context(context, agents)); @@ -104,9 +106,34 @@ protected override Task StopAgent(StopContext context) protected virtual async Task StopSupervisor(StopSupervisorContext context) { - SetCompleted(Task.WhenAll(context.Agents.Select(x => x.Completed))); + if (context.Agents.Length == 0) + { + SetCompleted(TaskUtil.Completed); + } + if (context.Agents.Length == 1) + { + SetCompleted(context.Agents[0].Completed); - await Task.WhenAll(context.Agents.Select(x => x.Stop(context))).UntilCompletedOrCanceled(context.CancellationToken).ConfigureAwait(false); + await context.Agents[0].Stop(context).UntilCompletedOrCanceled(context.CancellationToken).ConfigureAwait(false); + } + else if (context.Agents.Length > 1) + { + Task[] completedTasks = new Task[context.Agents.Length]; + for (int i = 0; i < context.Agents.Length; i++) + { + completedTasks[i] = context.Agents[i].Completed; + } + + SetCompleted(Task.WhenAll(completedTasks)); + + Task[] stopTasks = new Task[context.Agents.Length]; + for (int i = 0; i < context.Agents.Length; i++) + { + stopTasks[i] = context.Agents[i].Stop(context); + } + + await Task.WhenAll(stopTasks).UntilCompletedOrCanceled(context.CancellationToken).ConfigureAwait(false); + } await Completed.UntilCompletedOrCanceled(context.CancellationToken).ConfigureAwait(false); } @@ -119,63 +146,6 @@ void Remove(long id) } } - Task WhenAll(IAgent[] agents, string readyOrCompleted, Func selector) - { - if (Trace.Listeners.Count == 0) - return Task.WhenAll(agents.Select(selector).ToArray()); - - async Task WaitForAll() - { - await Task.Yield(); - - List faultedTasks = new List(); - do - { - var delayTask = Task.Delay(1000); - - var readyTask = await Task.WhenAny(agents.Select(selector).Concat(Enumerable.Repeat(delayTask, 1))).ConfigureAwait(false); - if (delayTask == readyTask) - { - Trace.WriteLine($"Waiting: {ToString()}"); - Trace.WriteLine(string.Join(Environment.NewLine, agents.Select(agent => $"{agent} - {selector(agent).Status}"))); - } - else - { - Trace.WriteLine($"{readyOrCompleted} Updated: {ToString()}"); - var completed = from agent in agents - let task = selector(agent) - where task.IsCompleted - select new {agent, task}; - - var completedAgents = completed.ToDictionary(x => x.agent); - - foreach (var item in completedAgents.Values) - if (item.task.IsCanceled) - { - Trace.WriteLine($"Canceled: {item.agent}"); - } - else if (item.task.IsFaulted) - { - Trace.WriteLine($"Faulted: {item.agent}"); - faultedTasks.Add(item.task); - } - else - { - Trace.WriteLine($"{readyOrCompleted}: {item.agent}"); - } - - agents = agents.Where(x => !completedAgents.ContainsKey(x)).ToArray(); - } - } - while (agents.Length > 0); - - if (faultedTasks.Count > 0) - await Task.WhenAll(faultedTasks).ConfigureAwait(false); - } - - return WaitForAll(); - } - /// public override string ToString() { diff --git a/src/GreenPipes/Configuration/PartitionerConfigurationExtensions.cs b/src/GreenPipes/Configuration/PartitionerConfigurationExtensions.cs index ed1d71c..a293384 100644 --- a/src/GreenPipes/Configuration/PartitionerConfigurationExtensions.cs +++ b/src/GreenPipes/Configuration/PartitionerConfigurationExtensions.cs @@ -49,9 +49,9 @@ public static void UsePartitioner(this IPipeConfigurator configurator, int if (keyProvider == null) throw new ArgumentNullException(nameof(keyProvider)); - PartitionKeyProvider provider = context => keyProvider(context).ToByteArray(); + byte[] Provider(T context) => keyProvider(context).ToByteArray(); - var specification = new PartitionerPipeSpecification(provider, partitionCount); + var specification = new PartitionerPipeSpecification(Provider, partitionCount); configurator.AddPipeSpecification(specification); } @@ -74,9 +74,9 @@ public static void UsePartitioner(this IPipeConfigurator configurator, IPa if (keyProvider == null) throw new ArgumentNullException(nameof(keyProvider)); - PartitionKeyProvider provider = context => keyProvider(context).ToByteArray(); + byte[] Provider(T context) => keyProvider(context).ToByteArray(); - var specification = new PartitionerPipeSpecification(provider, partitioner); + var specification = new PartitionerPipeSpecification(Provider, partitioner); configurator.AddPipeSpecification(specification); } @@ -100,13 +100,13 @@ public static void UsePartitioner(this IPipeConfigurator configurator, int var textEncoding = encoding ?? Encoding.UTF8; - PartitionKeyProvider provider = context => + byte[] Provider(T context) { var key = keyProvider(context) ?? ""; return textEncoding.GetBytes(key); - }; + } - var specification = new PartitionerPipeSpecification(provider, partitionCount); + var specification = new PartitionerPipeSpecification(Provider, partitionCount); configurator.AddPipeSpecification(specification); } @@ -133,13 +133,13 @@ public static void UsePartitioner(this IPipeConfigurator configurator, int var textEncoding = encoding ?? Encoding.UTF8; - PartitionKeyProvider provider = context => + byte[] Provider(T context) { var key = keyProvider(context) ?? ""; return textEncoding.GetBytes(key); - }; + } - var specification = new PartitionerPipeSpecification(provider, partitioner); + var specification = new PartitionerPipeSpecification(Provider, partitioner); configurator.AddPipeSpecification(specification); } @@ -160,13 +160,13 @@ public static void UsePartitioner(this IPipeConfigurator configurator, int if (keyProvider == null) throw new ArgumentNullException(nameof(keyProvider)); - PartitionKeyProvider provider = context => + byte[] Provider(T context) { var key = keyProvider(context); return BitConverter.GetBytes(key); - }; + } - var specification = new PartitionerPipeSpecification(provider, partitionCount); + var specification = new PartitionerPipeSpecification(Provider, partitionCount); configurator.AddPipeSpecification(specification); } @@ -189,13 +189,13 @@ public static void UsePartitioner(this IPipeConfigurator configurator, IPa if (keyProvider == null) throw new ArgumentNullException(nameof(keyProvider)); - PartitionKeyProvider provider = context => + byte[] Provider(T context) { var key = keyProvider(context); return BitConverter.GetBytes(key); - }; + } - var specification = new PartitionerPipeSpecification(provider, partitioner); + var specification = new PartitionerPipeSpecification(Provider, partitioner); configurator.AddPipeSpecification(specification); } @@ -216,9 +216,9 @@ public static void UsePartitioner(this IPipeConfigurator configurator, int if (keyProvider == null) throw new ArgumentNullException(nameof(keyProvider)); - PartitionKeyProvider provider = context => keyProvider(context); + byte[] Provider(T context) => keyProvider(context); - var specification = new PartitionerPipeSpecification(provider, partitionCount); + var specification = new PartitionerPipeSpecification(Provider, partitionCount); configurator.AddPipeSpecification(specification); } @@ -241,9 +241,9 @@ public static void UsePartitioner(this IPipeConfigurator configurator, IPa if (keyProvider == null) throw new ArgumentNullException(nameof(keyProvider)); - PartitionKeyProvider provider = context => keyProvider(context); + byte[] Provider(T context) => keyProvider(context); - var specification = new PartitionerPipeSpecification(provider, partitioner); + var specification = new PartitionerPipeSpecification(Provider, partitioner); configurator.AddPipeSpecification(specification); } diff --git a/src/GreenPipes/ContextAgentExtensions.cs b/src/GreenPipes/ContextAgentExtensions.cs index 144cc30..bcbec24 100644 --- a/src/GreenPipes/ContextAgentExtensions.cs +++ b/src/GreenPipes/ContextAgentExtensions.cs @@ -26,7 +26,7 @@ public static class ContextAgentExtensions /// /// /// - public static Task Stop(this IAgent agent, CancellationToken cancellationToken = default(CancellationToken)) + public static Task Stop(this IAgent agent, CancellationToken cancellationToken = default) { var stopContext = new DefaultStopContext(cancellationToken) { @@ -43,7 +43,7 @@ public static Task Stop(this IAgent agent, CancellationToken cancellationToken = /// The reason for stopping the agent /// /// - public static Task Stop(this IAgent agent, string reason, CancellationToken cancellationToken = default(CancellationToken)) + public static Task Stop(this IAgent agent, string reason, CancellationToken cancellationToken = default) { var stopContext = new DefaultStopContext(cancellationToken) { diff --git a/src/GreenPipes/Filters/DynamicFilter.cs b/src/GreenPipes/Filters/DynamicFilter.cs index 1e0e98d..d133040 100644 --- a/src/GreenPipes/Filters/DynamicFilter.cs +++ b/src/GreenPipes/Filters/DynamicFilter.cs @@ -13,7 +13,6 @@ namespace GreenPipes.Filters { using System; - using System.Collections.Concurrent; using System.Collections.Generic; using System.Diagnostics; using System.Linq; @@ -30,14 +29,18 @@ public class DynamicFilter : IDynamicFilter where TInput : class, PipeContext { + readonly Dictionary _outputPipes; protected readonly IPipeContextConverterFactory ConverterFactory; protected readonly FilterObservable Observers; - protected readonly ConcurrentDictionary OutputPipes; + IOutputPipe[] _outputPipeArray; public DynamicFilter(IPipeContextConverterFactory converterFactory) { ConverterFactory = converterFactory; - OutputPipes = new ConcurrentDictionary(); + + _outputPipes = new Dictionary(); + _outputPipeArray = _outputPipes.Values.ToArray(); + Observers = new FilterObservable(); } @@ -64,15 +67,30 @@ public ConnectHandle ConnectPipe(IPipe pipe) void IProbeSite.Probe(ProbeContext context) { - foreach (var pipe in OutputPipes.Values) - pipe.Filter.Probe(context); + foreach (var pipe in _outputPipes.Values) + pipe.Probe(context); } [DebuggerNonUserCode] [DebuggerStepThrough] - public Task Send(TInput context, IPipe next) + public async Task Send(TInput context, IPipe next) { - return Task.WhenAll(OutputPipes.Values.Select(x => x.Filter.Send(context, next))); + var outputPipes = _outputPipeArray; + + if (outputPipes.Length == 1) + { + await outputPipes[0].Send(context).ConfigureAwait(false); + } + else if (outputPipes.Length > 1) + { + var outputTasks = new Task[outputPipes.Length]; + for (int i = 0; i < outputPipes.Length; i++) + outputTasks[i] = outputPipes[i].Send(context); + + await Task.WhenAll(outputTasks).ConfigureAwait(false); + } + + await next.Send(context).ConfigureAwait(false); } protected TResult GetPipe() @@ -82,10 +100,28 @@ public Task Send(TInput context, IPipe next) return GetPipe().As(); } - protected virtual IOutputPipe GetPipe() + protected IOutputPipe GetPipe() where T : class, PipeContext { - return OutputPipes.GetOrAdd(typeof(T), x => new OutputPipe(Observers, ConverterFactory.GetConverter())); + lock (_outputPipes) + { + if (_outputPipes.TryGetValue(typeof(T), out var outputPipe)) + return outputPipe; + + outputPipe = CreateOutputPipe(); + + _outputPipes.Add(typeof(T), outputPipe); + + _outputPipeArray = _outputPipes.Values.ToArray(); + + return outputPipe; + } + } + + protected virtual IOutputPipe CreateOutputPipe() + where T : class, PipeContext + { + return new OutputPipe(Observers, ConverterFactory.GetConverter()); } public void AddFilter(IFilter filter) @@ -96,10 +132,9 @@ public void AddFilter(IFilter filter) protected interface IOutputPipe : + IPipe, IObserverConnector { - IFilter Filter { get; } - TResult As() where TResult : class; @@ -113,6 +148,7 @@ protected class OutputPipe : where TOutput : class, PipeContext { readonly Lazy> _filter; + readonly IPipe _nextPipe; protected readonly IPipeContextConverter ContextConverter; protected readonly FilterObservable Observers; @@ -123,12 +159,12 @@ public OutputPipe(FilterObservable observers, IPipeContextConverter>(); + + _nextPipe = Pipe.Empty(); } protected IList> PipeFilters { get; } - public IFilter Filter => _filter.Value; - TResult IOutputPipe.As() { return _filter.Value as TResult; @@ -160,6 +196,16 @@ public ConnectHandle ConnectObserver(IFilterObserver observer) return Observers.Connect(observer); } + public Task Send(TInput context) + { + return _filter.Value.Send(context, _nextPipe); + } + + public void Probe(ProbeContext context) + { + _filter.Value.Probe(context); + } + protected virtual IOutputPipeFilter CreateFilter() { IOutputPipeFilter filter = new OutputPipeFilter(PipeFilters, ContextConverter, new TeeFilter()); @@ -196,16 +242,13 @@ public ConnectHandle ConnectPipe(TKey key, IPipe pipe) return pipeConnector.ConnectPipe(key, pipe); } - protected override IOutputPipe GetPipe() + protected override IOutputPipe CreateOutputPipe() { - return OutputPipes.GetOrAdd(typeof(T), x => - { - var dynamicType = typeof(KeyOutputPipe<>).MakeGenericType(typeof(TInput), typeof(TKey), typeof(T)); + var dynamicType = typeof(KeyOutputPipe<>).MakeGenericType(typeof(TInput), typeof(TKey), typeof(T)); - var pipe = Activator.CreateInstance(dynamicType, Observers, ConverterFactory.GetConverter(), _keyAccessor); + var pipe = Activator.CreateInstance(dynamicType, Observers, ConverterFactory.GetConverter(), _keyAccessor); - return (IOutputPipe)pipe; - }); + return (IOutputPipe)pipe; } diff --git a/src/GreenPipes/Filters/KeyFilter.cs b/src/GreenPipes/Filters/KeyFilter.cs index 33dbdee..b814d2d 100644 --- a/src/GreenPipes/Filters/KeyFilter.cs +++ b/src/GreenPipes/Filters/KeyFilter.cs @@ -55,8 +55,7 @@ public async Task Send(TContext context, IPipe next) { var key = _keyAccessor(context); - IPipe pipe; - if (_pipes.TryGetValue(key, out pipe)) + if (_pipes.TryGetValue(key, out IPipe pipe)) await pipe.Send(context).ConfigureAwait(false); await next.Send(context).ConfigureAwait(false); @@ -81,8 +80,7 @@ public ConnectHandle ConnectPipe(TKey key, IPipe pipe) void RemovePipe(TKey key) { - IPipe pipe; - _pipes.TryRemove(key, out pipe); + _pipes.TryRemove(key, out IPipe _); } diff --git a/src/GreenPipes/Filters/RetryFilter.cs b/src/GreenPipes/Filters/RetryFilter.cs index b221a07..fbb7911 100644 --- a/src/GreenPipes/Filters/RetryFilter.cs +++ b/src/GreenPipes/Filters/RetryFilter.cs @@ -54,20 +54,24 @@ async Task IFilter.Send(TContext context, IPipe next) { await next.Send(policyContext.Context).ConfigureAwait(false); } - catch (OperationCanceledException exception) when (exception.CancellationToken == context.CancellationToken) + catch (OperationCanceledException exception) + when (exception.CancellationToken == policyContext.Context.CancellationToken || exception.CancellationToken == context.CancellationToken) { throw; } catch (Exception exception) { - if (context.CancellationToken.IsCancellationRequested) - context.CancellationToken.ThrowIfCancellationRequested(); + if (policyContext.Context.CancellationToken.IsCancellationRequested) + policyContext.Context.CancellationToken.ThrowIfCancellationRequested(); if (policyContext.Context.TryGetPayload(out RetryContext payloadRetryContext)) { - await policyContext.RetryFaulted(exception).ConfigureAwait(false); + if (_retryPolicy.IsHandled(exception)) + { + await policyContext.RetryFaulted(exception).ConfigureAwait(false); - await _observers.RetryFault(payloadRetryContext).ConfigureAwait(false); + await _observers.RetryFault(payloadRetryContext).ConfigureAwait(false); + } context.GetOrAddPayload(() => payloadRetryContext); @@ -76,9 +80,12 @@ async Task IFilter.Send(TContext context, IPipe next) if (policyContext.Context.TryGetPayload(out RetryContext genericRetryContext)) { - await policyContext.RetryFaulted(exception).ConfigureAwait(false); + if (_retryPolicy.IsHandled(exception)) + { + await policyContext.RetryFaulted(exception).ConfigureAwait(false); - await _observers.RetryFault(genericRetryContext).ConfigureAwait(false); + await _observers.RetryFault(genericRetryContext).ConfigureAwait(false); + } context.GetOrAddPayload(() => genericRetryContext); @@ -87,12 +94,14 @@ async Task IFilter.Send(TContext context, IPipe next) if (!policyContext.CanRetry(exception, out RetryContext retryContext)) { - await retryContext.RetryFaulted(exception).ConfigureAwait(false); + if (_retryPolicy.IsHandled(exception)) + { + await retryContext.RetryFaulted(exception).ConfigureAwait(false); - await _observers.RetryFault(retryContext).ConfigureAwait(false); + await _observers.RetryFault(retryContext).ConfigureAwait(false); - if (_retryPolicy.IsHandled(exception)) context.GetOrAddPayload(() => retryContext); + } throw; } @@ -107,10 +116,10 @@ async Task IFilter.Send(TContext context, IPipe next) [DebuggerStepThrough] async Task Attempt(TContext context, RetryContext retryContext, IPipe next) { - while (context.CancellationToken.IsCancellationRequested == false) + while (retryContext.CancellationToken.IsCancellationRequested == false) { if (retryContext.Delay.HasValue) - await Task.Delay(retryContext.Delay.Value, context.CancellationToken).ConfigureAwait(false); + await Task.Delay(retryContext.Delay.Value, retryContext.CancellationToken).ConfigureAwait(false); await retryContext.PreRetry().ConfigureAwait(false); @@ -122,7 +131,8 @@ async Task Attempt(TContext context, RetryContext retryContext, IPipe< return; } - catch (OperationCanceledException exception) when (exception.CancellationToken == context.CancellationToken) + catch (OperationCanceledException exception) + when (exception.CancellationToken == retryContext.CancellationToken || exception.CancellationToken == context.CancellationToken) { throw; } @@ -133,9 +143,12 @@ async Task Attempt(TContext context, RetryContext retryContext, IPipe< if (retryContext.Context.TryGetPayload(out RetryContext payloadRetryContext)) { - await retryContext.RetryFaulted(exception).ConfigureAwait(false); + if (_retryPolicy.IsHandled(exception)) + { + await retryContext.RetryFaulted(exception).ConfigureAwait(false); - await _observers.RetryFault(payloadRetryContext).ConfigureAwait(false); + await _observers.RetryFault(payloadRetryContext).ConfigureAwait(false); + } context.GetOrAddPayload(() => payloadRetryContext); @@ -144,9 +157,12 @@ async Task Attempt(TContext context, RetryContext retryContext, IPipe< if (retryContext.Context.TryGetPayload(out RetryContext genericRetryContext)) { - await retryContext.RetryFaulted(exception).ConfigureAwait(false); + if (_retryPolicy.IsHandled(exception)) + { + await retryContext.RetryFaulted(exception).ConfigureAwait(false); - await _observers.RetryFault(genericRetryContext).ConfigureAwait(false); + await _observers.RetryFault(genericRetryContext).ConfigureAwait(false); + } context.GetOrAddPayload(() => genericRetryContext); @@ -155,12 +171,14 @@ async Task Attempt(TContext context, RetryContext retryContext, IPipe< if (!retryContext.CanRetry(exception, out RetryContext nextRetryContext)) { - await nextRetryContext.RetryFaulted(exception).ConfigureAwait(false); + if (_retryPolicy.IsHandled(exception)) + { + await nextRetryContext.RetryFaulted(exception).ConfigureAwait(false); - await _observers.RetryFault(nextRetryContext).ConfigureAwait(false); + await _observers.RetryFault(nextRetryContext).ConfigureAwait(false); - if (_retryPolicy.IsHandled(exception)) context.GetOrAddPayload(() => nextRetryContext); + } throw; } diff --git a/src/GreenPipes/GreenPipes.csproj b/src/GreenPipes/GreenPipes.csproj index aa9f395..de0a77f 100644 --- a/src/GreenPipes/GreenPipes.csproj +++ b/src/GreenPipes/GreenPipes.csproj @@ -5,11 +5,14 @@ portable GreenPipes GreenPipes + 7.2 + 1591 + True + ../../GreenPipes.snk bin\$(Configuration)\$(TargetFramework)\GreenPipes.xml GreenPipes, a pipes and filters library for the Task Parallel Library - 1.0.0 https://github.com/MassTransit/GreenPipes https://github.com/MassTransit/GreenPipes/blob/master/LICENSE https://github.com/MassTransit/GreenPipes @@ -26,10 +29,6 @@ $(DefineConstants);NETCORE - - True - ../../GreenPipes.snk - diff --git a/src/GreenPipes/IAsyncDisposable.cs b/src/GreenPipes/IAsyncDisposable.cs index 256b2de..e4e0b9c 100644 --- a/src/GreenPipes/IAsyncDisposable.cs +++ b/src/GreenPipes/IAsyncDisposable.cs @@ -18,6 +18,6 @@ namespace GreenPipes public interface IAsyncDisposable { - Task DisposeAsync(CancellationToken cancellationToken = default(CancellationToken)); + Task DisposeAsync(CancellationToken cancellationToken = default); } } \ No newline at end of file diff --git a/src/GreenPipes/PipeExtensions.cs b/src/GreenPipes/PipeExtensions.cs index df2ad17..03353a5 100644 --- a/src/GreenPipes/PipeExtensions.cs +++ b/src/GreenPipes/PipeExtensions.cs @@ -28,8 +28,7 @@ public static class PipeExtensions public static TPayload GetPayload(this PipeContext context) where TPayload : class { - TPayload payload; - if (!context.TryGetPayload(out payload)) + if (!context.TryGetPayload(out TPayload payload)) throw new PayloadNotFoundException($"The payload was not found: {TypeCache.ShortName}"); return payload; diff --git a/src/GreenPipes/Policies/BaseRetryContext.cs b/src/GreenPipes/Policies/BaseRetryContext.cs index 5420c59..df1f623 100644 --- a/src/GreenPipes/Policies/BaseRetryContext.cs +++ b/src/GreenPipes/Policies/BaseRetryContext.cs @@ -13,19 +13,47 @@ namespace GreenPipes.Policies { using System; + using System.Threading; + using System.Threading.Tasks; + using Util; - public class BaseRetryContext : + public class BaseRetryContext : RetryContext + where TContext : class, PipeContext { - protected BaseRetryContext(Type contextType, int retryCount) + protected BaseRetryContext(TContext context, Exception exception, int retryCount, CancellationToken cancellationToken) { - RetryAttempt = retryCount; - ContextType = contextType; + Context = context; + Exception = exception; + RetryCount = retryCount; + CancellationToken = cancellationToken; + + RetryAttempt = retryCount + 1; } + public TContext Context { get; } + + public CancellationToken CancellationToken { get; } + + public Exception Exception { get; } + public int RetryAttempt { get; } - public Type ContextType { get; } + public int RetryCount { get; } + + public virtual TimeSpan? Delay => default; + + Type RetryContext.ContextType => typeof(TContext); + + public virtual Task PreRetry() + { + return TaskUtil.Completed; + } + + public virtual Task RetryFaulted(Exception exception) + { + return TaskUtil.Completed; + } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/BaseRetryPolicyContext.cs b/src/GreenPipes/Policies/BaseRetryPolicyContext.cs new file mode 100644 index 0000000..f19f810 --- /dev/null +++ b/src/GreenPipes/Policies/BaseRetryPolicyContext.cs @@ -0,0 +1,61 @@ +// Copyright 2012-2018 Chris Patterson +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use +// this file except in compliance with the License. You may obtain a copy of the +// License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed +// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR +// CONDITIONS OF ANY KIND, either express or implied. See the License for the +// specific language governing permissions and limitations under the License. +namespace GreenPipes.Policies +{ + using System; + using System.Threading; + using System.Threading.Tasks; + using Util; + + + public abstract class BaseRetryPolicyContext : + RetryPolicyContext + where TContext : class, PipeContext + { + readonly CancellationTokenSource _cancellationTokenSource; + readonly IRetryPolicy _policy; + CancellationTokenRegistration _registration; + + protected BaseRetryPolicyContext(IRetryPolicy policy, TContext context) + { + _policy = policy; + Context = context; + + _cancellationTokenSource = new CancellationTokenSource(); + _registration = context.CancellationToken.Register(_cancellationTokenSource.Cancel); + } + + protected CancellationToken CancellationToken => _cancellationTokenSource.Token; + + public TContext Context { get; } + + public virtual bool CanRetry(Exception exception, out RetryContext retryContext) + { + retryContext = CreateRetryContext(exception, _cancellationTokenSource.Token); + + return _policy.IsHandled(exception) && !_cancellationTokenSource.IsCancellationRequested; + } + + Task RetryPolicyContext.RetryFaulted(Exception exception) + { + return TaskUtil.Completed; + } + + void IDisposable.Dispose() + { + _registration.Dispose(); + } + + protected abstract RetryContext CreateRetryContext(Exception exception, CancellationToken cancellationToken); + } +} \ No newline at end of file diff --git a/src/GreenPipes/Policies/ExponentialRetryContext.cs b/src/GreenPipes/Policies/ExponentialRetryContext.cs index 44c4ce7..dfebb2d 100644 --- a/src/GreenPipes/Policies/ExponentialRetryContext.cs +++ b/src/GreenPipes/Policies/ExponentialRetryContext.cs @@ -13,50 +13,29 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class ExponentialRetryContext : - BaseRetryContext, + BaseRetryContext, RetryContext where TContext : class, PipeContext { readonly ExponentialRetryPolicy _policy; - readonly int _retryCount; - public ExponentialRetryContext(ExponentialRetryPolicy policy, TContext context, Exception exception, int retryCount) - : base(typeof(TContext), retryCount) + public ExponentialRetryContext(ExponentialRetryPolicy policy, TContext context, Exception exception, int retryCount, CancellationToken cancellationToken) + : base(context, exception, retryCount, cancellationToken) { _policy = policy; - _retryCount = retryCount; - Context = context; - Exception = exception; } - public TContext Context { get; } + public override TimeSpan? Delay => _policy.Intervals[RetryCount]; - public Exception Exception { get; } - - public int RetryCount => _retryCount; - - public TimeSpan? Delay => _policy.Intervals[_retryCount - 1]; - - public Task PreRetry() - { - return TaskUtil.Completed; - } - - public Task RetryFaulted(Exception exception) - { - return TaskUtil.Completed; - } - - bool RetryPolicyContext.CanRetry(Exception exception, out RetryContext retryContext) + bool RetryContext.CanRetry(Exception exception, out RetryContext retryContext) { - retryContext = new ExponentialRetryContext(_policy, Context, Exception, _retryCount + 1); + retryContext = new ExponentialRetryContext(_policy, Context, Exception, RetryCount + 1, CancellationToken); - return _retryCount < _policy.Intervals.Length && _policy.IsHandled(exception); + return RetryAttempt < _policy.Intervals.Length && _policy.IsHandled(exception); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/ExponentialRetryPolicyContext.cs b/src/GreenPipes/Policies/ExponentialRetryPolicyContext.cs index 04f36a2..5ccadbd 100644 --- a/src/GreenPipes/Policies/ExponentialRetryPolicyContext.cs +++ b/src/GreenPipes/Policies/ExponentialRetryPolicyContext.cs @@ -13,35 +13,31 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class ExponentialRetryPolicyContext : - RetryPolicyContext + BaseRetryPolicyContext where TContext : class, PipeContext { - readonly TContext _context; readonly ExponentialRetryPolicy _policy; public ExponentialRetryPolicyContext(ExponentialRetryPolicy policy, TContext context) + : base(policy, context) { _policy = policy; - _context = context; } - public TContext Context => _context; - - public bool CanRetry(Exception exception, out RetryContext retryContext) + public override bool CanRetry(Exception exception, out RetryContext retryContext) { - retryContext = new ExponentialRetryContext(_policy, _context, exception, 1); + retryContext = new NoRetryContext(Context, exception, CancellationToken); - return _policy.IsHandled(exception); + return false; } - public Task RetryFaulted(Exception exception) + protected override RetryContext CreateRetryContext(Exception exception, CancellationToken cancellationToken) { - return TaskUtil.Completed; + return new ExponentialRetryContext(_policy, Context, exception, 0, cancellationToken); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/ImmediateRetryContext.cs b/src/GreenPipes/Policies/ImmediateRetryContext.cs index f684d15..7c45229 100644 --- a/src/GreenPipes/Policies/ImmediateRetryContext.cs +++ b/src/GreenPipes/Policies/ImmediateRetryContext.cs @@ -13,49 +13,27 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class ImmediateRetryContext : - BaseRetryContext, + BaseRetryContext, RetryContext where TContext : class, PipeContext { readonly ImmediateRetryPolicy _policy; - public ImmediateRetryContext(ImmediateRetryPolicy policy, TContext context, Exception exception, int retryCount) - : base(typeof(TContext), retryCount) + public ImmediateRetryContext(ImmediateRetryPolicy policy, TContext context, Exception exception, int retryCount, CancellationToken cancellationToken) + : base(context, exception, retryCount, cancellationToken) { _policy = policy; - Context = context; - RetryCount = retryCount; - Exception = exception; } - public TContext Context { get; } - - public Exception Exception { get; } - - public int RetryCount { get; } - - public TimeSpan? Delay => default(TimeSpan?); - - public Task PreRetry() - { - return TaskUtil.Completed; - } - - public Task RetryFaulted(Exception exception) - { - return TaskUtil.Completed; - } - - public bool CanRetry(Exception exception, out RetryContext retryContext) + bool RetryContext.CanRetry(Exception exception, out RetryContext retryContext) { - retryContext = new ImmediateRetryContext(_policy, Context, Exception, RetryCount + 1); + retryContext = new ImmediateRetryContext(_policy, Context, Exception, RetryCount + 1, CancellationToken); - return RetryCount < _policy.RetryLimit && _policy.IsHandled(exception); + return RetryAttempt < _policy.RetryLimit && _policy.IsHandled(exception); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/ImmediateRetryPolicyContext.cs b/src/GreenPipes/Policies/ImmediateRetryPolicyContext.cs index 4546952..e44fb8a 100644 --- a/src/GreenPipes/Policies/ImmediateRetryPolicyContext.cs +++ b/src/GreenPipes/Policies/ImmediateRetryPolicyContext.cs @@ -13,35 +13,24 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class ImmediateRetryPolicyContext : - RetryPolicyContext + BaseRetryPolicyContext where TContext : class, PipeContext { - readonly TContext _context; readonly ImmediateRetryPolicy _policy; public ImmediateRetryPolicyContext(ImmediateRetryPolicy policy, TContext context) + : base(policy, context) { _policy = policy; - _context = context; } - public TContext Context => _context; - - public bool CanRetry(Exception exception, out RetryContext retryContext) - { - retryContext = new ImmediateRetryContext(_policy, _context, exception, 1); - - return _policy.IsHandled(exception); - } - - public Task RetryFaulted(Exception exception) + protected override RetryContext CreateRetryContext(Exception exception, CancellationToken cancellationToken) { - return TaskUtil.Completed; + return new ImmediateRetryContext(_policy, Context, exception, 0, cancellationToken); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/IncrementalRetryContext.cs b/src/GreenPipes/Policies/IncrementalRetryContext.cs index 168d5b4..f3e0434 100644 --- a/src/GreenPipes/Policies/IncrementalRetryContext.cs +++ b/src/GreenPipes/Policies/IncrementalRetryContext.cs @@ -13,55 +13,35 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class IncrementalRetryContext : - BaseRetryContext, + BaseRetryContext, RetryContext where TContext : class, PipeContext { readonly TimeSpan _delay; readonly TimeSpan _delayIncrement; readonly IncrementalRetryPolicy _policy; - readonly int _retryCount; public IncrementalRetryContext(IncrementalRetryPolicy policy, TContext context, Exception exception, int retryCount, TimeSpan delay, - TimeSpan delayIncrement) - : base(typeof(TContext), retryCount) + TimeSpan delayIncrement, CancellationToken cancellationToken) + : base(context, exception, retryCount, cancellationToken) { _policy = policy; - _retryCount = retryCount; _delay = delay; _delayIncrement = delayIncrement; - Context = context; - Exception = exception; } - public TContext Context { get; } + public override TimeSpan? Delay => _delay; - public Exception Exception { get; } - - public int RetryCount => _retryCount; - - public TimeSpan? Delay => _delay; - - public Task PreRetry() - { - return TaskUtil.Completed; - } - - public Task RetryFaulted(Exception exception) - { - return TaskUtil.Completed; - } - - public bool CanRetry(Exception exception, out RetryContext retryContext) + bool RetryContext.CanRetry(Exception exception, out RetryContext retryContext) { - retryContext = new IncrementalRetryContext(_policy, Context, Exception, _retryCount + 1, _delay + _delayIncrement, _delayIncrement); + retryContext = new IncrementalRetryContext(_policy, Context, Exception, RetryCount + 1, _delay + _delayIncrement, _delayIncrement, + CancellationToken); - return _retryCount < _policy.RetryLimit && _policy.IsHandled(exception); + return RetryAttempt < _policy.RetryLimit && _policy.IsHandled(exception); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/IncrementalRetryPolicyContext.cs b/src/GreenPipes/Policies/IncrementalRetryPolicyContext.cs index 5bccebc..4ac200d 100644 --- a/src/GreenPipes/Policies/IncrementalRetryPolicyContext.cs +++ b/src/GreenPipes/Policies/IncrementalRetryPolicyContext.cs @@ -13,35 +13,24 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class IncrementalRetryPolicyContext : - RetryPolicyContext + BaseRetryPolicyContext where TContext : class, PipeContext { - readonly TContext _context; readonly IncrementalRetryPolicy _policy; public IncrementalRetryPolicyContext(IncrementalRetryPolicy policy, TContext context) + : base(policy, context) { _policy = policy; - _context = context; } - public TContext Context => _context; - - public bool CanRetry(Exception exception, out RetryContext retryContext) - { - retryContext = new IncrementalRetryContext(_policy, _context, exception, 1, _policy.InitialInterval, _policy.IntervalIncrement); - - return _policy.IsHandled(exception); - } - - public Task RetryFaulted(Exception exception) + protected override RetryContext CreateRetryContext(Exception exception, CancellationToken cancellationToken) { - return TaskUtil.Completed; + return new IncrementalRetryContext(_policy, Context, exception, 0, _policy.InitialInterval, _policy.IntervalIncrement, cancellationToken); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/IntervalRetryContext.cs b/src/GreenPipes/Policies/IntervalRetryContext.cs index c2ba0bc..19e2fd9 100644 --- a/src/GreenPipes/Policies/IntervalRetryContext.cs +++ b/src/GreenPipes/Policies/IntervalRetryContext.cs @@ -13,50 +13,29 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class IntervalRetryContext : - BaseRetryContext, + BaseRetryContext, RetryContext where TContext : class, PipeContext { readonly IntervalRetryPolicy _policy; - readonly int _retryCount; - public IntervalRetryContext(IntervalRetryPolicy policy, TContext context, Exception exception, int retryCount) - : base(typeof(TContext), retryCount) + public IntervalRetryContext(IntervalRetryPolicy policy, TContext context, Exception exception, int retryCount, CancellationToken cancellationToken) + : base(context, exception, retryCount, cancellationToken) { _policy = policy; - _retryCount = retryCount; - Context = context; - Exception = exception; } - public TContext Context { get; } + public override TimeSpan? Delay => _policy.Intervals[RetryCount]; - public Exception Exception { get; } - - public int RetryCount => _retryCount; - - public TimeSpan? Delay => _policy.Intervals[_retryCount - 1]; - - public Task PreRetry() - { - return TaskUtil.Completed; - } - - public Task RetryFaulted(Exception exception) - { - return TaskUtil.Completed; - } - - public bool CanRetry(Exception exception, out RetryContext retryContext) + bool RetryContext.CanRetry(Exception exception, out RetryContext retryContext) { - retryContext = new IntervalRetryContext(_policy, Context, Exception, _retryCount + 1); + retryContext = new IntervalRetryContext(_policy, Context, Exception, RetryCount + 1, CancellationToken); - return _retryCount < _policy.Intervals.Length && _policy.IsHandled(exception); + return RetryAttempt < _policy.Intervals.Length && _policy.IsHandled(exception); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/IntervalRetryPolicyContext.cs b/src/GreenPipes/Policies/IntervalRetryPolicyContext.cs index 396ca22..7bb5509 100644 --- a/src/GreenPipes/Policies/IntervalRetryPolicyContext.cs +++ b/src/GreenPipes/Policies/IntervalRetryPolicyContext.cs @@ -13,35 +13,24 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class IntervalRetryPolicyContext : - RetryPolicyContext + BaseRetryPolicyContext where TContext : class, PipeContext { - readonly TContext _context; readonly IntervalRetryPolicy _policy; public IntervalRetryPolicyContext(IntervalRetryPolicy policy, TContext context) + : base(policy, context) { _policy = policy; - _context = context; } - public TContext Context => _context; - - public bool CanRetry(Exception exception, out RetryContext retryContext) - { - retryContext = new IntervalRetryContext(_policy, _context, exception, 1); - - return _policy.IsHandled(exception); - } - - public Task RetryFaulted(Exception exception) + protected override RetryContext CreateRetryContext(Exception exception, CancellationToken cancellationToken) { - return TaskUtil.Completed; + return new IntervalRetryContext(_policy, Context, exception, 0, cancellationToken); } } } \ No newline at end of file diff --git a/src/GreenPipes/Policies/NoRetryContext.cs b/src/GreenPipes/Policies/NoRetryContext.cs index 47b906d..efa6d06 100644 --- a/src/GreenPipes/Policies/NoRetryContext.cs +++ b/src/GreenPipes/Policies/NoRetryContext.cs @@ -13,40 +13,20 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class NoRetryContext : - BaseRetryContext, + BaseRetryContext, RetryContext where TContext : class, PipeContext { - public NoRetryContext(TContext context, Exception exception) - : base(typeof(TContext), 0) + public NoRetryContext(TContext context, Exception exception, CancellationToken cancellationToken) + : base(context, exception, 0, cancellationToken) { - Context = context; - Exception = exception; } - public TContext Context { get; } - public Exception Exception { get; } - - public int RetryCount => 1; - - public TimeSpan? Delay => default(TimeSpan?); - - public Task PreRetry() - { - return TaskUtil.Completed; - } - - public Task RetryFaulted(Exception exception) - { - return TaskUtil.Completed; - } - - public bool CanRetry(Exception exception, out RetryContext retryContext) + bool RetryContext.CanRetry(Exception exception, out RetryContext retryContext) { retryContext = this; diff --git a/src/GreenPipes/Policies/NoRetryPolicy.cs b/src/GreenPipes/Policies/NoRetryPolicy.cs index 15409b1..3ad9bee 100644 --- a/src/GreenPipes/Policies/NoRetryPolicy.cs +++ b/src/GreenPipes/Policies/NoRetryPolicy.cs @@ -35,10 +35,10 @@ void IProbeSite.Probe(ProbeContext context) RetryPolicyContext IRetryPolicy.CreatePolicyContext(T context) { - return new NoRetryPolicyContext(context); + return new NoRetryPolicyContext(this, context); } - public bool IsHandled(Exception exception) + bool IRetryPolicy.IsHandled(Exception exception) { return _filter.Match(exception); } diff --git a/src/GreenPipes/Policies/NoRetryPolicyContext.cs b/src/GreenPipes/Policies/NoRetryPolicyContext.cs index 7a6e7c4..b26d158 100644 --- a/src/GreenPipes/Policies/NoRetryPolicyContext.cs +++ b/src/GreenPipes/Policies/NoRetryPolicyContext.cs @@ -13,33 +13,28 @@ namespace GreenPipes.Policies { using System; - using System.Threading.Tasks; - using Util; + using System.Threading; public class NoRetryPolicyContext : - RetryPolicyContext + BaseRetryPolicyContext where TContext : class, PipeContext { - readonly TContext _context; - - public NoRetryPolicyContext(TContext context) + public NoRetryPolicyContext(IRetryPolicy retryPolicy, TContext context) + : base(retryPolicy, context) { - _context = context; } - public TContext Context => _context; - - public bool CanRetry(Exception exception, out RetryContext retryContext) + public override bool CanRetry(Exception exception, out RetryContext retryContext) { - retryContext = new NoRetryContext(_context, exception); + retryContext = new NoRetryContext(Context, exception, CancellationToken); return false; } - public Task RetryFaulted(Exception exception) + protected override RetryContext CreateRetryContext(Exception exception, CancellationToken cancellationToken) { - return TaskUtil.Completed; + return new NoRetryContext(Context, exception, cancellationToken); } } } \ No newline at end of file diff --git a/src/GreenPipes/RetryContext.cs b/src/GreenPipes/RetryContext.cs index 6067b84..4e68d35 100644 --- a/src/GreenPipes/RetryContext.cs +++ b/src/GreenPipes/RetryContext.cs @@ -13,32 +13,32 @@ namespace GreenPipes { using System; + using System.Threading; using System.Threading.Tasks; + using Filters; + /// + /// The base context of a retry, used by the + /// public interface RetryContext { /// - /// The retry attempt currently being attempted + /// Canceled when the retry should be canceled (not the same as if the underlying context + /// is canceled, which is different). This can be used to cancel retry, but not the operation + /// itself. /// - int RetryAttempt { get; } + CancellationToken CancellationToken { get; } /// - /// The context type of the retry context + /// The exception that originally caused the retry to be initiated /// - Type ContextType { get; } - } - + Exception Exception { get; } - public interface RetryContext : - RetryPolicyContext, - RetryContext - where TContext : class - { /// - /// The exception that originally caused the retry to be initiated + /// The retry attempt currently being attempted (should be 1 > than RetryCount) /// - Exception Exception { get; } + int RetryAttempt { get; } /// /// The number of retries which were attempted beyond the initial attempt @@ -50,10 +50,45 @@ public interface RetryContext : /// TimeSpan? Delay { get; } + /// + /// The context type of the retry context + /// + Type ContextType { get; } + + /// + /// Called after the retry attempt has failed + /// + /// + /// + Task RetryFaulted(Exception exception); + /// /// Called before the retry attempt is performed /// /// Task PreRetry(); } + + + /// + /// The retry context, with the specified context type + /// + /// The context type + public interface RetryContext : + RetryContext + where TContext : class + { + /// + /// The context being managed by the retry policy + /// + TContext Context { get; } + + /// + /// Determines if the exception can be retried + /// + /// The exception that occurred + /// The retry context for the retry + /// True if the task should be retried + bool CanRetry(Exception exception, out RetryContext retryContext); + } } \ No newline at end of file diff --git a/src/GreenPipes/RetryPolicyContext.cs b/src/GreenPipes/RetryPolicyContext.cs index b0f5806..1230596 100644 --- a/src/GreenPipes/RetryPolicyContext.cs +++ b/src/GreenPipes/RetryPolicyContext.cs @@ -20,7 +20,8 @@ namespace GreenPipes /// An initial context acquired to begin a retry filter /// /// - public interface RetryPolicyContext + public interface RetryPolicyContext : + IDisposable where TContext : class { /// diff --git a/src/GreenPipes/Util/Connectable.cs b/src/GreenPipes/Util/Connectable.cs index f7dbcf0..80131ef 100644 --- a/src/GreenPipes/Util/Connectable.cs +++ b/src/GreenPipes/Util/Connectable.cs @@ -84,7 +84,11 @@ public Task ForEachAsync(Func callback) if (connected.Length == 1) return callback(connected[0]); - return Task.WhenAll(connected.Select(callback)); + var outputTasks = new Task[connected.Length]; + for (int i = 0; i < connected.Length; i++) + outputTasks[i] = callback(connected[i]); + + return Task.WhenAll(outputTasks); } public bool All(Func callback) @@ -101,7 +105,13 @@ public bool All(Func callback) if (connected.Length == 1) return callback(connected[0]); - return connected.All(callback); + for (int i = 0; i < connected.Length; i++) + { + if (callback(connected[i]) == false) + return false; + } + + return true; } void Disconnect(long id)