Skip to content

Commit

Permalink
More cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
badrishc committed Aug 29, 2018
1 parent b681184 commit 002437f
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 190 deletions.
339 changes: 170 additions & 169 deletions cs/benchmark/FasterYcsbBenchmark.cs
Expand Up @@ -3,7 +3,7 @@

#pragma warning disable 0162

//#define DASHBOARD
#define DASHBOARD
//#define USE_CODEGEN

using FASTER.core;
Expand Down Expand Up @@ -32,7 +32,6 @@ public enum Op : ulong

const int kFileChunkSize = 4096;
const long kChunkSize = 640;


Key[] init_keys_;

Expand Down Expand Up @@ -96,63 +95,6 @@ public FASTER_YcsbBenchmark(int threadCount_, int numaStyle_, string distributio
(kMaxKey / 2, device);
}

private void SetupYcsb(int thread_idx)
{
if (numaStyle == 0)
Native32.AffinitizeThreadRoundRobin((uint)thread_idx);
else
Native32.AffinitizeThreadShardedTwoNuma((uint)thread_idx);

store.StartSession();

#if DASHBOARD
var tstart = HiResTimer.Rdtsc();
var tstop1 = tstart;
var lastWrittenValue = 0;
int count = 0;
#endif

Value value = default(Value);

for (long chunk_idx = Interlocked.Add(ref idx_, kChunkSize) - kChunkSize;
chunk_idx < kInitCount;
chunk_idx = Interlocked.Add(ref idx_, kChunkSize) - kChunkSize)
{
for (long idx = chunk_idx; idx < chunk_idx + kChunkSize; ++idx)
{
if (idx % 256 == 0)
{
store.Refresh();

if (idx % 65536 == 0)
{
store.CompletePending(false);
}
}

Key key = init_keys_[idx];
store.Upsert(&key, &value, null, 1);
}
#if DASHBOARD
count += (int)kChunkSize;

//Check if stats collector is requesting for statistics
if (writeStats[thread_idx])
{
var tstart1 = tstop1;
tstop1 = HiResTimer.Rdtsc();
threadThroughput[thread_idx] = (count - lastWrittenValue) / ((tstop1 - tstart1) / freq);
lastWrittenValue = count;
writeStats[thread_idx] = false;
statsWritten[thread_idx].Set();
}
#endif
}


store.CompletePending(true);
store.StopSession();
}
private void RunYcsb(int thread_idx)
{
RandomGenerator rng = new RandomGenerator((uint)(1 + thread_idx));
Expand Down Expand Up @@ -268,17 +210,176 @@ private void RunYcsb(int thread_idx)
Interlocked.Add(ref total_ops_done, reads_done + writes_done);
}

public unsafe void Run()
{
RandomGenerator rng = new RandomGenerator();

LoadData();

input_ = new Input[8];
for (int i = 0; i < 8; i++)
{
input_[i].value = i;
}
GCHandle handle = GCHandle.Alloc(input_, GCHandleType.Pinned);
input_ptr = (Input*)handle.AddrOfPinnedObject();

#if DASHBOARD
var dash = new Thread(() => DoContinuousMeasurements());
dash.Start();
#endif

Thread[] workers = new Thread[threadCount];

Console.WriteLine("Executing setup.");

// Setup the store for the YCSB benchmark.
for (int idx = 0; idx < threadCount; ++idx)
{
int x = idx;
workers[idx] = new Thread(() => SetupYcsb(x));
}
// Start threads.
foreach (Thread worker in workers)
{
worker.Start();
}
foreach (Thread worker in workers)
{
worker.Join();
}

long startTailAddress = store.Size;
Console.WriteLine("Start tail address = " + startTailAddress);


idx_ = 0;
store.DumpDistribution();

Console.WriteLine("Executing experiment.");

// Run the experiment.
for (int idx = 0; idx < threadCount; ++idx)
{
int x = idx;
workers[idx] = new Thread(() => RunYcsb(x));
}
// Start threads.
foreach (Thread worker in workers)
{
worker.Start();
}

Stopwatch swatch = new Stopwatch();
swatch.Start();

if (kCheckpointSeconds <= 0)
{
Thread.Sleep(TimeSpan.FromSeconds(kRunSeconds));
}
else
{
int runSeconds = 0;
while (runSeconds < kRunSeconds)
{
Thread.Sleep(TimeSpan.FromSeconds(kCheckpointSeconds));
store.TakeFullCheckpoint(out Guid token);
runSeconds += kCheckpointSeconds;
}
}

swatch.Stop();

done = true;

foreach (Thread worker in workers)
{
worker.Join();
}

#if DASHBOARD
dash.Abort();
#endif

double seconds = swatch.ElapsedMilliseconds / 1000.0;
long endTailAddress = store.Size;
Console.WriteLine("End tail address = " + endTailAddress);

Console.WriteLine("Total " + total_ops_done + " ops done " + " in " + seconds + " secs.");
Console.WriteLine("##, " + distribution + ", " + numaStyle + ", " + readPercent + ", "
+ threadCount + ", " + total_ops_done / seconds + ", "
+ (endTailAddress - startTailAddress));
}

