Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ ENTRYPOINT ["/app/transcoderd-server"]
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS builder-pgs
WORKDIR /src
ARG tessdata_version=ced78752cc61322fb554c280d13360b35b8684e4
ARG pgstosrt_version=26a4ab214fbc18520d2999eb3d8baf8d5c84a724
ARG pgstosrt_version=3123a9004cf1e163b6b7171a72deff2a899ed361

RUN apt-get -y update && \
apt-get -y upgrade && \
Expand Down
22 changes: 7 additions & 15 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,16 @@ help: ## show this help menu.
@echo ""

.PHONY: fmt
fmt:
fmt: ## Code Format
go fmt ./...

.PHONY: lint
lint: ## Linters
@golangci-lint run

# Install GolangCI-Lint
install-lint:
@echo "Installing GolangCI-Lint $(GOLANGCI_LINT_VERSION)..."
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s $(GOLANGCI_LINT_VERSION)

# Run GolangCI-Lint
lint:
@echo "Running GolangCI-Lint..."
$(GOLANGCI_LINT_BIN) run --config=$(GOLANGCI_LINT_CONFIG)

# Run GolangCI-Lint and fix issues automatically
lint-fix:
@echo "Running GolangCI-Lint with --fix..."
$(GOLANGCI_LINT_BIN) run --fix --config=$(GOLANGCI_LINT_CONFIG)
.PHONY: lint-fix
lint-fix: ## Lint fix if possible
@golangci-lint run --fix


.PHONY: build
Expand Down
5 changes: 2 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module transcoder

go 1.22.0
go 1.23.0

toolchain go1.23.3

