Skip to content

Commit

Permalink
Speeding up magic queue and surf miner starting
Browse files Browse the repository at this point in the history
  • Loading branch information
fireduck64 committed Jan 28, 2019
1 parent ab0ddb5 commit d98800a
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 22 deletions.
39 changes: 24 additions & 15 deletions miner/src/surf/MagicQueue.java
Expand Up @@ -4,6 +4,7 @@
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.HashMap;
import java.util.TreeMap;
import java.util.LinkedList;


Expand All @@ -24,7 +25,8 @@ public class MagicQueue
* Each thread accumulatedd data in this map before they are saved
* to the global buckets.
*/
private final ThreadLocal<Map<Integer, ByteBuffer> > local_buff;
//private final ThreadLocal<Map<Integer, ByteBuffer> > local_buff;
private final ThreadLocal<ByteBuffer[] > local_buff;


public MagicQueue(int max_chunk_size, int bucket_count)
Expand All @@ -38,9 +40,12 @@ public MagicQueue(int max_chunk_size, int bucket_count)
global_buckets[i] = new LinkedList<>();
}

local_buff = new ThreadLocal<Map<Integer, ByteBuffer>>() {
@Override protected Map<Integer,ByteBuffer> initialValue() {
return new HashMap<Integer, ByteBuffer>(bucket_count*2+1, 0.5f);
local_buff = new ThreadLocal<ByteBuffer[]>() {
@Override protected ByteBuffer[] initialValue() {
//return new HashMap<Integer, ByteBuffer>(bucket_count*2+1, 0.5f);
//return new TreeMap<Integer, ByteBuffer>();
return new ByteBuffer[bucket_count];

}
};

Expand All @@ -54,16 +59,17 @@ public MagicQueue(int max_chunk_size, int bucket_count)
*/
public ByteBuffer openWrite(int bucket, int data_size)
{
Map<Integer, ByteBuffer> local = local_buff.get();
if (local.containsKey(bucket))
ByteBuffer[] local = local_buff.get();
if (local[bucket] != null)
{
if (local.get(bucket).remaining() >= data_size) return local.get(bucket);
if (local[bucket].remaining() >= data_size) return local[bucket];

writeToBucket( bucket, local.get(bucket) );
writeToBucket( bucket, local[bucket] );
}

local.put(bucket, ByteBuffer.allocate(max_chunk_size));
return local.get(bucket);
local[bucket] = ByteBuffer.allocate(max_chunk_size);

return local[bucket];

}

Expand Down Expand Up @@ -107,13 +113,16 @@ public ByteBuffer readBucket(int bucket)

public void flushFromLocal()
{
for(Map.Entry<Integer,ByteBuffer> me : local_buff.get().entrySet())

ByteBuffer[] local = local_buff.get();
for(int b=0; b<bucket_count; b++)
{
int b = me.getKey();
ByteBuffer bb = me.getValue();
writeToBucket(b, bb);
if (local[b] != null)
{
writeToBucket(b, local[b]);
local[b]=null;
}
}
local_buff.get().clear();

}

Expand Down
124 changes: 124 additions & 0 deletions miner/src/surf/MagicQueueLoadTest.java
@@ -0,0 +1,124 @@
package snowblossom.miner.surf;

import java.util.Random;
import java.nio.ByteBuffer;
import duckutil.FusionInitiator;
import java.util.concurrent.Semaphore;
import java.text.DecimalFormat;

public class MagicQueueLoadTest
{
public static void main(String args[]) throws Exception
{
new MagicQueueLoadTest();
}

final MagicQueue mq;
final int write_size=57;
final int writes_per_thread=10000000;
final int write_threads=16;
final int read_threads=4;
final FusionInitiator fi;
final Semaphore read_sem = new Semaphore(0);

public MagicQueueLoadTest() throws Exception
{
mq = new MagicQueue(50000,256);

fi = new FusionInitiator(read_threads);
fi.start();

double start_tm = System.currentTimeMillis();


for(int i=0; i<read_threads; i++)
{
new QueueReader(i, i * read_threads / 256).start();
}
for(int i=0; i<write_threads; i++)
{
new QueueWriter().start();
}

read_sem.acquire(write_threads * writes_per_thread);

double end_tm = System.currentTimeMillis();
double sec = (end_tm - start_tm) / 1000.0;
double items = write_threads * writes_per_thread;
double rate = items / sec;

DecimalFormat df = new DecimalFormat("0.00");

System.out.println(String.format("%d items took %s seconds (%s/sec)", write_threads * writes_per_thread, df.format(sec), df.format(rate)));



}

public class QueueWriter extends Thread
{
public QueueWriter()
{

}

public void run()
{
byte[] buff = new byte[write_size];
Random rnd = new Random();
for(long x=0; x<writes_per_thread; x++)
{
rnd.nextBytes(buff);
int bucket = rnd.nextInt(256);
ByteBuffer bb = mq.openWrite(bucket, write_size);
bb.put(buff);

}
mq.flushFromLocal();

}
}

public class QueueReader extends Thread
{
int task_number;
int start;
public QueueReader(int task_number, int start)
{
this.task_number = task_number;
this.start = start;
setDaemon(true);
}

public void run()
{
int b = start;
while(true)
{
fi.taskWait(task_number);

ByteBuffer bb = null;
while((bb = mq.readBucket(b)) != null)
{
int items = bb.remaining() / write_size;
read_sem.release(items);

}



fi.taskComplete(task_number);
b = (b + 1) % 256;

try
{
if (b == 0) {Thread.sleep(100);}
}
catch(Throwable t){}

}

}

}
}
27 changes: 20 additions & 7 deletions miner/src/surf/SurfMiner.java
Expand Up @@ -342,28 +342,41 @@ private void runPass() throws Exception
last_work_unit = null;
}

int to_start = 0;

if (!start_work_sem.tryAcquire())
{
// We might be at the end of the work, so flush any in queues
magic_queue.flushFromLocal();
start_work_sem.acquire();
to_start++;
}
else
{
to_start++;
if (start_work_sem.tryAcquire(1000))
{
to_start+=1000;
}
}

wu = last_work_unit;

try (TimeRecordAuto tra = TimeRecord.openAuto("MinerThread.rndNonce"))
for(int s =0 ; s<to_start; s++)
{
rnd.nextBytes(nonce);
wu.getHeader().getNonce().copyTo(nonce, 0);
}

byte[] context = PowUtil.hashHeaderBits(wu.getHeader(), nonce, md);
byte[] context = PowUtil.hashHeaderBits(wu.getHeader(), nonce, md);

long word_idx = PowUtil.getNextSnowFieldIndex(context, field.getTotalWords(), md, tmp_buff);
long word_idx = PowUtil.getNextSnowFieldIndex(context, field.getTotalWords(), md, tmp_buff);

int block = (int)(word_idx / WORDS_PER_CHUNK);
int block = (int)(word_idx / WORDS_PER_CHUNK);

ByteBuffer bucket_buff = magic_queue.openWrite(block, getRecordSize());
ByteBuffer bucket_buff = magic_queue.openWrite(block, getRecordSize());

writeRecord(bucket_buff, wu.getWorkId(), (byte)0, word_idx, nonce, context);
writeRecord(bucket_buff, wu.getWorkId(), (byte)0, word_idx, nonce, context);
}

}

Expand Down

0 comments on commit d98800a

Please sign in to comment.