Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose status of chunks to see which was loaded and which are not #69

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dist/
.idea
.DS_Store
coverage.out
*.dat
*.output
1 change: 1 addition & 0 deletions chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ func (dst *OffsetWriter) Write(b []byte) (n int, err error) {
// Chunk represents the partial content range
type Chunk struct {
Start, End uint64
IsLoaded bool
}
16 changes: 8 additions & 8 deletions chunks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,23 @@ func TestChunksLength(t *testing.T) {
return
}

if d.chunks[0].Start != 0 {
if d.Chunks[0].Start != 0 {

t.Errorf("First chunk should start from 0, but got %d", d.chunks[0].Start)
t.Errorf("First chunk should start from 0, but got %d", d.Chunks[0].Start)
}

if chunk0.End != d.chunks[0].End {
if chunk0.End != d.Chunks[0].End {

t.Errorf("Chunk 0 expecting: %d but got: %d", chunk0.End, d.chunks[0].End)
t.Errorf("Chunk 0 expecting: %d but got: %d", chunk0.End, d.Chunks[0].End)
}

if d.chunks[1].Start != 5242871 {
if d.Chunks[1].Start != 5242871 {

t.Errorf("Second chunk should start from: 5242871, but got %d", d.chunks[1].Start)
t.Errorf("Second chunk should start from: 5242871, but got %d", d.Chunks[1].Start)
}

if chunk1.End != d.chunks[1].End {
if chunk1.End != d.Chunks[1].End {

t.Errorf("Chunk 1 expecting: %d but got: %d", chunk1.End, d.chunks[1].End)
t.Errorf("Chunk 1 expecting: %d but got: %d", chunk1.End, d.Chunks[1].End)
}
}
66 changes: 59 additions & 7 deletions download.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ type (

info *Info

chunks []*Chunk
Chunks []*Chunk

startedAt time.Time
// how many megabytes to download by one megabyte before switching to bigger chunks
HotStartMegabytes uint64
HotStartConcurrency uint
}

GotHeader struct {
Expand Down Expand Up @@ -149,22 +152,51 @@ func (d *Download) Init() (err error) {
if d.Concurrency == 0 {
d.Concurrency = getDefaultConcurrency()
}
if d.HotStartConcurrency == 0 && d.HotStartMegabytes > 0 {
d.HotStartConcurrency = 2
}

// Set default chunk size
if d.ChunkSize == 0 {
d.ChunkSize = getDefaultChunkSize(d.info.Size, d.MinChunkSize, d.MaxChunkSize, uint64(d.Concurrency))
}

d.Chunks = make([]*Chunk, 0)

Megabyte := uint64(1024 * 1024)

if d.info.Size < (1*Megabyte) && d.HotStartMegabytes > 0 {
chunk := new(Chunk)
d.Chunks = append(d.Chunks, chunk)
chunk.Start = 0
chunk.End = d.info.Size - 1
return nil
}

hotStartBytes := d.HotStartMegabytes * Megabyte

chunksLen := d.info.Size / d.ChunkSize
d.chunks = make([]*Chunk, 0, chunksLen)

if d.info.Size > hotStartBytes {

chunksLen = (d.info.Size - hotStartBytes) / d.ChunkSize
SmallChunkSize := Megabyte
for i := uint64(0); i < d.HotStartMegabytes; i++ {
chunk := new(Chunk)
d.Chunks = append(d.Chunks, chunk)

chunk.Start = (SmallChunkSize * i) + i
chunk.End = chunk.Start + SmallChunkSize
}
}

// Set chunk ranges.
for i := uint64(0); i < chunksLen; i++ {

chunk := new(Chunk)
d.chunks = append(d.chunks, chunk)
d.Chunks = append(d.Chunks, chunk)

chunk.Start = (d.ChunkSize * i) + i
chunk.Start = (d.ChunkSize * i) + i + hotStartBytes
chunk.End = chunk.Start + d.ChunkSize
if chunk.End >= d.info.Size || i == chunksLen-1 {
chunk.End = d.info.Size - 1
Expand Down Expand Up @@ -302,10 +334,29 @@ func (d *Download) dl(dest io.WriterAt, errC chan error) {
wg sync.WaitGroup

// Concurrency limit.
max = make(chan int, d.Concurrency)
max = make(chan int, d.Concurrency)
hotStartMax = make(chan int, d.HotStartConcurrency)
)

for i := 0; i < len(d.chunks); i++ {
for i := 0; i < int(d.HotStartMegabytes); i++ {
hotStartMax <- 1
wg.Add(1)

go func(i int) {
defer wg.Done()

// Concurrently download and write chunk
if err := d.DownloadChunk(d.Chunks[i], &OffsetWriter{dest, int64(d.Chunks[i].Start)}); err != nil {
errC <- err
return
}
d.Chunks[i].IsLoaded = true

<-hotStartMax
}(i)
}

for i := int(d.HotStartMegabytes); i < len(d.Chunks); i++ {

max <- 1
wg.Add(1)
Expand All @@ -314,10 +365,11 @@ func (d *Download) dl(dest io.WriterAt, errC chan error) {
defer wg.Done()

// Concurrently download and write chunk
if err := d.DownloadChunk(d.chunks[i], &OffsetWriter{dest, int64(d.chunks[i].Start)}); err != nil {
if err := d.DownloadChunk(d.Chunks[i], &OffsetWriter{dest, int64(d.Chunks[i].Start)}); err != nil {
errC <- err
return
}
d.Chunks[i].IsLoaded = true

<-max
}(i)
Expand Down