Skip to content

Commit

Permalink
feat: glue all changes
Browse files Browse the repository at this point in the history
  • Loading branch information
didasy committed Sep 25, 2020
1 parent 1aba1df commit 02d48f7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 18 deletions.
7 changes: 4 additions & 3 deletions app/ytfeed/ytfeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {

var streamScheduler *streamschedule.StreamSchedule
if cfg.BoltDBPath != "" {
streamScheduler, err = streamschedule.New(logger, cfg.BoltDBPath, cfg.ResubCallbackAddr, cfg.StreamSchedulerRetryDelay, cfg.StreamSchedulerWorkerInterval, cfg.StreamSchedulerMaxRetries)
streamScheduler, err = streamschedule.New(logger, cfg.BoltDBPath, cfg.StreamSchedulerWorkerInterval)
if err != nil {
err = errors.Wrap(err, "failed to create stream scheduler service")
return
Expand Down Expand Up @@ -178,7 +178,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {
}

// run workers
subscriber := autosubscribefeed.New(logger, cfg.VerificationToken, cfg.ResubTargetAddr, cfg.ResubTopic, cfg.ResubCallbackAddr, cfg.ResubInterval)
subscriber := autosubscribefeed.New(logger, cfg.VerificationToken, cfg.VerificationSecret, cfg.ResubTargetAddr, cfg.ResubCallbackAddr, cfg.ResubTopics, cfg.ResubInterval)
go func(ctx context.Context, subscriber *autosubscribefeed.Subscriber) {
err := subscriber.Subscribe(ctx)
if err != nil {
Expand All @@ -190,6 +190,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {

if streamScheduler != nil {
go func(ctx context.Context, streamScheduler *streamschedule.StreamSchedule) {
streamScheduler.RegisterDataHandler(dataHandlers...)
err := streamScheduler.RunWorker(ctx)
if err != nil {
err = errors.Wrap(err, "stream scheduler worker exited with error")
Expand All @@ -201,7 +202,7 @@ func Run(ctx context.Context, logger mainytfeed.Logger) (err error) {

// declare handler functions
http.HandleFunc("/health", health.Handler)
http.HandleFunc("/", rss.Handler(ctx, logger, cfg.VerificationToken, dataHandlers...))
http.HandleFunc("/", rss.Handler(ctx, logger, cfg.VerificationToken, cfg.VerificationSecret, dataHandlers...))

// listen
errCh := make(chan error, 1)
Expand Down
23 changes: 8 additions & 15 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,14 @@ import (
)

const (
DefaultVersion = "v1.0.0"
DefaultVersion = "v0.2.0"
DefaultHost = ":8123"
DefaultResubTargetAddr = "https://pubsubhubbub.appspot.com/subscribe"
DefaultResubInterval = 24 * 3 * time.Hour
DefaultFileNameTemplate = "{{.ChannelID}}/{{.PublishedYear}}/{{.PublishedMonth}}/{{.PublishedDay}}/{{.PublishedTimeZone}}/{{.VideoID}}.{{.VideoExtension}}"
DefaultFormatQuality = "720"
DefaultFormatExtension = "webm"
DefaultRedisChannel = "ytfeed"
DefaultStreamSchedulerRetryDelay = 1 * time.Minute
DefaultStreamSchedulerMaxRetries = 5
DefaultStreamSchedulerWorkerInterval = 1 * time.Minute
DefaultVideoDownloadMaxRetries = 5
DefaultTemporaryFileDir = "./"
Expand Down Expand Up @@ -55,6 +53,7 @@ func init() {
handleError(viper.BindEnv("host"))
handleError(viper.BindEnv("youtube_api_key"))
handleError(viper.BindEnv("verification_token"))
handleError(viper.BindEnv("verification_secret"))

handleError(viper.BindEnv("resub_interval"))
handleError(viper.BindEnv("resub_target_addr"))
Expand Down Expand Up @@ -99,9 +98,7 @@ func init() {
handleError(viper.BindEnv("redis_idle_check_frequency"))

handleError(viper.BindEnv("boltdb_path"))
handleError(viper.BindEnv("stream_scheduler_retry_delay"))
handleError(viper.BindEnv("stream_scheduler_worker_interval"))
handleError(viper.BindEnv("stream_scheduler_max_retries"))

handleError(viper.BindEnv("amqp_dsn"))
handleError(viper.BindEnv("amqp_exchange"))
Expand All @@ -122,9 +119,7 @@ func init() {
viper.SetDefault("video_format_quality", DefaultFormatQuality)
viper.SetDefault("video_format_extension", DefaultFormatExtension)
viper.SetDefault("redis_channel", DefaultRedisChannel)
viper.SetDefault("stream_scheduler_retry_delay", DefaultStreamSchedulerRetryDelay)
viper.SetDefault("stream_scheduler_worker_interval", DefaultStreamSchedulerWorkerInterval)
viper.SetDefault("stream_scheduler_max_retries", DefaultStreamSchedulerMaxRetries)
viper.SetDefault("video_download_max_retries", DefaultVideoDownloadMaxRetries)
viper.SetDefault("temporary_file_dir", DefaultTemporaryFileDir)
viper.SetDefault("amqp_exchange", DefaultAMQPExchange)
Expand All @@ -151,11 +146,12 @@ type Validator interface {
type Configuration struct {
validator Validator `validate:"required"`

YoutubeAPIKey string `validate:"required"`
VerificationToken string `validate:"required"`
YoutubeAPIKey string `validate:"required"`
VerificationToken string `validate:"required"`
VerificationSecret string `validate:"required"`

ResubTargetAddr string `validate:"required"`
ResubTopic string `validate:"required"`
ResubTopics []string `validate:"required"`
ResubCallbackAddr string `validate:"required"`
ResubInterval time.Duration `validate:"required"`

Expand Down Expand Up @@ -197,9 +193,7 @@ type Configuration struct {
RedisIdleCheckFrequency time.Duration `validate:""`

BoltDBPath string `validate:"omitempty,file"`
StreamSchedulerRetryDelay time.Duration `validate:"required,min=0"`
StreamSchedulerWorkerInterval time.Duration `validate:"required,min=1000000000"`
StreamSchedulerMaxRetries int `validate:""`

AMQPDSN string `validate:""`
AMQPExchange string `validate:"required"`
Expand All @@ -219,10 +213,11 @@ func New() (c *Configuration) {

c.YoutubeAPIKey = viper.GetString("youtube_api_key")
c.VerificationToken = viper.GetString("verification_token")
c.VerificationSecret = viper.GetString("verification_secret")

c.ResubCallbackAddr = viper.GetString("resub_callback_addr")
c.ResubTargetAddr = viper.GetString("resub_target_addr")
c.ResubTopic = viper.GetString("resub_topic")
c.ResubTopics = viper.GetStringSlice("resub_topic")
c.ResubInterval = viper.GetDuration("resub_interval")

c.S3Endpoint = viper.GetString("s3_endpoint")
Expand Down Expand Up @@ -263,9 +258,7 @@ func New() (c *Configuration) {
c.RedisIdleCheckFrequency = viper.GetDuration("redis_idle_check_frequency")

c.BoltDBPath = viper.GetString("boltdb_path")
c.StreamSchedulerRetryDelay = viper.GetDuration("stream_scheduler_retry_delay")
c.StreamSchedulerWorkerInterval = viper.GetDuration("stream_scheduler_worker_interval")
c.StreamSchedulerMaxRetries = viper.GetInt("stream_scheduler_max_retries")

c.AMQPDSN = viper.GetString("amqp_dsn")
c.AMQPExchange = viper.GetString("amqp_exchange")
Expand Down
6 changes: 6 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"errors"
"os"
"testing"

"github.com/golang/mock/gomock"
Expand All @@ -18,6 +19,9 @@ func TestConfig(t *testing.T) {
mockValidator := mock.NewMockValidator(ctrl)

t.Run("Validate success", func(t *testing.T) {
version := "v100.0.0"
os.Setenv("YTFEED_VERSION", version)

cfg = New()
require.NotNil(t, cfg)
cfg.StorageBackend = StorageBackendDisk
Expand All @@ -28,6 +32,8 @@ func TestConfig(t *testing.T) {

err := cfg.Validate()
require.NoError(t, err)
require.Equal(t, version, cfg.Version)
require.Equal(t, DefaultHost, cfg.Host)
})

t.Run("Validate failed", func(t *testing.T) {
Expand Down

0 comments on commit 02d48f7

Please sign in to comment.