Skip to content

Commit

Permalink
Added async API to sessions, added first draft of async sample.
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Dec 10, 2019
1 parent 1eca340 commit 0237a0c
Show file tree
Hide file tree
Showing 12 changed files with 554 additions and 32 deletions.
9 changes: 9 additions & 0 deletions cs/FASTER.sln
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FASTER.devices.AzureStorage
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterLogSample", "playground\FasterLogSample\FasterLogSample.csproj", "{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "FasterKVAsyncSample", "playground\FasterKVAsyncSample\FasterKVAsyncSample.csproj", "{859F76F4-93D8-4D60-BF9A-363E217FA247}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -166,6 +168,12 @@ Global
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|Any CPU.Build.0 = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.ActiveCfg = Release|x64
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58}.Release|x64.Build.0 = Release|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Debug|Any CPU.ActiveCfg = Debug|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Debug|x64.ActiveCfg = Debug|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Debug|x64.Build.0 = Debug|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Release|Any CPU.ActiveCfg = Release|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Release|x64.ActiveCfg = Release|x64
{859F76F4-93D8-4D60-BF9A-363E217FA247}.Release|x64.Build.0 = Release|x64
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -187,6 +195,7 @@ Global
{A6B14415-D316-4955-BE5F-725BB2DEBEBE} = {28800357-C8CE-4CD0-A2AD-D4A910ABB496}
{E571E686-01A0-44D5-BFF5-B7678284258B} = {A6B14415-D316-4955-BE5F-725BB2DEBEBE}
{25C5C6B6-4A8A-46DD-88C1-EB247033FE58} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
{859F76F4-93D8-4D60-BF9A-363E217FA247} = {E6026D6A-01C5-4582-B2C1-64751490DABE}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {A0750637-2CCB-4139-B25E-F2CE740DCFAC}
Expand Down
6 changes: 6 additions & 0 deletions cs/playground/FasterKVAsyncSample/App.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="utf-8"?>
<configuration>
<startup>
<supportedRuntime version="v4.0" sku=".NETFramework,Version=v4.6" />
</startup>
</configuration>
39 changes: 39 additions & 0 deletions cs/playground/FasterKVAsyncSample/FasterKVAsyncSample.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>
<Platforms>x64</Platforms>
<LangVersion>preview</LangVersion>
<RuntimeIdentifiers>win7-x64;linux-x64</RuntimeIdentifiers>
</PropertyGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
<RootNamespace>FasterKVAsyncSample</RootNamespace>
<ErrorReport>prompt</ErrorReport>
<RestoreProjectStyle>PackageReference</RestoreProjectStyle>
<Prefer32Bit>true</Prefer32Bit>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)' == 'Debug'">
<DefineConstants>TRACE;DEBUG</DefineConstants>
<DebugType>full</DebugType>
<DebugSymbols>true</DebugSymbols>
<OutputPath>bin\x64\Debug\</OutputPath>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)' == 'Release'">
<DefineConstants>TRACE</DefineConstants>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>bin\x64\Release\</OutputPath>
</PropertyGroup>

<ItemGroup>
<None Include="App.config" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\core\FASTER.core.csproj" />
</ItemGroup>
</Project>
140 changes: 140 additions & 0 deletions cs/playground/FasterKVAsyncSample/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System;
using System.Diagnostics;
using System.IO;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using FASTER.core;

namespace FasterKVAsyncSample
{
public class Program
{
static FasterKV<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions> faster;
static int numOps = 0;

/// <summary>
/// Main program entry point
/// </summary>
static void Main()
{
var path = "FasterKVAsyncSample";

var log = Devices.CreateLogDevice(path + "hlog.log", deleteOnClose: true);
var objlog = Devices.CreateLogDevice(path + "hlog.obj.log", deleteOnClose: true);

var logSettings = new LogSettings { LogDevice = log, ObjectLogDevice = objlog };
var checkpointSettings = new CheckpointSettings { CheckpointDir = path, CheckPointType = CheckpointType.FoldOver };
var serializerSettings = new SerializerSettings<CacheKey, CacheValue> { keySerializer = () => new CacheKeySerializer(), valueSerializer = () => new CacheValueSerializer() };

faster = new FasterKV<CacheKey, CacheValue, CacheInput, CacheOutput, CacheContext, CacheFunctions>
(1L << 20, new CacheFunctions(), logSettings, checkpointSettings, serializerSettings);

const int NumParallelTasks = 1;
ThreadPool.SetMinThreads(2 * Environment.ProcessorCount, 2 * Environment.ProcessorCount);
TaskScheduler.UnobservedTaskException += (object sender, UnobservedTaskExceptionEventArgs e) =>
{
Console.WriteLine($"Unobserved task exception: {e.Exception}");
e.SetObserved();
};

Task[] tasks = new Task[NumParallelTasks];
for (int i = 0; i < NumParallelTasks; i++)
{
int local = i;
tasks[i] = Task.Run(() => AsyncOperator(local));
}

// Threads for reporting, commit
new Thread(new ThreadStart(ReportThread)).Start();
new Thread(new ThreadStart(CommitThread)).Start();

Task.WaitAll(tasks);
}

/// <summary>
/// Async operations on FasterKV
/// </summary>
static async Task AsyncOperator(int id)
{
using var session = faster.NewSession(id.ToString());
Random rand = new Random(id);

bool batched = true;

await Task.Yield();

if (!batched)
{
// Single commit version - append each item and wait for commit
// Needs high parallelism (NumParallelTasks) for perf
// Needs separate commit thread to perform regular commit
// Otherwise we commit only at page boundaries
while (true)
{
try
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()), true);
Interlocked.Increment(ref numOps);
}
catch (Exception ex)
{
Console.WriteLine($"{nameof(AsyncOperator)}({id}): {ex}");
}
}
}
else
{
// Batched version - we enqueue many entries to memory,
// then wait for commit periodically
int count = 0;
while (true)
{
await session.UpsertAsync(new CacheKey(rand.Next()), new CacheValue(rand.Next()));
if (count++ % 100 == 0)
{
await session.WaitForCommitAsync();
Interlocked.Add(ref numOps, 100);
}
}
}
}

static void ReportThread()
{
long lastTime = 0;
long lastValue = numOps;

Stopwatch sw = new Stopwatch();
sw.Start();

while (true)
{
Thread.Sleep(5000);

var nowTime = sw.ElapsedMilliseconds;
var nowValue = numOps;

Console.WriteLine("Operation Throughput: {0} ops/sec, Tail: {1}",
(nowValue - lastValue) / (1000 * (nowTime - lastTime)), faster.Log.TailAddress);
lastValue = nowValue;
lastTime = nowTime;
}
}

static void CommitThread()
{
//Task<LinkedCommitInfo> prevCommitTask = null;
while (true)
{
Thread.Sleep(5000);

faster.TakeFullCheckpoint(out _);
faster.CompleteCheckpointAsync().GetAwaiter().GetResult();
}
}
}
}
22 changes: 22 additions & 0 deletions cs/playground/FasterKVAsyncSample/Properties/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license.

using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;

// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyDescription("")]
[assembly: AssemblyCopyright("Copyright © 2017")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]

// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]

// The following GUID is for the ID of the typelib if this project is exposed to COM
[assembly: Guid("17bdd0a5-98e5-464a-8a00-050d9ff4c562")]
Loading

0 comments on commit 0237a0c

Please sign in to comment.