Skip to content

Commit b890256

Browse files
committed
fix: some fixes after linter corrections
docs: update readme latest changes feat: PGS conversion shows progress in worker cli
1 parent 0f6c688 commit b890256

File tree

9 files changed

+132
-111
lines changed

9 files changed

+132
-111
lines changed

model/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,7 @@ type JobRequest struct {
234234
SourcePath string `json:"sourcePath"`
235235
ForceCompleted bool `json:"forceCompleted"`
236236
ForceFailed bool `json:"forceFailed"`
237+
ForceCanceled bool `json:"forceCanceled"`
237238
ForceAssigned bool `json:"forceAssigned"`
238239
SourceSize int64
239240
TargetPath string

readme.md

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,18 @@ You can have as many workers you want, locally or remote in multiple machines.
2626

2727
Replace it by your own values, remember to create manually the DB scheme and PG user.
2828
```yaml
29-
database:
30-
host: 192.168.1.55
31-
user: db_user
32-
password: db_pass
33-
scheme: encode
34-
29+
server:
30+
database:
31+
host: 192.168.1.55
32+
user: db_user
33+
password: db_pass
34+
scheme: encode
35+
scheduler:
36+
sourcePath: /mnt/media
37+
deleteOnComplete: false # if true, the original file will be deleted after job is completed
38+
minFileSize: 100 # minimum file size to be considered for encoding
3539
web:
3640
token: my_secret_token # replace it by your own secret
37-
38-
scheduler:
39-
sourcePath: /mnt/media
40-
deleteOnComplete: false # if true, the original file will be deleted after job is completed
41-
minFileSize: 100 # minimum file size to be considered for encoding
4241
```
4342
4443
### Run

server/repository/repository.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ func (s *SQLRepository) AddNewTaskEvent(ctx context.Context, event *model.TaskEv
513513
}
514514

515515
func (s *SQLRepository) addNewTaskEvent(ctx context.Context, tx SQLDBOperations, event *model.TaskEvent) error {
516-
rows, err := tx.QueryContext(ctx, "select max(job_event_id) from job_events where job_id=$1", event.Id.String())
516+
rows, err := tx.QueryContext(ctx, "select COALESCE(max(job_event_id),-1) from job_events where job_id=$1", event.Id.String())
517517
if err != nil {
518518
return err
519519
}

server/scheduler/scheduler.go

Lines changed: 56 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ func (r *RuntimeScheduler) scheduleRoutine(ctx context.Context) {
211211
r.pathChecksumMap[checksumPath.path] = checksumPath.checksum
212212
case <-time.After(r.config.ScheduleTime):
213213
if err := r.jobMaintenance(ctx); err != nil {
214-
log.Errorf("Errorf on job maintenance %s", err)
214+
log.Errorf("Error on job maintenance %s", err)
215215
}
216216

217217
}
@@ -280,65 +280,63 @@ type ScheduleJobRequestResult struct {
280280
}
281281

282282
func (r *RuntimeScheduler) scheduleJobRequest(ctx context.Context, jobRequest *model.JobRequest) (job *model.Job, err error) {
283+
l := log.WithFields(log.Fields{
284+
"source_path": jobRequest.SourcePath,
285+
})
283286
err = r.repo.WithTransaction(ctx, func(ctx context.Context, tx repository.Repository) error {
284287
job, err = tx.GetJobByPath(ctx, jobRequest.SourcePath)
285288
if err != nil {
286289
return err
287290
}
288291

289-
l := log.WithFields(log.Fields{
290-
"source_path": jobRequest.SourcePath,
291-
})
292-
293292
var eventsToAdd []*model.TaskEvent
294293
if job == nil {
295-
newUUID, _ := uuid.NewUUID()
296-
job = &model.Job{
297-
SourcePath: jobRequest.SourcePath,
298-
SourceSize: jobRequest.SourceSize,
299-
TargetPath: jobRequest.TargetPath,
300-
Id: newUUID,
301-
}
302-
l.WithField("job_id", job.Id.String()).Info("Creating new job")
303-
err = tx.AddJob(ctx, job)
294+
job, err = r.newJob(ctx, tx, jobRequest)
304295
if err != nil {
305296
return err
306297
}
307-
startEvent := job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus)
308-
eventsToAdd = append(eventsToAdd, startEvent)
298+
eventsToAdd = job.Events
309299
} else {
310300
// If job exist we check if we can retry the job
311-
lastEvent := job.Events.GetLatestPerNotificationType(model.JobNotification)
312-
status := job.Events.GetStatus()
313-
if jobRequest.ForceAssigned && (status == model.AssignedNotificationStatus || status == model.StartedNotificationStatus) {
314-
cancelEvent := job.AddEvent(model.NotificationEvent, model.JobNotification, model.CanceledNotificationStatus)
315-
eventsToAdd = append(eventsToAdd, cancelEvent)
316-
317-
}
318-
if (jobRequest.ForceCompleted && status == model.CompletedNotificationStatus) ||
319-
(jobRequest.ForceFailed && (status == model.FailedNotificationStatus || status == model.CanceledNotificationStatus)) ||
320-
(jobRequest.ForceAssigned && (status == model.StartedNotificationStatus || status == model.AssignedNotificationStatus)) {
321-
requeueEvent := job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus)
322-
eventsToAdd = append(eventsToAdd, requeueEvent)
323-
} else if !(jobRequest.ForceAssigned && status == model.QueuedNotificationStatus) {
324-
return fmt.Errorf("%s (%s) job is in %s state by %s, can not be rescheduled", job.Id.String(), jobRequest.SourcePath, lastEvent.Status, lastEvent.WorkerName)
301+
eventsToAdd, err = r.updateTerminatedJobByRequest(job, jobRequest)
302+
if err != nil {
303+
return err
325304
}
326305
}
327-
if len(eventsToAdd) > 0 {
328-
for _, taskEvent := range eventsToAdd {
329-
err = tx.AddNewTaskEvent(ctx, taskEvent)
330-
if err != nil {
331-
return err
332-
}
333-
l.WithField("job_id", job.Id.String()).Infof("job is now %s", taskEvent.Status)
306+
307+
for _, taskEvent := range eventsToAdd {
308+
err = tx.AddNewTaskEvent(ctx, taskEvent)
309+
if err != nil {
310+
return err
334311
}
312+
l.WithField("job_id", job.Id.String()).Infof("job is now %s", taskEvent.Status)
335313
}
336314

337315
return nil
338316
})
339317
return job, err
340318
}
341319

320+
func (r *RuntimeScheduler) newJob(ctx context.Context, tx repository.Repository, jobRequest *model.JobRequest) (*model.Job, error) {
321+
l := log.WithFields(log.Fields{
322+
"source_path": jobRequest.SourcePath,
323+
})
324+
newUUID, _ := uuid.NewUUID()
325+
job := &model.Job{
326+
SourcePath: jobRequest.SourcePath,
327+
SourceSize: jobRequest.SourceSize,
328+
TargetPath: jobRequest.TargetPath,
329+
Id: newUUID,
330+
}
331+
l.WithField("job_id", job.Id.String()).Info("Creating new job")
332+
err := tx.AddJob(ctx, job)
333+
if err != nil {
334+
return nil, err
335+
}
336+
job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus)
337+
return job, nil
338+
}
339+
342340
func (r *RuntimeScheduler) ScheduleJobRequests(ctx context.Context, jobRequest *model.JobRequest) (result *ScheduleJobRequestResult, returnError error) {
343341
result = &ScheduleJobRequestResult{}
344342
searchJobRequestChan := make(chan *JobRequestResult, 10)
@@ -546,6 +544,27 @@ func (r *RuntimeScheduler) assignedJobMaintenance(ctx context.Context) error {
546544
return nil
547545
}
548546

547+
func (r *RuntimeScheduler) updateTerminatedJobByRequest(job *model.Job, jobRequest *model.JobRequest) ([]*model.TaskEvent, error) {
548+
var eventsToAdd []*model.TaskEvent
549+
lastEvent := job.Events.GetLatestPerNotificationType(model.JobNotification)
550+
status := lastEvent.Status
551+
552+
switch {
553+
case jobRequest.ForceAssigned && (status == model.AssignedNotificationStatus || status == model.StartedNotificationStatus):
554+
eventsToAdd = append(eventsToAdd, job.AddEvent(model.NotificationEvent, model.JobNotification, model.CanceledNotificationStatus))
555+
eventsToAdd = append(eventsToAdd, job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus))
556+
557+
case jobRequest.ForceCompleted && status == model.CompletedNotificationStatus,
558+
jobRequest.ForceFailed && status == model.FailedNotificationStatus,
559+
jobRequest.ForceCanceled && status == model.CanceledNotificationStatus:
560+
requeueEvent := job.AddEvent(model.NotificationEvent, model.JobNotification, model.QueuedNotificationStatus)
561+
eventsToAdd = append(eventsToAdd, requeueEvent)
562+
default:
563+
return nil, fmt.Errorf("%s (%s) job is in %s state by %s, can not be rescheduled", job.Id.String(), jobRequest.SourcePath, lastEvent.Status, lastEvent.WorkerName)
564+
}
565+
return eventsToAdd, nil
566+
}
567+
549568
func simpleRegex(pattern string, string string) bool {
550569
m, err := regexp.MatchString(strings.ToLower(pattern), strings.ToLower(string))
551570
if err != nil {

server/web/web.go

Lines changed: 24 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ import (
1515
"strings"
1616
"sync"
1717
"sync/atomic"
18-
"syscall"
1918
"time"
2019
"transcoder/model"
2120
"transcoder/server/scheduler"
@@ -52,7 +51,7 @@ func (s *Server) requestJob(writer http.ResponseWriter, request *http.Request) {
5251
writer.WriteHeader(200)
5352
_, err = writer.Write(b)
5453
if err != nil {
55-
log.Errorf("Errorf writing response %v", err)
54+
log.Errorf("Error writing response %v", err)
5655
}
5756
}
5857

@@ -101,7 +100,7 @@ func (s *Server) addJobs(writer http.ResponseWriter, request *http.Request) {
101100
writer.WriteHeader(200)
102101
_, err = writer.Write(b)
103102
if err != nil {
104-
log.Errorf("Errorf writing response %v", err)
103+
log.Errorf("Error writing response %v", err)
105104
}
106105
}
107106

@@ -164,24 +163,18 @@ loop:
164163
case <-request.Context().Done():
165164
return
166165
default:
167-
readedBytes, err := reader.Read(b)
168-
if err != nil {
169-
if err == io.EOF {
170-
break loop
171-
}
172-
log.Errorf("Errorreading from reader: %v", err)
173-
http.Error(writer, "Error during upload", http.StatusInternalServerError)
174-
return
175-
}
176-
166+
readedBytes, readErr := reader.Read(b)
177167
readed += uint64(readedBytes)
178-
_, err = uploadStream.Write(b[0:readedBytes])
179-
if err != nil {
180-
if err == io.EOF {
181-
break loop
182-
}
183-
log.Errorf("Error writing: %v", err)
184-
http.Error(writer, "Error during upload", http.StatusInternalServerError)
168+
_, writeErr := uploadStream.Write(b[0:readedBytes])
169+
if writeErr != nil {
170+
break loop
171+
}
172+
if errors.Is(readErr, io.EOF) {
173+
break loop
174+
}
175+
if readErr != nil {
176+
log.Errorf("Error reading from download stream: %v", readErr)
177+
return
185178
}
186179
}
187180
}
@@ -238,28 +231,16 @@ loop:
238231
case <-request.Context().Done():
239232
return
240233
default:
241-
readedBytes, err := downloadStream.Read(b)
242-
if err != nil {
243-
if err == io.EOF {
244-
break loop
245-
}
246-
log.Errorf("Error reading from download stream: %v", err)
247-
// Send an appropriate HTTP error code or terminate gracefully.
248-
http.Error(writer, "Error during download", http.StatusInternalServerError)
249-
return
234+
readedBytes, readErr := downloadStream.Read(b)
235+
_, writeErr := writer.Write(b[0:readedBytes])
236+
if writeErr != nil {
237+
break loop
250238
}
251-
_, err = writer.Write(b[0:readedBytes])
252-
if err != nil {
253-
if err == io.EOF {
254-
break loop
255-
}
256-
log.Errorf("Error writing to response: %v", err)
257-
// Gracefully terminate if the client cannot receive data.
258-
if errors.Is(err, syscall.EPIPE) { // Broken pipe error
259-
log.Warn("Client disconnected during download")
260-
} else {
261-
http.Error(writer, "Error during download", http.StatusInternalServerError)
262-
}
239+
if errors.Is(readErr, io.EOF) {
240+
break loop
241+
}
242+
if readErr != nil {
243+
log.Errorf("Error reading from download stream: %v", readErr)
263244
return
264245
}
265246

@@ -283,7 +264,7 @@ func (s *Server) checksum(writer http.ResponseWriter, request *http.Request) {
283264
writer.WriteHeader(200)
284265
_, err = writer.Write([]byte(checksum))
285266
if err != nil {
286-
log.Errorf("Errorf writing response %v", err)
267+
log.Errorf("Error writing response %v", err)
287268
}
288269
}
289270

@@ -436,7 +417,7 @@ func writeUnauthorized(w http.ResponseWriter) {
436417
w.WriteHeader(401)
437418
_, err := w.Write([]byte("Unauthorised.\n"))
438419
if err != nil {
439-
log.Errorf("Errorf writing response %v", err)
420+
log.Errorf("Error writing response %v", err)
440421
}
441422
}
442423

worker/serverclient/server_client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (s *ServerClient) PublishEvent(event model.TaskEvent) error {
5050
}
5151
req, err := s.request("POST", "/api/v1/event", bytes.NewBuffer(b))
5252
if err != nil {
53-
fmt.Printf("Errorf creating request: %v\n", err)
53+
fmt.Printf("Error creating request: %v\n", err)
5454
return err
5555
}
5656

@@ -118,7 +118,7 @@ func (s *ServerClient) GetURL(uri string) string {
118118
func (s *ServerClient) request(method string, uri string, body io.Reader) (*http.Request, error) {
119119
req, err := http.NewRequest(method, s.GetURL(uri), body)
120120
if err != nil {
121-
fmt.Printf("Errorf creating request: %v\n", err)
121+
fmt.Printf("Error creating request: %v\n", err)
122122
return nil, err
123123
}
124124

worker/task/coordinator.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (q *ServerCoordinator) heartbeatRoutine(ctx context.Context) {
5959
return
6060
case <-time.After(time.Second * 30):
6161
if err := q.serverClient.PublishPing(); err != nil {
62-
q.printer.Errorf("Errorf Publishing Ping Event: %v", err)
62+
q.printer.Errorf("Error Publishing Ping Event: %v", err)
6363
}
6464
}
6565
}
@@ -74,7 +74,7 @@ func (q *ServerCoordinator) requestTaskRoutine(ctx context.Context) {
7474
if q.worker.AcceptJobs() {
7575
release, requireUpdate, err := q.updater.CheckForUpdate()
7676
if err != nil {
77-
q.printer.Errorf("Errorf Checking For Update: %v", err)
77+
q.printer.Errorf("Error Checking For Update: %v", err)
7878
continue
7979
}
8080
if requireUpdate {
@@ -85,13 +85,13 @@ func (q *ServerCoordinator) requestTaskRoutine(ctx context.Context) {
8585
taskJob, err := q.serverClient.RequestJob(q.worker.GetName())
8686
if err != nil {
8787
if !errors.Is(err, serverclient.NoJobAvailable) {
88-
q.printer.Errorf("Errorf Requesting Job: %v", err)
88+
q.printer.Errorf("Error Requesting Job: %v", err)
8989
}
9090
continue
9191
}
9292

9393
if err := q.worker.Execute(taskJob); err != nil {
94-
q.printer.Errorf("Errorf Preparing Job Execution: %v", err)
94+
q.printer.Errorf("Error Preparing Job Execution: %v", err)
9595
}
9696
}
9797
}

0 commit comments

Comments
 (0)