Skip to content

Commit

Permalink
Merge pull request #161 from microsoft/personal/namalu/improve-retry-…
Browse files Browse the repository at this point in the history
…logic

Improve retry logic to handle invalid cache and no changes to Observation
  • Loading branch information
namalu committed Jan 25, 2022
2 parents 6b160d7 + e16efcf commit ba5c327
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,10 @@ public enum ResourceOperation
/// FHIR resource updated
/// </summary>
Updated,

/// <summary>
/// FHIR resource no operation performed
/// </summary>
NoOperation,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,45 +72,70 @@ public virtual async Task<string> SaveObservationAsync(ILookupTemplate<IFhirTemp
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
}

Model.Observation result;
if (existingObservation == null)
{
var newObservation = GenerateObservation(config, observationGroup, identifier, ids);
result = await _client.CreateAsync(newObservation).ConfigureAwait(false);
_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created), 1);
}
else
{
var policyResult = await Policy<Model.Observation>
.Handle<FhirOperationException>(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed)
.RetryAsync(2, async (polyRes, attempt) =>
var policyResult = await Policy<(Model.Observation observation, ResourceOperation operationType)>
.Handle<FhirOperationException>(ex => ex.Status == System.Net.HttpStatusCode.Conflict || ex.Status == System.Net.HttpStatusCode.PreconditionFailed)
.RetryAsync(2, async (polyRes, attempt) =>
{
// 409 Conflict or 412 Precondition Failed can occur if the Observation.meta.versionId does not match the update request.
// This can happen if 2 independent processes are updating the same Observation simultaneously.
// or
// The update operation failed because the Observation no longer exists.
// This can happen if a cached Observation was deleted from the FHIR Server.
_logger.LogTrace("A conflict or precondition caused an Observation update to fail. Getting the most recent Observation.");
// Attempt to get the most recent version of the Observation.
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
// If the Observation no longer exists on the FHIR Server, it was most likely deleted.
if (existingObservation == null)
{
existingObservation = await GetObservationFromServerAsync(identifier).ConfigureAwait(false);
})
.ExecuteAndCaptureAsync(async () =>
_logger.LogTrace("A cached version of an Observation was deleted. Creating a new Observation.");
// Remove the Observation from the cache (this version no longer exists on the FHIR Server.
_observationCache.Remove(cacheKey);
}
})
.ExecuteAndCaptureAsync(async () =>
{
if (existingObservation == null)
{
var mergedObservation = MergeObservation(config, existingObservation, observationGroup);
return await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false);
}).ConfigureAwait(false);
var newObservation = GenerateObservation(config, observationGroup, identifier, ids);
return (await _client.CreateAsync(newObservation).ConfigureAwait(false), ResourceOperation.Created);
}
var exception = policyResult.FinalException;
// Merge the new data with the existing Observation.
var mergedObservation = MergeObservation(config, existingObservation, observationGroup);
if (exception != null)
{
throw exception;
}
// Check to see if there are any changes after merging.
if (mergedObservation.IsExactly(existingObservation))
{
// There are no changes to the Observation - Do not update.
return (existingObservation, ResourceOperation.NoOperation);
}
// Update the Observation. Some failures will be handled in the RetryAsync block above.
return (await _client.UpdateAsync(mergedObservation, versionAware: true).ConfigureAwait(false), ResourceOperation.Updated);
}).ConfigureAwait(false);

result = policyResult.Result;
_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Updated), 1);
var exception = policyResult.FinalException;

if (exception != null)
{
throw exception;
}

var observation = policyResult.Result.observation;

_logger.LogMetric(IomtMetrics.FhirResourceSaved(ResourceType.Observation, policyResult.Result.operationType), 1);

_observationCache.CreateEntry(cacheKey)
.SetAbsoluteExpiration(DateTimeOffset.UtcNow.AddHours(1))
.SetSize(1)
.SetValue(result)
.SetValue(observation)
.Dispose();

return result.Id;
return observation.Id;
}

public virtual Model.Observation GenerateObservation(ILookupTemplate<IFhirTemplate> config, IObservationGroup grp, Model.Identifier observationId, IDictionary<ResourceType, string> ids)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
using System.Threading.Tasks;
using Hl7.Fhir.Rest;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Health.Common.Telemetry;
using Microsoft.Health.Fhir.Ingest.Data;
using Microsoft.Health.Fhir.Ingest.Telemetry;
using Microsoft.Health.Fhir.Ingest.Template;
using Microsoft.Health.Logging.Telemetry;
using Microsoft.Health.Tests.Common;
Expand Down Expand Up @@ -87,6 +89,7 @@ public async void GivenNotFoundObservation_WhenSaveObservationAsync_ThenCreateIn

handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -121,7 +124,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation()))
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation));
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation() { Id = "2" }));