private void SetupYcsb(int thread_idx)
{
if (numaStyle == 0)
Native32.AffinitizeThreadRoundRobin((uint)thread_idx);
else
Native32.AffinitizeThreadShardedTwoNuma((uint)thread_idx);

store.StartSession();

#if DASHBOARD
var tstart = HiResTimer.Rdtsc();
var tstop1 = tstart;
var lastWrittenValue = 0;
int count = 0;
#endif

Value value = default(Value);

for (long chunk_idx = Interlocked.Add(ref idx_, kChunkSize) - kChunkSize;
chunk_idx < kInitCount;
chunk_idx = Interlocked.Add(ref idx_, kChunkSize) - kChunkSize)
{
for (long idx = chunk_idx; idx < chunk_idx + kChunkSize; ++idx)
{
if (idx % 256 == 0)
{
store.Refresh();

if (idx % 65536 == 0)
{
store.CompletePending(false);
}
}

Key key = init_keys_[idx];
store.Upsert(&key, &value, null, 1);
}
#if DASHBOARD
readonly int measurementInterval = 2000;
readonly bool allDone;
readonly bool measureLatency;
readonly bool[] writeStats;
readonly private EventWaitHandle[] statsWritten;
readonly double[] threadThroughput;
readonly double[] threadAverageLatency;
readonly double[] threadMaximumLatency;
readonly long[] threadProgress;
readonly double freq;
count += (int)kChunkSize;

//Check if stats collector is requesting for statistics
if (writeStats[thread_idx])
{
var tstart1 = tstop1;
tstop1 = HiResTimer.Rdtsc();
threadThroughput[thread_idx] = (count - lastWrittenValue) / ((tstop1 - tstart1) / freq);
lastWrittenValue = count;
writeStats[thread_idx] = false;
statsWritten[thread_idx].Set();
}
#endif
}


store.CompletePending(true);
store.StopSession();
}

#if DASHBOARD
int measurementInterval = 2000;
bool allDone;
bool measureLatency;
bool[] writeStats;
private EventWaitHandle[] statsWritten;
double[] threadThroughput;
double[] threadAverageLatency;
double[] threadMaximumLatency;
long[] threadProgress;
double freq;

void DoContinuousMeasurements()
{
Expand Down Expand Up @@ -487,106 +588,6 @@ private void LoadSyntheticData()
}
#endregion

public unsafe void Run()
{
RandomGenerator rng = new RandomGenerator();

LoadData();

input_ = new Input[8];
for (int i = 0; i < 8; i++)
{
input_[i].value = i;
}
GCHandle handle = GCHandle.Alloc(input_, GCHandleType.Pinned);
input_ptr = (Input*)handle.AddrOfPinnedObject();


#if DASHBOARD
var dash = new Thread(() => DoContinuousMeasurements());
dash.Start();
#endif

Thread[] workers = new Thread[threadCount];

Console.WriteLine("Executing setup.");

// Setup the store for the YCSB benchmark.
for (int idx = 0; idx < threadCount; ++idx)
{
int x = idx;
workers[idx] = new Thread(() => SetupYcsb(x));
}
// Start threads.
foreach (Thread worker in workers)
{
worker.Start();
}
foreach (Thread worker in workers)
{
worker.Join();
}

long startTailAddress = store.Size;
Console.WriteLine("Start tail address = " + startTailAddress);


idx_ = 0;
store.DumpDistribution();

Console.WriteLine("Executing experiment.");

// Run the experiment.
for (int idx = 0; idx < threadCount; ++idx)
{
int x = idx;
workers[idx] = new Thread(() => RunYcsb(x));
}
// Start threads.
foreach (Thread worker in workers)
{
worker.Start();
}

Stopwatch swatch = new Stopwatch();
swatch.Start();

if (kCheckpointSeconds <= 0)
{
Thread.Sleep(TimeSpan.FromSeconds(kRunSeconds));
}
else
{
int runSeconds = 0;
while (runSeconds < kRunSeconds)
{
Thread.Sleep(TimeSpan.FromSeconds(kCheckpointSeconds));
store.TakeFullCheckpoint(out Guid token);
runSeconds += kCheckpointSeconds;
}
}

swatch.Stop();

done = true;

foreach (Thread worker in workers)
{
worker.Join();
}

#if DASHBOARD
dash.Abort();
#endif

double seconds = swatch.ElapsedMilliseconds / 1000.0;
long endTailAddress = store.Size;
Console.WriteLine("End tail address = " + endTailAddress);

Console.WriteLine("Total " + total_ops_done + " ops done " + " in " + seconds + " secs.");
Console.WriteLine("##, " + distribution + ", " + numaStyle + ", " + readPercent + ", "
+ threadCount + ", " + total_ops_done / seconds + ", "
+ (endTailAddress - startTailAddress));
}
}
}

0 comments on commit 002437f

Please sign in to comment.