Skip to content

Commit

Permalink
add basic downloader implementation [#31]
Browse files Browse the repository at this point in the history
  • Loading branch information
bdon committed Sep 2, 2023
1 parent 9ca361a commit 98576cc
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 0 deletions.
80 changes: 80 additions & 0 deletions pmtiles/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package pmtiles


type Task struct {
Index int
Rng Range
Result chan TaskResult
}

type TaskResult struct {
Index int
Blob []byte
}

// returns a channel of results that exactly match the requested ranges.
func DownloadParts(getter func (Range) []byte, ranges []Range, numThreads int) chan []byte {
intermediate := make(chan TaskResult, 8)
orderedOutput := make(chan []byte, 8)
tasks := make(chan Task, 100)

lastTask := len(ranges) - 1

worker := func (id int, tasks <-chan Task) {
for task := range tasks {
task.Result <- TaskResult{task.Index, getter(task.Rng)}
}
}

for i := 0; i < numThreads; i++ {
go worker(i, tasks)
}


// push into the queue on a separate goroutine
go func () {
for idx, r := range ranges {
tasks <- Task{Index: idx, Rng: r, Result: intermediate}
}
close(tasks)
}()

// a goroutine that listens on a channel
// and buffers the results, outputting them in exact sorted order
// once it has received all results, it closes the result channel
go func() {
buffer := make(map[int]TaskResult)
nextIndex := 0

for i := range intermediate {
buffer[i.Index] = i

for {
if next, ok := buffer[nextIndex]; ok {
orderedOutput <- next.Blob
delete(buffer, nextIndex)
nextIndex++

if (nextIndex == lastTask) {
close(intermediate)
}
} else {
break
}
}
}

close(orderedOutput)
}()

return orderedOutput
}

// an number for overhead: 0.2 is 20% overhead, 1.0 is 100% overhead
// a number of maximum chunk size: n chunks * threads is the max memory usage
// store the smallest gaps in a heap
func DownloadBatchedParts(getter func (Range) []byte, ranges []Range, overhead float32, maxSizeBytes int, numThreads int) chan []byte {
orderedOutput := make(chan []byte, 8)
return orderedOutput
}

33 changes: 33 additions & 0 deletions pmtiles/downloader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package pmtiles

import (
"github.com/stretchr/testify/assert"
"testing"
"encoding/binary"
"time"
)

func TestDownloadParts(t *testing.T) {
fakeGet := func(rng Range) []byte {
time.Sleep(time.Millisecond * time.Duration(3 - rng.Offset))
bytes := make([]byte, 8)
binary.LittleEndian.PutUint64(bytes, rng.Offset)
return bytes
}

wanted := make([]Range, 0)
wanted = append(wanted, Range{0,1})
wanted = append(wanted, Range{1,2})
wanted = append(wanted, Range{2,3})

result := DownloadParts(fakeGet, wanted, 3)

expected := uint64(0)

for x := range result {
assert.Equal(t, expected, binary.LittleEndian.Uint64(x))
expected += 1
}

assert.Equal(t, expected, uint64(3))
}

0 comments on commit 98576cc

Please sign in to comment.