Skip to content

Commit

Permalink
Merge pull request #64 from moul/dev/moul/processing-worker
Browse files Browse the repository at this point in the history
feat: processing worker
  • Loading branch information
moul committed Nov 10, 2020
2 parents 1a2ed3c + 6761a55 commit ed2d3e2
Show file tree
Hide file tree
Showing 10 changed files with 389 additions and 178 deletions.
1 change: 1 addition & 0 deletions AUTHORS

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 6 additions & 1 deletion Makefile
Expand Up @@ -21,7 +21,7 @@ LDFLAGS ?= -X moul.io/sgtm/internal/sgtmversion.VcsRef=$(VCS_REF) -X moul.io/sgt
COMPILEDAEMON_OPTIONS ?= -exclude-dir=.git -color=true -build=go\ install -build-dir=./cmd/sgtm
run: generate
go install github.com/githubnemo/CompileDaemon
CompileDaemon $(COMPILEDAEMON_OPTIONS) -command="sgtm --dev-mode --enable-server --enable-discord run"
CompileDaemon $(COMPILEDAEMON_OPTIONS) -command="sgtm --dev-mode --enable-server --enable-discord --enable-processing-worker run"
.PHONY: run

run-discord: generate
Expand All @@ -34,6 +34,11 @@ run-server: generate
CompileDaemon $(COMPILEDAEMON_OPTIONS) -command="sgtm --dev-mode --enable-server run"
.PHONY: run-server

run-processing-worker: generate
go install github.com/githubnemo/CompileDaemon
CompileDaemon $(COMPILEDAEMON_OPTIONS) -command="sgtm --dev-mode --enable-processing-worker run"
.PHONY: run-processing-worker

