Skip to content

Commit

Permalink
Modularize IoManager (#1646)
Browse files Browse the repository at this point in the history
Modularize IoManager
  • Loading branch information
nopara73 committed Jul 2, 2019
2 parents 29e7088 + 6f06e83 commit efe8a78
Show file tree
Hide file tree
Showing 7 changed files with 541 additions and 404 deletions.
29 changes: 15 additions & 14 deletions WalletWasabi.Tests/StoreTests.cs
Expand Up @@ -9,6 +9,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using WalletWasabi.Io;
using WalletWasabi.Stores;
using Xunit;
using Xunit.Sdk;
Expand Down Expand Up @@ -356,7 +357,7 @@ public async Task IoManagerTestsAsync()

// Single thread file operations.

IoManager ioman1 = new IoManager(file1);
DigestableSafeMutexIoManager ioman1 = new DigestableSafeMutexIoManager(file1);

// Delete the file if Exist.

Expand Down Expand Up @@ -403,18 +404,18 @@ bool IsStringArraysEqual(string[] lines1, string[] lines2)
// Check digest file, and write only differ logic.

// Write the same content, file should not be written.
var currentDate = File.GetLastWriteTimeUtc(ioman1.OriginalFilePath);
var currentDate = File.GetLastWriteTimeUtc(ioman1.FilePath);
await Task.Delay(500);
await ioman1.WriteAllLinesAsync(lines);
var noChangeDate = File.GetLastWriteTimeUtc(ioman1.OriginalFilePath);
var noChangeDate = File.GetLastWriteTimeUtc(ioman1.FilePath);
Assert.Equal(currentDate, noChangeDate);

// Write different content, file should be written.
currentDate = File.GetLastWriteTimeUtc(ioman1.OriginalFilePath);
currentDate = File.GetLastWriteTimeUtc(ioman1.FilePath);
await Task.Delay(500);
lines.Add("Lorem ipsum dolor sit amet, consectetur adipiscing elit.");
await ioman1.WriteAllLinesAsync(lines);
var newContentDate = File.GetLastWriteTimeUtc(ioman1.OriginalFilePath);
var newContentDate = File.GetLastWriteTimeUtc(ioman1.FilePath);
Assert.NotEqual(currentDate, newContentDate);

/* The next test is commented out because on mac and on linux File.Open does not lock the file
Expand All @@ -427,12 +428,12 @@ bool IsStringArraysEqual(string[] lines1, string[] lines2)
* https://github.com/dotnet/corefx/issues/5964
*/

//using (File.OpenWrite(ioman1.OriginalFilePath))
//using (File.OpenWrite(ioman1.FilePath))
//{
// // Should be OK because the same data is written.
// await ioman1.WriteAllLinesAsync(lines);
//}
//using (File.OpenWrite(ioman1.OriginalFilePath))
//using (File.OpenWrite(ioman1.FilePath))
//{
// // Should fail because different data is written.
// await Assert.ThrowsAsync<IOException>(async () => await ioman1.WriteAllLinesAsync(lines));
Expand Down Expand Up @@ -472,8 +473,8 @@ bool IsStringArraysEqual(string[] lines1, string[] lines2)
// Simulate file write error and recovery logic.

// We have only *.new and *.old files.
File.Copy(ioman1.OriginalFilePath, ioman1.OldFilePath);
File.Move(ioman1.OriginalFilePath, ioman1.NewFilePath);
File.Copy(ioman1.FilePath, ioman1.OldFilePath);
File.Move(ioman1.FilePath, ioman1.NewFilePath);

// At this point there is now OriginalFile.

Expand All @@ -489,7 +490,7 @@ bool IsStringArraysEqual(string[] lines1, string[] lines2)
// Check recovery mechanism.

Assert.True(
File.Exists(ioman1.OriginalFilePath) &&
File.Exists(ioman1.FilePath) &&
!File.Exists(ioman1.OldFilePath) &&
!File.Exists(ioman1.NewFilePath));

Expand All @@ -499,12 +500,12 @@ bool IsStringArraysEqual(string[] lines1, string[] lines2)

// Check if directory is empty.

var fileCount = Directory.EnumerateFiles(Path.GetDirectoryName(ioman1.OriginalFilePath)).Count();
var fileCount = Directory.EnumerateFiles(Path.GetDirectoryName(ioman1.FilePath)).Count();
Assert.Equal(0, fileCount);

// Check Mutex usage on simultaneous file writes.

IoManager ioman2 = new IoManager(file2);
DigestableSafeMutexIoManager ioman2 = new DigestableSafeMutexIoManager(file2);

await Task.Run(async () =>
{
Expand All @@ -522,7 +523,7 @@ bool IsStringArraysEqual(string[] lines1, string[] lines2)
});

// TryReplace test.
var dummyFilePath = $"{ioman1.OriginalFilePath}dummy";
var dummyFilePath = $"{ioman1.FilePath}dummy";
var dummyContent = new string[]
{
"banana",
Expand All @@ -548,7 +549,7 @@ public async Task IoTestsAsync()
{
var file = Path.Combine(Global.Instance.DataDir, nameof(IoTestsAsync), $"file.dat");

IoManager ioman = new IoManager(file);
DigestableSafeMutexIoManager ioman = new DigestableSafeMutexIoManager(file);
ioman.DeleteMe();
await ioman.WriteAllLinesAsync(new string[0], dismissNullOrEmptyContent: false);

Expand Down
239 changes: 239 additions & 0 deletions WalletWasabi/Io/DigestableSafeMutexIoManager.cs
@@ -0,0 +1,239 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Nito.AsyncEx;
using WalletWasabi.Helpers;
using WalletWasabi.Logging;

namespace WalletWasabi.Io
{
/// <summary>
/// Safely manager file operations.
/// </summary>
public class DigestableSafeMutexIoManager : SafeMutexIoManager
{
public string DigestFilePath { get; }

/// <summary>
/// Use the random index of the line to create digest faster. -1 is special value, it means the last character. If null then hash whole file.
/// </summary>
private int? DigestRandomIndex { get; }

private const string DigestExtension = ".dig";

/// <param name="digestRandomIndex">Use the random index of the line to create digest faster. -1 is special value, it means the last character. If null then hash whole file.</param>
public DigestableSafeMutexIoManager(string filePath, int? digestRandomIndex = null) : base(filePath)

{
DigestRandomIndex = digestRandomIndex;

DigestFilePath = $"{FilePath}{DigestExtension}";
}

#region IoOperations

public new void DeleteMe()
{
base.DeleteMe();

if (File.Exists(DigestFilePath))
{
File.Delete(DigestFilePath);
}
}

public new async Task WriteAllLinesAsync(IEnumerable<string> lines, CancellationToken cancellationToken = default, bool dismissNullOrEmptyContent = true)
{
if (dismissNullOrEmptyContent)
{
if (lines is null || !lines.Any())
{
return;
}
}

var byteArrayBuilder = new ByteArrayBuilder();

foreach (var line in lines)

{
ContinueBuildHash(byteArrayBuilder, line);
}

var res = await WorkWithHashAsync(byteArrayBuilder, cancellationToken);
if (res.same)
{
return;
}

await base.WriteAllLinesAsync(lines, cancellationToken, dismissNullOrEmptyContent);

await WriteOutHashAsync(res.hash);
}

public async Task AppendAllLinesAsync(IEnumerable<string> lines, CancellationToken cancellationToken = default)
{
if (lines is null || !lines.Any())
{
return;
}

IoHelpers.EnsureContainingDirectoryExists(NewFilePath);
if (File.Exists(NewFilePath))
{
File.Delete(NewFilePath);
}

var byteArrayBuilder = new ByteArrayBuilder();

var linesArray = lines.ToArray();
var linesIndex = 0;

using (var sr = OpenText())
using (var fs = File.OpenWrite(NewFilePath))
using (var sw = new StreamWriter(fs, Encoding.ASCII, Constants.BigFileReadWriteBufferSize))
{
// 1. First copy.
if (!sr.EndOfStream)
{
var lineTask = sr.ReadLineAsync();
Task wTask = Task.CompletedTask;
string line = null;
while (lineTask != null)
{
if (line is null)
{
line = await lineTask;
}

lineTask = sr.EndOfStream ? null : sr.ReadLineAsync();

if (linesArray[linesIndex] == line) // If the line is a line we want to write, then we know that someone else have worked into the file.
{
linesIndex++;
continue;
}

await wTask;
wTask = sw.WriteLineAsync(line);

ContinueBuildHash(byteArrayBuilder, line);

cancellationToken.ThrowIfCancellationRequested();

line = null;
}
await wTask;
}
await sw.FlushAsync();

// 2. Then append.
foreach (var line in linesArray)
{
await sw.WriteLineAsync(line);

ContinueBuildHash(byteArrayBuilder, line);

cancellationToken.ThrowIfCancellationRequested();
}

await sw.FlushAsync();
}

var res = await WorkWithHashAsync(byteArrayBuilder, cancellationToken);
if (res.same)
{
return;
}

SafeMoveNewToOriginal();
await WriteOutHashAsync(res.hash);
}

#endregion IoOperations

#region Hashing

private async Task WriteOutHashAsync(byte[] hash)
{
try
{
IoHelpers.EnsureContainingDirectoryExists(DigestFilePath);

await File.WriteAllBytesAsync(DigestFilePath, hash);
}
catch (Exception ex)
{
Logger.LogWarning<DigestableSafeMutexIoManager>("Failed to create digest.");
Logger.LogInfo<DigestableSafeMutexIoManager>(ex);
}
}

private async Task<(bool same, byte[] hash)> WorkWithHashAsync(ByteArrayBuilder byteArrayBuilder, CancellationToken cancellationToken)
{
byte[] hash = null;
try
{
var bytes = byteArrayBuilder.ToArray();
hash = HashHelpers.GenerateSha256Hash(bytes);
if (File.Exists(DigestFilePath))
{
var digest = await File.ReadAllBytesAsync(DigestFilePath, cancellationToken);
if (ByteHelpers.CompareFastUnsafe(hash, digest))
{
if (File.Exists(NewFilePath))
{
File.Delete(NewFilePath);
}
return (true, hash);
}
}
}
catch (Exception ex)
{
Logger.LogWarning<DigestableSafeMutexIoManager>("Failed to read digest.");
Logger.LogInfo<DigestableSafeMutexIoManager>(ex);
}

return (false, hash);
}

private void ContinueBuildHash(ByteArrayBuilder byteArrayBuilder, string line)
{
if (string.IsNullOrWhiteSpace(line))
{
byteArrayBuilder.Append(0);
}
else
{
if (DigestRandomIndex.HasValue)
{
int index;
if (DigestRandomIndex == -1 || DigestRandomIndex >= line.Length) // Last char.
{
index = line.Length - 1;
}
else
{
index = DigestRandomIndex.Value;
}

var c = line[index];
var b = (byte)c;
byteArrayBuilder.Append(b);
}
else
{
var b = Encoding.ASCII.GetBytes(line);
byteArrayBuilder.Append(b);
}
}
}

#endregion Hashing
}
}

0 comments on commit efe8a78

Please sign in to comment.