From 45b321bdbb2358f86be97e068d4f28b2272f89ca Mon Sep 17 00:00:00 2001 From: Markus Olsson Date: Mon, 15 Feb 2016 20:13:35 +0100 Subject: [PATCH 1/2] Add failing test proving that filters crash when run concurrently --- LibGit2Sharp.Tests/FilterFixture.cs | 44 +++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/LibGit2Sharp.Tests/FilterFixture.cs b/LibGit2Sharp.Tests/FilterFixture.cs index b313981a9..0afc9f702 100644 --- a/LibGit2Sharp.Tests/FilterFixture.cs +++ b/LibGit2Sharp.Tests/FilterFixture.cs @@ -4,6 +4,7 @@ using System.IO; using LibGit2Sharp.Tests.TestHelpers; using Xunit; +using System.Threading.Tasks; namespace LibGit2Sharp.Tests { @@ -172,6 +173,49 @@ public void CleanFilterWritesOutputToObjectTree() } } + [Fact] + public void CanHandleMultipleSmudgesConcurrently() + { + const string decodedInput = "This is a substitution cipher"; + const string encodedInput = "Guvf vf n fhofgvghgvba pvcure"; + + const string branchName = "branch"; + + Action smudgeCallback = SubstitutionCipherFilter.RotateByThirteenPlaces; + + var filter = new FakeFilter(FilterName, attributes, null, smudgeCallback); + var registration = GlobalSettings.RegisterFilter(filter); + + try + { + int count = 30; + var tasks = new Task[count]; + + for (int i = 0; i < count; i++) + { + tasks[i] = Task.Factory.StartNew(() => + { + string repoPath = InitNewRepository(); + return CheckoutFileForSmudge(repoPath, branchName, encodedInput); + }); + } + + Task.WaitAll(tasks); + + foreach(var task in tasks) + { + FileInfo expectedFile = task.Result; + + string readAllText = File.ReadAllText(expectedFile.FullName); + Assert.Equal(decodedInput, readAllText); + } + } + finally + { + GlobalSettings.DeregisterFilter(registration); + } + } + [Fact] public void WhenCheckingOutAFileFileSmudgeWritesCorrectFileToWorkingDirectory() { From 1ba31dbafee755f589e419964735608f42456e50 Mon Sep 17 00:00:00 2001 From: Markus Olsson Date: Tue, 16 Feb 2016 17:46:48 +0100 Subject: [PATCH 2/2] Allow multiple concurrent filter streams The current implementation of filters in libgit2sharp only allows for one filter stream per type of filter at any given point in time. This switches the storage of the necessary state from instance variables on the Filter class to a dedicated state object which is tracked in a dictionary keyed on the libgit2 stream pointer. --- LibGit2Sharp/Filter.cs | 122 +++++++++++++++++++++++++++++------------ 1 file changed, 87 insertions(+), 35 deletions(-) diff --git a/LibGit2Sharp/Filter.cs b/LibGit2Sharp/Filter.cs index b9aab9c7b..b66889a12 100644 --- a/LibGit2Sharp/Filter.cs +++ b/LibGit2Sharp/Filter.cs @@ -1,5 +1,8 @@ using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; +using System.Globalization; using System.IO; using System.Linq; using System.Runtime.InteropServices; @@ -47,18 +50,34 @@ protected Filter(string name, IEnumerable attributes) ~Filter() { GlobalSettings.DeregisterFilter(this); + +#if LEAKS_IDENTIFYING + int activeStreamCount = activeStreams.Count; + if (activeStreamCount > 0) + { + Trace.WriteLine(string.Format(CultureInfo.InvariantCulture, "{0} leaked {1} stream handles at finalization", GetType().Name, activeStreamCount)); + } +#endif } private readonly string name; private readonly IEnumerable attributes; private readonly GitFilter gitFilter; + private readonly ConcurrentDictionary activeStreams = new ConcurrentDictionary(); - private GitWriteStream thisStream; - private GitWriteStream nextStream; - private IntPtr thisPtr; - private IntPtr nextPtr; - private FilterSource filterSource; - private Stream output; + /// + /// State bag used to keep necessary reference from being + /// garbage collected during filter processing. + /// + private class StreamState + { + public GitWriteStream thisStream; + public GitWriteStream nextStream; + public IntPtr thisPtr; + public IntPtr nextPtr; + public FilterSource filterSource; + public Stream output; + } /// /// The name that this filter was registered with @@ -226,33 +245,44 @@ int InitializeCallback(IntPtr filterPointer) int StreamCreateCallback(out IntPtr git_writestream_out, GitFilter self, IntPtr payload, IntPtr filterSourcePtr, IntPtr git_writestream_next) { int result = 0; + var state = new StreamState(); try { Ensure.ArgumentNotZeroIntPtr(filterSourcePtr, "filterSourcePtr"); Ensure.ArgumentNotZeroIntPtr(git_writestream_next, "git_writestream_next"); - thisStream = new GitWriteStream(); - thisStream.close = StreamCloseCallback; - thisStream.write = StreamWriteCallback; - thisStream.free = StreamFreeCallback; - thisPtr = Marshal.AllocHGlobal(Marshal.SizeOf(thisStream)); - Marshal.StructureToPtr(thisStream, thisPtr, false); - nextPtr = git_writestream_next; - nextStream = new GitWriteStream(); - Marshal.PtrToStructure(nextPtr, nextStream); - filterSource = FilterSource.FromNativePtr(filterSourcePtr); - output = new WriteStream(nextStream, nextPtr); - - Create(filterSource.Path, filterSource.Root, filterSource.SourceMode); + state.thisStream = new GitWriteStream(); + state.thisStream.close = StreamCloseCallback; + state.thisStream.write = StreamWriteCallback; + state.thisStream.free = StreamFreeCallback; + + state.thisPtr = Marshal.AllocHGlobal(Marshal.SizeOf(state.thisStream)); + Marshal.StructureToPtr(state.thisStream, state.thisPtr, false); + + state.nextPtr = git_writestream_next; + state.nextStream = new GitWriteStream(); + Marshal.PtrToStructure(state.nextPtr, state.nextStream); + + state.filterSource = FilterSource.FromNativePtr(filterSourcePtr); + state.output = new WriteStream(state.nextStream, state.nextPtr); + + Create(state.filterSource.Path, state.filterSource.Root, state.filterSource.SourceMode); + + if (!activeStreams.TryAdd(state.thisPtr, state)) + { + // AFAICT this is a theoretical error that could only happen if we manage + // to free the stream pointer but fail to remove the dictionary entry. + throw new InvalidOperationException("Overlapping stream pointers"); + } } catch (Exception exception) { // unexpected failures means memory clean up required - if (thisPtr != IntPtr.Zero) + if (state.thisPtr != IntPtr.Zero) { - Marshal.FreeHGlobal(thisPtr); - thisPtr = IntPtr.Zero; + Marshal.FreeHGlobal(state.thisPtr); + state.thisPtr = IntPtr.Zero; } Log.Write(LogLevel.Error, "Filter.StreamCreateCallback exception"); @@ -261,7 +291,7 @@ int StreamCreateCallback(out IntPtr git_writestream_out, GitFilter self, IntPtr result = (int)GitErrorCode.Error; } - git_writestream_out = thisPtr; + git_writestream_out = state.thisPtr; return result; } @@ -269,16 +299,25 @@ int StreamCreateCallback(out IntPtr git_writestream_out, GitFilter self, IntPtr int StreamCloseCallback(IntPtr stream) { int result = 0; + StreamState state; try { Ensure.ArgumentNotZeroIntPtr(stream, "stream"); - Ensure.ArgumentIsExpectedIntPtr(stream, thisPtr, "stream"); - using (BufferedStream outputBuffer = new BufferedStream(output, BufferSize)) + if(!activeStreams.TryGetValue(stream, out state)) + { + throw new ArgumentException("Unknown stream pointer", "stream"); + } + + Ensure.ArgumentIsExpectedIntPtr(stream, state.thisPtr, "stream"); + + using (BufferedStream outputBuffer = new BufferedStream(state.output, BufferSize)) { - Complete(filterSource.Path, filterSource.Root, outputBuffer); + Complete(state.filterSource.Path, state.filterSource.Root, outputBuffer); } + + result = state.nextStream.close(state.nextPtr); } catch (Exception exception) { @@ -288,19 +327,25 @@ int StreamCloseCallback(IntPtr stream) result = (int)GitErrorCode.Error; } - result = nextStream.close(nextPtr); - return result; } void StreamFreeCallback(IntPtr stream) { + StreamState state; + try { Ensure.ArgumentNotZeroIntPtr(stream, "stream"); - Ensure.ArgumentIsExpectedIntPtr(stream, thisPtr, "stream"); - Marshal.FreeHGlobal(thisPtr); + if (!activeStreams.TryRemove(stream, out state)) + { + throw new ArgumentException("Double free or invalid stream pointer", "stream"); + } + + Ensure.ArgumentIsExpectedIntPtr(stream, state.thisPtr, "stream"); + + Marshal.FreeHGlobal(state.thisPtr); } catch (Exception exception) { @@ -312,24 +357,31 @@ void StreamFreeCallback(IntPtr stream) unsafe int StreamWriteCallback(IntPtr stream, IntPtr buffer, UIntPtr len) { int result = 0; + StreamState state; try { Ensure.ArgumentNotZeroIntPtr(stream, "stream"); Ensure.ArgumentNotZeroIntPtr(buffer, "buffer"); - Ensure.ArgumentIsExpectedIntPtr(stream, thisPtr, "stream"); + + if (!activeStreams.TryGetValue(stream, out state)) + { + throw new ArgumentException("Invalid or already freed stream pointer", "stream"); + } + + Ensure.ArgumentIsExpectedIntPtr(stream, state.thisPtr, "stream"); using (UnmanagedMemoryStream input = new UnmanagedMemoryStream((byte*)buffer.ToPointer(), (long)len)) - using (BufferedStream outputBuffer = new BufferedStream(output, BufferSize)) + using (BufferedStream outputBuffer = new BufferedStream(state.output, BufferSize)) { - switch (filterSource.SourceMode) + switch (state.filterSource.SourceMode) { case FilterMode.Clean: - Clean(filterSource.Path, filterSource.Root, input, outputBuffer); + Clean(state.filterSource.Path, state.filterSource.Root, input, outputBuffer); break; case FilterMode.Smudge: - Smudge(filterSource.Path, filterSource.Root, input, outputBuffer); + Smudge(state.filterSource.Path, state.filterSource.Root, input, outputBuffer); break; default: