Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix shared write buffers 2 #14

Merged
merged 8 commits into from May 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 26 additions & 43 deletions block_amd64.go
Expand Up @@ -8,8 +8,8 @@ package md5simd

import (
"fmt"
"math"
"sync"
"sync/atomic"
"unsafe"

"github.com/klauspost/cpuid"
Expand All @@ -23,20 +23,6 @@ func block8(state *uint32, base uintptr, bufs *int32, cache *byte, n int)
//go:noescape
func block16(state *uint32, ptrs *int64, mask uint64, n int)

// NewHash - initialize instance for Md5 implementation.
func (s *md5Server) NewHash() Hasher {
uid := atomic.AddUint64(&s.uidCounter, 1)
blockCh := make(chan blockInput, 5)
s.newInput <- newClient{
uid: uid,
input: blockCh,
}
return &md5Digest{
uid: uid, blocksCh: blockCh,
cycleServer: s.cycle,
}
}

// 8-way 4x uint32 digests in 4 ymm registers
// (ymm0, ymm1, ymm2, ymm3)
type digest8 struct {
Expand Down Expand Up @@ -109,47 +95,43 @@ func (s *md5Server) blockMd5_x16(d *digest16, input [16][]byte, half bool) {
for i := range d8a.v0 {
j := i + 8
d8a.v0[i], d8a.v1[i], d8a.v2[i], d8a.v3[i] = d.v0[i], d.v1[i], d.v2[i], d.v3[i]
d8b.v0[i], d8b.v1[i], d8b.v2[i], d8b.v3[i] = d.v0[j], d.v1[j], d.v2[j], d.v3[j]
if !half {
d8b.v0[i], d8b.v1[i], d8b.v2[i], d8b.v3[i] = d.v0[j], d.v1[j], d.v2[j], d.v3[j]
}
}

i8 := [2][8][]byte{}
for i := range i8[0] {
i8[0][i], i8[1][i] = input[i], input[8+i]
}
if half {
blockMd5_avx2(&d8a, i8[0], s.bases[0], &s.maskRounds8a)
blockMd5_avx2(&d8a, i8[0], s.allBufs, &s.maskRounds8a)
} else {
wg := sync.WaitGroup{}
wg.Add(2)
go func() { blockMd5_avx2(&d8a, i8[0], s.bases[0], &s.maskRounds8a); wg.Done() }()
go func() { blockMd5_avx2(&d8b, i8[1], s.bases[1], &s.maskRounds8b); wg.Done() }()
go func() { blockMd5_avx2(&d8a, i8[0], s.allBufs, &s.maskRounds8a); wg.Done() }()
go func() { blockMd5_avx2(&d8b, i8[1], s.allBufs, &s.maskRounds8b); wg.Done() }()
wg.Wait()
}

for i := range d8a.v0 {
j := i + 8
d.v0[i], d.v1[i], d.v2[i], d.v3[i] = d8a.v0[i], d8a.v1[i], d8a.v2[i], d8a.v3[i]
d.v0[j], d.v1[j], d.v2[j], d.v3[j] = d8b.v0[i], d8b.v1[i], d8b.v2[i], d8b.v3[i]
if !half {
d.v0[j], d.v1[j], d.v2[j], d.v3[j] = d8b.v0[i], d8b.v1[i], d8b.v2[i], d8b.v3[i]
}
}
}
}

// Interface function to AVX512 assembly code
func blockMd5_avx512(s *digest16, input [16][]byte, maskRounds *[16]maskRounds) {

// Sanity check to make sure we're not passing in more data than internalBlockSize
{
for i := 1; i < len(input); i++ {
if len(input[i]) > internalBlockSize {
panic(fmt.Sprintf("Sanity check fails for lane %d: maximum input length cannot exceed internalBlockSize", i))
}
}
}

ptrs := [16]int64{}

for i := range ptrs {
if input[i] != nil {
if len(input[i]) > internalBlockSize {
panic(fmt.Sprintf("Sanity check fails for lane %d: maximum input length cannot exceed internalBlockSize", i))
}
ptrs[i] = int64(uintptr(unsafe.Pointer(&(input[i][0]))))
}
}
Expand All @@ -174,20 +156,21 @@ func blockMd5_avx512(s *digest16, input [16][]byte, maskRounds *[16]maskRounds)

// Interface function to AVX2 assembly code
func blockMd5_avx2(s *digest8, input [8][]byte, base []byte, maskRounds *[8]maskRounds) {
baseMin := uint64(uintptr(unsafe.Pointer(&(base[0])))) - 4
ptrs := [8]int32{}

// Sanity check to make sure we're not passing in more data than internalBlockSize
{
for i := 1; i < len(input); i++ {
for i := range ptrs {
if len(input[i]) > 0 {
if len(input[i]) > internalBlockSize {
panic(fmt.Sprintf("Sanity check fails for lane %d: maximum input length cannot exceed internalBlockSize", i))
}
}
}

bufs := [8]int32{4, 4 + internalBlockSize, 4 + internalBlockSize*2, 4 + internalBlockSize*3, 4 + internalBlockSize*4, 4 + internalBlockSize*5, 4 + internalBlockSize*6, 4 + internalBlockSize*7}

for i := 0; i < len(input); i++ {
copy(base[bufs[i]:], input[i])
off := uint64(uintptr(unsafe.Pointer(&(input[i][0])))) - baseMin
if off > math.MaxUint32 {
panic(fmt.Sprintf("invalid buffer sent with offset %x", off))
}
ptrs[i] = int32(off)
}
}

sdup := *s // create copy of initial states to receive intermediate updates
Expand All @@ -197,10 +180,10 @@ func blockMd5_avx2(s *digest8, input [8][]byte, base []byte, maskRounds *[8]mask
for r := 0; r < rounds; r++ {
m := maskRounds[r]
var cache cache8 // stack storage for block8 tmp state
block8(&sdup.v0[0], uintptr(unsafe.Pointer(&(base[0]))), &bufs[0], &cache[0], int(64*m.rounds))
block8(&sdup.v0[0], uintptr(baseMin), &ptrs[0], &cache[0], int(64*m.rounds))

for j := 0; j < len(bufs); j++ {
bufs[j] += int32(64 * m.rounds) // update pointers for next round
for j := 0; j < len(ptrs); j++ {
ptrs[j] += int32(64 * m.rounds) // update pointers for next round
if m.mask&(1<<j) != 0 { // update digest if still masked as active
(*s).v0[j], (*s).v1[j], (*s).v2[j], (*s).v3[j] = sdup.v0[j], sdup.v1[j], sdup.v2[j], sdup.v3[j]
}
Expand Down
34 changes: 28 additions & 6 deletions md5-digest_amd64.go
Expand Up @@ -10,6 +10,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync/atomic"
)

// md5Digest - Type for computing MD5 using either AVX2 or AVX512
Expand All @@ -20,6 +21,23 @@ type md5Digest struct {
x [BlockSize]byte
nx int
len uint64
buffers <-chan []byte
}

// NewHash - initialize instance for Md5 implementation.
func (s *md5Server) NewHash() Hasher {
uid := atomic.AddUint64(&s.uidCounter, 1)
blockCh := make(chan blockInput, buffersPerLane)
s.newInput <- newClient{
uid: uid,
input: blockCh,
}
return &md5Digest{
uid: uid,
buffers: s.buffers,
blocksCh: blockCh,
cycleServer: s.cycle,
}
}

// Size - Return size of checksum
Expand Down Expand Up @@ -74,16 +92,20 @@ func (d *md5Digest) write(p []byte) (nn int, err error) {
if d.nx == BlockSize {
// Create a copy of the overflow buffer in order to send it async over the channel
// (since we will modify the overflow buffer down below with any access beyond multiples of 64)
tmp := [BlockSize]byte{}
copy(tmp[:], d.x[:])
d.sendBlock(blockInput{uid: d.uid, msg: tmp[:]}, len(p)-n < BlockSize)
tmp := <-d.buffers
tmp = tmp[:BlockSize]
copy(tmp, d.x[:])
d.sendBlock(blockInput{uid: d.uid, msg: tmp}, len(p)-n < BlockSize)
d.nx = 0
}
p = p[n:]
}
if len(p) >= BlockSize {
n := len(p) &^ (BlockSize - 1)
d.sendBlock(blockInput{uid: d.uid, msg: p[:n]}, len(p)-n < BlockSize)
buf := <-d.buffers
buf = buf[:n]
copy(buf, p)
d.sendBlock(blockInput{uid: d.uid, msg: buf}, len(p)-n < BlockSize)
p = p[n:]
}
if len(p) > 0 {
Expand All @@ -105,8 +127,8 @@ func (d *md5Digest) Sum(in []byte) (result []byte) {
panic("sum after close")
}

trail := make([]byte, 0, 128)
trail = append(trail, d.x[:d.nx]...)
trail := <-d.buffers
trail = append(trail[:0], d.x[:d.nx]...)

length := d.len
// Padding. Add a 1 bit and 0 bits until 56 bytes mod 64.
Expand Down
21 changes: 16 additions & 5 deletions md5-server_amd64.go
Expand Up @@ -29,6 +29,8 @@ const (
// differentiate with default initialisation value of 0
const md5ServerUID = Lanes

const buffersPerLane = 3

// Message to send across input channel
type blockInput struct {
uid uint64
Expand All @@ -52,7 +54,8 @@ type md5Server struct {
maskRounds16 [16]maskRounds // Pre-allocated static array for max 16 rounds
maskRounds8a [8]maskRounds // Pre-allocated static array for max 8 rounds (1st AVX2 core)
maskRounds8b [8]maskRounds // Pre-allocated static array for max 8 rounds (2nd AVX2 core)
bases [2][]byte // base memory (only for non-AVX512 mode)
allBufs []byte // Preallocated buffer.
buffers chan []byte // Preallocated buffers, sliced from allBufs.
}

// NewServer - Create new object for parallel processing handling
Expand All @@ -65,10 +68,12 @@ func NewServer() Server {
md5srv.newInput = make(chan newClient, Lanes)
md5srv.cycle = make(chan uint64, Lanes*10)
md5srv.uidCounter = md5ServerUID - 1
if !hasAVX512 {
// only reserve memory when not on AVX512
md5srv.bases[0] = make([]byte, 4+8*internalBlockSize)
md5srv.bases[1] = make([]byte, 4+8*internalBlockSize)
md5srv.allBufs = make([]byte, 32+buffersPerLane*Lanes*internalBlockSize)
md5srv.buffers = make(chan []byte, buffersPerLane*Lanes)
// Fill buffers.
for i := 0; i < buffersPerLane*Lanes; i++ {
s := 32 + i*internalBlockSize
md5srv.buffers <- md5srv.allBufs[s : s+internalBlockSize : s+internalBlockSize]
}

// Start a single thread for reading from the input channel
Expand Down Expand Up @@ -154,6 +159,9 @@ func (s *md5Server) process(newClients chan newClient) {
binary.LittleEndian.PutUint32(sum.digest[8:], dig.s[2])
binary.LittleEndian.PutUint32(sum.digest[12:], dig.s[3])
block.sumCh <- sum
if block.msg != nil {
s.buffers <- block.msg
}
continue
}
if len(block.msg) == 0 {
Expand Down Expand Up @@ -273,6 +281,9 @@ func (s *md5Server) blocks(lanes []blockInput) {
binary.LittleEndian.PutUint32(dig[12:], state.v3[i])

s.digests[uid] = dig
if lane.msg != nil {
s.buffers <- lane.msg
}
lanes[i] = blockInput{}
}
}
Expand Down
74 changes: 44 additions & 30 deletions md5_test.go
Expand Up @@ -130,8 +130,9 @@ func TestGolangGolden16(t *testing.T) {
}

func testMultipleSums(t *testing.T, incr, incr2 int) {

server := NewServer()
defer server.Close()

h := server.NewHash()
var tmp [Size]byte

Expand Down Expand Up @@ -218,65 +219,78 @@ func testMd5Simulator(t *testing.T, concurrency, iterations, maxSize int, server
}

func TestMd5Simulator(t *testing.T) {
iterations := 200
iterations := 400
if testing.Short() {
iterations = 20
iterations = 40
}

server := NewServer()
t.Run("c16", func(t *testing.T) {
server := NewServer()
t.Cleanup(server.Close)
t.Parallel()
testMd5Simulator(t, 16, iterations/10, 20<<20, server)
})
t.Run("c1", func(t *testing.T) {
server := NewServer()
t.Cleanup(server.Close)
t.Parallel()
testMd5Simulator(t, 1, iterations, 5<<20, server)
})
t.Run("c19", func(t *testing.T) {
server := NewServer()
t.Cleanup(server.Close)
t.Parallel()
testMd5Simulator(t, 19, iterations*10, 100<<10, server)
testMd5Simulator(t, 19, iterations*2, 100<<10, server)
})
t.Cleanup(server.Close)
}

// TestRandomInput tests a number of random inputs.
func TestRandomInput(t *testing.T) {
n := 10000
n := 500
if testing.Short() {
n = 100
}
conc := runtime.GOMAXPROCS(0)
server := NewServer()
for c := 0; c < conc; c++ {
t.Run(fmt.Sprint("routine-", c), func(t *testing.T) {
rng := rand.New(rand.NewSource(0xabad1dea + int64(c)))
testBuffer := make([]byte, 0, 10<<20)
t.Parallel()
server := NewServer()
t.Cleanup(server.Close)
for i := 0; i < n; i++ {
testBuffer = testBuffer[:rng.Intn(cap(testBuffer))]
rng.Read(testBuffer)
wantMD5 := md5.Sum(testBuffer)
h := server.NewHash()
for len(testBuffer) > 0 {
wrLen := rng.Intn(len(testBuffer) + 1)
n, err := h.Write(testBuffer[:wrLen])
if err != nil {
t.Fatal(err)
rng := rand.New(rand.NewSource(0xabad1dea + int64(c*n+i)))
// Up to 1 MB
length := rng.Intn(1 << 20)
baseBuf := make([]byte, length)

t.Run(fmt.Sprint("hash-", i), func(t *testing.T) {
t.Parallel()
testBuffer := baseBuf
rng.Read(testBuffer)
wantMD5 := md5.Sum(testBuffer)
h := server.NewHash()
for len(testBuffer) > 0 {
wrLen := rng.Intn(len(testBuffer) + 1)
n, err := h.Write(testBuffer[:wrLen])
if err != nil {
t.Fatal(err)
}
if n != wrLen {
t.Fatalf("write mismatch, want %d, got %d", wrLen, n)
}
testBuffer = testBuffer[n:]
if len(testBuffer) == 0 {
// Test if we can use the buffer without races.
rng.Read(baseBuf)
}
}
if n != wrLen {
t.Fatalf("write mismatch, want %d, got %d", wrLen, n)
got := h.Sum(nil)
if !bytes.Equal(wantMD5[:], got) {
t.Fatalf("mismatch, want %v, got %v", wantMD5[:], got)
}
testBuffer = testBuffer[n:]
}
got := h.Sum(nil)
if !bytes.Equal(wantMD5[:], got) {
t.Fatalf("mismatch, want %v, got %v", wantMD5[:], got)
}
h.Close()
h.Close()
})
}
})
}
t.Cleanup(server.Close)
}

func benchmarkCryptoMd5(b *testing.B, blockSize int) {
Expand Down