Skip to content

Commit

Permalink
rebuild index (#2759)
Browse files Browse the repository at this point in the history
* rebuild index

* Modify the length of Ind and the Generator setting

* Modify 39.diff.sql

* update

* update sql

* modify 40.diff.sql

* modify the code for table IndexProperties

* Modfiy and add tests

* restore the global.json

* Modify E2E test for rebuild indexes

* Remove rebuild index progress

* Delete lists not used

* Delete unused files

* Remove useless assignment and change token

* fix error in test

* Remove delay

* Remove useless

* Combine flag in importtaskconfiguration

* Change DefaultSqlLongRunningOperationTimeoutInSec to 0

* Additional fix for Failed Export due to RequestEntityToolarge (#2803)

* Added actionable error message for customer when hitting a RequestEntityTooLargeException during export.

* Added in missing logging.

* Now clearing the output for the export job if it runs into the exceeded size so we can save the failed state. The output information is really necessary if it failed since it would be incomplete.

* Fix long job info crash (#2802)

* Bump System.Configuration.ConfigurationManager from 6.0.0 to 6.0.1 (#2801)

Bumps [System.Configuration.ConfigurationManager](https://github.com/dotnet/runtime) from 6.0.0 to 6.0.1.
- [Release notes](https://github.com/dotnet/runtime/releases)
- [Commits](dotnet/runtime@v6.0.0...v6.0.1)

---
updated-dependencies:
- dependency-name: System.Configuration.ConfigurationManager
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Standalone resource parser (#2751)

* Adds SDL analysis tools, cleaup PR builds (#2797)

+semver: skip

* [Bugfix] [SQL-Search] Fix SQL code generation when :not operator is included. (#2785)

* Changes in NotExpressionRewriter.
* Adding categories to tests.
* Adding tests to fix bug when using :not operator combined with other search criteria.

* Bump FluentValidation from 11.2.1 to 11.2.2 (#2800)

Bumps [FluentValidation](https://github.com/JeremySkinner/fluentvalidation) from 11.2.1 to 11.2.2.
- [Release notes](https://github.com/JeremySkinner/fluentvalidation/releases)
- [Changelog](https://github.com/FluentValidation/FluentValidation/blob/main/Changelog.txt)
- [Commits](FluentValidation/FluentValidation@11.2.1...11.2.2)

---
updated-dependencies:
- dependency-name: FluentValidation
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

* Package integration tests (#2806)

* restore 40

* add 41.diff

* change taskconfiguration

* remove incorrect set

* modify stored procedures

* fix stored procedures

* Add IndexRebuilder

* add sqlserver.database

* update sqlserver.database.sqlproj

* update suppress warning

* add log

* remove unnecessary reference

* modify sql

* modify encoding

* update BuildTimeCodeGenerator and Tasks version to 6.1.129

* update

* add attribute for test

* remove set and restore lines

* modify nuget version

* restore encoding

* restore encoding

* remove lines

* modify pros

* modify props

* modify props

* modify

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Paul Taladay <PTaladay@users.noreply.github.com>
Co-authored-by: Robert Johnson <rbrucej@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Brendan Kowitz <bkowitz@microsoft.com>
Co-authored-by: Fernando Henrique Inocêncio Borba Ferreira <fernfe@microsoft.com>
  • Loading branch information
6 people committed Oct 21, 2022
1 parent f86a54d commit b37dba3
Show file tree
Hide file tree
Showing 37 changed files with 6,669 additions and 56 deletions.
6 changes: 6 additions & 0 deletions Microsoft.Health.Fhir.sln
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4B.T
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Health.Fhir.R4B.Api.UnitTests", "src\Microsoft.Health.Fhir.R4B.Api.UnitTests\Microsoft.Health.Fhir.R4B.Api.UnitTests.csproj", "{D6C90E8C-50AF-45D8-B2D3-1B9B07E65F3E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "IndexRebuilder", "tools\IndexRebuilder\IndexRebuilder.csproj", "{F4DE2945-80C5-48FE-B58A-4AD1264C9FEA}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -402,6 +404,10 @@ Global
{D6C90E8C-50AF-45D8-B2D3-1B9B07E65F3E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D6C90E8C-50AF-45D8-B2D3-1B9B07E65F3E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D6C90E8C-50AF-45D8-B2D3-1B9B07E65F3E}.Release|Any CPU.Build.0 = Release|Any CPU
{F4DE2945-80C5-48FE-B58A-4AD1264C9FEA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F4DE2945-80C5-48FE-B58A-4AD1264C9FEA}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F4DE2945-80C5-48FE-B58A-4AD1264C9FEA}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F4DE2945-80C5-48FE-B58A-4AD1264C9FEA}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ public class ImportTaskConfiguration
private const int DefaultSqlMaxDeleteDuplicateOperationConcurrentCount = 3;
private const int DefaultSqlMaxDatatableProcessConcurrentCount = 3;
private const int DefaultSqlLongRunningOperationTimeoutInSec = 60 * 60 * 2;
private const int DefaultInfinitySqlLongRunningOperationTimeoutInSec = 0;
private const int DefaultSqlBulkOperationTimeoutInSec = 60 * 10;
private const int DefaultPollingFrequencyInSeconds = 60;
private const bool DefaultSqlRebuildClustered = false;

/// <summary>
/// Determines whether bulk import is enabled or not.
Expand All @@ -45,6 +47,8 @@ public class ImportTaskConfiguration
/// </summary>
public int SqlLongRunningOperationTimeoutInSec { get; set; } = DefaultSqlLongRunningOperationTimeoutInSec;

public int InfinitySqlLongRunningOperationTimeoutInSec { get; set; } = DefaultInfinitySqlLongRunningOperationTimeoutInSec;

/// <summary>
/// SQL bulk operation timeout in seconds
/// </summary>
Expand Down Expand Up @@ -101,8 +105,8 @@ public class ImportTaskConfiguration
public bool DisableOptionalIndexesForImport { get; set; }

/// <summary>
/// Disable unique optional index during import data.
/// Default not rebuild clustered.
/// </summary>
public bool DisableUniqueOptionalIndexesForImport { get; set; }
public bool RebuildClustered { get; } = DefaultSqlRebuildClustered;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,24 +228,16 @@ public async Task PreprocessAsync(CancellationToken cancellationToken)
{
try
{
await InitializeIndexProperties(cancellationToken);
OptionalIndexesForImport = IndexesList();
OptionalUniqueIndexesForImport = UniqueIndexesList();

// Not disable index by default
if (_importTaskConfiguration.DisableOptionalIndexesForImport || _importTaskConfiguration.DisableUniqueOptionalIndexesForImport)
if (_importTaskConfiguration.DisableOptionalIndexesForImport)
{
List<(string tableName, string indexName)> indexesNeedDisable = new List<(string tableName, string indexName)>();

if (_importTaskConfiguration.DisableOptionalIndexesForImport)
{
indexesNeedDisable.AddRange(OptionalIndexesForImport.Select(indexRecord => (indexRecord.table.TableName, indexRecord.index.IndexName)));
}

if (_importTaskConfiguration.DisableUniqueOptionalIndexesForImport)
{
indexesNeedDisable.AddRange(OptionalUniqueIndexesForImport.Select(indexRecord => (indexRecord.table.TableName, indexRecord.index.IndexName)));
}

indexesNeedDisable.AddRange(OptionalIndexesForImport.Select(indexRecord => (indexRecord.table.TableName, indexRecord.index.IndexName)));
indexesNeedDisable.AddRange(OptionalUniqueIndexesForImport.Select(indexRecord => (indexRecord.table.TableName, indexRecord.index.IndexName)));
foreach (var index in indexesNeedDisable)
{
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
Expand Down Expand Up @@ -274,48 +266,95 @@ public async Task PostprocessAsync(CancellationToken cancellationToken)
try
{
// Not rerebuild index by default
if (_importTaskConfiguration.DisableOptionalIndexesForImport || _importTaskConfiguration.DisableUniqueOptionalIndexesForImport)
if (_importTaskConfiguration.DisableOptionalIndexesForImport)
{
List<(string tableName, string indexName, bool pageCompression)> indexesNeedRebuild = new List<(string tableName, string indexName, bool pageCompression)>();

if (_importTaskConfiguration.DisableOptionalIndexesForImport)
await SwitchPartitionsOutAllTables(_importTaskConfiguration.RebuildClustered, cancellationToken);
var commandsForRebuildIndexes = await GetCommandsForRebuildIndexes(_importTaskConfiguration.RebuildClustered, cancellationToken);
if (_importTaskConfiguration.RebuildClustered)
{
indexesNeedRebuild.AddRange(OptionalIndexesForImport.Select(indexRecord => (indexRecord.table.TableName, indexRecord.index.IndexName, indexRecord.pageCompression)));
commandsForRebuildIndexes = await GetCommandsForRebuildIndexes(false, cancellationToken);
}

if (_importTaskConfiguration.DisableUniqueOptionalIndexesForImport)
{
indexesNeedRebuild.AddRange(OptionalUniqueIndexesForImport.Select(indexRecord => (indexRecord.table.TableName, indexRecord.index.IndexName, indexRecord.pageCompression)));
}
await RunCommandForRebuildIndexes(commandsForRebuildIndexes, cancellationToken);
await SwitchPartitionsInAllTables(cancellationToken);
}
}
catch (Exception ex)
{
_logger.LogInformation(ex, "PostprocessAsync failed.");
if (ex.IsRetryable())
{
throw new RetriableJobException(ex.Message, ex);
}

List<Task<(string tableName, string indexName)>> runningTasks = new List<Task<(string tableName, string indexName)>>();
HashSet<string> runningRebuildTables = new HashSet<string>();
throw;
}
}

// rebuild index operation on same table would be blocked, try to parallel run rebuild operation on different table.
while (indexesNeedRebuild.Count > 0)
{
// if all remine indexes' table has some running rebuild operation, need to wait until at least one operation completed.
while (indexesNeedRebuild.All(ix => runningRebuildTables.Contains(ix.tableName)) || runningTasks.Count >= _importTaskConfiguration.SqlMaxRebuildIndexOperationConcurrentCount)
{
Task<(string tableName, string indexName)> completedTask = await Task.WhenAny(runningTasks.ToArray());
(string tableName, string indexName) indexRebuilt = await completedTask;
private async Task InitializeIndexProperties(CancellationToken cancellationToken)
{
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand())
{
sqlCommandWrapper.CommandTimeout = _importTaskConfiguration.SqlLongRunningOperationTimeoutInSec;

runningRebuildTables.Remove(indexRebuilt.tableName);
runningTasks.Remove(completedTask);
}
VLatest.InitializeIndexProperties.PopulateCommand(sqlCommandWrapper);
await sqlCommandWrapper.ExecuteNonQueryAsync(cancellationToken);
}
}

private async Task<IList<(string tableName, string indexName, string command)>> GetCommandsForRebuildIndexes(bool rebuildClustered, CancellationToken cancellationToken)
{
var indexes = new List<(string tableName, string indexName, string command)>();
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand())
{
sqlCommandWrapper.CommandTimeout = _importTaskConfiguration.SqlLongRunningOperationTimeoutInSec;

VLatest.GetCommandsForRebuildIndexes.PopulateCommand(sqlCommandWrapper, rebuildClustered);
using SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(cancellationToken);
while (await sqlDataReader.ReadAsync(cancellationToken))
{
var tableName = sqlDataReader.GetString(0);
var indexName = sqlDataReader.GetString(1);
var command = sqlDataReader.GetString(2);
indexes.Add((tableName, indexName, command));
}
}

(string tableName, string indexName, bool pageCompression) nextIndex = indexesNeedRebuild.Where(ix => !runningRebuildTables.Contains(ix.tableName)).First();
indexesNeedRebuild.Remove(nextIndex);
runningRebuildTables.Add(nextIndex.tableName);
runningTasks.Add(ExecuteRebuildIndexTaskAsync(nextIndex.tableName, nextIndex.indexName, nextIndex.pageCompression, cancellationToken));
return indexes;
}

private async Task RunCommandForRebuildIndexes(IList<(string tableName, string indexName, string command)> commands, CancellationToken cancellationToken)
{
var tasks = new Queue<Task<string>>();
try
{
foreach (var sqlCommand in commands)
{
if (cancellationToken.IsCancellationRequested)
{
throw new OperationCanceledException("Operation Cancel");
}

await Task.WhenAll(runningTasks.ToArray());
while (tasks.Count >= _importTaskConfiguration.SqlMaxRebuildIndexOperationConcurrentCount)
{
await tasks.First();
_ = tasks.Dequeue();
}

tasks.Enqueue(ExecuteRebuildIndexSqlCommand(sqlCommand.tableName, sqlCommand.indexName, sqlCommand.command, cancellationToken));
}

while (tasks.Count > 0)
{
await tasks.First();
_ = tasks.Dequeue();
}
}
catch (Exception ex)
{
_logger.LogInformation(ex, "PostprocessAsync failed.");
_logger.LogError(ex, "Failed to rebuild indexes");
if (ex.IsRetryable())
{
throw new RetriableJobException(ex.Message, ex);
Expand All @@ -325,17 +364,56 @@ public async Task PostprocessAsync(CancellationToken cancellationToken)
}
}

private async Task<(string tableName, string indexName)> ExecuteRebuildIndexTaskAsync(string tableName, string indexName, bool pageCompression, CancellationToken cancellationToken)
private async Task<string> ExecuteRebuildIndexSqlCommand(string tableName, string indexName, string command, CancellationToken cancellationToken)
{
_logger.LogInformation(string.Format("table: {0}, index: {1} rebuild index start at {2}", tableName, indexName, DateTime.Now.ToString("hh:mm:ss tt")));
try
{
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand())
{
sqlCommandWrapper.CommandTimeout = _importTaskConfiguration.InfinitySqlLongRunningOperationTimeoutInSec;

VLatest.ExecuteCommandForRebuildIndexes.PopulateCommand(sqlCommandWrapper, tableName, indexName, command);
using SqlDataReader sqlDataReader = await sqlCommandWrapper.ExecuteReaderAsync(cancellationToken);
while (await sqlDataReader.ReadAsync(cancellationToken))
{
indexName = sqlDataReader.GetString(0);
}
}
}
catch (SqlException ex)
{
_logger.LogError(ex, "rebuild execute failed");
throw;
}

_logger.LogInformation(string.Format("table: {0}, index: {1} rebuild index complete at {2}", tableName, indexName, DateTime.Now.ToString("hh:mm:ss tt")));

return indexName;
}

private async Task SwitchPartitionsOutAllTables(bool rebuildClustered, CancellationToken cancellationToken)
{
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand())
{
sqlCommandWrapper.CommandTimeout = _importTaskConfiguration.SqlLongRunningOperationTimeoutInSec;

VLatest.RebuildIndex.PopulateCommand(sqlCommandWrapper, tableName, indexName, pageCompression);
VLatest.SwitchPartitionsOutAllTables.PopulateCommand(sqlCommandWrapper, rebuildClustered);
await sqlCommandWrapper.ExecuteNonQueryAsync(cancellationToken);
}
}

private async Task SwitchPartitionsInAllTables(CancellationToken cancellationToken)
{
using (SqlConnectionWrapper sqlConnectionWrapper = await _sqlConnectionWrapperFactory.ObtainSqlConnectionWrapperAsync(cancellationToken, true))
using (SqlCommandWrapper sqlCommandWrapper = sqlConnectionWrapper.CreateRetrySqlCommand())
{
sqlCommandWrapper.CommandTimeout = _importTaskConfiguration.SqlLongRunningOperationTimeoutInSec;

return (tableName, indexName);
VLatest.SwitchPartitionsInAllTables.PopulateCommand(sqlCommandWrapper);
await sqlCommandWrapper.ExecuteNonQueryAsync(cancellationToken);
}
}

Expand Down
Loading

0 comments on commit b37dba3

Please sign in to comment.