Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compress continuation tokens #2279

Merged
merged 2 commits into from
Oct 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1260,7 +1260,7 @@ public async Task GivenAGroupExportJobToResume_WhenExecuted_ThenAllPatientResour
{
// The ids aren't in the query parameters because of the reset
ids = new string[] { "1", "2", "3" };
continuationTokenIndex = int.Parse(Encoding.UTF8.GetString(Convert.FromBase64String(x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[2].Item2)).Substring(2));
continuationTokenIndex = int.Parse(ContinuationTokenConverter.Decode(x.ArgAt<IReadOnlyList<Tuple<string, string>>>(1)[2].Item2).Substring(2));
}

return CreateSearchResult(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// -------------------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License (MIT). See LICENSE in the repo root for license information.
// -------------------------------------------------------------------------------------------------

using System;
using System.Text;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.Health.Fhir.Core.Features.Search;
using Xunit;

namespace Microsoft.Health.Fhir.Core.UnitTests.Features.Search
{
public class ContinuationTokenConverterTests
{
[Fact]
public void GivenAString_WhenEcodingAndDecoding_ThenOriginalStringIsPreserved()
{
var data = Guid.NewGuid().ToString();

var encoded = ContinuationTokenConverter.Encode(data);
var decoded = ContinuationTokenConverter.Decode(encoded);

Assert.Equal(data, decoded);
}

[Fact]
public void GivenAnOldStringInBase64_WhenDecoding_ThenOriginalStringIsPreserved()
{
var data = Guid.NewGuid().ToString();

var encodedPrevious = Convert.ToBase64String(Encoding.UTF8.GetBytes(data));

var decoded = ContinuationTokenConverter.Decode(encodedPrevious);

Assert.Equal(data, decoded);
}

[Fact]
public void GivenAnInvalidString_WhenDecoding_ThenAnErrorIsThrown()
{
var data = Guid.NewGuid().ToString();

var encodedPrevious = Convert.ToBase64String(Encoding.UTF8.GetBytes(data)).Insert(5, "aaaafffff");

Assert.Throws<BadRequestException>(() => ContinuationTokenConverter.Decode(encodedPrevious));
}

[Fact]
public void GivenShortBase64WhenDecoding_ThenCorrectValueIsReturned()
{
var data = "YWJj";

var decoded = ContinuationTokenConverter.Decode(data);

Assert.Equal("abc", decoded);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,45 @@
// -------------------------------------------------------------------------------------------------

using System;
using System.IO;
using System.IO.Compression;
using System.Text;
using EnsureThat;
using Microsoft.Health.Fhir.Core.Features.Persistence;
using Microsoft.IO;

namespace Microsoft.Health.Fhir.Core.Features.Search
{
public sealed class ContinuationTokenConverter
{
private static readonly RecyclableMemoryStreamManager StreamManager = new();
private const string TokenVersion = "v2|";

public static string Decode(string encodedContinuationToken)
{
try
{
return System.Text.Encoding.UTF8.GetString(Convert.FromBase64String(encodedContinuationToken));
byte[] continuationTokenBytes = Convert.FromBase64String(encodedContinuationToken);

try
{
using MemoryStream memoryStream = StreamManager.GetStream(continuationTokenBytes);
using var deflate = new DeflateStream(memoryStream, CompressionMode.Decompress);
using var reader = new StreamReader(deflate, Encoding.UTF8);

var token = reader.ReadToEnd();
if (token?.StartsWith(TokenVersion, StringComparison.Ordinal) == true)
{
return token.Substring(TokenVersion.Length);
}

return Encoding.UTF8.GetString(continuationTokenBytes);
}
catch (InvalidDataException)
{
// Fall back to compatibility with non-compressed tokens
return Encoding.UTF8.GetString(continuationTokenBytes);
}
}
catch (FormatException)
{
Expand All @@ -26,7 +53,16 @@ public static string Decode(string encodedContinuationToken)
public static string Encode(string continuationToken)
{
EnsureArg.IsNotEmptyOrWhiteSpace(continuationToken);
return Convert.ToBase64String(System.Text.Encoding.UTF8.GetBytes(continuationToken));

using MemoryStream memoryStream = StreamManager.GetStream();
using var deflate = new DeflateStream(memoryStream, CompressionLevel.Fastest);
using var writer = new StreamWriter(deflate, Encoding.UTF8);

writer.Write(TokenVersion);
writer.Write(continuationToken);
writer.Flush();

return Convert.ToBase64String(memoryStream.ToArray());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace Microsoft.Health.Fhir.Core.UnitTests.Features.Operations.Reindex
[CollectionDefinition("ReindexTaskTests", DisableParallelization = true)]
public class ReindexJobTaskTests : IClassFixture<SearchParameterFixtureData>, IAsyncLifetime
{
private const string Base64EncodedToken = "dG9rZW4=";
private readonly string _base64EncodedToken = ContinuationTokenConverter.Encode("token");
private const int _mockedSearchCount = 5;

private static readonly WeakETag _weakETag = WeakETag.FromVersionId("0");
Expand Down Expand Up @@ -184,7 +184,7 @@ public async Task GivenContinuationToken_WhenExecuted_ThenAdditionalQueryAdded()
Assert.Collection<ReindexJobQueryStatus>(
job.QueryList.Keys.OrderBy(q => q.LastModified),
item => Assert.True(item.ContinuationToken == null && item.Status == OperationStatus.Completed),
item2 => Assert.True(item2.ContinuationToken == Base64EncodedToken && item2.Status == OperationStatus.Completed));
item2 => Assert.True(item2.ContinuationToken == _base64EncodedToken && item2.Status == OperationStatus.Completed));

param.IsSearchable = true;
}
Expand Down Expand Up @@ -253,15 +253,15 @@ public async Task GivenRunningJob_WhenExecuted_ThenQueuedQueryCompleted()
await _searchService.Received().SearchForReindexAsync(
Arg.Is<IReadOnlyList<Tuple<string, string>>>(
l => l.Any(t => t.Item1 == "_type" && t.Item2 == "Appointment") &&
l.Any(t => t.Item1 == KnownQueryParameterNames.ContinuationToken && t.Item2 == Base64EncodedToken)),
l.Any(t => t.Item1 == KnownQueryParameterNames.ContinuationToken && t.Item2 == _base64EncodedToken)),
Arg.Is<string>("appointmentHash"),
false,
Arg.Any<CancellationToken>());

await _searchService.Received().SearchForReindexAsync(
Arg.Is<IReadOnlyList<Tuple<string, string>>>(
l => l.Any(t => t.Item1 == "_type" && t.Item2 == "AppointmentResponse") &&
l.Any(t => t.Item1 == KnownQueryParameterNames.ContinuationToken && t.Item2 == Base64EncodedToken)),
l.Any(t => t.Item1 == KnownQueryParameterNames.ContinuationToken && t.Item2 == _base64EncodedToken)),
Arg.Is<string>("appointmentResponseHash"),
false,
Arg.Any<CancellationToken>());
Expand All @@ -279,8 +279,8 @@ public async Task GivenRunningJob_WhenExecuted_ThenQueuedQueryCompleted()
Assert.Equal(4, job.QueryList.Count);
Assert.Contains(job.QueryList.Keys, item => item.ContinuationToken == null && item.Status == OperationStatus.Completed && item.ResourceType == "AppointmentResponse");
Assert.Contains(job.QueryList.Keys, item => item.ContinuationToken == null && item.Status == OperationStatus.Completed && item.ResourceType == "Appointment");
Assert.Contains(job.QueryList.Keys, item => item.ContinuationToken == Base64EncodedToken && item.Status == OperationStatus.Completed && item.ResourceType == "AppointmentResponse");
Assert.Contains(job.QueryList.Keys, item => item.ContinuationToken == Base64EncodedToken && item.Status == OperationStatus.Completed && item.ResourceType == "Appointment");
Assert.Contains(job.QueryList.Keys, item => item.ContinuationToken == _base64EncodedToken && item.Status == OperationStatus.Completed && item.ResourceType == "AppointmentResponse");
Assert.Contains(job.QueryList.Keys, item => item.ContinuationToken == _base64EncodedToken && item.Status == OperationStatus.Completed && item.ResourceType == "Appointment");

await _reindexUtilities.Received().UpdateSearchParameterStatus(
Arg.Is<IReadOnlyCollection<string>>(r => r.Any(s => s.Contains("Appointment")) && r.Any(s => s.Contains("AppointmentResponse"))),
Expand Down