Skip to content

Commit

Permalink
add downloader and requester logic
Browse files Browse the repository at this point in the history
  • Loading branch information
ifilonenko committed Aug 4, 2020
1 parent 836f8c6 commit 738598d
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 18 deletions.
6 changes: 4 additions & 2 deletions cmd/puller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (

var (
modelDir = flag.String("model_dir", "/tmp","directory for multi-model config files")
numWorkers = flag.Int("num_workers", 1,"number of workers for parallel downloads")
)

func main() {
flag.Parse()
puller.InitiatePullers(*numWorkers)
puller.OnConfigChange(func(e puller.EventWrapper) {
log.Println("Send a request to:", e.LoadState)
log.Println("for model", e.ModelName)
log.Println("Send a request to:", e.LoadState, "for model", e.ModelName)
puller.AddModelToChannel(e)
})
puller.WatchConfig(*modelDir)
}
47 changes: 47 additions & 0 deletions pkg/puller/downloader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package puller

import "log"

func DownloadFunc(done chan struct{}, in chan EventWrapper) chan EventWrapper {
out := make(chan EventWrapper)
go func() {
for event := range in {
select {
default:
e := download(event)
if e == nil {
if event.DownloadRetries > 2 {
// LOOK AT THIS
close(done)
close(out)
} else {
log.Println("I am retrying", event.ModelName)
event.DownloadRetries += 1
in <- event
}
} else {
out <- event
}
case <-done:
return
}
}
close(out)
}()
return out
}

func download(event EventWrapper) *EventWrapper {
// TODO: Proper download logic
// We are testing the retry logic here
if event.ModelName == "my_model" || event.ModelName == "my_model2"{
log.Println("Downloading model,", event.ModelName)
if event.DownloadRetries == 0 {
log.Println("Need to retry download", event.ModelName)
return nil
}
log.Println("Success downloading my model", event.ModelName)
return &event
}
return nil
}
51 changes: 51 additions & 0 deletions pkg/puller/puller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package puller

import (
"log"
)

var (
p *Puller
)


type Puller struct {
NumWorkers int
Channel chan EventWrapper
}

func init() {
p = NewPuller()
}

func NewPuller() *Puller {
p := new(Puller)
return p
}

func worker(id int, events<-chan EventWrapper) {
log.Println("worker", id)
for event := range events {
log.Println("worker", id, "started job", event)
// TODO: Need to signal to close done and downloadChannel
// instead of closing in go-routines
done := make(chan struct{})
downloadChannel := make(chan EventWrapper, 1)
downloadChannel <- event
requestChannel := DownloadFunc(done, downloadChannel)
result := RequestFunc(done, requestChannel)
log.Println("worker", id, "finished job", event, "with:", result)
}
}

func AddModelToChannel(e EventWrapper) {
p.Channel <- e
}

func InitiatePullers(numWorkers int) {
p.NumWorkers = numWorkers
p.Channel = make(chan EventWrapper)
for workers := 1; workers <= p.NumWorkers; workers++ {
go worker(workers, p.Channel)
}
}
27 changes: 27 additions & 0 deletions pkg/puller/requester.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package puller

func RequestFunc(done chan struct{}, in chan EventWrapper) bool {
for event := range in {
select {
default:
e := request(event)
if e == nil {
return false
} else {
return true
}
case <-done:
return false
}
}
return false
}

func request(event EventWrapper) *EventWrapper {
// TODO: Write request logic for load / unload
// Currently just testing my_model
if event.ModelName == "my_model" {
return &event
}
return nil
}
35 changes: 19 additions & 16 deletions pkg/puller/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"strings"
)

var p *Puller
var w *Watcher

type Puller struct {
type Watcher struct {
modelDir string
onConfigChange func(EventWrapper)
fileExtension string
Expand All @@ -25,14 +25,14 @@ type ModelDefinition struct {
}

func init() {
p = New()
w = NewWatcher()
}

func New() *Puller {
p := new(Puller)
func NewWatcher() *Watcher {
w := new(Watcher)
// TODO: This should probably be overridable
p.fileExtension = ".json"
return p
w.fileExtension = ".json"
return w
}

type State string
Expand All @@ -49,19 +49,20 @@ type EventWrapper struct {
ModelDef *ModelDefinition
LoadState State
ModelName string
DownloadRetries int
}
func WatchConfig(modelDir string) {
log.Println("Entering watch")
p.modelDir = modelDir
p.WatchConfig()
w.modelDir = modelDir
w.WatchConfig()
}

func OnConfigChange(run func(in EventWrapper)) {
log.Println("Applying onConfigChange")
p.onConfigChange = run
w.onConfigChange = run
}

func (p *Puller) WatchConfig() {
func (w*Watcher) WatchConfig() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatal(err)
Expand All @@ -79,17 +80,18 @@ func (p *Puller) WatchConfig() {
// we only care about the model file:
// 1 - if the model file was modified or created
// 2 - if the model file was removed as a result of deletion or renaming
if p.onConfigChange != nil {
if w.onConfigChange != nil {
ext := filepath.Ext(event.Name)
isEdit := event.Op&writeOrCreateMask != 0
isRemove := event.Op&fsnotify.Remove != 0
isValidFile := ext == p.fileExtension
isValidFile := ext == w.fileExtension
if isValidFile && (isEdit || isRemove) {
fileName := strings.TrimSuffix(filepath.Base(event.Name), ext)
if isRemove {
p.onConfigChange(EventWrapper{
w.onConfigChange(EventWrapper{
LoadState: ShouldUnload,
ModelName: fileName,
DownloadRetries: 0,
})
} else {
file, _ := ioutil.ReadFile(filepath.Clean(event.Name))
Expand All @@ -98,10 +100,11 @@ func (p *Puller) WatchConfig() {
if err != nil {
log.Println("unable to marshall\n", err)
} else {
p.onConfigChange(EventWrapper{
w.onConfigChange(EventWrapper{
ModelDef: &modelDef,
LoadState: ShouldLoad,
ModelName: fileName,
DownloadRetries: 0,
})
}
}
Expand All @@ -118,7 +121,7 @@ func (p *Puller) WatchConfig() {
}
}
}()
err = watcher.Add(p.modelDir)
err = watcher.Add(w.modelDir)
if err != nil {
log.Fatal(err)
}
Expand Down

0 comments on commit 738598d

Please sign in to comment.