var cache = Substitute.For<IMemoryCache>();
var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();
Expand All @@ -135,6 +138,7 @@ public async void GivenFoundObservation_WhenSaveObservationAsync_ThenUpdateInvok
templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -168,7 +172,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn

// Mock search and update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle2))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle1, foundBundle1))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException(), x => savedObservation));

var fhirClient = Utilities.CreateMockFhirClient(handler);
Expand All @@ -195,6 +199,7 @@ public async void GivenFoundObservationAndConflictOnSave_WhenSaveObservationAsyn
templateProcessor.ReceivedWithAnyArgs(2).MergeObservation(default, default, default);
handler.Received(2).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(2).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationUpdated", x.Dimensions[DimensionNames.Name])), 1);
}

[Fact]
Expand Down Expand Up @@ -233,6 +238,143 @@ public void GivenValidTemplate_WhenGenerateObservation_ExpectedReferencesSet_Tes
});
}

[Fact]
public async Task GivenCachedObservationDeleted_WhenGenerateObservation_ThenCacheIsUpdatedAndCreateInvoked_Test()
{
var savedObservation = new Model.Observation { Id = "1" };

// Mock the cached Observation
var cache = Substitute.For<IMemoryCache>();
cache.TryGetValue(Arg.Any<object>(), out Model.Observation observation)
.Returns(x =>
{
x[1] = new Model.Observation();
return true;
});

// Mock update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put)).Returns(x => ThrowConflictException()))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(new Model.Bundle()))
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post)).Returns(savedObservation));

var fhirClient = Utilities.CreateMockFhirClient(handler);

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.CreateObservation(default, default).ReturnsForAnyArgs(new Model.Observation()))
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(new Model.Observation { Id = "2" }));

var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
cache.Received(1).Remove(Arg.Any<string>());
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Put));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Post));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationCreated", x.Dimensions[DimensionNames.Name])), 1);
cache.Received(1).Set(Arg.Any<object>(), savedObservation);
}

[Fact]
public async Task GivenCachedObservationUnchanged_WhenGenerateObservation_ThenCacheNoOperation_Test()
{
var cachedObservation = new Model.Observation { Id = "1" };

// Mock the cached Observation
var cache = Substitute.For<IMemoryCache>();
cache.TryGetValue(Arg.Any<object>(), out Model.Observation observation)
.Returns(x =>
{
x[1] = cachedObservation;
return true;
});

var fhirClient = Utilities.CreateMockFhirClient();

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(cachedObservation));

var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

var test = IomtMetrics.FhirResourceSaved(ResourceType.Observation, ResourceOperation.Created);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1);
cache.Received(1).Set(Arg.Any<object>(), cachedObservation);
}

[Fact]
public async void GivenFoundObservationUnchanged_WhenSaveObservationAsync_ThenUpdateInvoked_Test()
{
var foundObservation = new Model.Observation { Id = "1" };
var foundBundle = new Model.Bundle
{
Entry = new List<Model.Bundle.EntryComponent>
{
new Model.Bundle.EntryComponent
{
Resource = foundObservation,
},
},
};

var savedObservation = new Model.Observation();

// Mock search and update request
var handler = Utilities.CreateMockMessageHandler()
.Mock(m => m.GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get)).Returns(foundBundle));

var fhirClient = Utilities.CreateMockFhirClient(handler);

var ids = BuildIdCollection();
var identityService = Substitute.For<IResourceIdentityService>()
.Mock(m => m.ResolveResourceIdentitiesAsync(default).ReturnsForAnyArgs(Task.FromResult(ids)));

var observationGroup = Substitute.For<IObservationGroup>();

var templateProcessor = Substitute.For<IFhirTemplateProcessor<ILookupTemplate<IFhirTemplate>, Model.Observation>>()
.Mock(m => m.MergeObservation(default, default, default).ReturnsForAnyArgs(foundObservation));

var cache = Substitute.For<IMemoryCache>();
var config = Substitute.For<ILookupTemplate<IFhirTemplate>>();

var logger = Substitute.For<ITelemetryLogger>();

var service = new R4FhirImportService(identityService, fhirClient, templateProcessor, cache, logger);

var result = await service.SaveObservationAsync(config, observationGroup, ids);

templateProcessor.ReceivedWithAnyArgs(1).MergeObservation(default, default, default);
handler.Received(1).GetReturnContent(Arg.Is<HttpRequestMessage>(msg => msg.Method == HttpMethod.Get));
logger.Received(1).LogMetric(Arg.Is<Metric>(x => Equals("ObservationNoOperation", x.Dimensions[DimensionNames.Name])), 1);
}

private static IDictionary<Data.ResourceType, string> BuildIdCollection()
{
var lookup = IdentityLookupFactory.Instance.Create();
Expand Down

0 comments on commit ba5c327

Please sign in to comment.