Expand All @@ -15,9 +15,8 @@ require (
github.com/lib/pq v1.10.9
github.com/minio/selfupdate v0.6.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/pflag v1.0.5
github.com/spf13/pflag v1.0.6
github.com/spf13/viper v1.19.0
gopkg.in/errgo.v2 v2.1.0
gopkg.in/vansante/go-ffprobe.v2 v2.2.1
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y=
github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
17 changes: 12 additions & 5 deletions helper/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Command struct {
WorkDir string
stdoutFunc ReaderFunc
stderrFunc ReaderFunc
buffSize int
}

func NewAllowedCodesOption(codes ...int) Option {
Expand All @@ -36,10 +37,11 @@ func NewCommandByString(command string, params string) *Command {
}
func NewCommand(command string, params ...string) *Command {
cmd := &Command{
Command: command,
Params: params,
Env: os.Environ(),
WorkDir: GetWD(),
Command: command,
Params: params,
Env: os.Environ(),
WorkDir: GetWD(),
buffSize: 4096,
}
return cmd
}
Expand All @@ -63,6 +65,11 @@ func (c *Command) AddEnv(env string) *Command {
return c
}

func (c *Command) BuffSize(size int) *Command {
c.buffSize = size
return c
}

func (c *Command) SetStdoutFunc(stdoutFunc ReaderFunc) *Command {
c.stdoutFunc = stdoutFunc
return c
Expand Down Expand Up @@ -144,7 +151,7 @@ func allowedCodes(opts []Option, exitCode int) bool {

func (c *Command) readerStreamProcessor(ctx context.Context, wg *sync.WaitGroup, reader io.ReadCloser, callbackFunc ReaderFunc) {
defer wg.Done()
buffer := make([]byte, 4096)
buffer := make([]byte, c.buffSize)
loop:
for {
select {
Expand Down
178 changes: 54 additions & 124 deletions model/model.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package model

import (
"encoding/json"
"fmt"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"os"
"time"
"transcoder/helper/max"
)
Expand All @@ -13,28 +12,26 @@ type EventType string
type NotificationType string
type NotificationStatus string
type JobAction string
type TaskEvents []*TaskEvent
type TaskEvents []*TaskEventType

const (
PingEvent EventType = "Ping"
NotificationEvent EventType = "Notification"

JobNotification NotificationType = "Job"
DownloadNotification NotificationType = "Download"
UploadNotification NotificationType = "Upload"
MKVExtractNotification NotificationType = "MKVExtract"
FFProbeNotification NotificationType = "FFProbe"
PGSNotification NotificationType = "PGS"
FFMPEGSNotification NotificationType = "FFMPEG"

ProgressEvent EventType = "Progress"

JobNotification NotificationType = "Job"
DownloadNotification NotificationType = "Download"
UploadNotification NotificationType = "Upload"
MKVExtractNotification NotificationType = "MKVExtract"
PGSNotification NotificationType = "PGS"
FFMPEGSNotification NotificationType = "FFMPEG"
JobVerify NotificationType = "JobVerify"
QueuedNotificationStatus NotificationStatus = "queued"
AssignedNotificationStatus NotificationStatus = "assigned"
StartedNotificationStatus NotificationStatus = "started"
CompletedNotificationStatus NotificationStatus = "completed"
CanceledNotificationStatus NotificationStatus = "canceled"
FailedNotificationStatus NotificationStatus = "failed"

CancelJob JobAction = "cancel"
)

type Identity interface {
Expand Down Expand Up @@ -64,155 +61,87 @@ type Worker struct {
LastSeen time.Time
}

type ControlEvent struct {
Event *TaskEncode
ControlChan chan interface{}
}

type JobEvent struct {
Id uuid.UUID `json:"id"`
Action JobAction `json:"action"`
}

type JobType string

type TaskEncode struct {
type RequestJobResponse struct {
Id uuid.UUID `json:"id"`
EventID int `json:"eventID"`
}

type WorkTaskEncode struct {
TaskEncode *TaskEncode
WorkDir string
SourceFilePath string
TargetFilePath string
}

type TaskPGS struct {
PGSID int
PGSSourcePath string
PGSLanguage string
PGSTargetPath string
}

type TaskPGSResponse struct {
Id uuid.UUID `json:"id"`
PGSID int `json:"pgsid"`
Srt []byte `json:"srt"`
Err string `json:"error"`
Queue string `json:"queue"`
type EnvelopEvent struct {
EventType EventType `json:"eventType"`
EventData json.RawMessage `json:"eventData"`
}

func (t TaskEncode) getUUID() uuid.UUID {
return t.Id
type Event struct {
EventTime time.Time `json:"eventTime"`
WorkerName string `json:"workerName"`
}

type TaskEvent struct {
Id uuid.UUID `json:"id"`
type TaskEventType struct {
Event
JobId uuid.UUID `json:"Id"`
EventID int `json:"eventID"`
EventType EventType `json:"eventType"`
WorkerName string `json:"workerName"`
EventTime time.Time `json:"eventTime"`
IP string `json:"ip"`
NotificationType NotificationType `json:"notificationType"`
Status NotificationStatus `json:"status"`
Message string `json:"message"`
}

type TaskStatus struct {
LastState *TaskEvent
Task *WorkTaskEncode
}

func (e TaskEvent) IsAssigned() bool {
if e.EventType != NotificationEvent {
return false
}
if e.NotificationType == JobNotification && (e.Status == AssignedNotificationStatus || e.Status == StartedNotificationStatus) {
return true
}
return false
type PingEventType struct {
Event
IP string `json:"ip"`
}

func (e TaskEvent) IsCompleted() bool {
if e.EventType != NotificationEvent {
return false
}
if e.NotificationType == JobNotification && e.Status == CompletedNotificationStatus {
return true
}
return false
}
type TaskProgressStatus string

func (e TaskEvent) IsDownloading() bool {
if e.EventType != NotificationEvent {
return false
}
if e.NotificationType == DownloadNotification && e.Status == StartedNotificationStatus {
return true
}
const (
ProgressingTaskProgressTypeStatus TaskProgressStatus = "progressing"
DoneTaskProgressTypeStatus TaskProgressStatus = "done"
FailureTaskProgressTypeStatus TaskProgressStatus = "failure"
)

if e.NotificationType == JobNotification && (e.Status == StartedNotificationStatus) {
return true
}
return false
type TaskProgressType struct {
Event
JobId uuid.UUID `json:"jobId"`
ProgressID string `json:"progressID"`
Percent float64 `json:"percent"`
ETA time.Duration `json:"eta"`
NotificationType NotificationType `json:"notificationType"`
Status TaskProgressStatus `json:"status"`
}

func (e TaskEvent) IsEncoding() bool {
if e.EventType != NotificationEvent {
return false
}
if e.NotificationType == DownloadNotification && e.Status == CompletedNotificationStatus {
return true
}

if e.NotificationType == MKVExtractNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) {
return true
}
if e.NotificationType == FFProbeNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) {
return true
}
if e.NotificationType == PGSNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) {
return true
}
if e.NotificationType == FFMPEGSNotification && e.Status == StartedNotificationStatus {
func (e TaskEventType) IsAssigned() bool {
if e.NotificationType == JobNotification && (e.Status == AssignedNotificationStatus || e.Status == StartedNotificationStatus) {
return true
}

return false
}

func (e TaskEvent) IsUploading() bool {
if e.EventType != NotificationEvent {
return false
}
if e.NotificationType == FFMPEGSNotification && e.Status == CompletedNotificationStatus {
return true
}

if e.NotificationType == UploadNotification && e.Status == StartedNotificationStatus {
func (e TaskEventType) IsCompleted() bool {
if e.NotificationType == JobNotification && e.Status == CompletedNotificationStatus {
return true
}

return false
}

func (w *WorkTaskEncode) Clean() error {
log.Debugf("[%s] Cleaning up Task Workspace", w.TaskEncode.Id.String())
err := os.RemoveAll(w.WorkDir)
if err != nil {
return err
}
return nil
}

func (t TaskEvents) GetLatest() *TaskEvent {
func (t TaskEvents) GetLatest() *TaskEventType {
if len(t) == 0 {
return nil
}
return max.Max(t).(*TaskEvent)
return max.Max(t).(*TaskEventType)
}
func (t TaskEvents) GetLatestPerNotificationType(notificationType NotificationType) (returnEvent *TaskEvent) {
func (t TaskEvents) GetLatestPerNotificationType(notificationType NotificationType) (returnEvent *TaskEventType) {
eventID := -1
for _, event := range t {
if event.NotificationType == notificationType && event.EventID > eventID {
Expand Down Expand Up @@ -249,7 +178,7 @@ func (t TaskEvents) Less(i, j int) bool {
func (t TaskEvents) Swap(i, j int) {
t[i], t[j] = t[j], t[i]
}
func (t TaskEvents) GetByEventId(i int) (*TaskEvent, error) {
func (t TaskEvents) GetByEventId(i int) (*TaskEventType, error) {
for _, event := range t {
if event.EventID == i {
return event, nil
Expand All @@ -260,22 +189,23 @@ func (t TaskEvents) GetByEventId(i int) (*TaskEvent, error) {
func (t TaskEvents) GetLastElement(i int) interface{} {
return t[i]
}
func (v *Job) AddEvent(eventType EventType, notificationType NotificationType, notificationStatus NotificationStatus) (newEvent *TaskEvent) {
return v.AddEventComplete(eventType, notificationType, notificationStatus, "")
func (v *Job) AddEvent(notificationType NotificationType, notificationStatus NotificationStatus) (newEvent *TaskEventType) {
return v.AddEventComplete(notificationType, notificationStatus, "")
}

func (v *Job) AddEventComplete(eventType EventType, notificationType NotificationType, notificationStatus NotificationStatus, message string) (newEvent *TaskEvent) {
func (v *Job) AddEventComplete(notificationType NotificationType, notificationStatus NotificationStatus, message string) (newEvent *TaskEventType) {
latestEvent := v.Events.GetLatest()
newEventID := 0
if latestEvent != nil {
newEventID = latestEvent.EventID + 1
}

newEvent = &TaskEvent{
Id: v.Id,
newEvent = &TaskEventType{
Event: Event{
EventTime: time.Now(),
},
JobId: v.Id,
EventID: newEventID,
EventType: eventType,
EventTime: time.Now(),
NotificationType: notificationType,
Status: notificationStatus,
Message: message,
Expand Down
Loading
Loading