diff --git a/Dockerfile b/Dockerfile index 75b610d..634f7d6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -41,7 +41,7 @@ ENTRYPOINT ["/app/transcoderd-server"] FROM mcr.microsoft.com/dotnet/sdk:6.0 AS builder-pgs WORKDIR /src ARG tessdata_version=ced78752cc61322fb554c280d13360b35b8684e4 -ARG pgstosrt_version=26a4ab214fbc18520d2999eb3d8baf8d5c84a724 +ARG pgstosrt_version=3123a9004cf1e163b6b7171a72deff2a899ed361 RUN apt-get -y update && \ apt-get -y upgrade && \ diff --git a/Makefile b/Makefile index 34dc210..d260ef2 100644 --- a/Makefile +++ b/Makefile @@ -21,24 +21,16 @@ help: ## show this help menu. @echo "" .PHONY: fmt -fmt: +fmt: ## Code Format go fmt ./... +.PHONY: lint +lint: ## Linters + @golangci-lint run -# Install GolangCI-Lint -install-lint: - @echo "Installing GolangCI-Lint $(GOLANGCI_LINT_VERSION)..." - curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s $(GOLANGCI_LINT_VERSION) - -# Run GolangCI-Lint -lint: - @echo "Running GolangCI-Lint..." - $(GOLANGCI_LINT_BIN) run --config=$(GOLANGCI_LINT_CONFIG) - -# Run GolangCI-Lint and fix issues automatically -lint-fix: - @echo "Running GolangCI-Lint with --fix..." - $(GOLANGCI_LINT_BIN) run --fix --config=$(GOLANGCI_LINT_CONFIG) +.PHONY: lint-fix +lint-fix: ## Lint fix if possible + @golangci-lint run --fix .PHONY: build diff --git a/go.mod b/go.mod index 65bcb44..6adb812 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module transcoder -go 1.22.0 +go 1.23.0 toolchain go1.23.3 @@ -15,9 +15,8 @@ require ( github.com/lib/pq v1.10.9 github.com/minio/selfupdate v0.6.0 github.com/sirupsen/logrus v1.9.3 - github.com/spf13/pflag v1.0.5 + github.com/spf13/pflag v1.0.6 github.com/spf13/viper v1.19.0 - gopkg.in/errgo.v2 v2.1.0 gopkg.in/vansante/go-ffprobe.v2 v2.2.1 ) diff --git a/go.sum b/go.sum index cd6e423..5f34806 100644 --- a/go.sum +++ b/go.sum @@ -81,6 +81,8 @@ github.com/spf13/cast v1.7.1 h1:cuNEagBQEHWN1FnbGEjCXL2szYEXqfJPbP2HNUaca9Y= github.com/spf13/cast v1.7.1/go.mod h1:ancEpBxwJDODSW/UG4rDrAqiKolqNNh2DX3mk86cAdo= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/helper/command/command.go b/helper/command/command.go index a9d8050..67cb660 100644 --- a/helper/command/command.go +++ b/helper/command/command.go @@ -24,6 +24,7 @@ type Command struct { WorkDir string stdoutFunc ReaderFunc stderrFunc ReaderFunc + buffSize int } func NewAllowedCodesOption(codes ...int) Option { @@ -36,10 +37,11 @@ func NewCommandByString(command string, params string) *Command { } func NewCommand(command string, params ...string) *Command { cmd := &Command{ - Command: command, - Params: params, - Env: os.Environ(), - WorkDir: GetWD(), + Command: command, + Params: params, + Env: os.Environ(), + WorkDir: GetWD(), + buffSize: 4096, } return cmd } @@ -63,6 +65,11 @@ func (c *Command) AddEnv(env string) *Command { return c } +func (c *Command) BuffSize(size int) *Command { + c.buffSize = size + return c +} + func (c *Command) SetStdoutFunc(stdoutFunc ReaderFunc) *Command { c.stdoutFunc = stdoutFunc return c @@ -144,7 +151,7 @@ func allowedCodes(opts []Option, exitCode int) bool { func (c *Command) readerStreamProcessor(ctx context.Context, wg *sync.WaitGroup, reader io.ReadCloser, callbackFunc ReaderFunc) { defer wg.Done() - buffer := make([]byte, 4096) + buffer := make([]byte, c.buffSize) loop: for { select { diff --git a/model/model.go b/model/model.go index 10a1f0e..a3eab4f 100644 --- a/model/model.go +++ b/model/model.go @@ -1,10 +1,9 @@ package model import ( + "encoding/json" "fmt" "github.com/google/uuid" - log "github.com/sirupsen/logrus" - "os" "time" "transcoder/helper/max" ) @@ -13,28 +12,26 @@ type EventType string type NotificationType string type NotificationStatus string type JobAction string -type TaskEvents []*TaskEvent +type TaskEvents []*TaskEventType const ( PingEvent EventType = "Ping" NotificationEvent EventType = "Notification" - - JobNotification NotificationType = "Job" - DownloadNotification NotificationType = "Download" - UploadNotification NotificationType = "Upload" - MKVExtractNotification NotificationType = "MKVExtract" - FFProbeNotification NotificationType = "FFProbe" - PGSNotification NotificationType = "PGS" - FFMPEGSNotification NotificationType = "FFMPEG" - + ProgressEvent EventType = "Progress" + + JobNotification NotificationType = "Job" + DownloadNotification NotificationType = "Download" + UploadNotification NotificationType = "Upload" + MKVExtractNotification NotificationType = "MKVExtract" + PGSNotification NotificationType = "PGS" + FFMPEGSNotification NotificationType = "FFMPEG" + JobVerify NotificationType = "JobVerify" QueuedNotificationStatus NotificationStatus = "queued" AssignedNotificationStatus NotificationStatus = "assigned" StartedNotificationStatus NotificationStatus = "started" CompletedNotificationStatus NotificationStatus = "completed" CanceledNotificationStatus NotificationStatus = "canceled" FailedNotificationStatus NotificationStatus = "failed" - - CancelJob JobAction = "cancel" ) type Identity interface { @@ -64,11 +61,6 @@ type Worker struct { LastSeen time.Time } -type ControlEvent struct { - Event *TaskEncode - ControlChan chan interface{} -} - type JobEvent struct { Id uuid.UUID `json:"id"` Action JobAction `json:"action"` @@ -76,18 +68,11 @@ type JobEvent struct { type JobType string -type TaskEncode struct { +type RequestJobResponse struct { Id uuid.UUID `json:"id"` EventID int `json:"eventID"` } -type WorkTaskEncode struct { - TaskEncode *TaskEncode - WorkDir string - SourceFilePath string - TargetFilePath string -} - type TaskPGS struct { PGSID int PGSSourcePath string @@ -95,124 +80,68 @@ type TaskPGS struct { PGSTargetPath string } -type TaskPGSResponse struct { - Id uuid.UUID `json:"id"` - PGSID int `json:"pgsid"` - Srt []byte `json:"srt"` - Err string `json:"error"` - Queue string `json:"queue"` +type EnvelopEvent struct { + EventType EventType `json:"eventType"` + EventData json.RawMessage `json:"eventData"` } -func (t TaskEncode) getUUID() uuid.UUID { - return t.Id +type Event struct { + EventTime time.Time `json:"eventTime"` + WorkerName string `json:"workerName"` } - -type TaskEvent struct { - Id uuid.UUID `json:"id"` +type TaskEventType struct { + Event + JobId uuid.UUID `json:"Id"` EventID int `json:"eventID"` - EventType EventType `json:"eventType"` - WorkerName string `json:"workerName"` - EventTime time.Time `json:"eventTime"` - IP string `json:"ip"` NotificationType NotificationType `json:"notificationType"` Status NotificationStatus `json:"status"` Message string `json:"message"` } -type TaskStatus struct { - LastState *TaskEvent - Task *WorkTaskEncode -} - -func (e TaskEvent) IsAssigned() bool { - if e.EventType != NotificationEvent { - return false - } - if e.NotificationType == JobNotification && (e.Status == AssignedNotificationStatus || e.Status == StartedNotificationStatus) { - return true - } - return false +type PingEventType struct { + Event + IP string `json:"ip"` } -func (e TaskEvent) IsCompleted() bool { - if e.EventType != NotificationEvent { - return false - } - if e.NotificationType == JobNotification && e.Status == CompletedNotificationStatus { - return true - } - return false -} +type TaskProgressStatus string -func (e TaskEvent) IsDownloading() bool { - if e.EventType != NotificationEvent { - return false - } - if e.NotificationType == DownloadNotification && e.Status == StartedNotificationStatus { - return true - } +const ( + ProgressingTaskProgressTypeStatus TaskProgressStatus = "progressing" + DoneTaskProgressTypeStatus TaskProgressStatus = "done" + FailureTaskProgressTypeStatus TaskProgressStatus = "failure" +) - if e.NotificationType == JobNotification && (e.Status == StartedNotificationStatus) { - return true - } - return false +type TaskProgressType struct { + Event + JobId uuid.UUID `json:"jobId"` + ProgressID string `json:"progressID"` + Percent float64 `json:"percent"` + ETA time.Duration `json:"eta"` + NotificationType NotificationType `json:"notificationType"` + Status TaskProgressStatus `json:"status"` } -func (e TaskEvent) IsEncoding() bool { - if e.EventType != NotificationEvent { - return false - } - if e.NotificationType == DownloadNotification && e.Status == CompletedNotificationStatus { - return true - } - - if e.NotificationType == MKVExtractNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) { - return true - } - if e.NotificationType == FFProbeNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) { - return true - } - if e.NotificationType == PGSNotification && (e.Status == StartedNotificationStatus || e.Status == CompletedNotificationStatus) { - return true - } - if e.NotificationType == FFMPEGSNotification && e.Status == StartedNotificationStatus { +func (e TaskEventType) IsAssigned() bool { + if e.NotificationType == JobNotification && (e.Status == AssignedNotificationStatus || e.Status == StartedNotificationStatus) { return true } - return false } -func (e TaskEvent) IsUploading() bool { - if e.EventType != NotificationEvent { - return false - } - if e.NotificationType == FFMPEGSNotification && e.Status == CompletedNotificationStatus { - return true - } - - if e.NotificationType == UploadNotification && e.Status == StartedNotificationStatus { +func (e TaskEventType) IsCompleted() bool { + if e.NotificationType == JobNotification && e.Status == CompletedNotificationStatus { return true } - return false } -func (w *WorkTaskEncode) Clean() error { - log.Debugf("[%s] Cleaning up Task Workspace", w.TaskEncode.Id.String()) - err := os.RemoveAll(w.WorkDir) - if err != nil { - return err - } - return nil -} - -func (t TaskEvents) GetLatest() *TaskEvent { +func (t TaskEvents) GetLatest() *TaskEventType { if len(t) == 0 { return nil } - return max.Max(t).(*TaskEvent) + return max.Max(t).(*TaskEventType) } -func (t TaskEvents) GetLatestPerNotificationType(notificationType NotificationType) (returnEvent *TaskEvent) { +func (t TaskEvents) GetLatestPerNotificationType(notificationType NotificationType) (returnEvent *TaskEventType) { eventID := -1 for _, event := range t { if event.NotificationType == notificationType && event.EventID > eventID { @@ -249,7 +178,7 @@ func (t TaskEvents) Less(i, j int) bool { func (t TaskEvents) Swap(i, j int) { t[i], t[j] = t[j], t[i] } -func (t TaskEvents) GetByEventId(i int) (*TaskEvent, error) { +func (t TaskEvents) GetByEventId(i int) (*TaskEventType, error) { for _, event := range t { if event.EventID == i { return event, nil @@ -260,22 +189,23 @@ func (t TaskEvents) GetByEventId(i int) (*TaskEvent, error) { func (t TaskEvents) GetLastElement(i int) interface{} { return t[i] } -func (v *Job) AddEvent(eventType EventType, notificationType NotificationType, notificationStatus NotificationStatus) (newEvent *TaskEvent) { - return v.AddEventComplete(eventType, notificationType, notificationStatus, "") +func (v *Job) AddEvent(notificationType NotificationType, notificationStatus NotificationStatus) (newEvent *TaskEventType) { + return v.AddEventComplete(notificationType, notificationStatus, "") } -func (v *Job) AddEventComplete(eventType EventType, notificationType NotificationType, notificationStatus NotificationStatus, message string) (newEvent *TaskEvent) { +func (v *Job) AddEventComplete(notificationType NotificationType, notificationStatus NotificationStatus, message string) (newEvent *TaskEventType) { latestEvent := v.Events.GetLatest() newEventID := 0 if latestEvent != nil { newEventID = latestEvent.EventID + 1 } - newEvent = &TaskEvent{ - Id: v.Id, + newEvent = &TaskEventType{ + Event: Event{ + EventTime: time.Now(), + }, + JobId: v.Id, EventID: newEventID, - EventType: eventType, - EventTime: time.Now(), NotificationType: notificationType, Status: notificationStatus, Message: message, diff --git a/server/repository/repository.go b/server/repository/repository.go index f20ccb9..bbb3dd1 100644 --- a/server/repository/repository.go +++ b/server/repository/repository.go @@ -18,18 +18,21 @@ var ( type Repository interface { getConnection() (SQLDBOperations, error) Initialize(ctx context.Context) error - PingServerUpdate(ctx context.Context, name string, ip string) error - GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEvent, error) + PingServerUpdate(ctx context.Context, pingEventType model.PingEventType) error + GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEventType, error) GetJobs(ctx context.Context) (*[]model.Job, error) GetJobsByStatus(ctx context.Context, status model.NotificationStatus) (jobs []*model.Job, returnError error) GetJob(ctx context.Context, uuid string) (*model.Job, error) GetJobByPath(ctx context.Context, path string) (*model.Job, error) AddJob(ctx context.Context, video *model.Job) error UpdateJob(ctx context.Context, video *model.Job) error - AddNewTaskEvent(ctx context.Context, event *model.TaskEvent) error + ProgressJob(ctx context.Context, progressJob *model.TaskProgressType) error + DeleteProgressJob(ctx context.Context, progressId string, notificationType model.NotificationType) error + AddNewTaskEvent(ctx context.Context, event *model.TaskEventType) error WithTransaction(ctx context.Context, transactionFunc func(ctx context.Context, tx Repository) error) error GetWorker(ctx context.Context, name string) (*model.Worker, error) RetrieveQueuedJob(ctx context.Context) (*model.Job, error) + GetAllProgressJobs(ctx context.Context) ([]model.TaskProgressType, error) } type SQLDBOperations interface { @@ -332,7 +335,7 @@ func (s *SQLRepository) getJobsByStatus(ctx context.Context, tx SQLDBOperations, return jobs, nil } -func (s *SQLRepository) GetTimeoutJobs(ctx context.Context, timeout time.Duration) (taskEvent []*model.TaskEvent, returnError error) { +func (s *SQLRepository) GetTimeoutJobs(ctx context.Context, timeout time.Duration) (taskEvent []*model.TaskEventType, returnError error) { conn, err := s.getConnection() if err != nil { return nil, err @@ -412,16 +415,16 @@ func (s *SQLRepository) getJobs(ctx context.Context, tx SQLDBOperations) (*[]mod return &jobs, nil } -func (s *SQLRepository) getTaskEvents(ctx context.Context, tx SQLDBOperations, uuid string) ([]*model.TaskEvent, error) { +func (s *SQLRepository) getTaskEvents(ctx context.Context, tx SQLDBOperations, uuid string) ([]*model.TaskEventType, error) { rows, err := tx.QueryContext(ctx, "select * from job_events where job_id=$1 order by event_time asc", uuid) if err != nil { return nil, err } defer rows.Close() - var taskEvents []*model.TaskEvent + var taskEvents []*model.TaskEventType for rows.Next() { - event := model.TaskEvent{} - err := rows.Scan(&event.Id, &event.EventID, &event.WorkerName, &event.EventTime, &event.EventType, &event.NotificationType, &event.Status, &event.Message) + event := model.TaskEventType{} + err := rows.Scan(&event.JobId, &event.EventID, &event.WorkerName, &event.EventTime, &event.NotificationType, &event.Status, &event.Message) if err != nil { return nil, err } @@ -494,33 +497,56 @@ func (s *SQLRepository) GetJobByPath(ctx context.Context, path string) (video *m return s.getJobByPath(ctx, conn, path) } -func (s *SQLRepository) PingServerUpdate(ctx context.Context, name string, ip string) (returnError error) { +func (s *SQLRepository) PingServerUpdate(ctx context.Context, pingEventType model.PingEventType) (returnError error) { conn, err := s.getConnection() if err != nil { return err } - _, err = conn.ExecContext(ctx, "INSERT INTO workers (name, ip,last_seen ) VALUES ($1,$2,$3) ON CONFLICT (name) DO UPDATE SET ip = $2, last_seen=$3;", name, ip, time.Now()) + _, err = conn.ExecContext(ctx, "INSERT INTO workers (name, ip,last_seen ) VALUES ($1,$2,$3) ON CONFLICT (name) DO UPDATE SET ip = $2, last_seen=$3;", pingEventType.WorkerName, pingEventType.IP, time.Now()) return err } -func (s *SQLRepository) AddNewTaskEvent(ctx context.Context, event *model.TaskEvent) (returnError error) { +func (s *SQLRepository) AddNewTaskEvent(ctx context.Context, event *model.TaskEventType) (returnError error) { conn, err := s.getConnection() if err != nil { return err } return s.addNewTaskEvent(ctx, conn, event) +} +func (s *SQLRepository) ProgressJob(ctx context.Context, progressJob *model.TaskProgressType) (returnError error) { + conn, err := s.getConnection() + if err != nil { + return err + } + return s.insertOrUpdateProgressJob(ctx, conn, progressJob) } -func (s *SQLRepository) addNewTaskEvent(ctx context.Context, tx SQLDBOperations, event *model.TaskEvent) error { - rows, err := tx.QueryContext(ctx, "select COALESCE(max(job_event_id),-1) from job_events where job_id=$1", event.Id.String()) +func (s *SQLRepository) DeleteProgressJob(ctx context.Context, progressId string, notificationType model.NotificationType) error { + conn, err := s.getConnection() + if err != nil { + return err + } + return s.deleteProgressJob(ctx, conn, progressId, notificationType) +} + +func (s *SQLRepository) GetAllProgressJobs(ctx context.Context) ([]model.TaskProgressType, error) { + conn, err := s.getConnection() + if err != nil { + return nil, err + } + return s.getAllProgressJobs(ctx, conn) +} + +func (s *SQLRepository) addNewTaskEvent(ctx context.Context, tx SQLDBOperations, event *model.TaskEventType) error { + rows, err := tx.QueryContext(ctx, "select COALESCE(max(job_event_id),-1) from job_events where job_id=$1", event.JobId.String()) if err != nil { return err } videoEventID := -1 if rows.Next() { - err := rows.Scan(&videoEventID) + err = rows.Scan(&videoEventID) if err != nil { return err } @@ -528,11 +554,11 @@ func (s *SQLRepository) addNewTaskEvent(ctx context.Context, tx SQLDBOperations, rows.Close() if videoEventID+1 != event.EventID { - return fmt.Errorf("EventID for %s not match,lastReceived %d, new %d", event.Id.String(), videoEventID, event.EventID) + return fmt.Errorf("EventID for %s not match,lastReceived %d, new %d", event.JobId.String(), videoEventID, event.EventID) } - _, err = tx.ExecContext(ctx, "INSERT INTO job_events (job_id, job_event_id,worker_name,event_time,event_type,notification_type,status,message)"+ - " VALUES ($1,$2,$3,$4,$5,$6,$7,$8)", event.Id.String(), event.EventID, event.WorkerName, time.Now(), event.EventType, event.NotificationType, event.Status, event.Message) + _, err = tx.ExecContext(ctx, "INSERT INTO job_events (job_id, job_event_id,worker_name,event_time,notification_type,status,message)"+ + " VALUES ($1,$2,$3,$4,$5,$6,$7)", event.JobId.String(), event.EventID, event.WorkerName, time.Now(), event.NotificationType, event.Status, event.Message) return err } func (s *SQLRepository) AddJob(ctx context.Context, job *model.Job) error { @@ -562,21 +588,21 @@ func (s *SQLRepository) updateJob(ctx context.Context, tx SQLDBOperations, job * return err } -func (s *SQLRepository) getTimeoutJobs(ctx context.Context, tx SQLDBOperations, timeout time.Duration) ([]*model.TaskEvent, error) { +func (s *SQLRepository) getTimeoutJobs(ctx context.Context, tx SQLDBOperations, timeout time.Duration) ([]*model.TaskEventType, error) { timeoutDate := time.Now().Add(-timeout) rows, err := tx.QueryContext(ctx, "select v.* from job_events v right join "+ "(select job_id,max(job_event_id) as job_event_id from job_events where notification_type='Job' group by job_id) as m "+ "on m.job_id=v.job_id and m.job_event_id=v.job_event_id where status in ('assigned','started') and v.event_time < $1::timestamptz", timeoutDate) - if err != nil { return nil, err } + defer rows.Close() - var taskEvents []*model.TaskEvent + var taskEvents []*model.TaskEventType for rows.Next() { - event := model.TaskEvent{} - err := rows.Scan(&event.Id, &event.EventID, &event.WorkerName, &event.EventTime, &event.EventType, &event.NotificationType, &event.Status, &event.Message) + event := model.TaskEventType{} + err := rows.Scan(&event.JobId, &event.EventID, &event.WorkerName, &event.EventTime, &event.NotificationType, &event.Status, &event.Message) if err != nil { return nil, err } @@ -616,19 +642,65 @@ func (s *SQLRepository) WithTransaction(ctx context.Context, transactionFunc fun func (s *SQLRepository) queuedJob(ctx context.Context, tx SQLDBOperations) (*model.Job, error) { rows, err := tx.QueryContext(ctx, "select job_id, job_event_id from job_status where notification_type='Job' and status='queued' order by event_time asc limit 1") - if err != nil { return nil, err } defer rows.Close() if rows.Next() { - event := model.TaskEvent{} - err := rows.Scan(&event.Id, &event.EventID) + event := model.TaskEventType{} + err := rows.Scan(&event.JobId, &event.EventID) if err != nil { return nil, err } - return s.getJob(ctx, tx, event.Id.String()) + return s.getJob(ctx, tx, event.JobId.String()) } return nil, fmt.Errorf("%w, %s", ErrElementNotFound, "No jobs found") } + +func (s *SQLRepository) insertOrUpdateProgressJob(ctx context.Context, conn SQLDBOperations, jp *model.TaskProgressType) error { + query := ` + INSERT INTO job_progress + (progress_id, notification_type,job_id,worker_name, percent, eta) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (progress_id,notification_type) + DO UPDATE SET + percent = $5, + eta = $6, + last_update = now()` + + _, err := conn.ExecContext(ctx, query, jp.ProgressID, jp.NotificationType, jp.JobId, jp.WorkerName, jp.Percent, jp.ETA.Seconds()) + if err != nil { + return err + } + return nil +} + +func (s *SQLRepository) deleteProgressJob(ctx context.Context, conn SQLDBOperations, progressId string, notificationType model.NotificationType) error { + _, err := conn.ExecContext(ctx, "DELETE FROM job_progress WHERE progress_id=$1 and notification_type=$2", progressId, notificationType) + if err != nil { + return err + } + return nil + +} + +func (s *SQLRepository) getAllProgressJobs(ctx context.Context, conn SQLDBOperations) ([]model.TaskProgressType, error) { + rows, err := conn.QueryContext(ctx, "select progress_id, notification_type,job_id,worker_name, percent, eta, last_update from job_progress") + if err != nil { + return nil, err + } + defer rows.Close() + var progressJobs []model.TaskProgressType + var etaSeconds float64 + if rows.Next() { + progress := model.TaskProgressType{} + err = rows.Scan(&progress.ProgressID, &progress.NotificationType, &progress.JobId, &progress.WorkerName, &progress.Percent, &etaSeconds, &progress.EventTime) + if err != nil { + return nil, err + } + progress.ETA = time.Duration(etaSeconds) * time.Second + progressJobs = append(progressJobs, progress) + } + return progressJobs, nil +} diff --git a/server/repository/resources/database/004_21012025.sql b/server/repository/resources/database/004_21012025.sql new file mode 100644 index 0000000..5b6f0ba --- /dev/null +++ b/server/repository/resources/database/004_21012025.sql @@ -0,0 +1,106 @@ +-- Set workers as unlogged for extra performance +ALTER TABLE workers SET UNLOGGED; + +-- to remove event_type we need to recreate funcs and triggers that updates job_status table +DROP TRIGGER IF EXISTS event_insert_job_status_update ON job_events; + + +DROP FUNCTION IF EXISTS fn_trigger_job_status_update(); + +DROP FUNCTION IF EXISTS fn_job_status_update(varchar, integer, varchar, timestamp, varchar, varchar, varchar, text); + +ALTER TABLE job_events DROP COLUMN event_type; + +ALTER TABLE job_status drop column event_type; + + + +CREATE OR REPLACE FUNCTION fn_job_status_update( + p_job_id varchar, + p_job_event_id integer, + p_worker_name varchar, + p_event_time timestamp, + p_notification_type varchar, + p_status varchar, + p_message text +) RETURNS VOID SECURITY DEFINER LANGUAGE plpgsql AS $$ +DECLARE + p_video_path varchar; +BEGIN + SELECT + v.source_path INTO p_video_path + FROM + jobs v + WHERE + v.id = p_job_id; + + INSERT INTO job_status ( + job_id, + job_event_id, + video_path, + worker_name, + event_time, + notification_type, + status, + message + ) + VALUES ( + p_job_id, + p_job_event_id, + p_video_path, + p_worker_name, + p_event_time, + p_notification_type, + p_status, + p_message + ) + ON CONFLICT ON CONSTRAINT job_status_pkey DO + UPDATE + SET + job_event_id = p_job_event_id, + video_path = p_video_path, + worker_name = p_worker_name, + event_time = p_event_time, + notification_type = p_notification_type, + status = p_status, + message = p_message; +END; +$$; + +CREATE OR REPLACE FUNCTION fn_trigger_job_status_update() RETURNS TRIGGER SECURITY DEFINER LANGUAGE plpgsql AS $$ +BEGIN + PERFORM fn_job_status_update( + NEW.job_id, + NEW.job_event_id, + NEW.worker_name, + NEW.event_time, + NEW.notification_type, + NEW.status, + NEW.message + ); + + RETURN NEW; +END; +$$; + +CREATE TRIGGER event_insert_job_status_update + AFTER INSERT + ON job_events + FOR EACH ROW +EXECUTE PROCEDURE fn_trigger_job_status_update(); + + +CREATE UNLOGGED TABLE IF NOT EXISTS job_progress ( + progress_id varchar(255) NOT NULL, + notification_type varchar(255) NOT NULL, + job_id varchar(255) NOT NULL, + worker_name varchar(255) NOT NULL, + percent real NOT NULL, + eta real NOT NULL, + last_update timestamp NOT NULL DEFAULT NOW(), + PRIMARY KEY (progress_id,notification_type), + FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE +); + + + diff --git a/server/scheduler/scheduler.go b/server/scheduler/scheduler.go index b6203a2..29c8efc 100644 --- a/server/scheduler/scheduler.go +++ b/server/scheduler/scheduler.go @@ -29,8 +29,8 @@ type Scheduler interface { GetUploadJobWriter(ctx context.Context, uuid string, workerName string) (*UploadJobStream, error) GetDownloadJobWriter(ctx context.Context, uuid string, workerName string) (*DownloadJobStream, error) GetChecksum(ctx context.Context, uuid string) (string, error) - RequestJob(ctx context.Context, workerName string) (*model.TaskEncode, error) - HandleWorkerEvent(ctx context.Context, taskEvent *model.TaskEvent) error + RequestJob(ctx context.Context, workerName string) (*model.RequestJobResponse, error) + HandleWorkerEvent(ctx context.Context, taskEvent *model.EnvelopEvent) error CancelJob(ctx context.Context, id string) error } @@ -51,7 +51,7 @@ type RuntimeScheduler struct { handleEventMu sync.Mutex } -func (r *RuntimeScheduler) RequestJob(ctx context.Context, workerName string) (*model.TaskEncode, error) { +func (r *RuntimeScheduler) RequestJob(ctx context.Context, workerName string) (*model.RequestJobResponse, error) { r.jobRequestMu.Lock() defer r.jobRequestMu.Unlock() video, err := r.repo.RetrieveQueuedJob(ctx) @@ -64,13 +64,13 @@ func (r *RuntimeScheduler) RequestJob(ctx context.Context, workerName string) (* if video == nil { return nil, nil } - newEvent := video.AddEvent(model.NotificationEvent, model.JobNotification, model.AssignedNotificationStatus) + newEvent := video.AddEvent(model.JobNotification, model.AssignedNotificationStatus) newEvent.WorkerName = workerName if err = r.repo.AddNewTaskEvent(ctx, newEvent); err != nil { return nil, err } - task := &model.TaskEncode{ + task := &model.RequestJobResponse{ Id: video.Id, EventID: video.Events.GetLatest().EventID, } @@ -82,18 +82,12 @@ func (r *RuntimeScheduler) RequestJob(ctx context.Context, workerName string) (* return task, nil } -func (r *RuntimeScheduler) HandleWorkerEvent(ctx context.Context, jobEvent *model.TaskEvent) error { +func (r *RuntimeScheduler) HandleWorkerEvent(ctx context.Context, envelopedEvent *model.EnvelopEvent) error { r.handleEventMu.Lock() defer r.handleEventMu.Unlock() - if err := r.processEvent(ctx, jobEvent); err != nil { + if err := r.processEvent(ctx, envelopedEvent); err != nil { return err } - - if jobEvent.IsCompleted() { - if err := r.completeJob(ctx, jobEvent); err != nil { - return err - } - } return nil } @@ -112,7 +106,7 @@ func (r *RuntimeScheduler) CancelJob(ctx context.Context, id string) error { case status == model.CanceledNotificationStatus: return fmt.Errorf("job already canceled") case status == model.AssignedNotificationStatus, status == model.StartedNotificationStatus: - newEvent := job.AddEventComplete(model.NotificationEvent, model.JobNotification, model.CanceledNotificationStatus, "Job canceled by user") + newEvent := job.AddEventComplete(model.JobNotification, model.CanceledNotificationStatus, "Job canceled by user") err = r.repo.AddNewTaskEvent(ctx, newEvent) if err != nil { return err @@ -121,29 +115,60 @@ func (r *RuntimeScheduler) CancelJob(ctx context.Context, id string) error { return fmt.Errorf("job %s is in unknown state", id) } -func (r *RuntimeScheduler) processEvent(ctx context.Context, taskEvent *model.TaskEvent) error { +func (r *RuntimeScheduler) processEvent(ctx context.Context, event *model.EnvelopEvent) error { var err error - switch taskEvent.EventType { + switch event.EventType { case model.PingEvent: - err = r.repo.PingServerUpdate(ctx, taskEvent.WorkerName, taskEvent.IP) + pingEvent := model.PingEventType{} + if err = json.Unmarshal(event.EventData, &pingEvent); err != nil { + return err + } + return r.repo.PingServerUpdate(ctx, pingEvent) case model.NotificationEvent: - err = r.repo.AddNewTaskEvent(ctx, taskEvent) + taskEvent := model.TaskEventType{} + if err = json.Unmarshal(event.EventData, &taskEvent); err != nil { + return err + } + if err = r.repo.AddNewTaskEvent(ctx, &taskEvent); err != nil { + return err + } + if !taskEvent.IsCompleted() { + return nil + } + if err = r.completeJob(ctx, &taskEvent); err != nil { + return err + } + case model.ProgressEvent: + taskProgress := model.TaskProgressType{} + if err = json.Unmarshal(event.EventData, &taskProgress); err != nil { + return err + } + if taskProgress.Status != model.ProgressingTaskProgressTypeStatus { + if err = r.repo.DeleteProgressJob(ctx, taskProgress.ProgressID, taskProgress.NotificationType); err != nil { + return err + } + return nil + } + if err = r.repo.ProgressJob(ctx, &taskProgress); err != nil { + return err + } + default: - err = fmt.Errorf("unknown event type %s", taskEvent.EventType) + return fmt.Errorf("unknown event type %s", event.EventType) } - return err + return nil } -func (r *RuntimeScheduler) completeJob(ctx context.Context, jobEvent *model.TaskEvent) error { - video, err := r.repo.GetJob(ctx, jobEvent.Id.String()) +func (r *RuntimeScheduler) completeJob(ctx context.Context, jobEvent *model.TaskEventType) error { + video, err := r.repo.GetJob(ctx, jobEvent.JobId.String()) if err != nil { return err } sourcePath := filepath.Join(r.config.SourcePath, video.SourcePath) target := filepath.Join(r.config.SourcePath, video.TargetPath) l := log.WithFields(log.Fields{ - "job_id": jobEvent.Id.String(), + "job_id": jobEvent.JobId.String(), "source_path": sourcePath, "target_path": target, }) @@ -203,17 +228,25 @@ func (r *RuntimeScheduler) start(ctx context.Context) { } func (r *RuntimeScheduler) scheduleRoutine(ctx context.Context) { + progressTicker := time.NewTicker(time.Minute * 1) + maintenanceTicker := time.NewTicker(r.config.ScheduleTime) + defer progressTicker.Stop() + defer maintenanceTicker.Stop() + for { select { case <-ctx.Done(): return case checksumPath := <-r.checksumChan: r.pathChecksumMap[checksumPath.path] = checksumPath.checksum - case <-time.After(r.config.ScheduleTime): + case <-progressTicker.C: + if err := r.progressJobMaitenance(ctx); err != nil { + log.Errorf("Error on progress job maintenance: %s", err) + } + case <-maintenanceTicker.C: if err := r.jobMaintenance(ctx); err != nil { - log.Errorf("Error on job maintenance %s", err) + log.Errorf("Error on job maintenance: %s", err) } - } } } @@ -289,7 +322,7 @@ func (r *RuntimeScheduler) scheduleJobRequest(ctx context.Context, jobRequest *m return err } - var eventsToAdd []*model.TaskEvent + var eventsToAdd []*model.TaskEventType if job == nil { job, err = r.newJob(ctx, tx, jobRequest) if err != nil { @@ -298,7 +331,7 @@ func (r *RuntimeScheduler) scheduleJobRequest(ctx context.Context, jobRequest *m eventsToAdd = job.Events } else { // If job exist we check if we can retry the job - eventsToAdd, err = r.updateTerminatedJobByRequest(job, jobRequest) + eventsToAdd, err = r.updateJobByRequest(job, jobRequest) if err != nil { return err } @@ -333,7 +366,7 @@ func (r *RuntimeScheduler) newJob(ctx context.Context, tx repository.Repository, if err != nil { return nil, err } - job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus) + job.AddEvent(model.JobNotification, model.QueuedNotificationStatus) return job, nil } @@ -466,6 +499,21 @@ func (r *RuntimeScheduler) stop() { } +func (r *RuntimeScheduler) progressJobMaitenance(ctx context.Context) error { + progressJobs, err := r.repo.GetAllProgressJobs(ctx) + if err != nil { + return err + } + for _, progressJob := range progressJobs { + if time.Since(progressJob.EventTime) > time.Hour*24 { + if err = r.repo.DeleteProgressJob(ctx, progressJob.ProgressID, progressJob.NotificationType); err != nil { + return err + } + } + } + return nil +} + func (r *RuntimeScheduler) jobMaintenance(ctx context.Context) error { if err := r.queuedJobMaintenance(ctx); err != nil { return err @@ -487,7 +535,7 @@ func (r *RuntimeScheduler) queuedJobMaintenance(ctx context.Context) error { // Check if source file exists _, err = os.Stat(sourcePath) if os.IsNotExist(err) { - newEvent := job.AddEventComplete(model.NotificationEvent, model.JobNotification, model.FailedNotificationStatus, "job source file not found") + newEvent := job.AddEventComplete(model.JobNotification, model.FailedNotificationStatus, "job source file not found") if err = r.repo.AddNewTaskEvent(ctx, newEvent); err != nil { return err } @@ -525,8 +573,8 @@ func (r *RuntimeScheduler) assignedJobMaintenance(ctx context.Context) error { } for _, taskEvent := range taskEvents { if taskEvent.IsAssigned() { - log.Infof("Rescheduling %s after job timeout", taskEvent.Id.String()) - job, err := r.repo.GetJob(ctx, taskEvent.Id.String()) + log.Infof("Rescheduling %s after job timeout", taskEvent.JobId.String()) + job, err := r.repo.GetJob(ctx, taskEvent.JobId.String()) if err != nil { return err } @@ -544,20 +592,20 @@ func (r *RuntimeScheduler) assignedJobMaintenance(ctx context.Context) error { return nil } -func (r *RuntimeScheduler) updateTerminatedJobByRequest(job *model.Job, jobRequest *model.JobRequest) ([]*model.TaskEvent, error) { - var eventsToAdd []*model.TaskEvent +func (r *RuntimeScheduler) updateJobByRequest(job *model.Job, jobRequest *model.JobRequest) ([]*model.TaskEventType, error) { + var eventsToAdd []*model.TaskEventType lastEvent := job.Events.GetLatestPerNotificationType(model.JobNotification) status := lastEvent.Status switch { case jobRequest.ForceAssigned && (status == model.AssignedNotificationStatus || status == model.StartedNotificationStatus): - eventsToAdd = append(eventsToAdd, job.AddEvent(model.NotificationEvent, model.JobNotification, model.CanceledNotificationStatus)) - eventsToAdd = append(eventsToAdd, job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus)) + eventsToAdd = append(eventsToAdd, job.AddEvent(model.JobNotification, model.CanceledNotificationStatus)) + eventsToAdd = append(eventsToAdd, job.AddEvent(model.JobNotification, model.QueuedNotificationStatus)) case jobRequest.ForceCompleted && status == model.CompletedNotificationStatus, jobRequest.ForceFailed && status == model.FailedNotificationStatus, jobRequest.ForceCanceled && status == model.CanceledNotificationStatus: - requeueEvent := job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus) + requeueEvent := job.AddEvent(model.JobNotification, model.QueuedNotificationStatus) eventsToAdd = append(eventsToAdd, requeueEvent) default: return nil, fmt.Errorf("%s (%s) job is in %s state by %s, can not be rescheduled", job.Id.String(), jobRequest.SourcePath, lastEvent.Status, lastEvent.WorkerName) diff --git a/server/web/web.go b/server/web/web.go index d70802c..281ec1b 100644 --- a/server/web/web.go +++ b/server/web/web.go @@ -56,13 +56,13 @@ func (s *Server) requestJob(writer http.ResponseWriter, request *http.Request) { } func (s *Server) handleWorkerEvent(writer http.ResponseWriter, request *http.Request) { - taskEvent := &model.TaskEvent{} - err := json.NewDecoder(request.Body).Decode(taskEvent) + envelopEvent := &model.EnvelopEvent{} + err := json.NewDecoder(request.Body).Decode(envelopEvent) if webError(writer, err, 500) { return } - err = s.scheduler.HandleWorkerEvent(s.ctx, taskEvent) + err = s.scheduler.HandleWorkerEvent(s.ctx, envelopEvent) if webError(writer, err, 500) { return } diff --git a/worker/config/config.go b/worker/config/config.go index 5e9deeb..aa21553 100644 --- a/worker/config/config.go +++ b/worker/config/config.go @@ -21,18 +21,19 @@ type FFMPEGConfig struct { VideoPreset string `mapstructure:"videoPreset" envconfig:"WORKER_FFMPEG_VIDEOPRESET"` VideoProfile string `mapstructure:"videoProfile" envconfig:"WORKER_FFMPEG_VIDEOPROFILE"` VideoCRF int `mapstructure:"videoCRF" envconfig:"WORKER_FFMPEG_VIDEOCRF"` + Threads int `mapstructure:"threads" envconfig:"WORKER_FFMPEG_THREADS"` + ExtraArgs string `mapstructure:"extraArgs" envconfig:"WORKER_FFMPEG_EXTRA_ARGS"` } type Config struct { - TemporalPath string `mapstructure:"temporalPath" envconfig:"WORKER_TMP_PATH"` - Name string `mapstructure:"name" envconfig:"WORKER_NAME"` - Threads int `mapstructure:"threads" envconfig:"WORKER_THREADS"` - Priority int `mapstructure:"priority" envconfig:"WORKER_PRIORITY"` - StartAfter *time.Duration `mapstructure:"startAfter" envconfig:"WORKER_START_AFTER"` - StopAfter *time.Duration `mapstructure:"stopAfter" envconfig:"WORKER_STOP_AFTER"` - Paused bool - PGSConfig *PGSConfig `mapstructure:"pgsConfig"` - EncodeConfig *FFMPEGConfig `mapstructure:"ffmpegConfig"` + TemporalPath string `mapstructure:"temporalPath" envconfig:"WORKER_TMP_PATH"` + Name string `mapstructure:"name" envconfig:"WORKER_NAME"` + StartAfter *time.Duration `mapstructure:"startAfter" envconfig:"WORKER_START_AFTER"` + StopAfter *time.Duration `mapstructure:"stopAfter" envconfig:"WORKER_STOP_AFTER"` + Paused bool + PGSConfig *PGSConfig `mapstructure:"pgsConfig"` + EncodeConfig *FFMPEGConfig `mapstructure:"ffmpegConfig"` + VerifyDeltaTime float64 `mapstructure:"verifyDeltaTime" envconfig:"WORKER_VERIFY_DELTA_TIME"` } func (c Config) HaveSettedPeriodTime() bool { diff --git a/worker/console/console.go b/worker/console/console.go new file mode 100644 index 0000000..99daef7 --- /dev/null +++ b/worker/console/console.go @@ -0,0 +1,125 @@ +package console + +import ( + "context" + "github.com/jedib0t/go-pretty/v6/progress" + "github.com/jedib0t/go-pretty/v6/text" + log "github.com/sirupsen/logrus" + "sync" + "time" + "transcoder/model" +) + +var ( + unitScales = []int64{ + 1000000000000000, + 1000000000000, + 1000000000, + 1000000, + 1000, + } +) + +type RenderService struct { + pw progress.Writer +} + +func NewRenderService() *RenderService { + pw := newProgressWriter() + return &RenderService{ + pw: pw, + } +} +func (e *RenderService) Run(wg *sync.WaitGroup, ctx context.Context) { + log.Info("Starting Console...") + go e.pw.Render() + wg.Add(1) + go func() { + <-ctx.Done() + e.pw.Stop() + log.Info("Stopping Console...") + wg.Done() + }() +} + +func (e *RenderService) StepTracker(id string, notificationType model.NotificationType, logger LeveledLogger) *StepTracker { + progressTracker, color := newProgressTracker(id, notificationType) + e.pw.AppendTracker(progressTracker) + return &StepTracker{ + id: id, + stepType: notificationType, + progressTracker: progressTracker, + color: color, + logger: logger, + } +} + +func (e *RenderService) Logger(opts ...PrinterLoggerOption) LeveledLogger { + return newPrinterLogger(e.pw, opts...) +} + +func newProgressWriter() progress.Writer { + pw := progress.NewWriter() + pw.SetAutoStop(false) + pw.SetTrackerLength(40) + pw.SetMessageLength(50) + // pw.SetNumTrackersExpected(15) + pw.SetSortBy(progress.SortByPercent) + pw.SetStyle(progress.StyleDefault) + pw.SetTrackerPosition(progress.PositionRight) + pw.SetUpdateFrequency(time.Second * 1) + pw.Style().Colors = progress.StyleColorsExample + pw.Style().Options.PercentFormat = "%4.2f%%" + pw.Style().Visibility.ETA = true + pw.Style().Visibility.ETAOverall = true + pw.Style().Visibility.Percentage = true + pw.Style().Visibility.Pinned = false + pw.Style().Visibility.Speed = true + pw.Style().Visibility.SpeedOverall = true + pw.Style().Visibility.Time = true + pw.Style().Visibility.TrackerOverall = false + pw.Style().Visibility.Value = true + pw.Style().Visibility.Pinned = false + pw.Style().Options.TimeInProgressPrecision = time.Millisecond + pw.Style().Options.TimeDonePrecision = time.Millisecond + + return pw +} + +func newProgressTracker(id string, notificationType model.NotificationType) (*progress.Tracker, *text.Color) { + var unit progress.Units + var color text.Color + switch notificationType { + case model.DownloadNotification: + unit = progress.UnitsBytes + color = text.FgWhite + case model.UploadNotification: + unit = progress.UnitsBytes + color = text.FgGreen + case model.PGSNotification: + unit = progress.UnitsBytes + color = text.FgWhite + case model.FFMPEGSNotification: + unit = progress.Units{ + Notation: "", + NotationPosition: progress.UnitsNotationPositionBefore, + Formatter: func(value int64) string { + return formatNumber(value, map[int64]string{ + 1000000000000000: "PFrame", + 1000000000000: "TFrame", + 1000000000: "GFrame", + 1000000: "MFrame", + 1000: "KFrame", + 0: "Frame", + }) + }, + } + color = text.FgBlue + } + progressTracker := &progress.Tracker{ + Message: color.Sprintf("[%s] %s", id, notificationType), + Total: 0, + Units: unit, + } + return progressTracker, &color +} diff --git a/worker/console/logger.go b/worker/console/logger.go new file mode 100644 index 0000000..7044d34 --- /dev/null +++ b/worker/console/logger.go @@ -0,0 +1,61 @@ +package console + +import ( + "fmt" + "github.com/jedib0t/go-pretty/v6/progress" + "github.com/jedib0t/go-pretty/v6/text" + "time" +) + +type LeveledLogger interface { + Errorf(msg string, keysAndValues ...interface{}) + Warnf(msg string, keysAndValues ...interface{}) + Logf(msg string, keysAndValues ...interface{}) + Cmdf(msg string, keysAndValues ...interface{}) +} + +type PrinterLogger struct { + pw progress.Writer + messagePrefix string +} + +type PrinterLoggerOption func(*PrinterLogger) + +func WithMessagePrefix(prefix string) PrinterLoggerOption { + return func(pl *PrinterLogger) { + pl.messagePrefix = prefix + } +} + +func newPrinterLogger(pw progress.Writer, opts ...PrinterLoggerOption) *PrinterLogger { + pl := &PrinterLogger{pw: pw} + for _, opt := range opts { + opt(pl) + } + + return pl +} +func (c *PrinterLogger) Logf(msg string, a ...interface{}) { + c.pw.Log(c.log(msg, a...)) +} + +func (c *PrinterLogger) Warnf(msg string, a ...interface{}) { + c.pw.Log(text.FgHiYellow.Sprint(c.log(msg, a...))) +} + +func (c *PrinterLogger) Cmdf(msg string, a ...interface{}) { + c.pw.Log(text.FgHiCyan.Sprint(c.log(msg, a...))) +} + +func (c *PrinterLogger) Errorf(msg string, a ...interface{}) { + c.pw.Log(text.FgHiRed.Sprint(c.log(msg, a...))) +} + +func (c *PrinterLogger) log(msg string, a ...interface{}) string { + timestamp := time.Now().Format("2006-01-02 15:04:05") + printedMessage := fmt.Sprintf(msg, a...) + if c.messagePrefix != "" { + return fmt.Sprintf("%s %s %s", timestamp, c.messagePrefix, printedMessage) + } + return printedMessage +} diff --git a/worker/console/stepTracker.go b/worker/console/stepTracker.go new file mode 100644 index 0000000..ecf704a --- /dev/null +++ b/worker/console/stepTracker.go @@ -0,0 +1,59 @@ +package console + +import ( + "fmt" + "github.com/jedib0t/go-pretty/v6/progress" + "github.com/jedib0t/go-pretty/v6/text" + "time" + "transcoder/model" +) + +type StepTracker struct { + id string + stepType model.NotificationType + progressTracker *progress.Tracker + color *text.Color + logger LeveledLogger +} + +func (t *StepTracker) ETA() time.Duration { + return t.progressTracker.ETA() +} + +func (t *StepTracker) PercentDone() float64 { + return t.progressTracker.PercentDone() +} + +func (t *StepTracker) SetTotal(total int64) { + t.progressTracker.UpdateTotal(total) +} + +func (t *StepTracker) UpdateValue(value int64) { + t.progressTracker.SetValue(value) +} + +func (t *StepTracker) Increment(increment int) { + t.progressTracker.Increment(int64(increment)) +} + +func (t *StepTracker) Done() { + t.progressTracker.SetValue(t.progressTracker.Total) + t.progressTracker.MarkAsDone() +} + +func (t *StepTracker) Error() { + t.progressTracker.MarkAsErrored() +} + +func (t *StepTracker) Logger() LeveledLogger { + return t.logger +} + +func formatNumber(value int64, notations map[int64]string) string { + for _, unitScale := range unitScales { + if value >= unitScale { + return fmt.Sprintf("%.2f%s", float64(value)/float64(unitScale), notations[unitScale]) + } + } + return fmt.Sprintf("%d%s", value, notations[0]) +} diff --git a/worker/ffmpeg/ffprobe.go b/worker/ffmpeg/ffprobe.go new file mode 100644 index 0000000..9ba95d4 --- /dev/null +++ b/worker/ffmpeg/ffprobe.go @@ -0,0 +1,187 @@ +package ffmpeg + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "gopkg.in/vansante/go-ffprobe.v2" + "os" + "strconv" + "strings" + "time" +) + +type Video struct { + Id uint8 + Duration time.Duration + FrameRate int +} +type Audio struct { + Id uint8 + Language string + Channels string + ChannelsNumber uint8 + ChannelLayour string + Default bool + Bitrate uint + Title string +} +type Subtitle struct { + Id uint8 + Language string + Forced bool + Comment bool + Format string + Title string +} +type NormalizedFFProbe struct { + Video *Video + Audios []*Audio + Subtitle []*Subtitle +} + +func (c *NormalizedFFProbe) HaveImageTypeSubtitle() bool { + for _, sub := range c.Subtitle { + if sub.IsImageTypeSubtitle() { + return true + } + } + return false +} + +func (c *NormalizedFFProbe) GetPGSSubtitles() []*Subtitle { + var PGSTOSrt []*Subtitle + for _, subt := range c.Subtitle { + if subt.IsImageTypeSubtitle() { + PGSTOSrt = append(PGSTOSrt, subt) + } + } + return PGSTOSrt +} + +func (c *NormalizedFFProbe) ToJson() string { + b, err := json.Marshal(c) + if err != nil { + panic(err) + } + return string(b) +} +func (s *Subtitle) IsImageTypeSubtitle() bool { + return strings.Contains(strings.ToLower(s.Format), "pgs") +} + +func ExtractFFProbeData(ctx context.Context, inputFile string) (data *ffprobe.ProbeData, err error) { + fileReader, err := os.Open(inputFile) + if err != nil { + return nil, fmt.Errorf("error opening file %s because %v", inputFile, err) + } + + defer fileReader.Close() + data, err = ffprobe.ProbeReader(ctx, fileReader) + if err != nil { + return nil, fmt.Errorf("error getting data: %v", err) + } + return data, nil +} + +func ffProbeFrameRate(ffprobeFrameRate string) (frameRate int, err error) { + rate := 0 + frameRatio := 0 + avgFrameSpl := strings.Split(ffprobeFrameRate, "/") + if len(avgFrameSpl) != 2 { + return 0, errors.New("invalid Format") + } + + frameRatio, err = strconv.Atoi(avgFrameSpl[0]) + if err != nil { + return 0, err + } + rate, err = strconv.Atoi(avgFrameSpl[1]) + if err != nil { + return 0, err + } + return frameRatio / rate, nil +} + +func NormalizeFFProbeData(data *ffprobe.ProbeData) (container *NormalizedFFProbe, err error) { + container = &NormalizedFFProbe{} + + videoStream := data.StreamType(ffprobe.StreamVideo)[0] + frameRate, err := ffProbeFrameRate(videoStream.AvgFrameRate) + if err != nil { + frameRate = 24 + } + + container.Video = &Video{ + Id: uint8(videoStream.Index), + Duration: data.Format.Duration(), + FrameRate: frameRate, + } + + betterAudioStreamPerLanguage := make(map[string]*Audio) + for _, stream := range data.StreamType(ffprobe.StreamAudio) { + if stream.BitRate == "" { + stream.BitRate = "0" + } + bitRateInt, err := strconv.ParseUint(stream.BitRate, 10, 32) // TODO Aqui revem diferents tipos de numeros + if err != nil { + panic(err) + } + newAudio := &Audio{ + Id: uint8(stream.Index), + Language: stream.Tags.Language, + Channels: stream.ChannelLayout, + ChannelsNumber: uint8(stream.Channels), + ChannelLayour: stream.ChannelLayout, + Default: stream.Disposition.Default == 1, + Bitrate: uint(bitRateInt), + Title: stream.Tags.Title, + } + betterAudio := betterAudioStreamPerLanguage[newAudio.Language] + + // If more channels or same channels and better bitrate + if betterAudio != nil { + if newAudio.ChannelsNumber > betterAudio.ChannelsNumber { + betterAudioStreamPerLanguage[newAudio.Language] = newAudio + } else if newAudio.ChannelsNumber == betterAudio.ChannelsNumber && newAudio.Bitrate > betterAudio.Bitrate { + betterAudioStreamPerLanguage[newAudio.Language] = newAudio + } + } else { + betterAudioStreamPerLanguage[stream.Tags.Language] = newAudio + } + + } + for _, audioStream := range betterAudioStreamPerLanguage { + container.Audios = append(container.Audios, audioStream) + } + + betterSubtitleStreamPerLanguage := make(map[string]*Subtitle) + for _, stream := range data.StreamType(ffprobe.StreamSubtitle) { + newSubtitle := &Subtitle{ + Id: uint8(stream.Index), + Language: stream.Tags.Language, + Forced: stream.Disposition.Forced == 1, + Comment: stream.Disposition.Comment == 1, + Format: stream.CodecName, + Title: stream.Tags.Title, + } + + if newSubtitle.Forced || newSubtitle.Comment { + container.Subtitle = append(container.Subtitle, newSubtitle) + continue + } + // TODO Filter Languages we don't want + betterSubtitle := betterSubtitleStreamPerLanguage[newSubtitle.Language] + if betterSubtitle == nil { // TODO Potser perdem subtituls que es necesiten + betterSubtitleStreamPerLanguage[stream.Tags.Language] = newSubtitle + } else { + // TODO aixo es temporal per fer proves, borrar aquest else!! + container.Subtitle = append(container.Subtitle, newSubtitle) + } + } + for _, value := range betterSubtitleStreamPerLanguage { + container.Subtitle = append(container.Subtitle, value) + } + return container, nil +} diff --git a/worker/job/job.go b/worker/job/job.go new file mode 100644 index 0000000..2d7a1ac --- /dev/null +++ b/worker/job/job.go @@ -0,0 +1,105 @@ +package job + +import ( + "encoding/json" + "fmt" + "github.com/google/uuid" + "io" + "os" + "path/filepath" + "sync" + "transcoder/model" + "transcoder/worker/ffmpeg" +) + +type Context struct { + JobId uuid.UUID `json:"job_id"` + EventId int `json:"event_id"` + WorkingDir string `json:"working_dir"` + LastEvent *ContextEvent `json:"last_event"` + Source *VideoData `json:"source"` + Target *VideoData `json:"target"` + mu sync.Mutex +} + +type VideoData struct { + FilePath string `json:"file_path"` + Checksum string `json:"checksum"` + FFProbeData *ffmpeg.NormalizedFFProbe `json:"ffprobe_data"` + Size int64 +} + +type ContextEvent struct { + EventId int `json:"event_id"` + NotificationType model.NotificationType `json:"notification_type"` + Status model.NotificationStatus `json:"status"` + Message string `json:"message"` +} + +func NewContext(jobId uuid.UUID, lastEvent int, workingDir string) *Context { + return &Context{ + mu: sync.Mutex{}, + JobId: jobId, + EventId: lastEvent, + WorkingDir: workingDir, + Source: &VideoData{}, + } +} +func (j *Context) Init() error { + return os.MkdirAll(j.WorkingDir, os.ModePerm) +} + +func (j *Context) Clean() error { + return os.RemoveAll(j.WorkingDir) +} + +func (j *Context) UpdateEvent(notificationType model.NotificationType, status model.NotificationStatus, message string) { + j.mu.Lock() + defer j.mu.Unlock() + newEventID := j.EventId + 1 + if j.LastEvent != nil { + newEventID = j.LastEvent.EventId + 1 + } + JobCtxEvent := &ContextEvent{ + EventId: newEventID, + NotificationType: notificationType, + Status: status, + Message: message, + } + j.LastEvent = JobCtxEvent +} + +func (e *Context) PersistJobContext() error { + b, err := json.MarshalIndent(e, "", "\t") + if err != nil { + return err + } + eventFile, err := os.OpenFile(filepath.Join(e.WorkingDir, fmt.Sprintf("%s.json", e.JobId)), os.O_TRUNC|os.O_CREATE|os.O_RDWR, os.ModePerm) + if err != nil { + return err + } + defer eventFile.Close() + _, err = eventFile.Write(b) + if err != nil { + return err + } + return eventFile.Sync() +} + +func ReadContextFromDiskByPath(filepath string) *Context { + eventFile, err := os.Open(filepath) + if err != nil { + panic(err) + } + defer eventFile.Close() + b, err := io.ReadAll(eventFile) + if err != nil { + panic(err) + } + jobContext := &Context{} + err = json.Unmarshal(b, jobContext) + if err != nil { + panic(err) + } + return jobContext +} diff --git a/worker/main.go b/worker/main.go index b6299b6..4f919d7 100644 --- a/worker/main.go +++ b/worker/main.go @@ -13,8 +13,9 @@ import ( "transcoder/cmd" "transcoder/update" "transcoder/version" + "transcoder/worker/console" "transcoder/worker/serverclient" - "transcoder/worker/task" + "transcoder/worker/worker" ) var ( @@ -50,6 +51,8 @@ func init() { pflag.String("worker.ffmpegConfig.videoCodec", "libx265", "FFMPEG Video Codec") pflag.String("worker.ffmpegConfig.videoPreset", "medium", "FFMPEG Video Preset") pflag.String("worker.ffmpegConfig.videoProfile", "main10", "FFMPEG Video Profile") + pflag.String("worker.ffmpegConfig.extraArgs", "", "FFMPEG Extra Args") + pflag.Int("worker.verifyDeltaTime", 60, "FFMPEG Verify Delta Time in seconds, is the max range of time that the video can be different from the original, if is superior then the video is marked as invalid") pflag.Int("worker.ffmpegConfig.videoCRF", 21, "FFMPEG Video CRF") pflag.Duration("worker.startAfter", 0, "Accept jobs only After HH:mm") pflag.Duration("worker.stopAfter", 0, "Stop Accepting new Jobs after HH:mm") @@ -94,17 +97,19 @@ func main() { } func applicationRun(wg *sync.WaitGroup, ctx context.Context, updater *update.Updater) error { - printer := task.NewConsoleWorkerPrinter() + renderService := console.NewRenderService() + renderService.Run(wg, ctx) + serverClient := serverclient.NewServerClient(opts.Web, opts.Worker.Name) - if err := serverClient.PublishPing(); err != nil { + if err := serverClient.PublishPingEvent(); err != nil { return err } - encodeWorker := task.NewEncodeWorker(opts.Worker, serverClient, printer) + encodeWorker := worker.NewEncodeWorker(opts.Worker, serverClient, renderService) encodeWorker.Run(wg, ctx) - coordinator := task.NewServerCoordinator(serverClient, encodeWorker, updater, printer) + coordinator := worker.NewServerCoordinator(serverClient, encodeWorker, updater, renderService.Logger()) coordinator.Run(wg, ctx) return nil } diff --git a/worker/serverclient/server_client.go b/worker/serverclient/server_client.go index 1d54aab..4e8cba6 100644 --- a/worker/serverclient/server_client.go +++ b/worker/serverclient/server_client.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "github.com/google/uuid" retryablehttp "github.com/hashicorp/go-retryablehttp" "io" "net/http" @@ -16,9 +15,10 @@ import ( ) type ServerClient struct { - webServerConfig *web.Config - httpClient *http.Client - workerName string + webServerConfig *web.Config + retriableHttpClient *http.Client + workerName string + httpClient *http.Client } func NewServerClient(webServerConfig *web.Config, workerName string) *ServerClient { @@ -36,14 +36,14 @@ func NewServerClient(webServerConfig *web.Config, workerName string) *ServerClie // } return &ServerClient{ - webServerConfig: webServerConfig, - workerName: workerName, - httpClient: client.StandardClient(), + webServerConfig: webServerConfig, + workerName: workerName, + retriableHttpClient: client.StandardClient(), + httpClient: &http.Client{}, } } -func (s *ServerClient) PublishEvent(event model.TaskEvent) error { - event.WorkerName = s.workerName +func (s *ServerClient) publishEvent(event *model.EnvelopEvent) error { b, err := json.Marshal(event) if err != nil { return err @@ -53,8 +53,12 @@ func (s *ServerClient) PublishEvent(event model.TaskEvent) error { fmt.Printf("Error creating request: %v\n", err) return err } - - resp, err := s.httpClient.Do(req) + var resp *http.Response + if event.EventType == model.PingEvent || event.EventType == model.ProgressEvent { + resp, err = s.httpClient.Do(req) + } else { + resp, err = s.retriableHttpClient.Do(req) + } if err != nil { return err } @@ -68,13 +72,13 @@ func (s *ServerClient) PublishEvent(event model.TaskEvent) error { var NoJobAvailable = errors.New("no job available") -func (s *ServerClient) RequestJob(workerName string) (*model.TaskEncode, error) { +func (s *ServerClient) RequestJob() (*model.RequestJobResponse, error) { req, err := s.request("GET", "/api/v1/job/request", nil) if err != nil { return nil, err } - req.Header.Set("workerName", workerName) - resp, err := s.httpClient.Do(req) + req.Header.Set("workerName", s.workerName) + resp, err := s.retriableHttpClient.Do(req) if err != nil { return nil, err } @@ -91,7 +95,7 @@ func (s *ServerClient) RequestJob(workerName string) (*model.TaskEncode, error) if err != nil { return nil, err } - job := &model.TaskEncode{} + job := &model.RequestJobResponse{} err = json.Unmarshal(body, job) if err != nil { return nil, err @@ -99,22 +103,14 @@ func (s *ServerClient) RequestJob(workerName string) (*model.TaskEncode, error) return job, nil } -func (s *ServerClient) GetDownloadURL(id uuid.UUID) string { - return fmt.Sprintf("%s?uuid=%s", s.GetURL("/api/v1/download"), id.String()) -} - -func (s *ServerClient) GetChecksumURL(id uuid.UUID) string { - return fmt.Sprintf("%s?uuid=%s", s.GetURL("/api/v1/checksum"), id.String()) -} - -func (s *ServerClient) GetUploadURL(id uuid.UUID) string { - return fmt.Sprintf("%s?uuid=%s", s.GetURL("/api/v1/upload"), id.String()) -} - func (s *ServerClient) GetURL(uri string) string { return fmt.Sprintf("%s%s", s.webServerConfig.Domain, uri) } +func (s *ServerClient) GetBaseDomain() string { + return s.webServerConfig.Domain +} + func (s *ServerClient) request(method string, uri string, body io.Reader) (*http.Request, error) { req, err := http.NewRequest(method, s.GetURL(uri), body) if err != nil { @@ -129,16 +125,53 @@ func (s *ServerClient) request(method string, uri string, body io.Reader) (*http return req, nil } -func (s *ServerClient) PublishPing() error { +func (s *ServerClient) PublishPingEvent() error { publicIp, err := helper.GetPublicIP() if err != nil { return err } - pingEvent := model.TaskEvent{ - EventType: model.PingEvent, - WorkerName: s.workerName, - EventTime: time.Now(), - IP: publicIp, + pingEvent := model.PingEventType{ + Event: model.Event{ + EventTime: time.Now(), + WorkerName: s.workerName, + }, + IP: publicIp, + } + event, err := envelopEvent(model.PingEvent, pingEvent) + if err != nil { + return err + } + + return s.publishEvent(event) +} + +func (s *ServerClient) PublishTaskEvent(taskEvent *model.TaskEventType) error { + taskEvent.WorkerName = s.workerName + event, err := envelopEvent(model.NotificationEvent, taskEvent) + if err != nil { + return err + } + + return s.publishEvent(event) +} + +func (s *ServerClient) PublishTaskProgressEvent(taskProgress *model.TaskProgressType) error { + taskProgress.WorkerName = s.workerName + event, err := envelopEvent(model.ProgressEvent, taskProgress) + if err != nil { + return err + } + + return s.publishEvent(event) +} + +func envelopEvent(eventType model.EventType, eventData interface{}) (*model.EnvelopEvent, error) { + b, err := json.Marshal(eventData) + if err != nil { + return nil, err } - return s.PublishEvent(pingEvent) + return &model.EnvelopEvent{ + EventType: eventType, + EventData: b, + }, nil } diff --git a/worker/step/MKVExtractStep.go b/worker/step/MKVExtractStep.go new file mode 100644 index 0000000..0e14402 --- /dev/null +++ b/worker/step/MKVExtractStep.go @@ -0,0 +1,69 @@ +package step + +import ( + "context" + "fmt" + "path/filepath" + "regexp" + "strconv" + "transcoder/helper" + "transcoder/helper/command" + "transcoder/model" + "transcoder/worker/job" +) + +type MKVExtractStepExecutor struct { +} + +func NewMKVExtractStepExecutor(options ...ExecutorOption) *Executor { + mkvStep := &MKVExtractStepExecutor{} + return NewStepExecutor(model.MKVExtractNotification, mkvStep.actions, options...) +} + +func (m *MKVExtractStepExecutor) actions(jobContext *job.Context) []Action { + return []Action{ + { + Execute: func(ctx context.Context, stepTracker Tracker) error { + return m.mkvExtract(ctx, stepTracker, jobContext) + }, + Id: jobContext.JobId.String(), + }, + } + +} + +func (m *MKVExtractStepExecutor) mkvExtract(ctx context.Context, tracker Tracker, jobContext *job.Context) error { + tracker.SetTotal(100) + var outLog string + progressRegex := regexp.MustCompile(`Progress: (\d+)%`) + mkvExtractCommand := command.NewCommand(helper.GetMKVExtractPath(), "tracks", jobContext.Source.FilePath). + SetWorkDir(jobContext.WorkingDir). + BuffSize(128). + SetStdoutFunc(func(buffer []byte, exit bool) { + str := string(buffer) + outLog += str + progressMatch := progressRegex.FindStringSubmatch(str) + if len(progressMatch) > 0 { + p, err := strconv.Atoi(progressMatch[len(progressMatch)-1]) + if err != nil { + return + } + tracker.UpdateValue(int64(p)) + } + }) + + mkvExtractCommand.AddEnv("LC_ALL=C") + mkvExtractCommand.AddEnv(fmt.Sprintf("LD_LIBRARY_PATH=%s", filepath.Dir(helper.GetMKVExtractPath()))) + + for _, subtitle := range jobContext.Source.FFProbeData.GetPGSSubtitles() { + mkvExtractCommand.AddParam(fmt.Sprintf("%d:%d.sup", subtitle.Id, subtitle.Id)) + } + + _, err := mkvExtractCommand.RunWithContext(ctx, command.NewAllowedCodesOption(0, 1)) + if err != nil { + tracker.Logger().Cmdf("MKVExtract Command:%s", mkvExtractCommand.GetFullCommand()) + return fmt.Errorf("MKVExtract unexpected error:%v", err) + } + + return nil +} diff --git a/worker/task/pgs.go b/worker/step/PGStoSrtStep.go similarity index 71% rename from worker/task/pgs.go rename to worker/step/PGStoSrtStep.go index e052491..964027e 100644 --- a/worker/task/pgs.go +++ b/worker/step/PGStoSrtStep.go @@ -1,11 +1,10 @@ -package task +package step import ( "context" "errors" "fmt" "github.com/asticode/go-astisub" - log "github.com/sirupsen/logrus" "os" "regexp" "strconv" @@ -14,45 +13,40 @@ import ( "transcoder/helper/command" "transcoder/model" "transcoder/worker/config" + "transcoder/worker/ffmpeg" + "transcoder/worker/job" ) -var langMapping []PGSTesseractLanguage - -type PGSWorker struct { - workerConfig *config.Config +type PGSToSrtStepExecutor struct { + pgsConfig *config.PGSConfig } -type PGSTesseractLanguage struct { - tessLanguage string - mappingLanguage []string -} +func NewPGSToSrtStepExecutor(pgsConfig *config.PGSConfig, opts ...ExecutorOption) *Executor { + pgsStep := &PGSToSrtStepExecutor{ + pgsConfig: pgsConfig, + } -func init() { - langMapping = append(langMapping, PGSTesseractLanguage{"deu", []string{"ger", "ge", "de"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"eus", []string{"baq", "eus"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"eng", []string{"en", "uk"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"spa", []string{"es", "esp"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"deu", []string{"det"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"fra", []string{"fre"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"chi_tra", []string{"chi","zho"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"ell", []string{"gre"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"isl", []string{"ice"}}) - langMapping = append(langMapping, PGSTesseractLanguage{"ces", []string{"cze"}}) + return NewStepExecutor(model.PGSNotification, pgsStep.actions, opts...) } -func NewPGSWorker(workerConfig *config.Config) *PGSWorker { - encodeWorker := &PGSWorker{ - workerConfig: workerConfig, + +func (p *PGSToSrtStepExecutor) actions(jobContext *job.Context) []Action { + var pgsStepActions []Action + for _, pgs := range jobContext.Source.FFProbeData.GetPGSSubtitles() { + pgsStepActions = append(pgsStepActions, Action{ + Execute: func(ctx context.Context, stepTracker Tracker) error { + return p.convertPGSToSrt(ctx, stepTracker, jobContext, pgs) + }, + Id: fmt.Sprintf("%s %d", jobContext.JobId, pgs.Id), + }) } - return encodeWorker + return pgsStepActions } -func (p *PGSWorker) ConvertPGS(ctx context.Context, taskPGS model.TaskPGS, taskTrack *TaskTracks) (err error) { - log.Debugf("Converting PGS To Srt for Job stream %d", taskPGS.PGSID) - inputFilePath := taskPGS.PGSSourcePath - outputFilePath := taskPGS.PGSTargetPath - - language := calculateTesseractLanguage(taskPGS.PGSLanguage) - pgsConfig := p.workerConfig.PGSConfig +func (p *PGSToSrtStepExecutor) convertPGSToSrt(ctx context.Context, tracker Tracker, jobContext *job.Context, subtitle *ffmpeg.Subtitle) error { + pgsConfig := p.pgsConfig + inputFilePath := fmt.Sprintf("%s/%d.sup", jobContext.WorkingDir, subtitle.Id) + outputFilePath := fmt.Sprintf("%s/%d.srt", jobContext.WorkingDir, subtitle.Id) + language := calculateTesseractLanguage(subtitle.Language) PGSToSrtCommand := command.NewCommand(pgsConfig.DotnetPath, pgsConfig.DLLPath, "--tesseractversion", strconv.Itoa(pgsConfig.TessVersion), @@ -61,7 +55,7 @@ func (p *PGSWorker) ConvertPGS(ctx context.Context, taskPGS model.TaskPGS, taskT "--input", inputFilePath, "--output", outputFilePath, "--tesseractlanguage", language, - "--tesseractdata", pgsConfig.TesseractDataPath) + "--tesseractdata", pgsConfig.TesseractDataPath).SetWorkDir(jobContext.WorkingDir) outLog := "" startRegex := regexp.MustCompile(`Starting OCR for (\d+) items`) progressRegex := regexp.MustCompile(`Processed item (\d+)`) @@ -74,7 +68,7 @@ func (p *PGSWorker) ConvertPGS(ctx context.Context, taskPGS model.TaskPGS, taskT if err != nil { return } - taskTrack.UpdateValue(int64(p)) + tracker.UpdateValue(int64(p)) } startMatch := startRegex.FindStringSubmatch(str) if len(startMatch) > 0 { @@ -82,7 +76,7 @@ func (p *PGSWorker) ConvertPGS(ctx context.Context, taskPGS model.TaskPGS, taskT if err != nil { return } - taskTrack.SetTotal(int64(t)) + tracker.SetTotal(int64(t)) } }) @@ -90,7 +84,7 @@ func (p *PGSWorker) ConvertPGS(ctx context.Context, taskPGS model.TaskPGS, taskT PGSToSrtCommand.SetStderrFunc(func(buffer []byte, exit bool) { errLog += string(buffer) }) - log.Debugf("PGSTOSrt Command: %s", PGSToSrtCommand.GetFullCommand()) + tracker.Logger().Cmdf("PGSTOSrt Command: %s", PGSToSrtCommand.GetFullCommand()) ecode, err := PGSToSrtCommand.RunWithContext(ctx) pgslog := fmt.Sprintf("stdout: %s, stderr: %s", outLog, errLog) if err != nil { @@ -134,10 +128,30 @@ func (p *PGSWorker) ConvertPGS(ctx context.Context, taskPGS model.TaskPGS, taskT return fmt.Errorf("could not write to file: %v", err) } - log.Debugf("Converted PGS To Srt for Job stream %d", taskPGS.PGSID) return err } +var langMapping []PGSTesseractLanguage + +type PGSTesseractLanguage struct { + tessLanguage string + mappingLanguage []string +} + +func init() { + langMapping = append(langMapping, PGSTesseractLanguage{"deu", []string{"ger", "ge", "de"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"eus", []string{"baq", "eus"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"eng", []string{"en", "uk"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"spa", []string{"es", "esp"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"deu", []string{"det"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"fra", []string{"fre"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"chi_tra", []string{"chi", "zho"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"ell", []string{"gre"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"isl", []string{"ice"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"ces", []string{"cze"}}) + langMapping = append(langMapping, PGSTesseractLanguage{"ron", []string{"rum"}}) +} + func calculateTesseractLanguage(language string) string { for _, mapping := range langMapping { for _, mapLang := range mapping.mappingLanguage { diff --git a/worker/step/downloadStep.go b/worker/step/downloadStep.go new file mode 100644 index 0000000..6b0365a --- /dev/null +++ b/worker/step/downloadStep.go @@ -0,0 +1,188 @@ +package step + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "github.com/avast/retry-go" + "github.com/google/uuid" + "io" + "mime" + "net/http" + "os" + "path/filepath" + "strconv" + "time" + "transcoder/model" + "transcoder/worker/console" + "transcoder/worker/ffmpeg" + "transcoder/worker/job" +) + +var errJobNotFound = errors.New("job Not found") + +type DownloadStepExecutor struct { + workerName string + BaseDomainURL string +} + +func NewDownloadStepExecutor(workerName string, baseDomainUrl string, options ...ExecutorOption) *Executor { + downloadStep := &DownloadStepExecutor{ + workerName: workerName, + BaseDomainURL: baseDomainUrl, + } + return NewStepExecutor(model.DownloadNotification, downloadStep.actions, options...) +} + +func (d *DownloadStepExecutor) actions(jobContext *job.Context) []Action { + return []Action{ + { + Execute: func(ctx context.Context, stepTracker Tracker) error { + videoData, err := d.download(ctx, stepTracker, jobContext) + if err != nil { + return err + } + jobContext.Source = videoData + return nil + }, + Id: jobContext.JobId.String(), + }, + } + +} + +func (d *DownloadStepExecutor) download(ctx context.Context, tracker Tracker, jobContext *job.Context) (*job.VideoData, error) { + var sourceFilePath string + var sourceChecksum string + var fileSize int64 + logger := tracker.Logger() + err := retry.Do(func() error { + req, err := http.NewRequestWithContext(ctx, "GET", d.GetDownloadURL(jobContext.JobId), nil) + if err != nil { + return err + } + req.Header.Set("workerName", d.workerName) + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + if resp.StatusCode == http.StatusNotFound { + return errJobNotFound + } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("not 200 respose in download code %d", resp.StatusCode) + } + defer resp.Body.Close() + contentLength, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) + tracker.SetTotal(contentLength) + if err != nil { + return err + } + _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition")) + if err != nil { + return err + } + + sourceFilePath = filepath.Join(jobContext.WorkingDir, fmt.Sprintf("%s%s", jobContext.JobId.String(), filepath.Ext(params["filename"]))) + downloadFile, err := os.Create(sourceFilePath) + if err != nil { + return err + } + + defer downloadFile.Close() + + reader := NewProgressTrackStream(tracker, resp.Body) + + fileSize, err = io.Copy(downloadFile, reader) + if err != nil { + return err + } + if fileSize != contentLength { + return fmt.Errorf("file size error on download source:%d downloaded:%d", contentLength, fileSize) + } + sourceChecksum = hex.EncodeToString(reader.SumSha()) + bodyString, err := d.getChecksum(jobContext, logger) + if err != nil { + return err + } + + if sourceChecksum != bodyString { + return fmt.Errorf("checksum error on download source:%s downloaded:%s", bodyString, sourceChecksum) + } + + tracker.UpdateValue(contentLength) + return nil + }, retry.Delay(time.Second*5), + retry.DelayType(retry.FixedDelay), + retry.Attempts(180), // 15 min + retry.LastErrorOnly(true), + retry.OnRetry(func(n uint, err error) { + logger.Errorf("Error on downloading job %v", err) + }), + retry.RetryIf(func(err error) bool { + return !(errors.Is(err, context.Canceled) || errors.Is(err, errJobNotFound)) + })) + if err != nil { + return nil, err + } + + ffprobeData, err := ffmpeg.ExtractFFProbeData(ctx, sourceFilePath) + if err != nil { + return nil, err + } + + normalizedFFProbeData, err := ffmpeg.NormalizeFFProbeData(ffprobeData) + if err != nil { + return nil, err + } + + return &job.VideoData{ + FilePath: sourceFilePath, + Checksum: sourceChecksum, + Size: fileSize, + FFProbeData: normalizedFFProbeData, + }, nil +} + +func (d *DownloadStepExecutor) getChecksum(jobContext *job.Context, logger console.LeveledLogger) (string, error) { + var bodyString string + err := retry.Do(func() error { + respsha256, err := http.Get(d.GetChecksumURL(jobContext.JobId)) + if err != nil { + return err + } + defer respsha256.Body.Close() + if respsha256.StatusCode != http.StatusOK { + return fmt.Errorf("not 200 respose in sha265 code %d", respsha256.StatusCode) + } + + bodyBytes, err := io.ReadAll(respsha256.Body) + if err != nil { + return err + } + bodyString = string(bodyBytes) + return nil + }, retry.Delay(time.Second*5), + retry.Attempts(10), + retry.LastErrorOnly(true), + retry.OnRetry(func(n uint, err error) { + logger.Errorf("error %v on calculate checksum of downloaded job", err) + }), + retry.RetryIf(func(err error) bool { + return !errors.Is(err, context.Canceled) + })) + if err != nil { + return "", err + } + return bodyString, nil +} + +func (d *DownloadStepExecutor) GetDownloadURL(id uuid.UUID) string { + return fmt.Sprintf("%s%s?uuid=%s", d.BaseDomainURL, "/api/v1/download", id.String()) +} + +func (d *DownloadStepExecutor) GetChecksumURL(id uuid.UUID) string { + return fmt.Sprintf("%s%s?uuid=%s", d.BaseDomainURL, "/api/v1/checksum", id.String()) + +} diff --git a/worker/step/ffmpegStep.go b/worker/step/ffmpegStep.go new file mode 100644 index 0000000..dd58a12 --- /dev/null +++ b/worker/step/ffmpegStep.go @@ -0,0 +1,260 @@ +package step + +import ( + "context" + "crypto/sha256" + "fmt" + "gopkg.in/ini.v1" + "io" + "os" + "path/filepath" + "regexp" + "runtime" + "strings" + "time" + "transcoder/helper" + "transcoder/helper/command" + "transcoder/model" + "transcoder/worker/config" + "transcoder/worker/console" + "transcoder/worker/ffmpeg" + "transcoder/worker/job" +) + +type FFMPEGStepExecutor struct { + ffmpegConfig *config.FFMPEGConfig +} + +func NewFFMPEGStepExecutor(ffmpegConfig *config.FFMPEGConfig, options ...ExecutorOption) *Executor { + ffmpegStep := &FFMPEGStepExecutor{ + ffmpegConfig, + } + return NewStepExecutor(model.FFMPEGSNotification, ffmpegStep.actions, options...) +} + +func (f *FFMPEGStepExecutor) actions(jobContext *job.Context) []Action { + return []Action{ + { + Execute: func(ctx context.Context, stepTracker Tracker) error { + return f.encode(ctx, stepTracker, jobContext) + }, + Id: jobContext.JobId.String(), + }, + } + +} + +func (f *FFMPEGStepExecutor) encode(ctx context.Context, stepTracker Tracker, jobContext *job.Context) error { + FFMPEGProgressChan := make(chan int64) + go f.ffmpegProgressRoutine(ctx, jobContext, stepTracker, FFMPEGProgressChan) + err := f.ffmpeg(ctx, stepTracker.Logger(), jobContext, FFMPEGProgressChan) + if err != nil { + return err + } + + return nil +} + +func (f *FFMPEGStepExecutor) ffmpegProgressRoutine(ctx context.Context, job *job.Context, tracker Tracker, ffmpegProgressChan chan int64) { + tracker.SetTotal(int64(job.Source.FFProbeData.Video.Duration.Seconds()) * int64(job.Source.FFProbeData.Video.FrameRate)) + for { + select { + case <-ctx.Done(): + return + case progress, open := <-ffmpegProgressChan: + if !open { + return + } + tracker.UpdateValue(progress) + } + } +} + +func (f *FFMPEGStepExecutor) ffmpeg(ctx context.Context, logger console.LeveledLogger, jobContext *job.Context, ffmpegProgressChan chan<- int64) error { + ffmpegGenerator := &FFMPEGGenerator{Config: f.ffmpegConfig} + ffmpegGenerator.setInputFilters(jobContext) + ffmpegGenerator.setVideoFilters(jobContext.Source.FFProbeData) + ffmpegGenerator.setAudioFilters(jobContext.Source.FFProbeData) + ffmpegGenerator.setSubtFilters(jobContext.Source.FFProbeData) + ffmpegErrLog := "" + + checkPercentageFFMPEG := func(buffer []byte, exit bool) { + ffmpegErrLog += string(buffer) + } + + stdoutFFMPEG := func(buffer []byte, exit bool) { + cfg, err := ini.Load(buffer) + if err != nil { + return + } + s := cfg.Section("") + progress := s.Key("progress").String() + if progress == "continue" { + var progressValue int64 + outTimeUs, err := s.Key("out_time_ms").Int64() + if err == nil { + progressValue = (outTimeUs / 1000000) * int64(jobContext.Source.FFProbeData.Video.FrameRate) + } + // If out_time_ms is not present, we can use frame as a fallback, even is not as precise + if progressValue == 0 { + frame, err := s.Key("frame").Int64() + if err != nil { + return + } + progressValue = frame + } + + ffmpegProgressChan <- progressValue + + } + if exit { + close(ffmpegProgressChan) + } + } + sourceFileName := filepath.Base(jobContext.Source.FilePath) + encodedFilePath := fmt.Sprintf("%s-encoded.%s", strings.TrimSuffix(sourceFileName, filepath.Ext(sourceFileName)), "mkv") + targetPath := filepath.Join(jobContext.WorkingDir, encodedFilePath) + + ffmpegArguments := ffmpegGenerator.buildArguments(uint8(f.ffmpegConfig.Threads), f.ffmpegConfig.ExtraArgs, targetPath) + logger.Cmdf("FFMPEG Command:%s %s", helper.GetFFmpegPath(), ffmpegArguments) + ffmpegCommand := command.NewCommandByString(helper.GetFFmpegPath(), ffmpegArguments). + SetWorkDir(jobContext.WorkingDir). + SetStdoutFunc(stdoutFFMPEG). + SetStderrFunc(checkPercentageFFMPEG) + + if runtime.GOOS == "linux" { + ffmpegCommand.AddEnv(fmt.Sprintf("LD_LIBRARY_PATH=%s", filepath.Dir(helper.GetFFmpegPath()))) + } + exitCode, err := ffmpegCommand.RunWithContext(ctx) + if err != nil { + return fmt.Errorf("%w: stder:%s", err, ffmpegErrLog) + } + if exitCode != 0 { + return fmt.Errorf("exit code %d: stder:%s", exitCode, ffmpegErrLog) + } + + <-time.After(time.Second * 1) + ffprobeData, err := ffmpeg.ExtractFFProbeData(ctx, targetPath) + if err != nil { + return err + } + + normalizedFFProbeData, err := ffmpeg.NormalizeFFProbeData(ffprobeData) + if err != nil { + return err + } + + sha256str, err := hashFileSHA256(targetPath) + if err != nil { + return err + } + + jobContext.Target = &job.VideoData{ + FilePath: targetPath, + Checksum: sha256str, + FFProbeData: normalizedFFProbeData, + } + return nil +} + +type FFMPEGGenerator struct { + Config *config.FFMPEGConfig + inputPaths []string + VideoFilter string + AudioFilter []string + SubtitleFilter []string + Metadata string +} + +func (f *FFMPEGGenerator) setAudioFilters(container *ffmpeg.NormalizedFFProbe) { + + for index, audioStream := range container.Audios { + // TODO que pasa quan el channelLayout esta empty?? + title := fmt.Sprintf("%s (%s)", audioStream.Language, audioStream.ChannelLayour) + metadata := fmt.Sprintf(" -metadata:s:a:%d \"title=%s\"", index, title) + codecQuality := fmt.Sprintf("-c:a:%d %s -vbr %d", index, f.Config.AudioCodec, f.Config.AudioVBR) + f.AudioFilter = append(f.AudioFilter, fmt.Sprintf(" -map 0:%d %s %s", audioStream.Id, metadata, codecQuality)) + } +} +func (f *FFMPEGGenerator) setVideoFilters(container *ffmpeg.NormalizedFFProbe) { + videoFilterParameters := "\"scale='min(1920,iw)':-1:force_original_aspect_ratio=decrease\"" + videoEncoderQuality := fmt.Sprintf("-pix_fmt yuv420p10le -c:v %s -crf %d -profile:v %s -preset %s", f.Config.VideoCodec, f.Config.VideoCRF, f.Config.VideoProfile, f.Config.VideoPreset) + // TODO HDR?? + videoHDR := "" + f.VideoFilter = fmt.Sprintf("-map 0:%d -avoid_negative_ts make_zero -copyts -map_chapters -1 -flags +global_header -filter:v %s %s %s", container.Video.Id, videoFilterParameters, videoHDR, videoEncoderQuality) + +} +func (f *FFMPEGGenerator) setSubtFilters(container *ffmpeg.NormalizedFFProbe) { + subtInputIndex := 1 + for index, subtitle := range container.Subtitle { + if subtitle.IsImageTypeSubtitle() { + subtitleMap := fmt.Sprintf("-map %d -c:s:%d srt", subtInputIndex, index) + subtitleForced := "" + subtitleComment := "" + if subtitle.Forced { + subtitleForced = fmt.Sprintf(" -disposition:s:s:%d forced -disposition:s:s:%d default", index, index) + } + if subtitle.Comment { + subtitleComment = fmt.Sprintf(" -disposition:s:s:%d comment", index) + } + + // Clean subtitle title to avoid PGS in title + re := regexp.MustCompile(`(?i)\(?pgs\)?`) + subtitleTitle := re.ReplaceAllString(subtitle.Title, "") + subtitleTitle = strings.TrimSpace(strings.ReplaceAll(subtitleTitle, " ", " ")) + + f.SubtitleFilter = append(f.SubtitleFilter, fmt.Sprintf("%s %s %s -metadata:s:s:%d language=%s -metadata:s:s:%d \"title=%s\" -max_interleave_delta 0", subtitleMap, subtitleForced, subtitleComment, index, subtitle.Language, index, subtitleTitle)) + subtInputIndex++ + } else { + f.SubtitleFilter = append(f.SubtitleFilter, fmt.Sprintf("-map 0:%d -c:s:%d copy", subtitle.Id, index)) + } + + } +} + +func (f *FFMPEGGenerator) buildArguments(threads uint8, extraArgs string, outputFilePath string) string { + coreParameters := fmt.Sprintf("-fflags +genpts -nostats %s -progress pipe:1 -hide_banner -threads %d -analyzeduration 2147483647 -probesize 2147483647", extraArgs, threads) + inputsParameters := "" + for _, input := range f.inputPaths { + inputsParameters = fmt.Sprintf("%s -i \"%s\"", inputsParameters, input) + } + //-ss 900 -t 10 + audioParameters := "" + for _, audio := range f.AudioFilter { + audioParameters = fmt.Sprintf("%s %s", audioParameters, audio) + } + subtParameters := "" + for _, subt := range f.SubtitleFilter { + subtParameters = fmt.Sprintf("%s %s", subtParameters, subt) + } + + return fmt.Sprintf("%s %s -max_muxing_queue_size 9999 %s %s %s %s %s -y", coreParameters, inputsParameters, f.VideoFilter, audioParameters, subtParameters, f.Metadata, outputFilePath) +} + +func (f *FFMPEGGenerator) setInputFilters(jobContext *job.Context) { + source := jobContext.Source + f.inputPaths = append(f.inputPaths, source.FilePath) + if source.FFProbeData.HaveImageTypeSubtitle() { + for _, subt := range source.FFProbeData.Subtitle { + if subt.IsImageTypeSubtitle() { + srtEncodedFile := filepath.Join(jobContext.WorkingDir, fmt.Sprintf("%d.srt", subt.Id)) + f.inputPaths = append(f.inputPaths, srtEncodedFile) + } + } + } +} + +func hashFileSHA256(filePath string) (string, error) { + file, err := os.Open(filePath) + if err != nil { + return "", err + } + defer file.Close() + + hash := sha256.New() + if _, err := io.Copy(hash, file); err != nil { + return "", err + } + + return fmt.Sprintf("%x", hash.Sum(nil)), nil +} diff --git a/worker/step/ffmpegVerifyStep.go b/worker/step/ffmpegVerifyStep.go new file mode 100644 index 0000000..d7e36bd --- /dev/null +++ b/worker/step/ffmpegVerifyStep.go @@ -0,0 +1,46 @@ +package step + +import ( + "fmt" + "golang.org/x/net/context" + "transcoder/model" + "transcoder/worker/job" +) + +type FFMPEGVerifyStep struct { + verifyDeltaTimeSeconds float64 +} + +func NewFFMPEGVerifyStepExecutor(verifyDeltaTimeSeconds float64, options ...ExecutorOption) *Executor { + verifyStep := &FFMPEGVerifyStep{ + verifyDeltaTimeSeconds: verifyDeltaTimeSeconds, + } + return NewStepExecutor(model.JobVerify, verifyStep.actions, options...) +} + +func (f *FFMPEGVerifyStep) actions(jobContext *job.Context) []Action { + return []Action{ + { + Execute: func(_ context.Context, _ Tracker) error { + return f.verifyJob(jobContext) + }, + Id: jobContext.JobId.String(), + }, + } + +} + +func (f *FFMPEGVerifyStep) verifyJob(jobContext *job.Context) error { + sourceData := jobContext.Source.FFProbeData + targetData := jobContext.Target.FFProbeData + + diffDuration := sourceData.Video.Duration.Seconds() - targetData.Video.Duration.Seconds() + if diffDuration > f.verifyDeltaTimeSeconds || diffDuration < (-1*f.verifyDeltaTimeSeconds) { + err := fmt.Errorf("source File duration %f is diferent than encoded %f", sourceData.Video.Duration.Seconds(), targetData.Video.Duration.Seconds()) + return err + } + if jobContext.Target.Size > jobContext.Source.Size { + return fmt.Errorf("source File size %d bytes is less than encoded %d bytes", jobContext.Source.Size, jobContext.Target.Size) + } + return nil +} diff --git a/worker/step/progressTracker.go b/worker/step/progressTracker.go new file mode 100644 index 0000000..118a364 --- /dev/null +++ b/worker/step/progressTracker.go @@ -0,0 +1,32 @@ +package step + +import ( + "crypto/sha256" + "hash" + "io" +) + +type ProgressTrackReader struct { + tracker Tracker + io.ReadCloser + sha hash.Hash +} + +func NewProgressTrackStream(tracker Tracker, reader io.ReadCloser) *ProgressTrackReader { + return &ProgressTrackReader{ + tracker: tracker, + ReadCloser: reader, + sha: sha256.New(), + } +} + +func (p *ProgressTrackReader) Read(b []byte) (n int, err error) { + n, err = p.ReadCloser.Read(b) + p.tracker.Increment(n) + p.sha.Write(b[0:n]) + return n, err +} + +func (p *ProgressTrackReader) SumSha() []byte { + return p.sha.Sum(nil) +} diff --git a/worker/step/stepExecutor.go b/worker/step/stepExecutor.go new file mode 100644 index 0000000..1ca1286 --- /dev/null +++ b/worker/step/stepExecutor.go @@ -0,0 +1,107 @@ +package step + +import ( + "context" + "transcoder/model" + "transcoder/worker/console" + "transcoder/worker/job" +) + +type Tracker interface { + SetTotal(total int64) + UpdateValue(value int64) + Increment(increment int) + Logger() console.LeveledLogger +} + +type ActionsFunc func(jobContext *job.Context) []Action +type ActionExecuteFunc func(ctx context.Context, stepTracker Tracker) error +type ActionJobContextFunc func(jobContext *job.Context) +type ActionOnErrorFunc func(jobContext *job.Context, notificationType model.NotificationType, err error) +type ExecutorOption func(stepExecutor *Executor) +type Executor struct { + stepChan chan *job.Context + notificationType model.NotificationType + OnJob ActionJobContextFunc + OnAdd ActionJobContextFunc + OnComplete ActionJobContextFunc + OnError ActionOnErrorFunc + actionsFunc ActionsFunc + parallelRunners int +} + +func (s *Executor) NotificationType() model.NotificationType { + return s.notificationType +} + +func (s *Executor) AddJob(job *job.Context) { + s.OnAdd(job) + s.stepChan <- job +} + +func (s *Executor) GetJobChan() <-chan *job.Context { + return s.stepChan +} +func (s *Executor) Actions(jobContext *job.Context) []Action { + return s.actionsFunc(jobContext) +} + +func (s *Executor) Parallel() int { + return s.parallelRunners +} + +func (s *Executor) Stop() { + close(s.stepChan) +} + +type Action struct { + Execute ActionExecuteFunc + Id string +} + +func NewStepExecutor(notificationType model.NotificationType, actionsFunc ActionsFunc, options ...ExecutorOption) *Executor { + stepExecutor := &Executor{ + stepChan: make(chan *job.Context, 20), + notificationType: notificationType, + actionsFunc: actionsFunc, + parallelRunners: 1, + OnAdd: func(*job.Context) {}, + OnJob: func(*job.Context) {}, + OnComplete: func(*job.Context) {}, + OnError: func(*job.Context, model.NotificationType, error) {}, + } + for _, option := range options { + option(stepExecutor) + } + return stepExecutor +} + +func WithParallelRunners(parallelRunners int) ExecutorOption { + return func(stepExecutor *Executor) { + stepExecutor.parallelRunners = parallelRunners + } +} + +func WithOnErrorOpt(onError ActionOnErrorFunc) ExecutorOption { + return func(stepExecutor *Executor) { + stepExecutor.OnError = onError + } +} + +func WithOnCompleteOpt(onComplete ActionJobContextFunc) ExecutorOption { + return func(stepExecutor *Executor) { + stepExecutor.OnComplete = onComplete + } +} + +func WithOnAdd(onAdd ActionJobContextFunc) ExecutorOption { + return func(stepExecutor *Executor) { + stepExecutor.OnAdd = onAdd + } +} + +func WithOnJob(onJob ActionJobContextFunc) ExecutorOption { + return func(stepExecutor *Executor) { + stepExecutor.OnJob = onJob + } +} diff --git a/worker/step/uploadStep.go b/worker/step/uploadStep.go new file mode 100644 index 0000000..a360353 --- /dev/null +++ b/worker/step/uploadStep.go @@ -0,0 +1,95 @@ +package step + +import ( + "context" + "errors" + "fmt" + "github.com/avast/retry-go" + "github.com/google/uuid" + "io" + "net/http" + "os" + "strconv" + "time" + "transcoder/model" + "transcoder/worker/job" +) + +type UploadStepExecutor struct { + BaseDomainURL string + workerName string +} + +func NewUploadStepExecutor(workerName string, baseDomainUrl string, options ...ExecutorOption) *Executor { + uploadStep := &UploadStepExecutor{ + workerName: workerName, + BaseDomainURL: baseDomainUrl, + } + return NewStepExecutor(model.UploadNotification, uploadStep.actions, options...) +} + +func (u *UploadStepExecutor) actions(jobContext *job.Context) []Action { + return []Action{ + { + Execute: func(ctx context.Context, stepTracker Tracker) error { + return u.upload(ctx, stepTracker, jobContext) + }, + Id: jobContext.JobId.String(), + }, + } + +} + +func (u *UploadStepExecutor) upload(ctx context.Context, tracker Tracker, jobContext *job.Context) error { + return retry.Do(func() error { + tracker.UpdateValue(0) + encodedFile, err := os.Open(jobContext.Target.FilePath) + if err != nil { + return err + } + defer encodedFile.Close() + fi, _ := encodedFile.Stat() + fileSize := fi.Size() + tracker.SetTotal(fileSize) + + reader := NewProgressTrackStream(tracker, encodedFile) + + client := &http.Client{} + req, err := http.NewRequestWithContext(ctx, "POST", u.GetUploadURL(jobContext.JobId), reader) + if err != nil { + return err + } + req.ContentLength = fileSize + req.Body = reader + req.GetBody = func() (io.ReadCloser, error) { + return io.NopCloser(reader), nil + } + req.Header.Set("workerName", u.workerName) + req.Header.Add("checksum", jobContext.Target.Checksum) + req.Header.Add("Content-Type", "application/octet-stream") + req.Header.Add("Content-Length", strconv.FormatInt(fileSize, 10)) + resp, err := client.Do(req) + if err != nil { + return err + } + + if resp.StatusCode != 201 { + return fmt.Errorf("invalid status Code %d", resp.StatusCode) + } + tracker.UpdateValue(fileSize) + return nil + }, retry.Delay(time.Second*5), + retry.RetryIf(func(err error) bool { + return !errors.Is(err, context.Canceled) + }), + retry.DelayType(retry.FixedDelay), + retry.Attempts(17280), + retry.LastErrorOnly(true), + retry.OnRetry(func(n uint, err error) { + tracker.Logger().Errorf("Error on uploading job %v", err) + })) +} + +func (u *UploadStepExecutor) GetUploadURL(id uuid.UUID) string { + return fmt.Sprintf("%s%s?uuid=%s", u.BaseDomainURL, "/api/v1/upload", id.String()) +} diff --git a/worker/task/console.go b/worker/task/console.go deleted file mode 100644 index 00236c0..0000000 --- a/worker/task/console.go +++ /dev/null @@ -1,188 +0,0 @@ -package task - -import ( - "fmt" - "github.com/jedib0t/go-pretty/v6/progress" - "github.com/jedib0t/go-pretty/v6/text" - "sync" - "time" -) - -type JobStepType string - -const DownloadJobStepType = "Download" -const UploadJobStepType = "Upload" -const EncodeJobStepType = "Encode" -const PGSJobStepType = "PGS" - -var ( - unitScales = []int64{ - 1000000000000000, - 1000000000000, - 1000000000, - 1000000, - 1000, - } -) - -type ConsoleWorkerPrinter struct { - pw progress.Writer - mu sync.RWMutex -} - -type TaskTracks struct { - id string - stepType JobStepType - progressTracker *progress.Tracker - printer *text.Color -} - -func NewConsoleWorkerPrinter() *ConsoleWorkerPrinter { - pw := progress.NewWriter() - pw.SetAutoStop(false) - pw.SetTrackerLength(40) - pw.SetMessageLength(50) - // pw.SetNumTrackersExpected(15) - pw.SetSortBy(progress.SortByPercent) - pw.SetStyle(progress.StyleDefault) - pw.SetTrackerPosition(progress.PositionRight) - pw.SetUpdateFrequency(time.Second * 1) - pw.Style().Colors = progress.StyleColorsExample - pw.Style().Options.PercentFormat = "%4.2f%%" - pw.Style().Visibility.ETA = true - pw.Style().Visibility.ETAOverall = true - pw.Style().Visibility.Percentage = true - pw.Style().Visibility.Pinned = false - pw.Style().Visibility.Speed = true - pw.Style().Visibility.SpeedOverall = true - pw.Style().Visibility.Time = true - pw.Style().Visibility.TrackerOverall = false - pw.Style().Visibility.Value = true - pw.Style().Visibility.Pinned = false - pw.Style().Options.TimeInProgressPrecision = time.Millisecond - pw.Style().Options.TimeDonePrecision = time.Millisecond - - return &ConsoleWorkerPrinter{ - pw: pw, - } -} -func (c *ConsoleWorkerPrinter) Stop() { - c.pw.Stop() -} -func (c *ConsoleWorkerPrinter) Render() { - c.pw.Render() -} - -func (c *ConsoleWorkerPrinter) AddTask(id string, stepType JobStepType) *TaskTracks { - c.mu.Lock() - defer c.mu.Unlock() - - var unit progress.Units - var printer text.Color - switch stepType { - case DownloadJobStepType: - unit = progress.UnitsBytes - printer = text.FgWhite - case UploadJobStepType: - unit = progress.UnitsBytes - printer = text.FgGreen - case PGSJobStepType: - unit = progress.UnitsBytes - printer = text.FgWhite - case EncodeJobStepType: - unit = progress.Units{ - Notation: "", - NotationPosition: progress.UnitsNotationPositionBefore, - Formatter: func(value int64) string { - return formatNumber(value, map[int64]string{ - 1000000000000000: "PFrame", - 1000000000000: "TFrame", - 1000000000: "GFrame", - 1000000: "MFrame", - 1000: "KFrame", - 0: "Frame", - }) - }, - } - printer = text.FgBlue - } - tracker := &progress.Tracker{ - Message: printer.Sprintf("[%s] %s", id, stepType), - Total: 0, - Units: unit, - } - taskTrack := &TaskTracks{ - id: id, - stepType: stepType, - progressTracker: tracker, - printer: &printer, - } - - c.pw.AppendTracker(tracker) - return taskTrack -} - -func (c *ConsoleWorkerPrinter) Log(msg string, a ...interface{}) { - c.pw.Log(msg, a...) -} - -func (c *ConsoleWorkerPrinter) Warn(msg string, a ...interface{}) { - c.pw.Log(text.FgHiYellow.Sprintf(msg, a...)) -} - -func (c *ConsoleWorkerPrinter) Cmd(msg string, a ...interface{}) { - c.pw.Log(text.FgHiCyan.Sprintf(msg, a...)) -} - -func (c *ConsoleWorkerPrinter) Errorf(msg string, a ...interface{}) { - c.pw.Log(text.FgHiRed.Sprintf(msg, a...)) -} - -func (t *TaskTracks) SetTotal(total int64) { - t.progressTracker.UpdateTotal(total) -} - -func (t *TaskTracks) ETA() time.Duration { - return t.progressTracker.ETA() -} - -func (t *TaskTracks) PercentDone() float64 { - return t.progressTracker.PercentDone() -} - -func (t *TaskTracks) UpdateValue(value int64) { - t.progressTracker.SetValue(value) -} - -func (t *TaskTracks) Increment64(increment int64) { - t.progressTracker.Increment(increment) -} -func (t *TaskTracks) Increment(increment int) { - t.progressTracker.Increment(int64(increment)) -} - -func (t *TaskTracks) Message(msg string) { - t.progressTracker.UpdateMessage(t.printer.Sprintf("[%s] %s", t.id, msg)) -} - -func (t *TaskTracks) ResetMessage() { - t.progressTracker.UpdateMessage(t.printer.Sprintf("[%s] %s", t.id, t.stepType)) -} - -func (t *TaskTracks) Done() { - t.progressTracker.SetValue(t.progressTracker.Total) - t.progressTracker.MarkAsDone() -} - -func (t *TaskTracks) Error() { - t.progressTracker.MarkAsErrored() -} - -func formatNumber(value int64, notations map[int64]string) string { - for _, unitScale := range unitScales { - if value >= unitScale { - return fmt.Sprintf("%.2f%s", float64(value)/float64(unitScale), notations[unitScale]) - } - } - return fmt.Sprintf("%d%s", value, notations[0]) -} diff --git a/worker/task/encode.go b/worker/task/encode.go deleted file mode 100644 index 10745d2..0000000 --- a/worker/task/encode.go +++ /dev/null @@ -1,1054 +0,0 @@ -package task - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "github.com/avast/retry-go" - log "github.com/sirupsen/logrus" - "gopkg.in/ini.v1" - "gopkg.in/vansante/go-ffprobe.v2" - "hash" - "io" - "mime" - "net/http" - "os" - "path/filepath" - "regexp" - "runtime" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - "transcoder/helper" - "transcoder/helper/command" - "transcoder/model" - "transcoder/worker/config" - "transcoder/worker/serverclient" -) - -const maxPrefetchedJobs = 1 - -var ErrorJobNotFound = errors.New("job Not found") - -type FFMPEGProgress struct { - duration int - percent float64 -} -type EncodeWorker struct { - name string - prefetchJobs uint32 - downloadChan chan *model.WorkTaskEncode - encodeChan chan *model.WorkTaskEncode - uploadChan chan *model.WorkTaskEncode - workerConfig *config.Config - tempPath string - wg sync.WaitGroup - mu sync.RWMutex - terminal *ConsoleWorkerPrinter - pgsWorker *PGSWorker - client *serverclient.ServerClient -} - -func NewEncodeWorker(workerConfig *config.Config, client *serverclient.ServerClient, printer *ConsoleWorkerPrinter) *EncodeWorker { - tempPath := filepath.Join(workerConfig.TemporalPath, fmt.Sprintf("worker-%s", workerConfig.Name)) - encodeWorker := &EncodeWorker{ - name: workerConfig.Name, - pgsWorker: NewPGSWorker(workerConfig), - client: client, - wg: sync.WaitGroup{}, - workerConfig: workerConfig, - downloadChan: make(chan *model.WorkTaskEncode, 100), - encodeChan: make(chan *model.WorkTaskEncode, 100), - uploadChan: make(chan *model.WorkTaskEncode, 100), - tempPath: tempPath, - terminal: printer, - prefetchJobs: 0, - } - if err := os.MkdirAll(tempPath, os.ModePerm); err != nil { - log.Fatal(err) - } - - return encodeWorker -} - -func (e *EncodeWorker) Run(wg *sync.WaitGroup, ctx context.Context) { - serviceCtx, cancelServiceCtx := context.WithCancel(context.Background()) - log.Info("Starting Worker Client...") - e.start(serviceCtx) - log.Info("Started Worker Client...") - wg.Add(1) - go func() { - <-ctx.Done() - cancelServiceCtx() - e.stop() - log.Info("Stopping Worker Client...") - wg.Done() - }() -} - -func (e *EncodeWorker) start(ctx context.Context) { - e.resumeJobs() - go e.terminalRefreshRoutine(ctx) - go e.downloadQueueRoutine(ctx) - go e.encodeQueueRoutine(ctx) - go e.uploadQueueRoutine(ctx) -} - -func (e *EncodeWorker) stop() { - e.terminal.Stop() - defer close(e.downloadChan) - defer close(e.uploadChan) - defer close(e.encodeChan) -} -func (e *EncodeWorker) terminalRefreshRoutine(ctx context.Context) { - e.wg.Add(1) - e.terminal.Render() - <-ctx.Done() - e.terminal.Stop() - e.wg.Done() -} - -func (e *EncodeWorker) resumeJobs() { - err := filepath.Walk(e.tempPath, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - if info.IsDir() { - return nil - } - if filepath.Ext(path) == ".json" { - filepath.Base(path) - taskEncode := e.readTaskStatusFromDiskByPath(path) - - if taskEncode.LastState.IsDownloading() { - e.AddDownloadJob(taskEncode.Task) - return nil - } - if taskEncode.LastState.IsEncoding() { - // add as prefetched job so won't try to download more jobs until jobs are in encoding phase - atomic.AddUint32(&e.prefetchJobs, 1) - t := e.terminal.AddTask(fmt.Sprintf("CACHED: %s", taskEncode.Task.TaskEncode.Id.String()), DownloadJobStepType) - t.Done() - e.AddEncodeJob(taskEncode.Task) - return nil - } - if taskEncode.LastState.IsUploading() { - t := e.terminal.AddTask(fmt.Sprintf("CACHED: %s", taskEncode.Task.TaskEncode.Id.String()), EncodeJobStepType) - t.Done() - e.AddUploadJob(taskEncode.Task) - return nil - } - } - - return nil - }) - - if err != nil { - panic(err) - } -} - -func (e *EncodeWorker) AcceptJobs() bool { - if e.workerConfig.Paused { - return false - } - if e.workerConfig.HaveSettedPeriodTime() { - now := time.Now() - midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) - elapsedSinceMidnight := now.Sub(midnight) - return elapsedSinceMidnight >= *e.workerConfig.StartAfter && elapsedSinceMidnight <= *e.workerConfig.StopAfter - } - return e.PrefetchJobs() < maxPrefetchedJobs -} - -func (e *EncodeWorker) dowloadFile(ctx context.Context, job *model.WorkTaskEncode, track *TaskTracks) (err error) { - err = retry.Do(func() error { - track.UpdateValue(0) - req, err := http.NewRequestWithContext(ctx, "GET", e.client.GetDownloadURL(job.TaskEncode.Id), nil) - if err != nil { - return err - } - req.Header.Set("workerName", e.workerConfig.Name) - resp, err := http.DefaultClient.Do(req) - if err != nil { - return err - } - if resp.StatusCode == http.StatusNotFound { - return ErrorJobNotFound - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("not 200 respose in download code %d", resp.StatusCode) - } - defer resp.Body.Close() - size, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) - track.SetTotal(size) - if err != nil { - return err - } - _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition")) - if err != nil { - return err - } - - job.SourceFilePath = filepath.Join(job.WorkDir, fmt.Sprintf("%s%s", job.TaskEncode.Id.String(), filepath.Ext(params["filename"]))) - dowloadFile, err := os.Create(job.SourceFilePath) - if err != nil { - return err - } - - defer dowloadFile.Close() - - reader := NewProgressTrackStream(track, resp.Body) - - _, err = io.Copy(dowloadFile, reader) - if err != nil { - return err - } - sha256String := hex.EncodeToString(reader.SumSha()) - bodyString := "" - - err = retry.Do(func() error { - respsha256, err := http.Get(e.client.GetChecksumURL(job.TaskEncode.Id)) - if err != nil { - return err - } - defer respsha256.Body.Close() - if respsha256.StatusCode != http.StatusOK { - return fmt.Errorf("not 200 respose in sha265 code %d", respsha256.StatusCode) - } - - bodyBytes, err := io.ReadAll(respsha256.Body) - if err != nil { - return err - } - bodyString = string(bodyBytes) - return nil - }, retry.Delay(time.Second*5), - retry.Attempts(10), - retry.LastErrorOnly(true), - retry.OnRetry(func(n uint, err error) { - e.terminal.Errorf("error %v on calculate checksum of downloaded job", err) - }), - retry.RetryIf(func(err error) bool { - return !errors.Is(err, context.Canceled) - })) - if err != nil { - return err - } - - if sha256String != bodyString { - return fmt.Errorf("checksum error on download source:%s downloaded:%s", bodyString, sha256String) - } - - track.UpdateValue(size) - return nil - }, retry.Delay(time.Second*5), - retry.DelayType(retry.FixedDelay), - retry.Attempts(180), // 15 min - retry.LastErrorOnly(true), - retry.OnRetry(func(n uint, err error) { - e.terminal.Errorf("Error on downloading job %v", err) - }), - retry.RetryIf(func(err error) bool { - return !(errors.Is(err, context.Canceled) || errors.Is(err, ErrorJobNotFound)) - })) - return err -} -func (e *EncodeWorker) getVideoParameters(ctx context.Context, inputFile string) (data *ffprobe.ProbeData, size int64, err error) { - - fileReader, err := os.Open(inputFile) - if err != nil { - return nil, -1, fmt.Errorf("error opening file %s because %v", inputFile, err) - } - stat, err := fileReader.Stat() - if err != nil { - return nil, 0, err - } - - defer fileReader.Close() - data, err = ffprobe.ProbeReader(ctx, fileReader) - if err != nil { - return nil, 0, fmt.Errorf("error getting data: %v", err) - } - return data, stat.Size(), nil -} - -func FFProbeFrameRate(ffprobeFrameRate string) (frameRate int, err error) { - rate := 0 - frameRatio := 0 - avgFrameSpl := strings.Split(ffprobeFrameRate, "/") - if len(avgFrameSpl) != 2 { - return 0, errors.New("invalid Format") - } - - frameRatio, err = strconv.Atoi(avgFrameSpl[0]) - if err != nil { - return 0, err - } - rate, err = strconv.Atoi(avgFrameSpl[1]) - if err != nil { - return 0, err - } - return frameRatio / rate, nil -} - -func (e *EncodeWorker) clearData(data *ffprobe.ProbeData) (container *ContainerData, err error) { - container = &ContainerData{} - - videoStream := data.StreamType(ffprobe.StreamVideo)[0] - frameRate, err := FFProbeFrameRate(videoStream.AvgFrameRate) - if err != nil { - frameRate = 24 - } - - container.Video = &Video{ - Id: uint8(videoStream.Index), - Duration: data.Format.Duration(), - FrameRate: frameRate, - } - - betterAudioStreamPerLanguage := make(map[string]*Audio) - for _, stream := range data.StreamType(ffprobe.StreamAudio) { - if stream.BitRate == "" { - stream.BitRate = "0" - } - bitRateInt, err := strconv.ParseUint(stream.BitRate, 10, 32) // TODO Aqui revem diferents tipos de numeros - if err != nil { - panic(err) - } - newAudio := &Audio{ - Id: uint8(stream.Index), - Language: stream.Tags.Language, - Channels: stream.ChannelLayout, - ChannelsNumber: uint8(stream.Channels), - ChannelLayour: stream.ChannelLayout, - Default: stream.Disposition.Default == 1, - Bitrate: uint(bitRateInt), - Title: stream.Tags.Title, - } - betterAudio := betterAudioStreamPerLanguage[newAudio.Language] - - // If more channels or same channels and better bitrate - if betterAudio != nil { - if newAudio.ChannelsNumber > betterAudio.ChannelsNumber { - betterAudioStreamPerLanguage[newAudio.Language] = newAudio - } else if newAudio.ChannelsNumber == betterAudio.ChannelsNumber && newAudio.Bitrate > betterAudio.Bitrate { - betterAudioStreamPerLanguage[newAudio.Language] = newAudio - } - } else { - betterAudioStreamPerLanguage[stream.Tags.Language] = newAudio - } - - } - for _, audioStream := range betterAudioStreamPerLanguage { - container.Audios = append(container.Audios, audioStream) - } - - betterSubtitleStreamPerLanguage := make(map[string]*Subtitle) - for _, stream := range data.StreamType(ffprobe.StreamSubtitle) { - newSubtitle := &Subtitle{ - Id: uint8(stream.Index), - Language: stream.Tags.Language, - Forced: stream.Disposition.Forced == 1, - Comment: stream.Disposition.Comment == 1, - Format: stream.CodecName, - Title: stream.Tags.Title, - } - - if newSubtitle.Forced || newSubtitle.Comment { - container.Subtitle = append(container.Subtitle, newSubtitle) - continue - } - // TODO Filter Languages we don't want - betterSubtitle := betterSubtitleStreamPerLanguage[newSubtitle.Language] - if betterSubtitle == nil { // TODO Potser perdem subtituls que es necesiten - betterSubtitleStreamPerLanguage[stream.Tags.Language] = newSubtitle - } else { - // TODO aixo es temporal per fer proves, borrar aquest else!! - container.Subtitle = append(container.Subtitle, newSubtitle) - } - } - for _, value := range betterSubtitleStreamPerLanguage { - container.Subtitle = append(container.Subtitle, value) - } - return container, nil -} -func (e *EncodeWorker) FFMPEG(ctx context.Context, job *model.WorkTaskEncode, videoContainer *ContainerData, ffmpegProgressChan chan<- FFMPEGProgress) error { - ffmpeg := &FFMPEGGenerator{Config: e.workerConfig.EncodeConfig} - ffmpeg.setInputFilters(videoContainer, job.SourceFilePath, job.WorkDir) - ffmpeg.setVideoFilters(videoContainer) - ffmpeg.setAudioFilters(videoContainer) - ffmpeg.setSubtFilters(videoContainer) - ffmpegErrLog := "" - - checkPercentageFFMPEG := func(buffer []byte, exit bool) { - ffmpegErrLog += string(buffer) - } - - stdoutFFMPEG := func(buffer []byte, exit bool) { - cfg, err := ini.Load(buffer) - if err != nil { - return - } - s := cfg.Section("") - progress := s.Key("progress").String() - if progress == "continue" { - var outTimeSeconds int64 - outTimeUs, err := s.Key("out_time_ms").Int64() - if err == nil { - outTimeSeconds = outTimeUs / 1000000 - } - // If out_time_ms is not present, we can use frame as a fallback, even is not as precise - if outTimeSeconds == 0 { - frame, err := s.Key("frame").Int64() - if err != nil { - return - } - outTimeSeconds = frame / int64(videoContainer.Video.FrameRate) - } - - ffmpegProgressChan <- FFMPEGProgress{ - duration: int(outTimeSeconds), - percent: float64(outTimeSeconds*100) / videoContainer.Video.Duration.Seconds(), - } - } - if exit { - close(ffmpegProgressChan) - } - } - sourceFileName := filepath.Base(job.SourceFilePath) - encodedFilePath := fmt.Sprintf("%s-encoded.%s", strings.TrimSuffix(sourceFileName, filepath.Ext(sourceFileName)), "mkv") - job.TargetFilePath = filepath.Join(job.WorkDir, encodedFilePath) - - ffmpegArguments := ffmpeg.buildArguments(uint8(e.workerConfig.Threads), job.TargetFilePath) - e.terminal.Cmd("FFMPEG Command:%s %s", helper.GetFFmpegPath(), ffmpegArguments) - ffmpegCommand := command.NewCommandByString(helper.GetFFmpegPath(), ffmpegArguments). - SetWorkDir(job.WorkDir). - SetStdoutFunc(stdoutFFMPEG). - SetStderrFunc(checkPercentageFFMPEG) - - if runtime.GOOS == "linux" { - ffmpegCommand.AddEnv(fmt.Sprintf("LD_LIBRARY_PATH=%s", filepath.Dir(helper.GetFFmpegPath()))) - } - exitCode, err := ffmpegCommand.RunWithContext(ctx) - if err != nil { - return fmt.Errorf("%w: stder:%s", err, ffmpegErrLog) - } - if exitCode != 0 { - return fmt.Errorf("exit code %d: stder:%s", exitCode, ffmpegErrLog) - } - - return nil -} - -type ProgressTrackReader struct { - taskTracker *TaskTracks - io.ReadCloser - sha hash.Hash -} - -func NewProgressTrackStream(track *TaskTracks, reader io.ReadCloser) *ProgressTrackReader { - return &ProgressTrackReader{ - taskTracker: track, - ReadCloser: reader, - sha: sha256.New(), - } -} - -func (p *ProgressTrackReader) Read(b []byte) (n int, err error) { - n, err = p.ReadCloser.Read(b) - p.taskTracker.Increment(n) - p.sha.Write(b[0:n]) - return n, err -} - -func (p *ProgressTrackReader) SumSha() []byte { - return p.sha.Sum(nil) -} - -func (e *EncodeWorker) UploadJob(ctx context.Context, task *model.WorkTaskEncode, track *TaskTracks) error { - e.updateTaskStatus(task, model.UploadNotification, model.StartedNotificationStatus, "") - err := retry.Do(func() error { - track.UpdateValue(0) - encodedFile, err := os.Open(task.TargetFilePath) - if err != nil { - return err - } - defer encodedFile.Close() - fi, _ := encodedFile.Stat() - fileSize := fi.Size() - track.SetTotal(fileSize) - sha := sha256.New() - if _, err = io.Copy(sha, encodedFile); err != nil { - return err - } - checksum := hex.EncodeToString(sha.Sum(nil)) - _, err = encodedFile.Seek(0, io.SeekStart) - if err != nil { - return err - } - - reader := NewProgressTrackStream(track, encodedFile) - - client := &http.Client{} - req, err := http.NewRequestWithContext(ctx, "POST", e.client.GetUploadURL(task.TaskEncode.Id), reader) - if err != nil { - return err - } - req.ContentLength = fileSize - req.Body = reader - req.GetBody = func() (io.ReadCloser, error) { - return io.NopCloser(reader), nil - } - req.Header.Set("workerName", e.workerConfig.Name) - req.Header.Add("checksum", checksum) - req.Header.Add("Content-Type", "application/octet-stream") - req.Header.Add("Content-Length", strconv.FormatInt(fileSize, 10)) - resp, err := client.Do(req) - if err != nil { - return err - } - - if resp.StatusCode != 201 { - return fmt.Errorf("invalid status Code %d", resp.StatusCode) - } - track.UpdateValue(fileSize) - return nil - }, retry.Delay(time.Second*5), - retry.RetryIf(func(err error) bool { - return !errors.Is(err, context.Canceled) - }), - retry.DelayType(retry.FixedDelay), - retry.Attempts(17280), - retry.LastErrorOnly(true), - retry.OnRetry(func(n uint, err error) { - e.terminal.Errorf("Error on uploading job %v", err) - })) - - if err != nil { - e.updateTaskStatus(task, model.UploadNotification, model.FailedNotificationStatus, "") - return err - } - - e.updateTaskStatus(task, model.UploadNotification, model.CompletedNotificationStatus, "") - return nil -} - -func (e *EncodeWorker) errorJob(taskEncode *model.WorkTaskEncode, err error) { - if errors.Is(err, context.Canceled) { - e.updateTaskStatus(taskEncode, model.JobNotification, model.CanceledNotificationStatus, "") - } else { - e.updateTaskStatus(taskEncode, model.JobNotification, model.FailedNotificationStatus, err.Error()) - } - - err = taskEncode.Clean() - if err != nil { - e.terminal.Errorf("Error on cleaning job %s", err.Error()) - return - } -} - -func (e *EncodeWorker) Execute(taskEncode *model.TaskEncode) error { - workDir := filepath.Join(e.tempPath, taskEncode.Id.String()) - workTaskEncode := &model.WorkTaskEncode{ - TaskEncode: taskEncode, - WorkDir: workDir, - } - err := os.MkdirAll(workDir, os.ModePerm) - if err != nil { - return err - } - - e.updateTaskStatus(workTaskEncode, model.JobNotification, model.StartedNotificationStatus, "") - e.AddDownloadJob(workTaskEncode) - return nil -} - -func (e *EncodeWorker) GetID() string { - return e.name -} -func (e *EncodeWorker) updateTaskStatus(encode *model.WorkTaskEncode, notificationType model.NotificationType, status model.NotificationStatus, message string) { - e.mu.Lock() - defer e.mu.Unlock() - encode.TaskEncode.EventID++ - event := model.TaskEvent{ - Id: encode.TaskEncode.Id, - EventID: encode.TaskEncode.EventID, - EventType: model.NotificationEvent, - WorkerName: e.workerConfig.Name, - EventTime: time.Now(), - NotificationType: notificationType, - Status: status, - Message: message, - } - - if err := e.client.PublishEvent(event); err != nil { - e.terminal.Errorf("Error on publishing event %s", err.Error()) - } - if err := e.saveTaskStatusDisk(&model.TaskStatus{ - LastState: &event, - Task: encode, - }); err != nil { - e.terminal.Errorf("Error on publishing event %s", err.Error()) - } - e.terminal.Log("[%s] %s have been %s: %s", event.Id.String(), event.NotificationType, event.Status, event.Message) - -} - -func (e *EncodeWorker) saveTaskStatusDisk(taskEncode *model.TaskStatus) error { - b, err := json.MarshalIndent(taskEncode, "", "\t") - if err != nil { - return err - } - eventFile, err := os.OpenFile(filepath.Join(taskEncode.Task.WorkDir, fmt.Sprintf("%s.json", taskEncode.Task.TaskEncode.Id)), os.O_TRUNC|os.O_CREATE|os.O_RDWR, os.ModePerm) - if err != nil { - return err - } - defer eventFile.Close() - _, err = eventFile.Write(b) - if err != nil { - return err - } - return eventFile.Sync() -} -func (e *EncodeWorker) readTaskStatusFromDiskByPath(filepath string) *model.TaskStatus { - eventFile, err := os.Open(filepath) - if err != nil { - panic(err) - } - defer eventFile.Close() - b, err := io.ReadAll(eventFile) - if err != nil { - panic(err) - } - taskStatus := &model.TaskStatus{} - err = json.Unmarshal(b, taskStatus) - if err != nil { - panic(err) - } - return taskStatus -} - -func (e *EncodeWorker) PGSMkvExtractDetectAndConvert(ctx context.Context, taskEncode *model.WorkTaskEncode, track *TaskTracks, container *ContainerData) error { - var PGSTOSrt []*Subtitle - for _, subt := range container.Subtitle { - if subt.isImageTypeSubtitle() { - PGSTOSrt = append(PGSTOSrt, subt) - } - } - if len(PGSTOSrt) > 0 { - e.updateTaskStatus(taskEncode, model.MKVExtractNotification, model.StartedNotificationStatus, "") - track.Message(string(model.MKVExtractNotification)) - track.SetTotal(0) - err := e.MKVExtract(ctx, PGSTOSrt, taskEncode) - if err != nil { - e.updateTaskStatus(taskEncode, model.MKVExtractNotification, model.FailedNotificationStatus, err.Error()) - return err - } - e.updateTaskStatus(taskEncode, model.MKVExtractNotification, model.CompletedNotificationStatus, "") - - e.updateTaskStatus(taskEncode, model.PGSNotification, model.StartedNotificationStatus, "") - track.Message(string(model.PGSNotification)) - err = e.convertPGSToSrt(ctx, taskEncode, PGSTOSrt) - if err != nil { - e.updateTaskStatus(taskEncode, model.PGSNotification, model.FailedNotificationStatus, err.Error()) - return err - } else { - e.updateTaskStatus(taskEncode, model.PGSNotification, model.CompletedNotificationStatus, "") - } - } - return nil -} - -func (e *EncodeWorker) convertPGSToSrt(ctx context.Context, taskEncode *model.WorkTaskEncode, subtitles []*Subtitle) error { - var wg sync.WaitGroup - tasks := make(chan Subtitle, len(subtitles)) - errs := make(chan error, len(subtitles)) - for i := 0; i < e.workerConfig.PGSConfig.ParallelJobs; i++ { - wg.Add(1) - go func() { - defer wg.Done() - for subtitle := range tasks { - pgsTerminalTask := e.terminal.AddTask(fmt.Sprintf("%s %d", taskEncode.TaskEncode.Id.String(), subtitle.Id), PGSJobStepType) - pgsTerminalTask.SetTotal(0) - supPath := filepath.Join(taskEncode.WorkDir, fmt.Sprintf("%d.sup", subtitle.Id)) - err := e.pgsWorker.ConvertPGS(ctx, model.TaskPGS{ - PGSID: int(subtitle.Id), - PGSSourcePath: supPath, - PGSTargetPath: filepath.Join(taskEncode.WorkDir, fmt.Sprintf("%d.srt", subtitle.Id)), - PGSLanguage: subtitle.Language, - }, pgsTerminalTask) - if err != nil { - pgsTerminalTask.Error() - errs <- err - } - pgsTerminalTask.Done() - } - }() - } - - for _, subtitle := range subtitles { - tasks <- *subtitle - } - close(tasks) - wg.Wait() - close(errs) - - var errorList []error - for err := range errs { - errorList = append(errorList, err) - } - return errors.Join(errorList...) -} - -func (e *EncodeWorker) MKVExtract(ctx context.Context, subtitles []*Subtitle, taskEncode *model.WorkTaskEncode) error { - mkvExtractCommand := command.NewCommand(helper.GetMKVExtractPath(), "tracks", taskEncode.SourceFilePath). - SetWorkDir(taskEncode.WorkDir) - if runtime.GOOS == "linux" { - mkvExtractCommand.AddEnv(fmt.Sprintf("LD_LIBRARY_PATH=%s", filepath.Dir(helper.GetMKVExtractPath()))) - } - for _, subtitle := range subtitles { - mkvExtractCommand.AddParam(fmt.Sprintf("%d:%d.sup", subtitle.Id, subtitle.Id)) - } - - _, err := mkvExtractCommand.RunWithContext(ctx, command.NewAllowedCodesOption(0, 1)) - if err != nil { - e.terminal.Cmd("MKVExtract Command:%s", mkvExtractCommand.GetFullCommand()) - return fmt.Errorf("MKVExtract unexpected error:%v", err) - } - - return nil -} -func (e *EncodeWorker) PrefetchJobs() uint32 { - return atomic.LoadUint32(&e.prefetchJobs) -} - -func (e *EncodeWorker) AddDownloadJob(job *model.WorkTaskEncode) { - atomic.AddUint32(&e.prefetchJobs, 1) - e.downloadChan <- job -} - -func (e *EncodeWorker) AddEncodeJob(job *model.WorkTaskEncode) { - e.encodeChan <- job -} - -func (e *EncodeWorker) AddUploadJob(job *model.WorkTaskEncode) { - e.uploadChan <- job -} - -func (e *EncodeWorker) downloadQueueRoutine(ctx context.Context) { - e.wg.Add(1) - defer e.wg.Done() - for { - select { - case <-ctx.Done(): - e.terminal.Warn("Stopping Download ServerCoordinator") - return - case job, ok := <-e.downloadChan: - if !ok { - return - } - taskTrack := e.terminal.AddTask(job.TaskEncode.Id.String(), DownloadJobStepType) - - e.updateTaskStatus(job, model.DownloadNotification, model.StartedNotificationStatus, "") - err := e.dowloadFile(ctx, job, taskTrack) - if err != nil { - e.updateTaskStatus(job, model.DownloadNotification, model.FailedNotificationStatus, err.Error()) - taskTrack.Error() - e.errorJob(job, err) - atomic.AddUint32(&e.prefetchJobs, ^uint32(0)) - continue - } - e.updateTaskStatus(job, model.DownloadNotification, model.CompletedNotificationStatus, "") - taskTrack.Done() - e.AddEncodeJob(job) - } - } - -} - -func (e *EncodeWorker) uploadQueueRoutine(ctx context.Context) { - e.wg.Add(1) - for { - select { - case <-ctx.Done(): - e.terminal.Warn("Stopping Upload ServerCoordinator") - e.wg.Done() - return - case job, ok := <-e.uploadChan: - if !ok { - continue - } - taskTrack := e.terminal.AddTask(job.TaskEncode.Id.String(), UploadJobStepType) - err := e.UploadJob(ctx, job, taskTrack) - if err != nil { - e.terminal.Errorf("Error on uploading job %v", err) - taskTrack.Error() - e.errorJob(job, err) - continue - } - - e.updateTaskStatus(job, model.JobNotification, model.CompletedNotificationStatus, "") - err = job.Clean() - if err != nil { - e.terminal.Errorf("Error on cleaning job %v", err) - taskTrack.Error() - continue - } - taskTrack.Done() - } - } - -} - -func (e *EncodeWorker) encodeQueueRoutine(ctx context.Context) { - e.wg.Add(1) - defer e.wg.Done() - for { - select { - case <-ctx.Done(): - e.terminal.Warn("Stopping Encode Queue") - return - case job, ok := <-e.encodeChan: - if !ok { - return - } - atomic.AddUint32(&e.prefetchJobs, ^uint32(0)) - taskTrack := e.terminal.AddTask(job.TaskEncode.Id.String(), EncodeJobStepType) - err := e.encodeVideo(ctx, job, taskTrack) - if err != nil { - taskTrack.Error() - e.errorJob(job, err) - continue - } - - taskTrack.Done() - e.AddUploadJob(job) - } - } - -} - -func (e *EncodeWorker) encodeVideo(ctx context.Context, job *model.WorkTaskEncode, track *TaskTracks) error { - e.updateTaskStatus(job, model.FFProbeNotification, model.StartedNotificationStatus, "") - track.Message(string(model.FFProbeNotification)) - sourceVideoParams, sourceVideoSize, err := e.getVideoParameters(ctx, job.SourceFilePath) - if err != nil { - e.updateTaskStatus(job, model.FFProbeNotification, model.FailedNotificationStatus, err.Error()) - return err - } - e.updateTaskStatus(job, model.FFProbeNotification, model.CompletedNotificationStatus, "") - - videoContainer, err := e.clearData(sourceVideoParams) - if err != nil { - e.terminal.Warn("Error in clearData %s", e.GetID()) - return err - } - if err = e.PGSMkvExtractDetectAndConvert(ctx, job, track, videoContainer); err != nil { - return err - } - e.updateTaskStatus(job, model.FFMPEGSNotification, model.StartedNotificationStatus, "") - track.ResetMessage() - FFMPEGProgressChan := make(chan FFMPEGProgress) - go e.FFMPEGProgressRoutine(ctx, job, track, FFMPEGProgressChan, videoContainer) - err = e.FFMPEG(ctx, job, videoContainer, FFMPEGProgressChan) - if err != nil { - e.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error()) - return err - } - <-time.After(time.Second * 1) - - if err = e.verifyResultJob(ctx, job, sourceVideoParams, sourceVideoSize); err != nil { - return err - } - - e.updateTaskStatus(job, model.FFMPEGSNotification, model.CompletedNotificationStatus, "") - return nil -} - -func (e *EncodeWorker) verifyResultJob(ctx context.Context, job *model.WorkTaskEncode, sourceVideoParams *ffprobe.ProbeData, sourceVideoSize int64) error { - encodedVideoParams, encodedVideoSize, err := e.getVideoParameters(ctx, job.TargetFilePath) - if err != nil { - e.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error()) - return err - } - - diffDuration := encodedVideoParams.Format.DurationSeconds - sourceVideoParams.Format.DurationSeconds - if diffDuration > 60 || diffDuration < -60 { - err = fmt.Errorf("source File duration %f is diferent than encoded %f", sourceVideoParams.Format.DurationSeconds, encodedVideoParams.Format.DurationSeconds) - e.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error()) - return err - } - if encodedVideoSize > sourceVideoSize { - err = fmt.Errorf("source File size %d bytes is less than encoded %d bytes", sourceVideoSize, encodedVideoSize) - e.updateTaskStatus(job, model.FFMPEGSNotification, model.FailedNotificationStatus, err.Error()) - return err - } - return nil -} - -func (e *EncodeWorker) FFMPEGProgressRoutine(ctx context.Context, job *model.WorkTaskEncode, track *TaskTracks, ffmpegProgressChan chan FFMPEGProgress, videoContainer *ContainerData) { - track.SetTotal(int64(videoContainer.Video.Duration.Seconds()) * int64(videoContainer.Video.FrameRate)) - lastProgressEvent := float64(0) - - for { - select { - case <-ctx.Done(): - return - case progress, open := <-ffmpegProgressChan: - if !open { - return - } - - track.UpdateValue(int64(progress.duration * videoContainer.Video.FrameRate)) - - if progress.percent-lastProgressEvent > 10 { - e.updateTaskStatus(job, model.FFMPEGSNotification, model.StartedNotificationStatus, fmt.Sprintf("{\"progress\":\"%.2f\"}", track.PercentDone())) - lastProgressEvent = progress.percent - } - } - } -} - -func (e *EncodeWorker) GetName() string { - return e.name -} - -type FFMPEGGenerator struct { - Config *config.FFMPEGConfig - inputPaths []string - VideoFilter string - AudioFilter []string - SubtitleFilter []string - Metadata string -} - -func (f *FFMPEGGenerator) setAudioFilters(container *ContainerData) { - - for index, audioStream := range container.Audios { - // TODO que pasa quan el channelLayout esta empty?? - title := fmt.Sprintf("%s (%s)", audioStream.Language, audioStream.ChannelLayour) - metadata := fmt.Sprintf(" -metadata:s:a:%d \"title=%s\"", index, title) - codecQuality := fmt.Sprintf("-c:a:%d %s -vbr %d", index, f.Config.AudioCodec, f.Config.AudioVBR) - f.AudioFilter = append(f.AudioFilter, fmt.Sprintf(" -map 0:%d %s %s", audioStream.Id, metadata, codecQuality)) - } -} -func (f *FFMPEGGenerator) setVideoFilters(container *ContainerData) { - videoFilterParameters := "\"scale='min(1920,iw)':-1:force_original_aspect_ratio=decrease\"" - videoEncoderQuality := fmt.Sprintf("-pix_fmt yuv420p10le -c:v %s -crf %d -profile:v %s -preset %s", f.Config.VideoCodec, f.Config.VideoCRF, f.Config.VideoProfile, f.Config.VideoPreset) - // TODO HDR?? - videoHDR := "" - f.VideoFilter = fmt.Sprintf("-map 0:%d -avoid_negative_ts make_zero -copyts -map_chapters -1 -flags +global_header -filter:v %s %s %s", container.Video.Id, videoFilterParameters, videoHDR, videoEncoderQuality) - -} -func (f *FFMPEGGenerator) setSubtFilters(container *ContainerData) { - subtInputIndex := 1 - for index, subtitle := range container.Subtitle { - if subtitle.isImageTypeSubtitle() { - subtitleMap := fmt.Sprintf("-map %d -c:s:%d srt", subtInputIndex, index) - subtitleForced := "" - subtitleComment := "" - if subtitle.Forced { - subtitleForced = fmt.Sprintf(" -disposition:s:s:%d forced -disposition:s:s:%d default", index, index) - } - if subtitle.Comment { - subtitleComment = fmt.Sprintf(" -disposition:s:s:%d comment", index) - } - - // Clean subtitle title to avoid PGS in title - re := regexp.MustCompile(`(?i)\(?pgs\)?`) - subtitleTitle := re.ReplaceAllString(subtitle.Title, "") - subtitleTitle = strings.TrimSpace(strings.ReplaceAll(subtitleTitle, " ", " ")) - - f.SubtitleFilter = append(f.SubtitleFilter, fmt.Sprintf("%s %s %s -metadata:s:s:%d language=%s -metadata:s:s:%d \"title=%s\" -max_interleave_delta 0", subtitleMap, subtitleForced, subtitleComment, index, subtitle.Language, index, subtitleTitle)) - subtInputIndex++ - } else { - f.SubtitleFilter = append(f.SubtitleFilter, fmt.Sprintf("-map 0:%d -c:s:%d copy", subtitle.Id, index)) - } - - } -} - -func (f *FFMPEGGenerator) buildArguments(threads uint8, outputFilePath string) string { - coreParameters := fmt.Sprintf("-fflags +genpts -nostats -progress pipe:1 -hide_banner -threads %d -analyzeduration 2147483647 -probesize 2147483647", threads) - inputsParameters := "" - for _, input := range f.inputPaths { - inputsParameters = fmt.Sprintf("%s -i \"%s\"", inputsParameters, input) - } - //-ss 900 -t 10 - audioParameters := "" - for _, audio := range f.AudioFilter { - audioParameters = fmt.Sprintf("%s %s", audioParameters, audio) - } - subtParameters := "" - for _, subt := range f.SubtitleFilter { - subtParameters = fmt.Sprintf("%s %s", subtParameters, subt) - } - - return fmt.Sprintf("%s %s -max_muxing_queue_size 9999 %s %s %s %s %s -y", coreParameters, inputsParameters, f.VideoFilter, audioParameters, subtParameters, f.Metadata, outputFilePath) -} - -func (f *FFMPEGGenerator) setInputFilters(container *ContainerData, sourceFilePath string, workDir string) { - f.inputPaths = append(f.inputPaths, sourceFilePath) - if container.HaveImageTypeSubtitle() { - for _, subt := range container.Subtitle { - if subt.isImageTypeSubtitle() { - srtEncodedFile := filepath.Join(workDir, fmt.Sprintf("%d.srt", subt.Id)) - f.inputPaths = append(f.inputPaths, srtEncodedFile) - } - } - } -} - -type Video struct { - Id uint8 - Duration time.Duration - FrameRate int -} -type Audio struct { - Id uint8 - Language string - Channels string - ChannelsNumber uint8 - ChannelLayour string - Default bool - Bitrate uint - Title string -} -type Subtitle struct { - Id uint8 - Language string - Forced bool - Comment bool - Format string - Title string -} -type ContainerData struct { - Video *Video - Audios []*Audio - Subtitle []*Subtitle -} - -func (c *ContainerData) HaveImageTypeSubtitle() bool { - for _, sub := range c.Subtitle { - if sub.isImageTypeSubtitle() { - return true - } - } - return false -} -func (c *ContainerData) ToJson() string { - b, err := json.Marshal(c) - if err != nil { - panic(err) - } - return string(b) -} -func (s *Subtitle) isImageTypeSubtitle() bool { - return strings.Contains(strings.ToLower(s.Format), "pgs") -} diff --git a/worker/task/coordinator.go b/worker/worker/coordinator.go similarity index 66% rename from worker/task/coordinator.go rename to worker/worker/coordinator.go index 80b7167..66eb9b6 100644 --- a/worker/task/coordinator.go +++ b/worker/worker/coordinator.go @@ -1,4 +1,4 @@ -package task +package worker import ( "context" @@ -7,23 +7,24 @@ import ( "os" "sync" "transcoder/update" + "transcoder/worker/console" "transcoder/worker/serverclient" "time" ) type ServerCoordinator struct { - printer *ConsoleWorkerPrinter + logger console.LeveledLogger serverClient *serverclient.ServerClient - worker *EncodeWorker + worker *JobExecutor updater *update.Updater } -func NewServerCoordinator(serverClient *serverclient.ServerClient, worker *EncodeWorker, updater *update.Updater, printer *ConsoleWorkerPrinter) *ServerCoordinator { +func NewServerCoordinator(serverClient *serverclient.ServerClient, worker *JobExecutor, updater *update.Updater, logger console.LeveledLogger) *ServerCoordinator { coordinator := &ServerCoordinator{ serverClient: serverClient, worker: worker, - printer: printer, + logger: logger, updater: updater, } return coordinator @@ -52,14 +53,13 @@ func (q *ServerCoordinator) stop() { func (q *ServerCoordinator) heartbeatRoutine(ctx context.Context) { // Declare Worker Unique ServerCoordinator - for { select { case <-ctx.Done(): return - case <-time.After(time.Second * 30): - if err := q.serverClient.PublishPing(); err != nil { - q.printer.Errorf("Error Publishing Ping Event: %v", err) + case <-time.After(time.Second * 5): + if err := q.serverClient.PublishPingEvent(); err != nil { + q.logger.Errorf("Error Publishing Ping Event: %v", err) } } } @@ -74,24 +74,24 @@ func (q *ServerCoordinator) requestTaskRoutine(ctx context.Context) { if q.worker.AcceptJobs() { release, requireUpdate, err := q.updater.CheckForUpdate() if err != nil { - q.printer.Errorf("Error Checking For Update: %v", err) + q.logger.Errorf("Error Checking For Update: %v", err) continue } if requireUpdate { - q.printer.Log("New version available %s,exiting ...", release.TagName) + q.logger.Logf("New version available %s,exiting ...", release.TagName) os.Exit(update.ExitCode) } - taskJob, err := q.serverClient.RequestJob(q.worker.GetName()) + requestJobResponse, err := q.serverClient.RequestJob() if err != nil { if !errors.Is(err, serverclient.NoJobAvailable) { - q.printer.Errorf("Error Requesting Job: %v", err) + q.logger.Errorf("Error Requesting Job: %v", err) } continue } - if err := q.worker.Execute(taskJob); err != nil { - q.printer.Errorf("Error Preparing Job Execution: %v", err) + if err := q.worker.ExecuteJob(requestJobResponse.Id, requestJobResponse.EventID); err != nil { + q.logger.Errorf("Error Preparing Job Execution: %v", err) } } } diff --git a/worker/worker/executor.go b/worker/worker/executor.go new file mode 100644 index 0000000..46844d9 --- /dev/null +++ b/worker/worker/executor.go @@ -0,0 +1,322 @@ +package worker + +import ( + "context" + "errors" + "fmt" + "github.com/google/uuid" + log "github.com/sirupsen/logrus" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" + "transcoder/model" + "transcoder/worker/config" + "transcoder/worker/console" + "transcoder/worker/job" + "transcoder/worker/serverclient" + "transcoder/worker/step" +) + +const maxActiveJobs = 2 + +type JobExecutor struct { + activeJobs uint32 + + workerConfig *config.Config + tempPath string + wg sync.WaitGroup + mu sync.RWMutex + client *serverclient.ServerClient + + console *console.RenderService + stepExecutors map[model.NotificationType]*step.Executor +} + +func NewEncodeWorker(workerConfig *config.Config, client *serverclient.ServerClient, renderService *console.RenderService) *JobExecutor { + tempPath := filepath.Join(workerConfig.TemporalPath, fmt.Sprintf("worker-%s", workerConfig.Name)) + + jobExecutor := &JobExecutor{ + client: client, + wg: sync.WaitGroup{}, + workerConfig: workerConfig, + stepExecutors: make(map[model.NotificationType]*step.Executor), + tempPath: tempPath, + console: renderService, + activeJobs: 0, + } + stepExecutors := setupStepExecutors(jobExecutor) + jobExecutor.stepExecutors = stepExecutors + + if err := os.MkdirAll(tempPath, os.ModePerm); err != nil { + log.Fatal(err) + } + + return jobExecutor +} + +func setupStepExecutors(jobExecutor *JobExecutor) map[model.NotificationType]*step.Executor { + workerConfig := jobExecutor.workerConfig + client := jobExecutor.client + + onErrOpt := step.WithOnErrorOpt(func(jobContext *job.Context, notificationType model.NotificationType, err error) { + jobExecutor.publishTaskEvent(jobContext, model.JobNotification, model.FailedNotificationStatus, fmt.Sprintf("%s:%s", notificationType, err.Error())) + jobExecutor.ConsoleTrackStep(jobContext.JobId.String(), model.JobNotification).Error() + if err := jobExecutor.CleanJob(jobContext); err != nil { + jobExecutor.jobLogger(jobContext).Errorf("failed to clean job workspace %v", err) + } + }) + stepExecutors := make(map[model.NotificationType]*step.Executor) + + // Download Step + stepExecutors[model.DownloadNotification] = step.NewDownloadStepExecutor( + workerConfig.Name, + client.GetBaseDomain(), + step.WithOnCompleteOpt(func(jobContext *job.Context) { + if jobContext.Source.FFProbeData.HaveImageTypeSubtitle() { + stepExecutors[model.MKVExtractNotification].AddJob(jobContext) + return + } + stepExecutors[model.FFMPEGSNotification].AddJob(jobContext) + }), + onErrOpt) + + // MKVExtract Step + stepExecutors[model.MKVExtractNotification] = step.NewMKVExtractStepExecutor(onErrOpt, + step.WithOnCompleteOpt(func(jobContext *job.Context) { + stepExecutors[model.PGSNotification].AddJob(jobContext) + }), + onErrOpt) + + // PGS Step + stepExecutors[model.PGSNotification] = step.NewPGSToSrtStepExecutor(workerConfig.PGSConfig, + step.WithParallelRunners(workerConfig.PGSConfig.ParallelJobs), + step.WithOnCompleteOpt(func(jobContext *job.Context) { + stepExecutors[model.FFMPEGSNotification].AddJob(jobContext) + }), + onErrOpt) + + // FFMPEG Step + stepExecutors[model.FFMPEGSNotification] = step.NewFFMPEGStepExecutor(workerConfig.EncodeConfig, + step.WithOnCompleteOpt(func(jobContext *job.Context) { + stepExecutors[model.JobVerify].AddJob(jobContext) + }), + onErrOpt) + + // Verify Step + stepExecutors[model.JobVerify] = step.NewFFMPEGVerifyStepExecutor(workerConfig.VerifyDeltaTime, + step.WithOnCompleteOpt(func(jobContext *job.Context) { + stepExecutors[model.UploadNotification].AddJob(jobContext) + }), + onErrOpt) + + // Upload Step + stepExecutors[model.UploadNotification] = step.NewUploadStepExecutor(workerConfig.Name, + client.GetBaseDomain(), + step.WithOnCompleteOpt(func(jobContext *job.Context) { + jobExecutor.publishTaskEvent(jobContext, model.JobNotification, model.CompletedNotificationStatus, "") + jobExecutor.ConsoleTrackStep(jobContext.JobId.String(), model.JobNotification).Done() + if err := jobExecutor.CleanJob(jobContext); err != nil { + jobExecutor.jobLogger(jobContext).Errorf("failed to clean job workspace: %v", err) + } + }), + onErrOpt) + + return stepExecutors +} + +func (e *JobExecutor) Run(wg *sync.WaitGroup, ctx context.Context) { + serviceCtx, cancelServiceCtx := context.WithCancel(context.Background()) + log.Info("Starting Worker Client...") + e.start(serviceCtx) + log.Info("Started Worker Client...") + wg.Add(1) + go func() { + <-ctx.Done() + cancelServiceCtx() + e.stop() + log.Info("Stopping Worker Client...") + wg.Done() + }() +} + +func (e *JobExecutor) start(ctx context.Context) { + e.resumeJobs() + for _, stepExecutor := range e.stepExecutors { + go e.stepQueueRoutine(ctx, stepExecutor) + } +} + +func (e *JobExecutor) stop() { + for _, stepExecutor := range e.stepExecutors { + stepExecutor.Stop() + } +} + +func (e *JobExecutor) resumeJobs() { + err := filepath.Walk(e.tempPath, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + if filepath.Ext(path) == ".json" { + filepath.Base(path) + jobContext := job.ReadContextFromDiskByPath(path) + atomic.AddUint32(&e.activeJobs, 1) + + e.stepExecutors[jobContext.LastEvent.NotificationType].AddJob(jobContext) + } + + return nil + }) + + if err != nil { + panic(err) + } +} + +func (e *JobExecutor) AcceptJobs() bool { + if e.workerConfig.Paused { + return false + } + if e.workerConfig.HaveSettedPeriodTime() { + now := time.Now() + midnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location()) + elapsedSinceMidnight := now.Sub(midnight) + return elapsedSinceMidnight >= *e.workerConfig.StartAfter && elapsedSinceMidnight <= *e.workerConfig.StopAfter + } + return e.ActiveJobs() < maxActiveJobs +} +func (e *JobExecutor) jobLogger(jobContext *job.Context) console.LeveledLogger { + return e.console.Logger(console.WithMessagePrefix(fmt.Sprintf("[%s]", jobContext.JobId.String()))) +} +func (e *JobExecutor) ExecuteJob(jobId uuid.UUID, lastEvent int) error { + jobContext := job.NewContext(jobId, lastEvent, filepath.Join(e.tempPath, jobId.String())) + + if err := jobContext.Init(); err != nil { + return err + } + + e.publishTaskEvent(jobContext, model.JobNotification, model.StartedNotificationStatus, "") + + atomic.AddUint32(&e.activeJobs, 1) + e.stepExecutors[model.DownloadNotification].AddJob(jobContext) + return nil +} + +func (e *JobExecutor) publishTaskEvent(jobContext *job.Context, notificationType model.NotificationType, status model.NotificationStatus, message string) { + e.mu.Lock() + defer e.mu.Unlock() + l := e.jobLogger(jobContext) + jobContext.UpdateEvent(notificationType, status, message) + event := &model.TaskEventType{ + Event: model.Event{ + EventTime: time.Now(), + WorkerName: e.workerConfig.Name, + }, + JobId: jobContext.JobId, + EventID: jobContext.LastEvent.EventId, + NotificationType: jobContext.LastEvent.NotificationType, + Status: jobContext.LastEvent.Status, + Message: jobContext.LastEvent.Message, + } + if err := e.client.PublishTaskEvent(event); err != nil { + l.Errorf("Error on publishing event %s", err.Error()) + } + if err := jobContext.PersistJobContext(); err != nil { + l.Errorf("Error on publishing event %s", err.Error()) + } + l.Logf("%s have been %s", event.NotificationType, event.Status) + +} + +func (e *JobExecutor) ActiveJobs() uint32 { + return atomic.LoadUint32(&e.activeJobs) +} + +func (e *JobExecutor) stepQueueRoutine(ctx context.Context, stepExecutor *step.Executor) { + for { + select { + case <-ctx.Done(): + return + case jobCtx, ok := <-stepExecutor.GetJobChan(): + if !ok { + return + } + stepExecutor.OnJob(jobCtx) + err := e.executeStepActions(ctx, stepExecutor, jobCtx) + if err != nil { + stepExecutor.OnError(jobCtx, stepExecutor.NotificationType(), err) + continue + } + stepExecutor.OnComplete(jobCtx) + } + } +} + +func (e *JobExecutor) executeStepActions(ctx context.Context, stepExecutor *step.Executor, jobContext *job.Context) error { + wg := sync.WaitGroup{} + + actions := stepExecutor.Actions(jobContext) + actionsChan := make(chan step.Action, len(actions)) + errs := make(chan error, len(actions)) + + // prepare parallel runners + for i := 0; i < stepExecutor.Parallel(); i++ { + wg.Add(1) + go func() { + defer wg.Done() + for action := range actionsChan { + tracker := e.ReportTrackStep(jobContext.JobId, action.Id, stepExecutor.NotificationType()) + if err := action.Execute(ctx, tracker); err != nil { + tracker.Logger().Errorf("Error on executing step %s: %s", stepExecutor.NotificationType(), err.Error()) + tracker.Error() + errs <- err + continue + } + tracker.Done() + } + }() + } + + e.publishTaskEvent(jobContext, stepExecutor.NotificationType(), model.StartedNotificationStatus, "") + for _, action := range actions { + actionsChan <- action + } + + close(actionsChan) + wg.Wait() + close(errs) + + var errorList []error + for err := range errs { + errorList = append(errorList, err) + } + err := errors.Join(errorList...) + if err != nil { + e.publishTaskEvent(jobContext, stepExecutor.NotificationType(), model.FailedNotificationStatus, err.Error()) + return err + } + + e.publishTaskEvent(jobContext, stepExecutor.NotificationType(), model.CompletedNotificationStatus, "") + return nil +} + +func (e *JobExecutor) CleanJob(jobContext *job.Context) error { + atomic.AddUint32(&e.activeJobs, ^uint32(0)) + return jobContext.Clean() +} + +// ConsoleTrackStep Console tracker to print progress to console +func (e *JobExecutor) ConsoleTrackStep(stepId string, notificationType model.NotificationType) *console.StepTracker { + return e.console.StepTracker(stepId, notificationType, e.console.Logger(console.WithMessagePrefix(fmt.Sprintf("[%s]", stepId)))) +} + +// ReportTrackStep This one is like consoleTrackStep but also reports progress to server +func (e *JobExecutor) ReportTrackStep(jobId uuid.UUID, stepId string, notificationType model.NotificationType) *ReportStepProgressTracker { + consoleTracker := e.ConsoleTrackStep(stepId, notificationType) + return newReportStepProgressTracker(jobId, stepId, notificationType, e.client, consoleTracker) +} diff --git a/worker/worker/progress.go b/worker/worker/progress.go new file mode 100644 index 0000000..e5825c2 --- /dev/null +++ b/worker/worker/progress.go @@ -0,0 +1,77 @@ +package worker + +import ( + "github.com/google/uuid" + "time" + "transcoder/model" + "transcoder/worker/console" + "transcoder/worker/serverclient" +) + +type ReportStepProgressTracker struct { + notificationType model.NotificationType + consoleStepTracker *console.StepTracker + serverClient *serverclient.ServerClient + logger console.LeveledLogger + jobId uuid.UUID + stepId string + lastUpdate time.Time +} + +func newReportStepProgressTracker(jobId uuid.UUID, stepId string, notificationType model.NotificationType, serverClient *serverclient.ServerClient, consoleStepTracker *console.StepTracker) *ReportStepProgressTracker { + return &ReportStepProgressTracker{ + jobId: jobId, + stepId: stepId, + serverClient: serverClient, + notificationType: notificationType, + consoleStepTracker: consoleStepTracker, + logger: consoleStepTracker.Logger(), + } +} +func (e *ReportStepProgressTracker) Logger() console.LeveledLogger { + return e.logger +} + +func (e *ReportStepProgressTracker) SetTotal(total int64) { + e.consoleStepTracker.SetTotal(total) +} + +func (e *ReportStepProgressTracker) UpdateValue(value int64) { + e.consoleStepTracker.UpdateValue(value) + e.reportTrackProgress(model.ProgressingTaskProgressTypeStatus) +} + +func (e *ReportStepProgressTracker) Increment(increment int) { + e.consoleStepTracker.Increment(increment) + e.reportTrackProgress(model.ProgressingTaskProgressTypeStatus) +} + +func (e *ReportStepProgressTracker) reportTrackProgress(status model.TaskProgressStatus) { + if time.Since(e.lastUpdate) > 5*time.Second || status != model.ProgressingTaskProgressTypeStatus { + err := e.serverClient.PublishTaskProgressEvent(&model.TaskProgressType{ + Event: model.Event{ + EventTime: time.Now(), + }, + JobId: e.jobId, + ProgressID: e.stepId, + Percent: e.consoleStepTracker.PercentDone(), + ETA: e.consoleStepTracker.ETA(), + NotificationType: e.notificationType, + Status: status, + }) + if err != nil { + e.logger.Errorf("Error on publishing track progress %s", err.Error()) + } + e.lastUpdate = time.Now() + } +} + +func (e *ReportStepProgressTracker) Error() { + e.consoleStepTracker.Error() + e.reportTrackProgress(model.FailureTaskProgressTypeStatus) +} + +func (e *ReportStepProgressTracker) Done() { + e.consoleStepTracker.Done() + e.reportTrackProgress(model.DoneTaskProgressTypeStatus) +}