packr:
(cd static; git clean -fxd)
cd pkg/sgtm && packr2
Expand Down
4 changes: 4 additions & 0 deletions api/sgtm.proto
Expand Up @@ -112,6 +112,8 @@ message User {
string goals = 27;
string soundcloud_username = 28;
string role = 29;
int64 processing_version = 30;
string processing_error = 31;

repeated Post recent_posts = 50 [(go.field) = {tags: 'gorm:"foreignkey:AuthorID;PRELOAD:false"'}];
// timezone
Expand All @@ -138,6 +140,8 @@ message Post {
Provider provider = 17;
string body = 18;
int64 sort_date = 19;
int64 processing_version = 20;
string processing_error = 21;

/// comment

Expand Down
6 changes: 5 additions & 1 deletion cmd/sgtm/main.go
Expand Up @@ -61,6 +61,7 @@ func app(args []string) error {
rootFlags.StringVar(&svcOpts.SoundCloudClientID, "soundcloud-client-id", svcOpts.SoundCloudClientID, "SoundCloud client ID")
rootFlags.StringVar(&svcOpts.BearerToken, "bearer-token", svcOpts.BearerToken, "Bearer.sh token")
rootFlags.StringVar(&svcOpts.IPFSAPI, "ipfs-api", svcOpts.IPFSAPI, "IPFS API multiaddress, if not provided or empry, will use the ipfs cli without an '--api' arg")
rootFlags.BoolVar(&svcOpts.EnableProcessingWorker, "enable-processing-worker", svcOpts.EnableProcessingWorker, "enable processing worker")

root := &ffcli.Command{
FlagSet: rootFlags,
Expand Down Expand Up @@ -159,12 +160,15 @@ func runCmd(ctx context.Context, _ []string) error {
// run.Group
var gr run.Group
{
if svcOpts.EnableDiscord || svcOpts.EnableServer {
if svcOpts.EnableDiscord || svcOpts.EnableServer || svcOpts.EnableProcessingWorker {
gr.Add(run.SignalHandler(ctx, syscall.SIGTERM, syscall.SIGINT, os.Interrupt, os.Kill))
}
if svcOpts.EnableDiscord {
gr.Add(svc.StartDiscord, svc.CloseDiscord)
}
if svcOpts.EnableProcessingWorker {
gr.Add(svc.StartProcessingWorker, svc.CloseProcessingWorker)
}
if svcOpts.EnableServer {
gr.Add(svc.StartServer, svc.CloseServer)
}
Expand Down
12 changes: 12 additions & 0 deletions deployments/sgtm.club/docker-compose.yml
Expand Up @@ -37,6 +37,18 @@ services:
networks:
- default

sgtm-processing-worker:
image: moul/sgtm:latest
restart: on-failure
volumes:
- .:/app
working_dir: /app
labels:
com.centurylinklabs.watchtower.enable: "true"
command: --enable-processing-worker run
networks:
- default

ipfs-daemon:
image: ipfs/go-ipfs:latest
restart: on-failure
Expand Down
4 changes: 2 additions & 2 deletions gen.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/sgtm/opts.go
Expand Up @@ -26,6 +26,10 @@ type Opts struct {
DiscordClientID string
DiscordClientSecret string

// Processing Worker

EnableProcessingWorker bool

// SoundCloud

SoundCloudClientID string
Expand Down
136 changes: 136 additions & 0 deletions pkg/sgtm/processing_worker.go
@@ -0,0 +1,136 @@
package sgtm

import (
"fmt"
"os"
"sync"
"time"

"go.uber.org/zap"
"gorm.io/gorm"
"moul.io/banner"
"moul.io/sgtm/pkg/sgtmpb"
)

type processingWorkerDriver struct {
started bool
wg *sync.WaitGroup

trackMigrations []func(*sgtmpb.Post) error
}

func (svc *Service) StartProcessingWorker() error {
// init
{
fmt.Fprintln(os.Stderr, banner.Inline("processing-worker"))
svc.logger.Debug("starting processing-worker")
svc.setupMigrations()
svc.processingWorker.wg = &sync.WaitGroup{}
svc.processingWorker.wg.Add(1)
defer svc.processingWorker.wg.Done()
svc.processingWorker.started = true
}

// loop
for i := 0; ; i++ {
if err := svc.processingLoop(i); err != nil {
return err
}

select {
// FIXME: add a channel to get instant worker task
case <-time.After(30 * time.Second):
case <-svc.ctx.Done():
return nil
}
}
}

func (svc *Service) CloseProcessingWorker(err error) {
svc.logger.Debug("closing processingWorker", zap.Bool("was-started", svc.processingWorker.started), zap.Error(err))
svc.cancel()
if svc.processingWorker.started {
svc.processingWorker.wg.Wait()
svc.logger.Debug("processing-worker closed")
}
}

func (svc *Service) processingLoop(i int) error {
before := time.Now()

// track migrations
{
var outdated []*sgtmpb.Post
err := svc.rodb().
Where(sgtmpb.Post{Kind: sgtmpb.Post_TrackKind}).
Where("processing_error IS NULL OR processing_error == ''").
Where("processing_version IS NULL OR processing_version < ?", len(svc.processingWorker.trackMigrations)).
Preload("Author").
Find(&outdated).
Error
if err != nil {
return fmt.Errorf("failed to fetch tracks that need to be processed: %w", err)
}

err = svc.rwdb().Transaction(func(db *gorm.DB) error {
for _, entryPtr := range outdated {
entry := entryPtr
version := 1
for _, migration := range svc.processingWorker.trackMigrations {
err := migration(entry)
if err != nil {
entry.ProcessingError = err.Error()
break
}
entry.ProcessingVersion = int64(version)
version++
}
if err := db.
Model(&entry).
Updates(map[string]interface{}{
"processing_version": entry.ProcessingVersion,
"processing_error": entry.ProcessingError,
}).
Error; err != nil {
return fmt.Errorf("failed to save processing state: %w", err)
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to run migration: %w", err)
}
}

// TODO: other type migrations
// TODO: track maintenance (i.e., daily check if the track still exists on SoundCloud)

svc.logger.Debug("processing loop ended",
zap.Duration("duration", time.Since(before)),
zap.Int("loop", i),
)
return nil
}

func (svc *Service) setupMigrations() {
svc.processingWorker.trackMigrations = []func(*sgtmpb.Post) error{
/*
// FIXME: try downloading the mp3 locally
func(post *sgtmpb.Post) error { return fmt.Errorf("not implemented") },
// FIXME: compute BPM
func(post *sgtmpb.Post) error { return fmt.Errorf("not implemented") },
// FIXME: extract thumbnail from file metadata
func(post *sgtmpb.Post) error { return fmt.Errorf("not implemented") },
// FIXME: compute other info with analysis tools
func(post *sgtmpb.Post) error { return fmt.Errorf("not implemented") },
// FIXME: create MP3 version for uploaded WAV
func(post *sgtmpb.Post) error {
if post.Provider != sgtmpb.Provider_IPFS {
return nil
}
// if post.mp3_192_cid == "" && format != mp3 { download; compress; upload }
return nil
},
*/
}
}
7 changes: 4 additions & 3 deletions pkg/sgtm/service.go
Expand Up @@ -26,9 +26,10 @@ type Service struct {

// drivers

discord discordDriver
server serverDriver
ipfs ipfsWrapper
discord discordDriver
server serverDriver
processingWorker processingWorkerDriver
ipfs ipfsWrapper
}

func New(db *gorm.DB, opts Opts) (Service, error) {
Expand Down

0 comments on commit ed2d3e2

Please sign in to comment.