diff --git a/Dockerfile b/.docker/Dockerfile similarity index 100% rename from Dockerfile rename to .docker/Dockerfile diff --git a/Dockerfile.nvidia b/.docker/Dockerfile.nvidia similarity index 100% rename from Dockerfile.nvidia rename to .docker/Dockerfile.nvidia diff --git a/.docker/README.md b/.docker/README.md new file mode 100644 index 0000000..9dca83f --- /dev/null +++ b/.docker/README.md @@ -0,0 +1,64 @@ +# Docker + +## Build + +```sh +./build go-transcode:latest +``` + +## Run + +```sh +docker run --rm -d \ + --name="go-transcode" \ + -p "8080:8080" \ + -v "${PWD}/config.yaml:/app/config.yaml" go-transcode:latest +``` + +# VAAPI Support + +```sh +docker run --rm -d \ + --name="go-transcode" \ + --device=/dev/dri:/dev/dri \ + -p "8080:8080" \ + -v "${PWD}/config.yaml:/app/config.yaml" go-transcode:latest +``` + +# Nvidia GPU support + +You will need to have [nvidia-docker](https://github.com/NVIDIA/nvidia-docker) installed. + +## Build + +First, you need to build previous container. Then, build Nvidia container. + +```sh +docker build --build-arg "TRANSCODE_IMAGE=go-transcode:latest" -t go-transcode-nvidia:latest -f Dockerfile.nvidia .. +``` + +## Run + +```sh +docker run --rm -d \ + --gpus=all \ + --name="go-transcode-nvidia" \ + -p "8080:8080" \ + -v "${PWD}/config.yaml:/app/config.yaml" go-transcode-nvidia:latest +``` + +## Supported inputs + +Input codec will be automatically determined from given stream. Please check your graphic card's supported codec and maximum concurrent sessions [here](https://developer.nvidia.com/video-encode-decode-gpu-support-matrix). + +| Codec | CUVID | Codec Name | +| ---------- | ----------- | ----------------------------------------- | +| h264 | h264_cuvid | H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10 | +| hevc | hevc_cuvid | H.265 / HEVC | +| mjpeg | mjpeg_cuvid | Motion JPEG | +| mpeg1video | mpeg1_cuvid | MPEG-1 video | +| mpeg2video | mpeg2_cuvid | MPEG-2 video | +| mpeg4 | mpeg4_cuvid | MPEG-4 part 2 | +| vc1 | vc1_cuvid | SMPTE VC-1 | +| vp8 | vp8_cuvid | On2 VP8 | +| vp9 | vp9_cuvid | Google VP9 | diff --git a/.docker/build b/.docker/build new file mode 100755 index 0000000..055c8f4 --- /dev/null +++ b/.docker/build @@ -0,0 +1,5 @@ +#!/bin/sh +cd "$(dirname "$0")" + +BUILD_TAG="${1:-transcode_server_img}" +docker build -t "${BUILD_TAG}" -f Dockerfile .. diff --git a/docker-compose.yaml b/.docker/docker-compose.yaml similarity index 52% rename from docker-compose.yaml rename to .docker/docker-compose.yaml index 68bc717..4aa78f4 100644 --- a/docker-compose.yaml +++ b/.docker/docker-compose.yaml @@ -2,13 +2,13 @@ version: "3" services: go-transcode: - build: . - container_name: go-transcode - restart: unless-stopped + build: "./" + container_name: "go-transcode" + restart: "unless-stopped" ports: - "8080:8080" volumes: - - ./config.yaml:/app/config.yaml + - "./config.yaml:/app/config.yaml" # Mount your VOD here, for hlsvod module # - ./media:/app/media - command: serve -d + command: "serve -d" diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 36a1378..ee26363 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -14,7 +14,7 @@ build: variables: BASE_IMAGE: $CI_REGISTRY_IMAGE:latest script: - - docker build -t $BASE_IMAGE . + - docker build -t $BASE_IMAGE -f ./docker/Dockerfile . - docker push $BASE_IMAGE only: - master @@ -25,7 +25,7 @@ build nivdia: GO_TRANSCODE_IMAGE: $CI_REGISTRY_IMAGE:latest IMAGE_TAG: $CI_REGISTRY_IMAGE/nivdia:latest script: - - docker build --build-arg "GO_TRANSCODE_IMAGE=${GO_TRANSCODE_IMAGE}" --build-arg "BASE_IMAGE=${NVIDIA_FFMPEG_IMAGE}" -t $IMAGE_TAG -f Dockerfile.nvidia . + - docker build --build-arg "GO_TRANSCODE_IMAGE=${GO_TRANSCODE_IMAGE}" --build-arg "BASE_IMAGE=${NVIDIA_FFMPEG_IMAGE}" -t $IMAGE_TAG -f ./docker/Dockerfile.nvidia . - docker push $IMAGE_TAG only: - master diff --git a/README.md b/README.md index 6fb7ac5..bd65b03 100644 --- a/README.md +++ b/README.md @@ -46,21 +46,18 @@ streams: Full configuration example: ```yaml -# allow debug outputs -debug: true - -# mount debug pprof endpoint at /debug/pprof/ -pprof: true - # bind server to IP:PORT (use :8888 for all connections) bind: localhost:8888 # serve static files from this directory (optional) static: /var/www/html -# TODO: issue #4 +# trust reverse proxies proxy: true +# mount debug pprof endpoint at /debug/pprof/ +pprof: true + # For live streaming streams: cam: rtmp://localhost/live/cam @@ -136,70 +133,7 @@ $ ./go-transcode serve First line is warning and "serving streams" line says empty list (`map[]`) because we don't have config.yaml so there no stream configured. Make your config.yaml and try again. -## Docker - -### Build - -```sh -docker build -t go-transcode:latest . -``` - -### Run - -```sh -docker run --rm -d \ - --name="go-transcode" \ - -p "8080:8080" \ - -v "${PWD}/config.yaml:/app/config.yaml" go-transcode:latest -``` - -## VAAPI Support (docker) - -```sh -docker run --rm -d \ - --name="go-transcode" \ - --device=/dev/dri:/dev/dri \ - -p "8080:8080" \ - -v "${PWD}/config.yaml:/app/config.yaml" go-transcode:latest -``` - -## Nvidia GPU support (docker) - -You will need to have [nvidia-docker](https://github.com/NVIDIA/nvidia-docker) installed. - -### Build - -First, you need to build previous container. Then, build Nvidia container. - -```sh -docker build --build-arg "TRANSCODE_IMAGE=go-transcode:latest" -t go-transcode-nvidia:latest -f Dockerfile.nvidia . -``` - -### Run - -```sh -docker run --rm -d \ - --gpus=all \ - --name="go-transcode-nvidia" \ - -p "8080:8080" \ - -v "${PWD}/config.yaml:/app/config.yaml" go-transcode-nvidia:latest -``` - -### Supported inputs - -Input codec will be automatically determined from given stream. Please check your graphic card's supported codec and maximum concurrent sessions [here](https://developer.nvidia.com/video-encode-decode-gpu-support-matrix). - -| Codec | CUVID | Codec Name | -| ---------- | ----------- | ----------------------------------------- | -| h264 | h264_cuvid | H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10 | -| hevc | hevc_cuvid | H.265 / HEVC | -| mjpeg | mjpeg_cuvid | Motion JPEG | -| mpeg1video | mpeg1_cuvid | MPEG-1 video | -| mpeg2video | mpeg2_cuvid | MPEG-2 video | -| mpeg4 | mpeg4_cuvid | MPEG-4 part 2 | -| vc1 | vc1_cuvid | SMPTE VC-1 | -| vp8 | vp8_cuvid | On2 VP8 | -| vp9 | vp9_cuvid | Google VP9 | +See [.docker](.docker) folder for docker support. ## Alternatives @@ -216,19 +150,18 @@ Join us in the [Matrix space](https://matrix.to/#/#go-transcode:proxychat.net) ( The source code is in the following files/folders: -- `cmd/` and `main.go`: source for the command-line interface -- `hls/`: process runner for HLS transcoding -- `hlsvod/`: process runner for HLS VOD transcoding (for static files) -- `internal/`: actual source code logic +- `.docker`: for docker support. +- `cmd/` and `main.go`: source for the command-line interface. +- `internal/`: internal source code. +- `modules/`: standalone plug'n'play modules. +- `pkg/`: external packages ready to be reused. *TODO: document different modules/packages and dependencies* Other files/folders in the repositories are: -- `data/`: files used/served by go-transcode -- `dev/`: some docker helper scripts -- `profiles/`: the ffmpeg profiles for transcoding -- `tests/`: some tests for the project -- `Dockerfile`, `Dockerfile.nvidia` and `docker-compose.yaml`: for the docker lovers -- `god.mod` and `go.sum`: golang dependencies/modules tracking -- `LICENSE`: licensing information (Apache 2.0) +- `docs/`: documentation and usage examples. +- `profiles/`: the ffmpeg profiles for transcoding. +- `tests/`: some tests for the project. +- `god.mod` and `go.sum`: golang dependencies/modules tracking. +- `LICENSE`: licensing information (Apache 2.0). diff --git a/cmd/root.go b/cmd/root.go index 1e4ff7c..5aa6181 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,100 +1,242 @@ package cmd import ( + "fmt" + "io" "os" + "os/signal" "runtime" "strings" + "syscall" "github.com/fsnotify/fsnotify" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/spf13/viper" - - transcode "github.com/m1k1o/go-transcode/internal" + "gopkg.in/natefinch/lumberjack.v2" ) -func Execute() error { - return root.Execute() -} +// Default configuration path +const defCfgPath = "/etc/transcode/" -var root = &cobra.Command{ - Use: "transcode", - Short: "transcode server", - Long: `transcode server`, +// ENV prefix for configuration +const envPrefix = "TRANSCODE" + +var rootCmd = &cobra.Command{ + Use: "transcode", + Short: "Transcode server CLI.", + Long: `Transcode HTTP on-demand transcoding API.`, + Version: "1.0.0", } +var onConfigLoad []func() + func init() { + var cfgFile string + var logConfig logConfig + cobra.OnInitialize(func() { - ////// - // logs - ////// - zerolog.TimeFieldFormat = zerolog.TimeFormatUnix - log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}) - - ////// - // configs - ////// - - // at this point we did not read any config data, so we need to tell - // explicitly how to get this value - cfgFile := viper.GetString("config") // Use config file from the flag - if cfgFile == "" { - cfgFile = os.Getenv("TRANSCODE_CONFIG") // Use config file from the env - } + initConfiguration(cfgFile, defCfgPath, envPrefix) + logConfig.Set() + initLogging(logConfig) + + // display used configuration file + file := viper.ConfigFileUsed() + if file != "" { + viper.OnConfigChange(func(e fsnotify.Event) { + log.Info().Msg("config file reloaded") + + // call load config + for _, loadConfig := range onConfigLoad { + loadConfig() + } + }) - if cfgFile != "" { - viper.SetConfigFile(cfgFile) // use config file from the flag + viper.WatchConfig() + + log.Info().Str("config", file).Msg("preflight complete with config file") } else { - if runtime.GOOS == "linux" { - viper.AddConfigPath("/etc/transcode/") - } + log.Warn().Msg("preflight complete without config file") + } - viper.AddConfigPath(".") - viper.SetConfigName("config") + // call load config + for _, loadConfig := range onConfigLoad { + loadConfig() } + }) + + // config file + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "configuration file path") + _ = viper.BindPFlag("config", rootCmd.PersistentFlags().Lookup("config")) - viper.SetEnvPrefix("TRANSCODE") - viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) - viper.AutomaticEnv() // read in environment variables that match + // log configuration + _ = logConfig.Init(rootCmd) +} + +type Config interface { + Init(cmd *cobra.Command) error + Set() +} - err := viper.ReadInConfig() - if err != nil && cfgFile != "" { - log.Err(err).Msg("unable to read in config") +func Execute() error { + return rootCmd.Execute() +} + +// +// Configuration initialization +// + +func initConfiguration(cfgFile string, defCfgPath string, envPrefix string) { + // use configuration file if provided + if cfgFile != "" { + viper.SetConfigFile(cfgFile) + } else { + // confguratino file name + viper.SetConfigName("config") + + // search for configuration file + if runtime.GOOS == "linux" && defCfgPath != "" { + viper.AddConfigPath(defCfgPath) } - // all configs (from file, env and flags) are loaded now, - // we can set them - config := transcode.Service.RootConfig - config.Set() + // seatch for configuration file in ./ + viper.AddConfigPath(".") + } + + if envPrefix != "" { + // env prefix is uppercase progname + viper.SetEnvPrefix(envPrefix) - if config.Debug { - zerolog.SetGlobalLevel(zerolog.DebugLevel) - } else { - zerolog.SetGlobalLevel(zerolog.InfoLevel) + // replace . and - with _ + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) + + // read in environment variables that match + viper.AutomaticEnv() + } + + // read config file + err := viper.ReadInConfig() + if err != nil && cfgFile != "" { + panic(fmt.Errorf("fatal error config file: %w", err)) + } +} + +// +// Logging initialization +// + +type logConfig struct { + // Set log level + Level string `yaml:"level"` + // Enable console logging + Console bool `yaml:"console"` + // Enable file logging and specify its path + File string `yaml:"file"` + // MaxAge the max age in days to keep a logfile + MaxAge int `yaml:"maxage"` + // MaxSize the max size in MB of the logfile before it's rolled + MaxSize int `yaml:"maxsize"` + // MaxBackups the max number of rolled files to keep + MaxBackups int `yaml:"maxbackups"` +} + +func (logConfig) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().String("log.level", "", "Set log level") + if err := viper.BindPFlag("log.level", cmd.PersistentFlags().Lookup("log.level")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("log.console", true, "Enable console logging") + if err := viper.BindPFlag("log.console", cmd.PersistentFlags().Lookup("log.console")); err != nil { + return err + } + + cmd.PersistentFlags().String("log.file", "", "Enable file logging and specify its path") + if err := viper.BindPFlag("log.file", cmd.PersistentFlags().Lookup("log.file")); err != nil { + return err + } + + cmd.PersistentFlags().Int("log.maxage", 0, "MaxAge the max age in days to keep a logfile") + if err := viper.BindPFlag("log.maxage", cmd.PersistentFlags().Lookup("log.maxage")); err != nil { + return err + } + + cmd.PersistentFlags().Int("log.maxsize", 100, "MaxSize the max size in MB of the logfile before it's rolled") + if err := viper.BindPFlag("log.maxsize", cmd.PersistentFlags().Lookup("log.maxsize")); err != nil { + return err + } + + cmd.PersistentFlags().Int("log.maxbackups", 0, "MaxBackups the max number of rolled files to keep") + if err := viper.BindPFlag("log.maxbackups", cmd.PersistentFlags().Lookup("log.maxbackups")); err != nil { + return err + } + + return nil +} + +func (c *logConfig) Set() { + c.Level = viper.GetString("log.level") + c.Console = viper.GetBool("log.console") + c.File = viper.GetString("log.file") + c.MaxAge = viper.GetInt("log.maxage") + c.MaxSize = viper.GetInt("log.maxsize") + c.MaxBackups = viper.GetInt("log.maxbackups") +} + +func initLogging(config logConfig) { + var writers []io.Writer + + if config.Console { + writers = append(writers, zerolog.ConsoleWriter{ + Out: os.Stderr, + }) + } + + if config.File != "" { + logger := &lumberjack.Logger{ + Filename: config.File, + MaxAge: config.MaxAge, // days + MaxSize: config.MaxSize, // megabytes + MaxBackups: config.MaxBackups, // files } - file := viper.ConfigFileUsed() - if file != "" { - viper.OnConfigChange(func(e fsnotify.Event) { - log.Info().Msg("config file reloaded") - transcode.Service.ConfigReload() - }) + // rotate in response to SIGHUP + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) - viper.WatchConfig() + go func() { + for { + <-c + logger.Rotate() + } + }() + + writers = append(writers, logger) + } + + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + log.Logger = log.Output(io.MultiWriter(writers...)) - log.Info(). - Bool("debug", config.Debug). - Str("config", file). - Msg("preflight complete with config file") + if config.Level == "" { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + log.Info().Msg("using default log level") + } else { + // set custom log level + level, err := zerolog.ParseLevel(config.Level) + if err != nil { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + log.Warn().Str("log-level", config.Level).Msg("unknown log level") } else { - log.Warn(). - Bool("debug", config.Debug). - Msg("preflight complete without config file") + zerolog.SetGlobalLevel(level) } - }) - - if err := transcode.Service.RootConfig.Init(root); err != nil { - log.Panic().Err(err).Msg("unable to run root command") } + + log.Info(). + Bool("console", config.Console). + Str("file", config.File). + Int("maxage", config.MaxAge). + Int("maxsize", config.MaxSize). + Int("maxbackups", config.MaxBackups). + Msg("logging configured") } diff --git a/cmd/serve.go b/cmd/serve.go index 75530af..1a4ad3e 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -4,25 +4,27 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "github.com/m1k1o/go-transcode/internal" + "github.com/m1k1o/go-transcode/internal/serve" ) func init() { + serve := serve.NewCommand() + command := &cobra.Command{ Use: "serve", Short: "serve transcode server", Long: `serve transcode server`, - Run: transcode.Service.ServeCommand, + Run: serve.Run, } - cobra.OnInitialize(func() { - transcode.Service.ServerConfig.Set() - transcode.Service.Preflight() + onConfigLoad = append(onConfigLoad, func() { + serve.Config.Set() + serve.Preflight() }) - if err := transcode.Service.ServerConfig.Init(command); err != nil { + if err := serve.Config.Init(command); err != nil { log.Panic().Err(err).Msg("unable to run serve command") } - root.AddCommand(command) + rootCmd.AddCommand(command) } diff --git a/dev/build b/dev/build deleted file mode 100755 index a9a9aca..0000000 --- a/dev/build +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker build -t transcode_server_img .. diff --git a/dev/exec b/dev/exec deleted file mode 100755 index 29ef203..0000000 --- a/dev/exec +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker exec -it transcode_server_dev /bin/bash; diff --git a/dev/go b/dev/go deleted file mode 100755 index da31564..0000000 --- a/dev/go +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker run -it --rm \ - -v "${PWD}/../:/app" \ - --entrypoint="go" \ - --volume "${PWD}/../:/app" \ - transcode_server_img "$@"; diff --git a/dev/start b/dev/start deleted file mode 100755 index 65d4ac1..0000000 --- a/dev/start +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker run --rm -it \ - --name="transcode_server_dev" \ - -p "3005:8080" \ - -v "${PWD}/../:/app" \ - --entrypoint="/bin/bash" \ - --workdir="/app" \ - transcode_server_img -c 'go build && ./go-transcode serve --bind :8080'; diff --git a/go.mod b/go.mod index 6c08c69..e5b4915 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( golang.org/x/text v0.3.7 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/ini.v1 v1.63.2 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) require ( diff --git a/go.sum b/go.sum index 09cf700..6e42e10 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,7 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -686,6 +687,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c= gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/hls/types.go b/hls/types.go deleted file mode 100644 index eb6c94f..0000000 --- a/hls/types.go +++ /dev/null @@ -1,16 +0,0 @@ -package hls - -import "net/http" - -type Manager interface { - Start() error - Stop() - Cleanup() - - ServePlaylist(w http.ResponseWriter, r *http.Request) - ServeMedia(w http.ResponseWriter, r *http.Request) - - OnStart(event func()) - OnCmdLog(event func(message string)) - OnStop(event func(err error)) -} diff --git a/hlsproxy/types.go b/hlsproxy/types.go deleted file mode 100644 index 79bec46..0000000 --- a/hlsproxy/types.go +++ /dev/null @@ -1,10 +0,0 @@ -package hlsproxy - -import "net/http" - -type Manager interface { - Shutdown() - - ServePlaylist(w http.ResponseWriter, r *http.Request) - ServeMedia(w http.ResponseWriter, r *http.Request) -} diff --git a/hlsvod/types.go b/hlsvod/types.go deleted file mode 100644 index 7c4e651..0000000 --- a/hlsvod/types.go +++ /dev/null @@ -1,31 +0,0 @@ -package hlsvod - -import ( - "context" - "net/http" -) - -type Config struct { - MediaPath string // Transcoded video input. - TranscodeDir string // Temporary directory to store transcoded elements. - SegmentPrefix string - - VideoProfile *VideoProfile - VideoKeyframes bool - AudioProfile *AudioProfile - - Cache bool - CacheDir string // If not empty, cache will folder will be used instead of media path - - FFmpegBinary string - FFprobeBinary string -} - -type Manager interface { - Start() error - Stop() - Preload(ctx context.Context) (*ProbeMediaData, error) - - ServePlaylist(w http.ResponseWriter, r *http.Request) - ServeMedia(w http.ResponseWriter, r *http.Request) -} diff --git a/internal/api/hls.go b/internal/api/hls.go deleted file mode 100644 index 6bd4074..0000000 --- a/internal/api/hls.go +++ /dev/null @@ -1,93 +0,0 @@ -package api - -import ( - _ "embed" - "fmt" - "net/http" - "os/exec" - - "github.com/go-chi/chi" - "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/hls" -) - -var hlsManagers map[string]hls.Manager = make(map[string]hls.Manager) - -//go:embed play.html -var playHTML string - -func (a *ApiManagerCtx) HLS(r chi.Router) { - r.Get("/{profile}/{input}/index.m3u8", func(w http.ResponseWriter, r *http.Request) { - logger := log.With().Str("module", "hls").Logger() - - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - - if !resourceRegex.MatchString(profile) || !resourceRegex.MatchString(input) { - http.Error(w, "400 invalid parameters", http.StatusBadRequest) - return - } - - // check if stream exists - _, ok := a.config.Streams[input] - if !ok { - http.Error(w, "404 stream not found", http.StatusNotFound) - return - } - - // check if profile exists - profilePath, err := a.ProfilePath("hls", profile) - if err != nil { - logger.Warn().Err(err).Msg("profile path could not be found") - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - ID := fmt.Sprintf("%s/%s", profile, input) - - manager, ok := hlsManagers[ID] - if !ok { - // create new manager - manager = hls.New(func() *exec.Cmd { - // get transcode cmd - cmd, err := a.transcodeStart(profilePath, input) - if err != nil { - logger.Error().Err(err).Msg("transcode could not be started") - } - - return cmd - }) - - hlsManagers[ID] = manager - } - - manager.ServePlaylist(w, r) - }) - - r.Get("/{profile}/{input}/{file}.ts", func(w http.ResponseWriter, r *http.Request) { - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - file := chi.URLParam(r, "file") - - if !resourceRegex.MatchString(profile) || !resourceRegex.MatchString(input) || !resourceRegex.MatchString(file) { - http.Error(w, "400 invalid parameters", http.StatusBadRequest) - return - } - - ID := fmt.Sprintf("%s/%s", profile, input) - - manager, ok := hlsManagers[ID] - if !ok { - http.Error(w, "404 transcode not found", http.StatusNotFound) - return - } - - manager.ServeMedia(w, r) - }) - - r.Get("/{profile}/{input}/play.html", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/html") - _, _ = w.Write([]byte(playHTML)) - }) -} diff --git a/internal/api/hlsproxy.go b/internal/api/hlsproxy.go deleted file mode 100644 index 9752624..0000000 --- a/internal/api/hlsproxy.go +++ /dev/null @@ -1,41 +0,0 @@ -package api - -import ( - "net/http" - "strings" - - "github.com/go-chi/chi" - - "github.com/m1k1o/go-transcode/hlsproxy" -) - -const hlsProxyPerfix = "/hlsproxy/" - -var hlsProxyManagers map[string]hlsproxy.Manager = make(map[string]hlsproxy.Manager) - -func (a *ApiManagerCtx) HLSProxy(r chi.Router) { - r.Get(hlsProxyPerfix+"{sourceId}/*", func(w http.ResponseWriter, r *http.Request) { - ID := chi.URLParam(r, "sourceId") - - // check if stream exists - baseUrl, ok := a.config.HlsProxy[ID] - if !ok { - http.Error(w, "404 hls proxy source not found", http.StatusNotFound) - return - } - - manager, ok := hlsProxyManagers[ID] - if !ok { - // create new manager - manager = hlsproxy.New(baseUrl, hlsProxyPerfix+ID+"/") - hlsProxyManagers[ID] = manager - } - - // if this is playlist request - if strings.HasSuffix(r.URL.String(), ".m3u8") { - manager.ServePlaylist(w, r) - } else { - manager.ServeMedia(w, r) - } - }) -} diff --git a/internal/api/hlsvod.go b/internal/api/hlsvod.go deleted file mode 100644 index 7f86f32..0000000 --- a/internal/api/hlsvod.go +++ /dev/null @@ -1,160 +0,0 @@ -package api - -import ( - _ "embed" - "fmt" - "net/http" - "os" - "path" - "path/filepath" - "strings" - - "github.com/go-chi/chi" - "github.com/m1k1o/go-transcode/hlsvod" - "github.com/rs/zerolog/log" -) - -var hlsVodManagers map[string]hlsvod.Manager = make(map[string]hlsvod.Manager) - -func (a *ApiManagerCtx) HlsVod(r chi.Router) { - r.Get("/vod/*", func(w http.ResponseWriter, r *http.Request) { - logger := log.With().Str("module", "hlsvod").Logger() - - // remove /vod/ from path - urlPath := r.URL.Path[5:] - - // get index of last slash from path - lastSlashIndex := strings.LastIndex(urlPath, "/") - if lastSlashIndex == -1 { - http.Error(w, "400 invalid parameters", http.StatusBadRequest) - return - } - - // everything after last slash is hls resource (playlist or segment) - hlsResource := urlPath[lastSlashIndex+1:] - // everything before last slash is vod media path - vodMediaPath := urlPath[:lastSlashIndex] - // use clean path - vodMediaPath = filepath.Clean(vodMediaPath) - vodMediaPath = path.Join(a.config.Vod.MediaDir, vodMediaPath) - - // serve master profile - if hlsResource == "index.m3u8" { - data, err := hlsvod.New(hlsvod.Config{ - MediaPath: vodMediaPath, - VideoKeyframes: a.config.Vod.VideoKeyframes, - - Cache: a.config.Vod.Cache, - CacheDir: a.config.Vod.CacheDir, - - FFmpegBinary: a.config.Vod.FFmpegBinary, - FFprobeBinary: a.config.Vod.FFprobeBinary, - }).Preload(r.Context()) - - if err != nil { - logger.Warn().Err(err).Msg("unable to preload metadata") - http.Error(w, "500 unable to preload metadata", http.StatusInternalServerError) - return - } - - width, height := 0, 0 - if data.Video != nil { - width, height = data.Video.Width, data.Video.Height - } - - profiles := map[string]hlsvod.VideoProfile{} - for name, profile := range a.config.Vod.VideoProfiles { - if width != 0 && width < profile.Width && - height != 0 && height < profile.Height { - continue - } - - profiles[name] = hlsvod.VideoProfile{ - Width: profile.Width, - Height: profile.Height, - Bitrate: (profile.Bitrate + a.config.Vod.AudioProfile.Bitrate) / 100 * 105000, - } - } - - playlist := hlsvod.StreamsPlaylist(profiles, "%s.m3u8") - _, _ = w.Write([]byte(playlist)) - return - } - - // get profile name (everythinb before . or -) - profileID := strings.FieldsFunc(hlsResource, func(r rune) bool { - return r == '.' || r == '-' - })[0] - - // check if exists profile and fetch - profile, ok := a.config.Vod.VideoProfiles[profileID] - if !ok { - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - ID := fmt.Sprintf("%s/%s", profileID, vodMediaPath) - manager, ok := hlsVodManagers[ID] - - logger.Info(). - Str("path", urlPath). - Str("hlsResource", hlsResource). - Str("vodMediaPath", vodMediaPath). - Msg("new hls vod request") - - // if manager was not found - if !ok { - // check if vod media path exists - if _, err := os.Stat(vodMediaPath); os.IsNotExist(err) { - http.Error(w, "404 vod not found", http.StatusNotFound) - return - } - - // create own transcoding directory - transcodeDir, err := os.MkdirTemp(a.config.Vod.TranscodeDir, fmt.Sprintf("vod-%s-*", profileID)) - if err != nil { - logger.Warn().Err(err).Msg("could not create temp dir") - http.Error(w, "500 could not create temp dir", http.StatusInternalServerError) - return - } - - // create new manager - manager = hlsvod.New(hlsvod.Config{ - MediaPath: vodMediaPath, - TranscodeDir: transcodeDir, - SegmentPrefix: profileID, - - VideoProfile: &hlsvod.VideoProfile{ - Width: profile.Width, - Height: profile.Height, - Bitrate: profile.Bitrate, - }, - VideoKeyframes: a.config.Vod.VideoKeyframes, - AudioProfile: &hlsvod.AudioProfile{ - Bitrate: a.config.Vod.AudioProfile.Bitrate, - }, - - Cache: a.config.Vod.Cache, - CacheDir: a.config.Vod.CacheDir, - - FFmpegBinary: a.config.Vod.FFmpegBinary, - FFprobeBinary: a.config.Vod.FFprobeBinary, - }) - - hlsVodManagers[ID] = manager - - if err := manager.Start(); err != nil { - logger.Warn().Err(err).Msg("hls vod manager could not be started") - http.Error(w, "500 hls vod manager could not be started", http.StatusInternalServerError) - return - } - } - - // server playlist or segment - if hlsResource == profileID+".m3u8" { - manager.ServePlaylist(w, r) - } else { - manager.ServeMedia(w, r) - } - }) -} diff --git a/internal/api/http.go b/internal/api/http.go deleted file mode 100644 index f190dab..0000000 --- a/internal/api/http.go +++ /dev/null @@ -1,139 +0,0 @@ -package api - -import ( - "io" - "net/http" - "os/exec" - - "github.com/go-chi/chi" - "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/internal/utils" -) - -func (a *ApiManagerCtx) Http(r chi.Router) { - r.Get("/test", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "video/mp2t") - logger := log.With(). - Str("path", r.URL.Path). - Str("module", "ffmpeg"). - Logger() - - // dummy input for testing purposes - file := a.config.AbsPath("profiles", "http-test.sh") - cmd := exec.Command(file) - logger.Info().Msg("command startred") - - read, write := io.Pipe() - cmd.Stdout = write - cmd.Stderr = utils.LogWriter(logger) - - defer func() { - logger.Info().Msg("command stopped") - - read.Close() - write.Close() - }() - - go func() { - _ = cmd.Run() - }() - _, _ = io.Copy(w, read) - }) - - r.Get("/{profile}/{input}", func(w http.ResponseWriter, r *http.Request) { - logger := log.With(). - Str("path", r.URL.Path). - Str("module", "ffmpeg"). - Logger() - - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - - // check if stream exists - _, ok := a.config.Streams[input] - if !ok { - http.Error(w, "404 stream not found", http.StatusNotFound) - return - } - - // check if profile exists - profilePath, err := a.ProfilePath("hls", profile) - if err != nil { - logger.Warn().Err(err).Msg("profile path could not be found") - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - cmd, err := a.transcodeStart(profilePath, input) - if err != nil { - logger.Warn().Err(err).Msg("transcode could not be started") - http.Error(w, "500 not available", http.StatusInternalServerError) - return - } - - logger.Info().Msg("command started") - w.Header().Set("Content-Type", "video/mp2t") - - read, write := io.Pipe() - cmd.Stdout = write - cmd.Stderr = utils.LogWriter(logger) - - defer func() { - logger.Info().Msg("command stopped") - - read.Close() - write.Close() - }() - - go func() { - _ = cmd.Run() - }() - _, _ = io.Copy(w, read) - }) - - // buffered http streaming (alternative to prervious type) - r.Get("/{profile}/{input}/buf", func(w http.ResponseWriter, r *http.Request) { - logger := log.With(). - Str("path", r.URL.Path). - Str("module", "ffmpeg"). - Logger() - - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - - // check if stream exists - _, ok := a.config.Streams[input] - if !ok { - http.Error(w, "404 stream not found", http.StatusNotFound) - return - } - - // check if profile exists - profilePath, err := a.ProfilePath("hls", profile) - if err != nil { - logger.Warn().Err(err).Msg("profile path could not be found") - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - cmd, err := a.transcodeStart(profilePath, input) - if err != nil { - logger.Warn().Err(err).Msg("transcode could not be started") - http.Error(w, "500 not available", http.StatusInternalServerError) - return - } - - logger.Info().Msg("command started") - w.Header().Set("Content-Type", "video/mp2t") - - read, write := io.Pipe() - cmd.Stdout = write - cmd.Stderr = utils.LogWriter(logger) - - go utils.IOPipeToHTTP(w, read) - _ = cmd.Run() - write.Close() - logger.Info().Msg("command stopped") - }) -} diff --git a/internal/api/router.go b/internal/api/router.go deleted file mode 100644 index aad132f..0000000 --- a/internal/api/router.go +++ /dev/null @@ -1,95 +0,0 @@ -package api - -import ( - "fmt" - "net/http" - "os" - "os/exec" - "path" - "regexp" - - "github.com/go-chi/chi" - "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/internal/config" -) - -var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) - -type ApiManagerCtx struct { - config *config.Server -} - -func New(config *config.Server) *ApiManagerCtx { - return &ApiManagerCtx{ - config: config, - } -} - -func (manager *ApiManagerCtx) Start() { -} - -func (manager *ApiManagerCtx) Shutdown() error { - // stop all hls managers - for _, hls := range hlsManagers { - hls.Stop() - } - - // stop all hls vod managers - for _, hls := range hlsVodManagers { - hls.Stop() - } - - // shutdown all hls proxy managers - for _, hls := range hlsProxyManagers { - hls.Shutdown() - } - - return nil -} - -func (a *ApiManagerCtx) Mount(r *chi.Mux) { - r.Get("/ping", func(w http.ResponseWriter, r *http.Request) { - //nolint - _, _ = w.Write([]byte("pong")) - }) - - if a.config.Vod.MediaDir != "" { - r.Group(a.HlsVod) - log.Info().Str("vod-dir", a.config.Vod.MediaDir).Msg("static file transcoding is active") - } - - if len(a.config.HlsProxy) > 0 { - r.Group(a.HLSProxy) - log.Info().Interface("hls-proxy", a.config.HlsProxy).Msg("hls proxy is active") - } - - r.Group(a.HLS) - r.Group(a.Http) -} - -func (a *ApiManagerCtx) ProfilePath(folder string, profile string) (string, error) { - // [profiles]/hls,http/[profile].sh - // [profiles] defaults to [basedir]/profiles - - if !resourceRegex.MatchString(profile) { - return "", fmt.Errorf("invalid profile path") - } - - profilePath := path.Join(a.config.Profiles, folder, fmt.Sprintf("%s.sh", profile)) - if _, err := os.Stat(profilePath); os.IsNotExist(err) { - return "", err - } - return profilePath, nil -} - -// Call ProfilePath before -func (a *ApiManagerCtx) transcodeStart(profilePath string, input string) (*exec.Cmd, error) { - url, ok := a.config.Streams[input] - if !ok { - return nil, fmt.Errorf("stream not found") - } - - log.Info().Str("profilePath", profilePath).Str("url", url).Msg("command startred") - return exec.Command(profilePath, url), nil -} diff --git a/internal/main.go b/internal/main.go deleted file mode 100644 index 16a361a..0000000 --- a/internal/main.go +++ /dev/null @@ -1,84 +0,0 @@ -package transcode - -import ( - "os" - "os/signal" - - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - "github.com/spf13/cobra" - - "github.com/m1k1o/go-transcode/internal/api" - "github.com/m1k1o/go-transcode/internal/config" - "github.com/m1k1o/go-transcode/internal/http" -) - -var Service *Main - -func init() { - Service = &Main{ - RootConfig: &config.Root{}, - ServerConfig: &config.Server{}, - } -} - -type Main struct { - RootConfig *config.Root - ServerConfig *config.Server - - logger zerolog.Logger - apiManager *api.ApiManagerCtx - httpManager *http.HttpManagerCtx -} - -func (main *Main) Preflight() { - main.logger = log.With().Str("service", "main").Logger() -} - -func (main *Main) Start() { - config := main.ServerConfig - - main.apiManager = api.New(config) - main.apiManager.Start() - - main.httpManager = http.New(config) - main.httpManager.Mount(main.apiManager.Mount) - main.httpManager.Start() - - if main.RootConfig.PProf { - pathPrefix := "/debug/pprof/" - main.httpManager.WithDebugPProf(pathPrefix) - main.logger.Info().Msgf("mounted debug pprof endpoint at %s", pathPrefix) - } - - main.logger.Info().Msgf("serving streams from basedir %s: %s", config.BaseDir, config.Streams) -} - -func (main *Main) Shutdown() { - var err error - - err = main.httpManager.Shutdown() - main.logger.Err(err).Msg("http manager shutdown") - - err = main.apiManager.Shutdown() - main.logger.Err(err).Msg("api manager shutdown") -} - -func (main *Main) ServeCommand(cmd *cobra.Command, args []string) { - main.logger.Info().Msg("starting main server") - main.Start() - main.logger.Info().Msg("main ready") - - quit := make(chan os.Signal, 1) - signal.Notify(quit, os.Interrupt) - sig := <-quit - - main.logger.Warn().Msgf("received %s, attempting graceful shutdown", sig) - main.Shutdown() - main.logger.Info().Msg("shutdown complete") -} - -func (main *Main) ConfigReload() { - main.RootConfig.Set() - main.ServerConfig.Set() -} diff --git a/internal/config/config.go b/internal/serve/config.go similarity index 88% rename from internal/config/config.go rename to internal/serve/config.go index eac056c..fc58b01 100644 --- a/internal/config/config.go +++ b/internal/serve/config.go @@ -1,7 +1,6 @@ -package config +package serve import ( - "fmt" "os" "path" @@ -9,37 +8,6 @@ import ( "github.com/spf13/viper" ) -type Root struct { - Debug bool - PProf bool - CfgFile string -} - -func (Root) Init(cmd *cobra.Command) error { - cmd.PersistentFlags().BoolP("debug", "d", false, "enable debug mode") - if err := viper.BindPFlag("debug", cmd.PersistentFlags().Lookup("debug")); err != nil { - return err - } - - cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") - if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { - return err - } - - cmd.PersistentFlags().String("config", "", "configuration file path") - if err := viper.BindPFlag("config", cmd.PersistentFlags().Lookup("config")); err != nil { - return err - } - - return nil -} - -func (s *Root) Set() { - s.Debug = viper.GetBool("debug") - s.PProf = viper.GetBool("pprof") - s.CfgFile = viper.GetString("config") -} - type VideoProfile struct { Width int `mapstructure:"width"` Height int `mapstructure:"height"` @@ -62,7 +30,10 @@ type VOD struct { FFprobeBinary string `mapstructure:"ffprobe-binary"` } -type Server struct { +type Config struct { + Debug bool + PProf bool + Cert string Key string Bind string @@ -77,7 +48,17 @@ type Server struct { HlsProxy map[string]string } -func (Server) Init(cmd *cobra.Command) error { +func (Config) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().BoolP("debug", "d", false, "enable debug mode") + if err := viper.BindPFlag("debug", cmd.PersistentFlags().Lookup("debug")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") + if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { + return err + } + cmd.PersistentFlags().String("bind", "127.0.0.1:8080", "address/port/socket to serve neko") if err := viper.BindPFlag("bind", cmd.PersistentFlags().Lookup("bind")); err != nil { return err @@ -116,7 +97,10 @@ func (Server) Init(cmd *cobra.Command) error { return nil } -func (s *Server) Set() { +func (s *Config) Set() { + s.Debug = viper.GetBool("debug") + s.PProf = viper.GetBool("pprof") + s.Cert = viper.GetString("cert") s.Key = viper.GetString("key") s.Bind = viper.GetString("bind") @@ -135,8 +119,7 @@ func (s *Server) Set() { s.Profiles = viper.GetString("profiles") if s.Profiles == "" { - // TODO: issue #5 - s.Profiles = fmt.Sprintf("%s/profiles", s.BaseDir) + s.Profiles = path.Join(s.BaseDir, "profiles") } s.Streams = viper.GetStringMapString("streams") @@ -187,7 +170,7 @@ func (s *Server) Set() { s.HlsProxy = viper.GetStringMapString("hls-proxy") } -func (s *Server) AbsPath(elem ...string) string { +func (s *Config) AbsPath(elem ...string) string { // prepend base path elem = append([]string{s.BaseDir}, elem...) return path.Join(elem...) diff --git a/internal/serve/serve.go b/internal/serve/serve.go new file mode 100644 index 0000000..1662ab6 --- /dev/null +++ b/internal/serve/serve.go @@ -0,0 +1,173 @@ +package serve + +import ( + "os" + "os/signal" + "path" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/m1k1o/go-transcode/internal/server" + "github.com/m1k1o/go-transcode/modules/hlslive" + "github.com/m1k1o/go-transcode/modules/hlsproxy" + "github.com/m1k1o/go-transcode/modules/hlsvod" + "github.com/m1k1o/go-transcode/modules/httpstream" + "github.com/m1k1o/go-transcode/modules/player" + hlsVodPkg "github.com/m1k1o/go-transcode/pkg/hlsvod" +) + +func NewCommand() *Main { + return &Main{ + Config: &Config{}, + } +} + +type Main struct { + Config *Config + + logger zerolog.Logger + server *server.ServerManagerCtx + hlsLive *hlslive.ModuleCtx + hlsProxy *hlsproxy.ModuleCtx + hlsVod *hlsvod.ModuleCtx + httpStream *httpstream.ModuleCtx + player *player.ModuleCtx +} + +func (main *Main) Preflight() { + main.logger = log.With().Str("service", "main").Logger() +} + +func (main *Main) start() { + config := main.Config + + main.server = server.New(&server.Config{ + Bind: config.Bind, + Static: config.Static, + SSLCert: config.Cert, + SSLKey: config.Key, + Proxy: config.Proxy, + PProf: config.PProf, + }) + + /* + r.Get("/ping", func(w http.ResponseWriter, r *http.Request) { + //nolint + _, _ = w.Write([]byte("pong")) + }) + */ + + if config.Vod.MediaDir != "" { + videoProfiles := map[string]hlsVodPkg.VideoProfile{} + for key, prof := range config.Vod.VideoProfiles { + videoProfiles[key] = hlsVodPkg.VideoProfile{ + Width: prof.Width, + Height: prof.Height, + Bitrate: prof.Bitrate, + } + } + + main.hlsVod = hlsvod.New("/vod/", &hlsvod.Config{ + MediaBasePath: config.Vod.MediaDir, + TranscodeDir: config.Vod.TranscodeDir, + VideoProfiles: videoProfiles, + MasterPlaylistName: "index.m3u8", + + Config: hlsVodPkg.Config{ + VideoKeyframes: config.Vod.VideoKeyframes, + AudioProfile: &hlsVodPkg.AudioProfile{ + Bitrate: config.Vod.AudioProfile.Bitrate, + }, + + Cache: config.Vod.Cache, + CacheDir: config.Vod.CacheDir, + + FFmpegBinary: config.Vod.FFmpegBinary, + FFprobeBinary: config.Vod.FFprobeBinary, + }, + }) + + main.server.Handle("/vod/", main.hlsVod) + main.logger.Info().Str("vod-dir", config.Vod.MediaDir).Msg("static file transcoding is active") + } + + if len(config.HlsProxy) > 0 { + main.hlsProxy = hlsproxy.New("/hlsproxy/", &hlsproxy.Config{ + Sources: config.HlsProxy, + }) + main.server.Handle("/hlsproxy/", main.hlsProxy) + log.Info().Interface("hls-proxy", config.HlsProxy).Msg("hls proxy is active") + } + + main.hlsLive = hlslive.New("/", &hlslive.Config{ + Sources: config.Streams, + ProfilesPath: path.Join(config.Profiles, "hls"), + PlaylistName: "index.m3u8", + // TOOD: Profile ends with .sh + }) + main.server.Handle("/", main.hlsLive) + main.logger.Info().Msg("hlsLive registered") + + // TODO: Match correct URLs. + main.httpStream = httpstream.New("/", &httpstream.Config{ + Sources: config.Streams, + ProfilesPath: path.Join(config.Profiles, "http"), + UseBufCopy: false, + }) + main.server.Handle("/", main.httpStream) + main.logger.Info().Msg("httpStream registered") + + // TODO: Match correct URLs. + main.player = player.New("/player/", &player.Config{}) + main.server.Handle("/player/", main.player) + main.logger.Info().Msg("player registered") + + main.server.Start() + main.logger.Info().Msgf("serving streams from basedir %s: %s", config.BaseDir, config.Streams) +} + +func (main *Main) shutdown() { + err := main.server.Shutdown() + main.logger.Err(err).Msg("http manager shutdown") + + if main.hlsVod != nil { + main.hlsVod.Shutdown() + main.logger.Info().Msg("hlsVod shutdown") + } + + if main.hlsProxy != nil { + main.hlsProxy.Shutdown() + main.logger.Info().Msg("hlsProxy shutdown") + } + + if main.hlsLive != nil { + main.hlsLive.Shutdown() + main.logger.Info().Msg("hlsLive shutdown") + } + + if main.httpStream != nil { + main.httpStream.Shutdown() + main.logger.Info().Msg("httpStream shutdown") + } + + if main.player != nil { + main.player.Shutdown() + main.logger.Info().Msg("player shutdown") + } +} + +func (main *Main) Run(cmd *cobra.Command, args []string) { + main.logger.Info().Msg("starting main server") + main.start() + main.logger.Info().Msg("main ready") + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt) + sig := <-quit + + main.logger.Warn().Msgf("received %s, attempting graceful shutdown", sig) + main.shutdown() + main.logger.Info().Msg("shutdown complete") +} diff --git a/internal/server/config.go b/internal/server/config.go new file mode 100644 index 0000000..ef34706 --- /dev/null +++ b/internal/server/config.go @@ -0,0 +1,57 @@ +package server + +import ( + "github.com/rs/zerolog/log" + + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +type Config struct { + Bind string `mapstructure:"bind"` + Static string `mapstructure:"static"` + SSLCert string `mapstructure:"sslcert"` + SSLKey string `mapstructure:"sslkey"` + Proxy bool `mapstructure:"proxy"` + PProf bool `mapstructure:"pprof"` +} + +func (Config) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().String("bind", "127.0.0.1:8080", "address/port/socket to serve http") + if err := viper.BindPFlag("bind", cmd.PersistentFlags().Lookup("bind")); err != nil { + return err + } + + cmd.PersistentFlags().String("static", "", "path to client files to serve") + if err := viper.BindPFlag("static", cmd.PersistentFlags().Lookup("static")); err != nil { + return err + } + + cmd.PersistentFlags().String("sslcert", "", "path to the SSL cert") + if err := viper.BindPFlag("sslcert", cmd.PersistentFlags().Lookup("sslcert")); err != nil { + return err + } + + cmd.PersistentFlags().String("sslkey", "", "path to the SSL key") + if err := viper.BindPFlag("sslkey", cmd.PersistentFlags().Lookup("sslkey")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("proxy", false, "allow reverse proxies") + if err := viper.BindPFlag("proxy", cmd.PersistentFlags().Lookup("proxy")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") + if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { + return err + } + + return nil +} + +func (c *Config) Set() { + if err := viper.Unmarshal(c); err != nil { + log.Panic().Msg("unable to unmarshal config structure") + } +} diff --git a/internal/http/debug.go b/internal/server/debug.go similarity index 77% rename from internal/http/debug.go rename to internal/server/debug.go index 32565fd..7e474c9 100644 --- a/internal/http/debug.go +++ b/internal/server/debug.go @@ -1,4 +1,4 @@ -package http +package server import ( "net/http" @@ -7,8 +7,10 @@ import ( "github.com/go-chi/chi" ) -func (s *HttpManagerCtx) WithDebugPProf(pathPrefix string) { - s.router.Route(pathPrefix, func(r chi.Router) { +const pprofPath = "/debug/pprof/" + +func withPProf(router *chi.Mux) { + router.Route(pprofPath, func(r chi.Router) { r.Get("/", pprof.Index) r.Get("/{action}", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/http/http.go b/internal/server/http.go similarity index 54% rename from internal/http/http.go rename to internal/server/http.go index f58a8ef..4c80b0e 100644 --- a/internal/http/http.go +++ b/internal/server/http.go @@ -1,4 +1,4 @@ -package http +package server import ( "context" @@ -10,22 +10,27 @@ import ( "github.com/go-chi/chi/middleware" "github.com/rs/zerolog" "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/internal/config" ) -type HttpManagerCtx struct { +type ServerManagerCtx struct { logger zerolog.Logger - config *config.Server + config *Config router *chi.Mux - http *http.Server + server *http.Server } -func New(config *config.Server) *HttpManagerCtx { - logger := log.With().Str("module", "http").Logger() +func New(config *Config) *ServerManagerCtx { + logger := log.With().Str("module", "server").Logger() router := chi.NewRouter() router.Use(middleware.RequestID) // Create a request ID for each request + + // get real users ip + if config.Proxy { + router.Use(middleware.RealIP) + } + + // add http logger router.Use(middleware.RequestLogger(&logformatter{logger})) router.Use(middleware.Recoverer) // Recover from panics without crashing server @@ -41,48 +46,56 @@ func New(config *config.Server) *HttpManagerCtx { }) } - router.NotFound(func(w http.ResponseWriter, r *http.Request) { - //nolint - _, _ = w.Write([]byte("404")) - }) + // mount pprof endpoint + if config.PProf { + withPProf(router) + logger.Info().Msgf("with pprof endpoint at %s", pprofPath) + } + + // we could use custom 404 + router.NotFound(http.NotFound) - return &HttpManagerCtx{ + return &ServerManagerCtx{ logger: logger, config: config, router: router, - http: &http.Server{ + server: &http.Server{ Addr: config.Bind, Handler: router, }, } } -func (s *HttpManagerCtx) Start() { - if s.config.Cert != "" && s.config.Key != "" { +func (s *ServerManagerCtx) Start() { + if s.config.SSLCert != "" && s.config.SSLKey != "" { s.logger.Warn().Msg("TLS support is provided for convenience, but you should never use it in production. Use a reverse proxy (apache nginx caddy) instead!") go func() { - if err := s.http.ListenAndServeTLS(s.config.Cert, s.config.Key); err != http.ErrServerClosed { + if err := s.server.ListenAndServeTLS(s.config.SSLCert, s.config.SSLKey); err != http.ErrServerClosed { s.logger.Panic().Err(err).Msg("unable to start https server") } }() - s.logger.Info().Msgf("https listening on %s", s.http.Addr) + s.logger.Info().Msgf("https server listening on %s", s.server.Addr) } else { go func() { - if err := s.http.ListenAndServe(); err != http.ErrServerClosed { + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { s.logger.Panic().Err(err).Msg("unable to start http server") } }() - s.logger.Info().Msgf("http listening on %s", s.http.Addr) + s.logger.Info().Msgf("http server listening on %s", s.server.Addr) } } -func (s *HttpManagerCtx) Shutdown() error { +func (s *ServerManagerCtx) Shutdown() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return s.http.Shutdown(ctx) + return s.server.Shutdown(ctx) } -func (s *HttpManagerCtx) Mount(fn func(r *chi.Mux)) { +func (s *ServerManagerCtx) Mount(fn func(r *chi.Mux)) { fn(s.router) } + +func (s *ServerManagerCtx) Handle(pattern string, fn http.Handler) { + s.router.Handle(pattern, fn) +} diff --git a/internal/http/logger.go b/internal/server/logger.go similarity index 99% rename from internal/http/logger.go rename to internal/server/logger.go index acc20ef..1b5d5ac 100644 --- a/internal/http/logger.go +++ b/internal/server/logger.go @@ -1,4 +1,4 @@ -package http +package server import ( "fmt" diff --git a/internal/utils/cache.go b/internal/utils/cache.go index 141a94b..637af74 100644 --- a/internal/utils/cache.go +++ b/internal/utils/cache.go @@ -2,7 +2,6 @@ package utils import ( "io" - "net/http" "sync" "time" ) @@ -17,13 +16,13 @@ type Cache struct { listeners []func([]byte) (int, error) listenersMu sync.RWMutex - Expires time.Time + expires time.Time } func NewCache(expires time.Time) *Cache { return &Cache{ closeCh: make(chan struct{}), - Expires: expires, + expires: expires, } } @@ -64,9 +63,8 @@ func (c *Cache) Close() error { return nil } -func (c *Cache) ServeHTTP(w http.ResponseWriter) { - offset := 0 - index := 0 +func (c *Cache) CopyTo(w io.Writer) error { + offset, index := 0, 0 for { c.mu.RLock() @@ -79,7 +77,11 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) { chunk := c.chunks[index] c.mu.RUnlock() - i, _ := w.Write(chunk) + i, err := w.Write(chunk) + if err != nil { + return err + } + offset += i index++ continue @@ -87,7 +89,11 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) { // if stream is already closed if closed { - return + var err error + if closer, ok := w.(io.WriteCloser); ok { + err = closer.Close() + } + return err } // we don't have enough data but stream is not closed @@ -101,4 +107,9 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) { // wait until it finishes <-c.closeCh + return nil +} + +func (c *Cache) Expired() bool { + return time.Now().After(c.expires) } diff --git a/main.go b/main.go index b7ab534..43e6b46 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,9 @@ package main import ( - "github.com/rs/zerolog/log" - "github.com/m1k1o/go-transcode/cmd" ) func main() { - if err := cmd.Execute(); err != nil { - log.Panic().Err(err).Msg("failed to execute command") - } + _ = cmd.Execute() } diff --git a/modules/hlslive/module.go b/modules/hlslive/module.go new file mode 100644 index 0000000..764cf55 --- /dev/null +++ b/modules/hlslive/module.go @@ -0,0 +1,115 @@ +package hlslive + +import ( + "fmt" + "net/http" + "os" + "os/exec" + "path" + "regexp" + "strings" + + "github.com/m1k1o/go-transcode/pkg/hlslive" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config + + managers map[string]hlslive.Manager +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "hlslive").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + + managers: make(map[string]hlslive.Manager), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + for _, manager := range m.managers { + manager.Stop() + } +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading and ending / + p = strings.Trim(p, "/") + // split path to parts + s := strings.Split(p, "/") + + // we need exactly three parts of the url + if len(s) != 3 { + http.NotFound(w, r) + return + } + + // {source}/{profile}/{resource} + sourceName, profileName, resource := s[0], s[1], s[2] + + // check if parameters match regex + if !resourceRegex.MatchString(sourceName) || + !resourceRegex.MatchString(profileName) { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + ID := fmt.Sprintf("%s/%s", sourceName, profileName) + manager, ok := m.managers[ID] + if !ok { + // find relevant source + source, ok := m.config.Sources[sourceName] + if !ok { + http.Error(w, "404 source not found", http.StatusNotFound) + return + } + + // check if exists profile path + profilePath := path.Join(m.config.ProfilesPath, profileName) + if _, err := os.Stat(profilePath); err != nil { + http.Error(w, "404 profile not found", http.StatusNotFound) + return + } + + // create new manager + manager = hlslive.New(func() *exec.Cmd { + return exec.Command(profilePath, source) + }, &m.config.Config) + + m.managers[ID] = manager + } + + if resource == m.config.PlaylistName { + manager.ServePlaylist(w, r) + } else { + manager.ServeSegment(w, r) + } +} diff --git a/modules/hlslive/types.go b/modules/hlslive/types.go new file mode 100644 index 0000000..041872c --- /dev/null +++ b/modules/hlslive/types.go @@ -0,0 +1,18 @@ +package hlslive + +import "github.com/m1k1o/go-transcode/pkg/hlslive" + +type Config struct { + hlslive.Config + + Sources map[string]string + ProfilesPath string + PlaylistName string +} + +func (c Config) withDefaultValues() Config { + if c.PlaylistName == "" { + c.PlaylistName = "index.m3u8" + } + return c +} diff --git a/modules/hlsproxy/module.go b/modules/hlsproxy/module.go new file mode 100644 index 0000000..79e17bf --- /dev/null +++ b/modules/hlsproxy/module.go @@ -0,0 +1,102 @@ +package hlsproxy + +import ( + "net/http" + "regexp" + "strings" + + "github.com/m1k1o/go-transcode/pkg/hlsproxy" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config + + managers map[string]hlsproxy.Manager +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "hlsproxy").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + + managers: make(map[string]hlsproxy.Manager), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + for _, manager := range m.managers { + manager.Shutdown() + } +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading / + p = strings.TrimLeft(p, "/") + // split path to parts + s := strings.Split(p, "/") + + // we need at least first part of the url + if len(s) == 0 { + http.NotFound(w, r) + return + } + + sourceName := s[0] + + // check if parameters match regex + if !resourceRegex.MatchString(sourceName) { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + manager, ok := m.managers[sourceName] + if !ok { + // find relevant source + source, ok := m.config.Sources[sourceName] + if !ok { + http.Error(w, "404 source not found", http.StatusNotFound) + return + } + + config := m.config.Config + config.PlaylistBaseUrl = source + config.PlaylistPathPrefix = strings.TrimRight(m.pathPrefix, "/") + sourceName + + // create new manager + manager = hlsproxy.New(&config) + m.managers[sourceName] = manager + } + + if strings.HasSuffix(r.URL.Path, ".m3u8") { + manager.ServePlaylist(w, r) + } else { + manager.ServeSegment(w, r) + } +} diff --git a/modules/hlsproxy/types.go b/modules/hlsproxy/types.go new file mode 100644 index 0000000..69cb1d0 --- /dev/null +++ b/modules/hlsproxy/types.go @@ -0,0 +1,19 @@ +package hlsproxy + +import "github.com/m1k1o/go-transcode/pkg/hlsproxy" + +type Config struct { + hlsproxy.Config + + // overwritten properties + PlaylistBaseUrl string `mapstructure:"-"` + PlaylistPathPrefix string `mapstructure:"-"` + SegmentBaseUrl string `mapstructure:"-"` + SegmentPathPrefix string `mapstructure:"-"` + + Sources map[string]string +} + +func (c Config) withDefaultValues() Config { + return c +} diff --git a/modules/hlsvod/module.go b/modules/hlsvod/module.go new file mode 100644 index 0000000..bc11f42 --- /dev/null +++ b/modules/hlsvod/module.go @@ -0,0 +1,181 @@ +package hlsvod + +import ( + "fmt" + "net/http" + "os" + "path" + "path/filepath" + "strings" + + "github.com/m1k1o/go-transcode/pkg/hlsvod" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config + + managers map[string]hlsvod.Manager +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "hlsvod").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + + managers: make(map[string]hlsvod.Manager), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + for _, manager := range m.managers { + manager.Stop() + } +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading and ending / + p = strings.Trim(p, "/") + + // get index of last slash from path + lastSlashIndex := strings.LastIndex(p, "/") + if lastSlashIndex == -1 { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + // everything after last slash is hls resource (playlist or segment) + hlsResource := p[lastSlashIndex+1:] + // everything before last slash is vod media path + vodMediaPath := p[:lastSlashIndex] + // use clean path + vodMediaPath = filepath.Clean(vodMediaPath) + vodMediaPath = path.Join(m.config.MediaBasePath, vodMediaPath) + + // serve master profile + if hlsResource == m.config.MasterPlaylistName { + // modify default config + config := m.config.Config + config.MediaPath = vodMediaPath + + data, err := hlsvod.New(&config).Preload(r.Context()) + if err != nil { + m.logger.Warn().Err(err).Msg("unable to preload metadata") + http.Error(w, "500 unable to preload metadata", http.StatusInternalServerError) + return + } + + width, height := 0, 0 + if data.Video != nil { + width, height = data.Video.Width, data.Video.Height + } + + profiles := map[string]hlsvod.VideoProfile{} + for name, profile := range m.config.VideoProfiles { + if width != 0 && width < profile.Width && + height != 0 && height < profile.Height { + continue + } + + profiles[name] = hlsvod.VideoProfile{ + Width: profile.Width, + Height: profile.Height, + Bitrate: (profile.Bitrate + m.config.AudioProfile.Bitrate) / 100 * 105000, + } + } + + playlist := hlsvod.StreamsPlaylist(profiles, "%s.m3u8") + _, _ = w.Write([]byte(playlist)) + return + } + + // get profile name (everythinb before . or -) + profileID := strings.FieldsFunc(hlsResource, func(r rune) bool { + return r == '.' || r == '-' + })[0] + + // check if exists profile and fetch + profile, ok := m.config.VideoProfiles[profileID] + if !ok { + http.Error(w, "404 profile not found", http.StatusNotFound) + return + } + + ID := fmt.Sprintf("%s/%s", profileID, vodMediaPath) + manager, ok := m.managers[ID] + + m.logger.Info(). + Str("path", p). + Str("hlsResource", hlsResource). + Str("vodMediaPath", vodMediaPath). + Msg("new hls vod request") + + // if manager was not found + if !ok { + // check if vod media path exists + if _, err := os.Stat(vodMediaPath); os.IsNotExist(err) { + http.Error(w, "404 vod not found", http.StatusNotFound) + return + } + + // create own transcoding directory + transcodeDir, err := os.MkdirTemp(m.config.TranscodeDir, fmt.Sprintf("vod-%s-*", profileID)) + if err != nil { + m.logger.Warn().Err(err).Msg("could not create temp dir") + http.Error(w, "500 could not create temp dir", http.StatusInternalServerError) + return + } + + // modify default config + config := m.config.Config + config.MediaPath = vodMediaPath + config.TranscodeDir = transcodeDir // with current medias subfolder + config.SegmentNamePrefix = profileID + config.VideoProfile = &hlsvod.VideoProfile{ + Width: profile.Width, + Height: profile.Height, + Bitrate: profile.Bitrate, + } + + // create new manager + manager = hlsvod.New(&config) + if err := manager.Start(); err != nil { + m.logger.Warn().Err(err).Msg("hls vod manager could not be started") + http.Error(w, "500 hls vod manager could not be started", http.StatusInternalServerError) + return + } + + m.managers[ID] = manager + } + + // server playlist or segment + if hlsResource == profileID+".m3u8" { + manager.ServePlaylist(w, r) + } else { + manager.ServeSegment(w, r) + } +} diff --git a/modules/hlsvod/types.go b/modules/hlsvod/types.go new file mode 100644 index 0000000..779e296 --- /dev/null +++ b/modules/hlsvod/types.go @@ -0,0 +1,26 @@ +package hlsvod + +import "github.com/m1k1o/go-transcode/pkg/hlsvod" + +type Config struct { + hlsvod.Config + + // overwritten properties + MediaPath string `mapstructure:"-"` + SegmentNamePrefix string `mapstructure:"-"` + VideoProfile *hlsvod.VideoProfile `mapstructure:"-"` + + // modified properties + MediaBasePath string + TranscodeDir string + + VideoProfiles map[string]hlsvod.VideoProfile + MasterPlaylistName string +} + +func (c Config) withDefaultValues() Config { + if c.MasterPlaylistName == "" { + c.MasterPlaylistName = "index.m3u8" + } + return c +} diff --git a/modules/httpstream/module.go b/modules/httpstream/module.go new file mode 100644 index 0000000..a68dac5 --- /dev/null +++ b/modules/httpstream/module.go @@ -0,0 +1,145 @@ +package httpstream + +import ( + "io" + "net/http" + "os" + "os/exec" + "path" + "regexp" + "strings" + + "github.com/m1k1o/go-transcode/internal/utils" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "httpstream").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading and ending / + p = strings.Trim(p, "/") + // split path to parts + s := strings.Split(p, "/") + + // we need exactly two parts of the url + if len(s) != 2 { + http.NotFound(w, r) + return + } + + // {source}/{profile}/{resource} + sourceName, profileName := s[0], s[1] + + // check if parameters match regex + if !resourceRegex.MatchString(sourceName) || + !resourceRegex.MatchString(profileName) { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + // find relevant source + source, ok := m.config.Sources[sourceName] + if !ok { + http.Error(w, "404 source not found", http.StatusNotFound) + return + } + + // check if exists profile path + profilePath := path.Join(m.config.ProfilesPath, profileName) + if _, err := os.Stat(profilePath); err != nil { + http.Error(w, "404 profile not found", http.StatusNotFound) + return + } + + cmd := exec.CommandContext(r.Context(), profilePath, source) + cmd.Stderr = utils.LogWriter(m.logger) + + if m.config.UseBufCopy { + m.bufCopyCmdToHttp(cmd, w) + } else { + m.pipeCmdToHttp(cmd, w) + } +} + +func (m *ModuleCtx) bufCopyCmdToHttp(cmd *exec.Cmd, w http.ResponseWriter) { + w.Header().Set("Content-Type", "video/mp2t") + + read, write := io.Pipe() + cmd.Stdout = write + + go utils.IOPipeToHTTP(w, read) + m.logger.Info().Msg("starting command") + + err := cmd.Run() + if err != nil { + m.logger.Warn().Err(err).Msg("transcode could not be started") + http.Error(w, "500 not available", http.StatusInternalServerError) + return + } + + write.Close() + m.logger.Info().Msg("command finished") +} + +func (m *ModuleCtx) pipeCmdToHttp(cmd *exec.Cmd, w http.ResponseWriter) { + read, write := io.Pipe() + cmd.Stdout = write + + err := cmd.Start() + if err != nil { + m.logger.Warn().Err(err).Msg("transcode could not be started") + http.Error(w, "500 not available", http.StatusInternalServerError) + return + } + + m.logger.Info().Msg("command started") + + go func() { + err := cmd.Wait() + m.logger.Err(err).Msg("command finished") + read.Close() + write.Close() + }() + + w.Header().Set("Content-Type", "video/mp2t") + _, _ = io.Copy(w, read) +} diff --git a/modules/httpstream/types.go b/modules/httpstream/types.go new file mode 100644 index 0000000..9f8dc4a --- /dev/null +++ b/modules/httpstream/types.go @@ -0,0 +1,11 @@ +package httpstream + +type Config struct { + Sources map[string]string + ProfilesPath string + UseBufCopy bool +} + +func (c Config) withDefaultValues() Config { + return c +} diff --git a/modules/player/module.go b/modules/player/module.go new file mode 100644 index 0000000..261aea6 --- /dev/null +++ b/modules/player/module.go @@ -0,0 +1,50 @@ +package player + +import ( + _ "embed" + "net/http" + "strings" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +//go:embed player.html +var playHTML string + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "player").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + + html := strings.Replace(playHTML, "index.m3u8", m.config.Source, 1) + _, _ = w.Write([]byte(html)) +} diff --git a/internal/api/play.html b/modules/player/player.html similarity index 100% rename from internal/api/play.html rename to modules/player/player.html diff --git a/modules/player/types.go b/modules/player/types.go new file mode 100644 index 0000000..0490cc8 --- /dev/null +++ b/modules/player/types.go @@ -0,0 +1,12 @@ +package player + +type Config struct { + Source string +} + +func (c Config) withDefaultValues() Config { + if c.Source == "" { + c.Source = "index.m3u8" + } + return c +} diff --git a/modules/types.go b/modules/types.go new file mode 100644 index 0000000..f0129d1 --- /dev/null +++ b/modules/types.go @@ -0,0 +1,13 @@ +package modules + +import "net/http" + +type Config interface { +} + +type Module interface { + Shutdown() + ConfigReload(config Config) + Cleanup() + ServeHTTP(w http.ResponseWriter, r *http.Request) +} diff --git a/hls/manager.go b/pkg/hlslive/manager.go similarity index 84% rename from hls/manager.go rename to pkg/hlslive/manager.go index 4728624..4db349c 100644 --- a/hls/manager.go +++ b/pkg/hlslive/manager.go @@ -1,4 +1,4 @@ -package hls +package hlslive import ( "errors" @@ -17,23 +17,9 @@ import ( "github.com/m1k1o/go-transcode/internal/utils" ) -// how often should be cleanup called -const cleanupPeriod = 4 * time.Second - -// timeout for first playlist, when it waits for new data -const playlistTimeout = 60 * time.Second - -// minimum segments available to consider stream as active -const hlsMinimumSegments = 2 - -// how long must be active stream idle to be considered as dead -const activeIdleTimeout = 12 * time.Second - -// how long must be iactive stream idle to be considered as dead -const inactiveIdleTimeout = 24 * time.Second - type ManagerCtx struct { logger zerolog.Logger + config Config mu sync.Mutex cmdFactory func() *exec.Cmd active bool @@ -54,10 +40,11 @@ type ManagerCtx struct { shutdown chan interface{} } -func New(cmdFactory func() *exec.Cmd) *ManagerCtx { +func New(cmdFactory func() *exec.Cmd, config *Config) *ManagerCtx { return &ManagerCtx{ logger: log.With().Str("module", "hls").Str("submodule", "manager").Logger(), cmdFactory: cmdFactory, + config: config.withDefaultValues(), playlistLoad: make(chan string), shutdown: make(chan interface{}), @@ -119,7 +106,7 @@ func (m *ManagerCtx) Start() error { Str("playlist", m.playlist). Msg("received playlist") - if m.sequence == hlsMinimumSegments { + if m.sequence == m.config.HlsMinimumSegments { m.active = true m.playlistLoad <- m.playlist close(m.playlistLoad) @@ -136,7 +123,7 @@ func (m *ManagerCtx) Start() error { // periodic cleanup go func() { - ticker := time.NewTicker(cleanupPeriod) + ticker := time.NewTicker(m.config.CleanupPeriod) defer ticker.Stop() for { @@ -217,7 +204,7 @@ func (m *ManagerCtx) Stop() { func (m *ManagerCtx) Cleanup() { m.mu.Lock() diff := time.Since(m.lastRequest) - stop := m.active && diff > activeIdleTimeout || !m.active && diff > inactiveIdleTimeout + stop := m.active && diff > m.config.ActiveIdleTimeout || !m.active && diff > m.config.InactiveIdleTimeout m.mu.Unlock() m.logger.Debug(). @@ -256,7 +243,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { m.logger.Warn().Msg("playlist load failed because of shutdown") http.Error(w, "500 playlist not available", http.StatusInternalServerError) return - case <-time.After(playlistTimeout): + case <-time.After(m.config.PlaylistTimeout): m.logger.Warn().Msg("playlist load channel timeouted") http.Error(w, "504 playlist timeout", http.StatusGatewayTimeout) return @@ -268,13 +255,13 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(playlist)) } -func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { +func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { fileName := path.Base(r.URL.RequestURI()) path := path.Join(m.tempdir, fileName) if _, err := os.Stat(path); os.IsNotExist(err) { - m.logger.Warn().Str("path", path).Msg("media file not found") - http.Error(w, "404 media not found", http.StatusNotFound) + m.logger.Warn().Str("path", path).Msg("segment not found") + http.Error(w, "404 segment not found", http.StatusNotFound) return } @@ -282,7 +269,7 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { m.lastRequest = time.Now() m.mu.Unlock() - w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + w.Header().Set("Content-Type", "video/MP2T") w.Header().Set("Cache-Control", "no-cache") http.ServeFile(w, r, path) } diff --git a/pkg/hlslive/types.go b/pkg/hlslive/types.go new file mode 100644 index 0000000..bc6c2e9 --- /dev/null +++ b/pkg/hlslive/types.go @@ -0,0 +1,46 @@ +package hlslive + +import ( + "net/http" + "time" +) + +type Config struct { + CleanupPeriod time.Duration // how often should be cleanup called + PlaylistTimeout time.Duration // timeout for first playlist, when it waits for new data + HlsMinimumSegments int // minimum segments available to consider stream as active + ActiveIdleTimeout time.Duration // how long must be active stream idle to be considered as dead + InactiveIdleTimeout time.Duration // how long must be iactive stream idle to be considered as dead +} + +func (c Config) withDefaultValues() Config { + if c.CleanupPeriod == 0 { + c.CleanupPeriod = 4 * time.Second + } + if c.PlaylistTimeout == 0 { + c.PlaylistTimeout = 60 * time.Second + } + if c.HlsMinimumSegments == 0 { + c.HlsMinimumSegments = 2 + } + if c.ActiveIdleTimeout == 0 { + c.ActiveIdleTimeout = 12 * time.Second + } + if c.InactiveIdleTimeout == 0 { + c.InactiveIdleTimeout = 24 * time.Second + } + return c +} + +type Manager interface { + Start() error + Stop() + Cleanup() + + ServePlaylist(w http.ResponseWriter, r *http.Request) + ServeSegment(w http.ResponseWriter, r *http.Request) + + OnStart(event func()) + OnCmdLog(event func(message string)) + OnStop(event func(err error)) +} diff --git a/hlsproxy/cache.go b/pkg/hlsproxy/cache.go similarity index 94% rename from hlsproxy/cache.go rename to pkg/hlsproxy/cache.go index 4f8a60d..cd5aeeb 100644 --- a/hlsproxy/cache.go +++ b/pkg/hlsproxy/cache.go @@ -19,7 +19,7 @@ func (m *ManagerCtx) getFromCache(key string) (*utils.Cache, bool) { } // if cache has expired - if time.Now().After(entry.Expires) { + if entry.Expired() { return nil, false } @@ -61,7 +61,7 @@ func (m *ManagerCtx) clearCache() { m.cacheMu.Lock() for key, entry := range m.cache { // remove expired entries - if time.Now().After(entry.Expires) { + if entry.Expired() { delete(m.cache, key) m.logger.Debug().Str("key", key).Msg("cache cleanup remove expired") } else { @@ -90,7 +90,7 @@ func (m *ManagerCtx) cleanupStart() { go func() { m.logger.Debug().Msg("cleanup started") - ticker := time.NewTicker(cacheCleanupPeriod) + ticker := time.NewTicker(m.config.CacheCleanupPeriod) defer ticker.Stop() for { diff --git a/hlsproxy/manager.go b/pkg/hlsproxy/manager.go similarity index 61% rename from hlsproxy/manager.go rename to pkg/hlsproxy/manager.go index a01b1e9..5d02236 100644 --- a/hlsproxy/manager.go +++ b/pkg/hlsproxy/manager.go @@ -6,7 +6,6 @@ import ( "regexp" "strings" "sync" - "time" "github.com/m1k1o/go-transcode/internal/utils" @@ -14,19 +13,9 @@ import ( "github.com/rs/zerolog/log" ) -// how often should be cache cleanup called -const cacheCleanupPeriod = 4 * time.Second - -// how long should be segment kept in memory -const segmentExpiration = 60 * time.Second - -// how long should be playlist kept in memory -const playlistExpiration = 1 * time.Second - type ManagerCtx struct { - logger zerolog.Logger - baseUrl string - prefix string + logger zerolog.Logger + config Config cache map[string]*utils.Cache cacheMu sync.RWMutex @@ -36,16 +25,11 @@ type ManagerCtx struct { shutdown chan struct{} } -func New(baseUrl string, prefix string) *ManagerCtx { - // ensure it ends with slash - baseUrl = strings.TrimSuffix(baseUrl, "/") - baseUrl += "/" - +func New(config *Config) *ManagerCtx { return &ManagerCtx{ - logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(), - baseUrl: baseUrl, - prefix: prefix, - cache: map[string]*utils.Cache{}, + logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(), + config: config.withDefaultValues(), + cache: map[string]*utils.Cache{}, } } @@ -54,7 +38,7 @@ func (m *ManagerCtx) Shutdown() { } func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { - url := m.baseUrl + strings.TrimPrefix(r.URL.String(), m.prefix) + url := m.config.PlaylistBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.PlaylistPathPrefix) cache, ok := m.getFromCache(url) if !ok { @@ -81,26 +65,27 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { return } - var re = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`) - text := re.ReplaceAllString(string(buf), m.prefix) + // TODO: Handle relative paths. + text := string(buf) + text = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`).ReplaceAllString(text, m.config.SegmentPathPrefix) - cache = m.saveToCache(url, strings.NewReader(text), playlistExpiration) + cache = m.saveToCache(url, strings.NewReader(text), m.config.PlaylistExpiration) } w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") w.WriteHeader(200) - cache.ServeHTTP(w) + cache.CopyTo(w) } -func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { - url := m.baseUrl + strings.TrimPrefix(r.URL.String(), m.prefix) +func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { + url := m.config.SegmentBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.SegmentPathPrefix) cache, ok := m.getFromCache(url) if !ok { resp, err := http.Get(url) if err != nil { - m.logger.Err(err).Msg("unable to get HTTP") + m.logger.Err(err).Str("url", url).Msg("unable to get HTTP") http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } @@ -113,11 +98,11 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { return } - cache = m.saveToCache(url, resp.Body, segmentExpiration) + cache = m.saveToCache(url, resp.Body, m.config.SegmentExpiration) } w.Header().Set("Content-Type", "video/MP2T") w.WriteHeader(200) - cache.ServeHTTP(w) + cache.CopyTo(w) } diff --git a/pkg/hlsproxy/types.go b/pkg/hlsproxy/types.go new file mode 100644 index 0000000..5a8a010 --- /dev/null +++ b/pkg/hlsproxy/types.go @@ -0,0 +1,50 @@ +package hlsproxy + +import ( + "net/http" + "strings" + "time" +) + +type Config struct { + PlaylistBaseUrl string + PlaylistPathPrefix string + SegmentBaseUrl string // optional: will be used playlist value if empty + SegmentPathPrefix string // optional: will be used playlist value if empty + + CacheCleanupPeriod time.Duration // how often should be cache cleanup called + SegmentExpiration time.Duration // how long should be segment kept in memory + PlaylistExpiration time.Duration // how long should be playlist kept in memory +} + +func (c Config) withDefaultValues() Config { + if c.SegmentBaseUrl == "" { + c.SegmentBaseUrl = c.PlaylistBaseUrl + } + if c.SegmentPathPrefix == "" { + c.SegmentPathPrefix = c.PlaylistPathPrefix + } + if c.CacheCleanupPeriod == 0 { + c.CacheCleanupPeriod = 4 * time.Second + } + if c.SegmentExpiration == 0 { + c.SegmentExpiration = 60 * time.Second + } + if c.PlaylistExpiration == 0 { + c.PlaylistExpiration = 1 * time.Second + } + // ensure it ends with single / + c.PlaylistBaseUrl = strings.TrimRight(c.PlaylistBaseUrl, "/") + "/" + c.SegmentBaseUrl = strings.TrimRight(c.SegmentBaseUrl, "/") + "/" + // ensure it starts and ends with single / + c.PlaylistPathPrefix = "/" + strings.Trim(c.PlaylistPathPrefix, "/") + "/" + c.SegmentPathPrefix = "/" + strings.Trim(c.SegmentPathPrefix, "/") + "/" + return c +} + +type Manager interface { + Shutdown() + + ServePlaylist(w http.ResponseWriter, r *http.Request) + ServeSegment(w http.ResponseWriter, r *http.Request) +} diff --git a/hlsvod/cache.go b/pkg/hlsvod/cache.go similarity index 79% rename from hlsvod/cache.go rename to pkg/hlsvod/cache.go index c6dc232..6ca141f 100644 --- a/hlsvod/cache.go +++ b/pkg/hlsvod/cache.go @@ -7,11 +7,9 @@ import ( "path" ) -const cacheFileSuffix = ".go-transcode-cache" - func (m *ManagerCtx) getCacheData() ([]byte, error) { // check for local cache - localCachePath := m.config.MediaPath + cacheFileSuffix + localCachePath := m.config.MediaPath + m.config.CacheFileExt if _, err := os.Stat(localCachePath); err == nil { m.logger.Info().Str("path", localCachePath).Msg("media local cache hit") return os.ReadFile(localCachePath) @@ -22,7 +20,7 @@ func (m *ManagerCtx) getCacheData() ([]byte, error) { h.Write([]byte(m.config.MediaPath)) hash := h.Sum(nil) - fileName := fmt.Sprintf("%x%s", hash, cacheFileSuffix) + fileName := fmt.Sprintf("%x%s", hash, m.config.CacheFileExt) globalCachePath := path.Join(m.config.CacheDir, fileName) if _, err := os.Stat(globalCachePath); err == nil { m.logger.Info().Str("path", globalCachePath).Msg("media global cache hit") @@ -33,7 +31,7 @@ func (m *ManagerCtx) getCacheData() ([]byte, error) { } func (m *ManagerCtx) saveLocalCacheData(data []byte) error { - localCachePath := m.config.MediaPath + cacheFileSuffix + localCachePath := m.config.MediaPath + m.config.CacheFileExt return os.WriteFile(localCachePath, data, 0755) } @@ -42,7 +40,7 @@ func (m *ManagerCtx) saveGlobalCacheData(data []byte) error { h.Write([]byte(m.config.MediaPath)) hash := h.Sum(nil) - fileName := fmt.Sprintf("%x%s", hash, cacheFileSuffix) + fileName := fmt.Sprintf("%x%s", hash, m.config.CacheFileExt) globalCachePath := path.Join(m.config.CacheDir, fileName) return os.WriteFile(globalCachePath, data, 0755) } diff --git a/hlsvod/manager.go b/pkg/hlsvod/manager.go similarity index 86% rename from hlsvod/manager.go rename to pkg/hlsvod/manager.go index 2bb8913..6734fcf 100644 --- a/hlsvod/manager.go +++ b/pkg/hlsvod/manager.go @@ -18,21 +18,10 @@ import ( "github.com/rs/zerolog/log" ) -// how long can it take for transcode to be ready -const readyTimeout = 80 * time.Second - -// how long can it take for transcode to return first data -const transcodeTimeout = 10 * time.Second - type ManagerCtx struct { logger zerolog.Logger config Config - segmentLength float64 - segmentOffset float64 - segmentBufferMin int // minimum segments available after playing head - segmentBufferMax int // maximum segments to be transcoded at once - ready bool readyMu sync.RWMutex readyChan chan struct{} @@ -51,17 +40,12 @@ type ManagerCtx struct { cancel context.CancelFunc } -func New(config Config) *ManagerCtx { +func New(config *Config) *ManagerCtx { ctx, cancel := context.WithCancel(context.Background()) + return &ManagerCtx{ logger: log.With().Str("module", "hlsvod").Str("submodule", "manager").Logger(), - config: config, - - segmentLength: 3.50, - segmentOffset: 1.25, - segmentBufferMin: 3, - segmentBufferMax: 5, - + config: config.withDefaultValues(), ctx: ctx, cancel: cancel, } @@ -121,7 +105,7 @@ func (m *ManagerCtx) httpEnsureReady(w http.ResponseWriter) bool { m.logger.Warn().Msg("manager load failed because of shutdown") http.Error(w, "500 manager not available", http.StatusInternalServerError) return false - case <-time.After(readyTimeout): + case <-time.After(m.config.ReadyTimeout): m.logger.Warn().Msg("manager load timeouted") http.Error(w, "504 manager timeout", http.StatusGatewayTimeout) return false @@ -201,14 +185,14 @@ func (m *ManagerCtx) loadMetadata(ctx context.Context) error { } func (m *ManagerCtx) getSegmentName(index int) string { - return fmt.Sprintf("%s-%05d.ts", m.config.SegmentPrefix, index) + return fmt.Sprintf("%s-%05d.ts", m.config.SegmentNamePrefix, index) } func (m *ManagerCtx) parseSegmentIndex(segmentName string) (int, bool) { regex := regexp.MustCompile(`^(.*)-([0-9]{5})\.ts$`) matches := regex.FindStringSubmatch(segmentName) - if len(matches) != 3 || matches[1] != m.config.SegmentPrefix { + if len(matches) != 3 || matches[1] != m.config.SegmentNamePrefix { return 0, false } @@ -228,7 +212,7 @@ func (m *ManagerCtx) getPlaylist() string { "#EXT-X-VERSION:4", "#EXT-X-PLAYLIST-TYPE:VOD", "#EXT-X-MEDIA-SEQUENCE:0", - fmt.Sprintf("#EXT-X-TARGETDURATION:%.2f", m.segmentLength+m.segmentOffset), + fmt.Sprintf("#EXT-X-TARGETDURATION:%.2f", m.config.SegmentLength+m.config.SegmentOffset), } // playlist segments @@ -255,7 +239,7 @@ func (m *ManagerCtx) initialize() { } // generate breakpoints from keyframes - m.breakpoints = convertToSegments(keyframes, m.metadata.Duration, m.segmentLength, m.segmentOffset) + m.breakpoints = convertToSegments(keyframes, m.metadata.Duration, m.config.SegmentLength, m.config.SegmentOffset) // generate playlist m.playlist = m.getPlaylist() @@ -371,7 +355,8 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error { segments, err := TranscodeSegments(m.ctx, m.config.FFmpegBinary, TranscodeConfig{ InputFilePath: m.config.MediaPath, OutputDirPath: m.config.TranscodeDir, - SegmentPrefix: m.config.SegmentPrefix, // This does not need to match. + // This does not need to be the same as chosen prefix. + SegmentPrefix: m.config.SegmentNamePrefix, VideoProfile: m.config.VideoProfile, AudioProfile: m.config.AudioProfile, @@ -420,8 +405,8 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error { func (m *ManagerCtx) transcodeFromSegment(index int) error { segmentsTotal := len(m.segments) - if index+m.segmentBufferMax < segmentsTotal { - segmentsTotal = index + m.segmentBufferMax + if index+m.config.SegmentBufferMax < segmentsTotal { + segmentsTotal = index + m.config.SegmentBufferMax } offset, limit := 0, 0 @@ -444,7 +429,7 @@ func (m *ManagerCtx) transcodeFromSegment(index int) error { } // if offset is greater than our minimal offset, we have enough segments available - if offset > m.segmentBufferMin { + if offset > m.config.SegmentBufferMin { return nil } @@ -505,7 +490,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(m.playlist)) } -func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { +func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { // ensure that manager started if !m.httpEnsureReady(w) { return @@ -517,7 +502,7 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { // getting index from segment name index, ok := m.parseSegmentIndex(reqSegName) if !ok { - http.Error(w, "400 bad media path", http.StatusBadRequest) + http.Error(w, "400 bad segment path", http.StatusBadRequest) return } @@ -530,8 +515,8 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { // try to transcode from current segment if err := m.transcodeFromSegment(index); err != nil { - m.logger.Err(err).Int("index", index).Msg("unable to transcode media") - http.Error(w, "500 unable to transcode", http.StatusInternalServerError) + m.logger.Err(err).Int("index", index).Msg("unable to transcode segment") + http.Error(w, "500 unable to transcode segment", http.StatusInternalServerError) return } @@ -541,8 +526,8 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { segChan, ok := m.waitForSegment(index) if !ok { // this should never happen - m.logger.Error().Int("index", index).Msg("media not queued even after transcode") - http.Error(w, "409 media not queued even after transcode", http.StatusConflict) + m.logger.Error().Int("index", index).Msg("segment not queued even after transcode") + http.Error(w, "409 segment not queued even after transcode", http.StatusConflict) return } @@ -559,25 +544,24 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { } // when transcode stops before getting ready case <-m.ctx.Done(): - m.logger.Warn().Msg("media transcode failed because of shutdown") - http.Error(w, "500 media not available", http.StatusInternalServerError) + m.logger.Warn().Msg("segment transcode failed because of shutdown") + http.Error(w, "500 segment not available", http.StatusInternalServerError) return - case <-time.After(transcodeTimeout): - m.logger.Warn().Msg("media transcode timeouted") - http.Error(w, "504 media timeout", http.StatusGatewayTimeout) + case <-time.After(m.config.TranscodeTimeout): + m.logger.Warn().Msg("segment transcode timeouted") + http.Error(w, "504 segment transcode timeout", http.StatusGatewayTimeout) return } } // check if segment is on the disk if _, err := os.Stat(segmentPath); os.IsNotExist(err) { - m.logger.Warn().Int("index", index).Str("path", segmentPath).Msg("media file not found") - http.Error(w, "404 media not found", http.StatusNotFound) + m.logger.Warn().Int("index", index).Str("path", segmentPath).Msg("segment file not found") + http.Error(w, "404 segment not found", http.StatusNotFound) return } // return existing segment - w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") - w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Content-Type", "video/MP2T") http.ServeFile(w, r, segmentPath) } diff --git a/hlsvod/probe.go b/pkg/hlsvod/probe.go similarity index 100% rename from hlsvod/probe.go rename to pkg/hlsvod/probe.go diff --git a/hlsvod/transcode.go b/pkg/hlsvod/transcode.go similarity index 100% rename from hlsvod/transcode.go rename to pkg/hlsvod/transcode.go diff --git a/pkg/hlsvod/types.go b/pkg/hlsvod/types.go new file mode 100644 index 0000000..938e6f4 --- /dev/null +++ b/pkg/hlsvod/types.go @@ -0,0 +1,72 @@ +package hlsvod + +import ( + "context" + "net/http" + "time" +) + +type Config struct { + MediaPath string // transcoded video input + TranscodeDir string // temporary directory to store transcoded elements + SegmentNamePrefix string // e.g. prefix-000001.ts + + VideoProfile *VideoProfile + VideoKeyframes bool + AudioProfile *AudioProfile + + Cache bool + CacheDir string // if not empty, cache will folder will be used instead of media path + CacheFileExt string // extension of cache files created by this module + + FFmpegBinary string + FFprobeBinary string + + ReadyTimeout time.Duration // how long can it take for transcode to be ready + TranscodeTimeout time.Duration // how long can it take for transcode to be ready + + SegmentLength float64 + SegmentOffset float64 // maximim segment length deviation + SegmentBufferMin int // minimum segments available after playing head + SegmentBufferMax int // maximum segments to be transcoded at once +} + +func (c Config) withDefaultValues() Config { + if c.CacheFileExt == "" { + c.FFmpegBinary = ".go-transcode-cache" + } + if c.FFmpegBinary == "" { + c.FFmpegBinary = "ffmpeg" + } + if c.FFprobeBinary == "" { + c.FFprobeBinary = "ffprobe" + } + if c.ReadyTimeout == 0 { + c.ReadyTimeout = 80 * time.Second + } + if c.TranscodeTimeout == 0 { + c.TranscodeTimeout = 10 * time.Second + } + if c.SegmentLength == 0 { + c.SegmentLength = 3.50 + } + if c.SegmentOffset == 0 { + c.SegmentOffset = 1.25 + } + if c.SegmentBufferMin == 0 { + c.SegmentBufferMin = 3 + } + if c.SegmentBufferMax == 0 { + c.SegmentBufferMax = 5 + } + return c +} + +type Manager interface { + Start() error + Stop() + Preload(ctx context.Context) (*ProbeMediaData, error) + + ServePlaylist(w http.ResponseWriter, r *http.Request) + ServeSegment(w http.ResponseWriter, r *http.Request) +} diff --git a/hlsvod/utils.go b/pkg/hlsvod/utils.go similarity index 100% rename from hlsvod/utils.go rename to pkg/hlsvod/utils.go