-
Notifications
You must be signed in to change notification settings - Fork 144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix serialization tests & don't block thread #846
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
using Microsoft.SqlTools.Hosting.Protocol; | ||
using Microsoft.SqlTools.ServiceLayer.QueryExecution.Contracts; | ||
using Microsoft.SqlTools.ServiceLayer.QueryExecution.DataStorage; | ||
using Microsoft.SqlTools.ServiceLayer.Utility; | ||
using Microsoft.SqlTools.Utility; | ||
|
||
|
||
|
@@ -40,93 +41,111 @@ public override void InitializeService(IProtocolEndpoint serviceHost) | |
/// <summary> | ||
/// Begin to process request to save a resultSet to a file in CSV format | ||
/// </summary> | ||
internal async Task HandleSerializeStartRequest(SerializeDataStartRequestParams serializeParams, | ||
internal Task HandleSerializeStartRequest(SerializeDataStartRequestParams serializeParams, | ||
RequestContext<SerializeDataResult> requestContext) | ||
{ | ||
// Run in separate thread so that message thread isn't held up by a potentially time consuming file write | ||
Task.Run(async () => { | ||
await RunSerializeStartRequest(serializeParams, requestContext); | ||
}).ContinueWithOnFaulted(async t => await SendErrorAndCleanup(serializeParams?.FilePath, requestContext, t.Exception)); | ||
return Task.CompletedTask; | ||
} | ||
|
||
internal async Task RunSerializeStartRequest(SerializeDataStartRequestParams serializeParams, RequestContext<SerializeDataResult> requestContext) | ||
{ | ||
try | ||
{ | ||
// Verify we have sensible inputs and there isn't a task running for this file already | ||
Validate.IsNotNull(nameof(serializeParams), serializeParams); | ||
Validate.IsNotNullOrWhitespaceString("FilePath", serializeParams.FilePath); | ||
|
||
DataSerializer serializer = null; | ||
bool hasSerializer = inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer); | ||
if (hasSerializer) | ||
if (inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer)) | ||
{ | ||
// Cannot proceed as there is an in progress serialization happening | ||
throw new Exception(SR.SerializationServiceRequestInProgress(serializeParams.FilePath)); | ||
} | ||
|
||
|
||
// Create a new serializer, save for future calls if needed, and write the request out | ||
serializer = new DataSerializer(serializeParams); | ||
if (!serializeParams.IsLastBatch) | ||
{ | ||
inProgressSerializations.AddOrUpdate(serializer.FilePath, serializer, (key, old) => serializer); | ||
} | ||
Func<Task<SerializeDataResult>> writeData = () => | ||
{ | ||
return Task.Factory.StartNew(() => | ||
{ | ||
var result = serializer.ProcessRequest(serializeParams); | ||
return result; | ||
}); | ||
}; | ||
await HandleRequest(writeData, requestContext, "HandleSerializeStartRequest"); | ||
|
||
Logger.Write(TraceEventType.Verbose, "HandleSerializeStartRequest"); | ||
SerializeDataResult result = serializer.ProcessRequest(serializeParams); | ||
await requestContext.SendResult(result); | ||
} | ||
catch (Exception ex) | ||
{ | ||
await requestContext.SendError(ex.Message); | ||
await SendErrorAndCleanup(serializeParams.FilePath, requestContext, ex); | ||
} | ||
} | ||
|
||
private async Task SendErrorAndCleanup(string filePath, RequestContext<SerializeDataResult> requestContext, Exception ex) | ||
{ | ||
if (filePath != null) | ||
{ | ||
try | ||
{ | ||
DataSerializer removed; | ||
inProgressSerializations.TryRemove(filePath, out removed); | ||
if (removed != null) | ||
{ | ||
// Flush any contents to disk and remove the writer | ||
removed.CloseStreams(); | ||
} | ||
} | ||
catch | ||
{ | ||
// Do not care if there was an error removing this, must always delete if something failed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
You're not actually deleting the file though - is that intentional? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great question. I have a gap here for sure. On consideration, I will ensure the write stream is closed out in this case and intentionally leave file contents that have been written as-is. The thinking is that for most formats (CSV, JSON) users are going to be happier with half the written contents on an unexpected error than none. |
||
} | ||
} | ||
await requestContext.SendError(ex.Message); | ||
} | ||
|
||
/// <summary> | ||
/// Process request to save a resultSet to a file in CSV format | ||
/// </summary> | ||
internal async Task HandleSerializeContinueRequest(SerializeDataContinueRequestParams serializeParams, | ||
internal Task HandleSerializeContinueRequest(SerializeDataContinueRequestParams serializeParams, | ||
RequestContext<SerializeDataResult> requestContext) | ||
{ | ||
// Run in separate thread so that message thread isn't held up by a potentially time consuming file write | ||
Task.Run(async () => | ||
{ | ||
await RunSerializeContinueRequest(serializeParams, requestContext); | ||
}).ContinueWithOnFaulted(async t => await SendErrorAndCleanup(serializeParams?.FilePath, requestContext, t.Exception)); | ||
return Task.CompletedTask; | ||
} | ||
|
||
internal async Task RunSerializeContinueRequest(SerializeDataContinueRequestParams serializeParams, RequestContext<SerializeDataResult> requestContext) | ||
{ | ||
try | ||
{ | ||
// Verify we have sensible inputs and some data has already been sent for the file | ||
Validate.IsNotNull(nameof(serializeParams), serializeParams); | ||
Validate.IsNotNullOrWhitespaceString("FilePath", serializeParams.FilePath); | ||
|
||
DataSerializer serializer = null; | ||
bool hasSerializer = inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer); | ||
if (!hasSerializer) | ||
if (!inProgressSerializations.TryGetValue(serializeParams.FilePath, out serializer)) | ||
{ | ||
throw new Exception(SR.SerializationServiceRequestNotFound(serializeParams.FilePath)); | ||
} | ||
|
||
Func<Task<SerializeDataResult>> writeData = () => | ||
{ | ||
return Task.Factory.StartNew(() => | ||
{ | ||
var result = serializer.ProcessRequest(serializeParams); | ||
if (serializeParams.IsLastBatch) | ||
{ | ||
// Cleanup the serializer | ||
this.inProgressSerializations.TryRemove(serializer.FilePath, out serializer); | ||
} | ||
return result; | ||
}); | ||
}; | ||
await HandleRequest(writeData, requestContext, "HandleSerializeContinueRequest"); | ||
} | ||
catch (Exception ex) | ||
{ | ||
await requestContext.SendError(ex.Message); | ||
} | ||
} | ||
|
||
private async Task HandleRequest<T>(Func<Task<T>> handler, RequestContext<T> requestContext, string requestType) | ||
{ | ||
Logger.Write(TraceEventType.Verbose, requestType); | ||
|
||
try | ||
{ | ||
T result = await handler(); | ||
// Write to file and cleanup if needed | ||
Logger.Write(TraceEventType.Verbose, "HandleSerializeContinueRequest"); | ||
SerializeDataResult result = serializer.ProcessRequest(serializeParams); | ||
if (serializeParams.IsLastBatch) | ||
{ | ||
// Cleanup the serializer | ||
this.inProgressSerializations.TryRemove(serializer.FilePath, out serializer); | ||
} | ||
await requestContext.SendResult(result); | ||
} | ||
catch (Exception ex) | ||
{ | ||
await requestContext.SendError(ex.Message); | ||
await SendErrorAndCleanup(serializeParams.FilePath, requestContext, ex); | ||
} | ||
} | ||
} | ||
|
@@ -242,9 +261,13 @@ private void EnsureWriterCreated() | |
this.writer = factory.GetWriter(requestParams.FilePath); | ||
} | ||
} | ||
private void CloseStreams() | ||
public void CloseStreams() | ||
{ | ||
this.writer.Dispose(); | ||
if (this.writer != null) | ||
{ | ||
this.writer.Dispose(); | ||
this.writer = null; | ||
} | ||
} | ||
|
||
private SaveResultsAsJsonRequestParams CreateJsonRequestParams() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So a potential problem is that now since this is truly async we'll immediately return - at which point the caller may then call to continue serialization. But the start request may not actually have started/finished yet and thus we have a race condition if the continue serialization call starts trying to do stuff assuming that the start request has finished.
Same issue with the continue calls - those could then get out of order or start causing problems trying to write at the same time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is dependent on the caller waiting on responses. That's what I've implemented in ADS - we wait on the result to come back, then send the next request. Since each one is awaited, we will get correct ordering. Right now I would treat this as "by design" because:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh right, yeah my mistake. Got confused about the return value here being the indication that the request was done. Nevermind!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest leaving a comment in the code about this stated 'by design' contract.