Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions helper/helper.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package helper

import (
"github.com/avast/retry-go"
log "github.com/sirupsen/logrus"
"io"
"math/rand"
"net/http"
"path/filepath"
"time"
)

var (
ValidVideoExtensions = []string{"mp4", "mpg", "m4a", "m4v", "f4v", "f4a", "m4b", "m4r", "f4b", "mov ", "ogg", "oga", "ogv", "ogx ", "wmv", "wma", "asf ", "webm", "avi", "flv", "vob ", "mkv"}
STUNServers = []string{"https://api.ipify.org?format=text", "https://ifconfig.me", "https://ident.me/", "https://myexternalip.com/raw"}
ffmpegPath = "ffmpeg"
mkvExtractPath = "mkvextract"
)
Expand All @@ -32,27 +26,6 @@ func CheckPath(path string) {
}
}

func GetPublicIP() (publicIP string, err error) {
err = retry.Do(func() error {
randomIndex := rand.Intn(len(STUNServers))
resp, err := http.Get(STUNServers[randomIndex])
if err != nil {
return err
}
defer resp.Body.Close()
publicIPBytes, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
publicIP = string(publicIPBytes)
return nil
}, retry.Delay(time.Millisecond*100), retry.Attempts(360), retry.LastErrorOnly(true))
if err != nil {
return "", err
}
return publicIP, nil
}

func GetFFmpegPath() string {
return ffmpegPath
}
Expand Down
6 changes: 3 additions & 3 deletions model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ type TaskPGS struct {
}

type EnvelopEvent struct {
EventType EventType `json:"eventType"`
EventData json.RawMessage `json:"eventData"`
EventType EventType `json:"eventType"`
EventData json.RawMessage `json:"eventData"`
RemoteAddr string
}

type Event struct {
Expand All @@ -100,7 +101,6 @@ type TaskEventType struct {

type PingEventType struct {
Event
IP string `json:"ip"`
}

type TaskProgressStatus string
Expand Down
6 changes: 3 additions & 3 deletions server/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
type Repository interface {
getConnection() (SQLDBOperations, error)
Initialize(ctx context.Context) error
PingServerUpdate(ctx context.Context, pingEventType model.PingEventType) error
PingServerUpdate(ctx context.Context, remoteAddr string, pingEventType model.PingEventType) error
GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEventType, error)
GetJobs(ctx context.Context) (*[]model.Job, error)
GetJobsByStatus(ctx context.Context, notificationType model.NotificationType, status model.NotificationStatus) (jobs []*model.Job, returnError error)
Expand Down Expand Up @@ -497,12 +497,12 @@ func (s *SQLRepository) GetJobByPath(ctx context.Context, path string) (video *m
return s.getJobByPath(ctx, conn, path)
}

func (s *SQLRepository) PingServerUpdate(ctx context.Context, pingEventType model.PingEventType) (returnError error) {
func (s *SQLRepository) PingServerUpdate(ctx context.Context, remoteAddr string, pingEventType model.PingEventType) (returnError error) {
conn, err := s.getConnection()
if err != nil {
return err
}
_, err = conn.ExecContext(ctx, "INSERT INTO workers (name, ip,last_seen ) VALUES ($1,$2,$3) ON CONFLICT (name) DO UPDATE SET ip = $2, last_seen=$3;", pingEventType.WorkerName, pingEventType.IP, time.Now())
_, err = conn.ExecContext(ctx, "INSERT INTO workers (name, ip,last_seen ) VALUES ($1,$2,$3) ON CONFLICT (name) DO UPDATE SET ip = $2, last_seen=$3;", pingEventType.WorkerName, remoteAddr, time.Now())
return err
}

Expand Down
3 changes: 1 addition & 2 deletions server/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (r *RuntimeScheduler) processEvent(ctx context.Context, event *model.Envelo
if err = json.Unmarshal(event.EventData, &pingEvent); err != nil {
return err
}
return r.repo.PingServerUpdate(ctx, pingEvent)
return r.repo.PingServerUpdate(ctx, event.RemoteAddr, pingEvent)
case model.NotificationEvent:
taskEvent := model.TaskEventType{}
if err = json.Unmarshal(event.EventData, &taskEvent); err != nil {
Expand Down Expand Up @@ -156,7 +156,6 @@ func (r *RuntimeScheduler) processEvent(ctx context.Context, event *model.Envelo
default:
return fmt.Errorf("unknown event type %s", event.EventType)
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion server/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ func (s *Server) requestJob(writer http.ResponseWriter, request *http.Request) {
}

func (s *Server) handleWorkerEvent(writer http.ResponseWriter, request *http.Request) {
envelopEvent := &model.EnvelopEvent{}
envelopEvent := &model.EnvelopEvent{
RemoteAddr: strings.Split(request.RemoteAddr, ":")[0],
}
err := json.NewDecoder(request.Body).Decode(envelopEvent)
if webError(writer, err, 500) {
return
Expand Down
6 changes: 0 additions & 6 deletions worker/serverclient/server_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"io"
"net/http"
"time"
"transcoder/helper"
"transcoder/model"
"transcoder/server/web"
)
Expand Down Expand Up @@ -126,16 +125,11 @@ func (s *ServerClient) request(method string, uri string, body io.Reader) (*http
}

func (s *ServerClient) PublishPingEvent() error {
publicIp, err := helper.GetPublicIP()
if err != nil {
return err
}
pingEvent := model.PingEventType{
Event: model.Event{
EventTime: time.Now(),
WorkerName: s.workerName,
},
IP: publicIp,
}
event, err := envelopEvent(model.PingEvent, pingEvent)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion worker/worker/progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (e *ReportStepProgressTracker) Increment(increment int) {
}

func (e *ReportStepProgressTracker) reportTrackProgress(status model.TaskProgressStatus) {
if time.Since(e.lastUpdate) > 5*time.Second || status != model.ProgressingTaskProgressTypeStatus {
if time.Since(e.lastUpdate) > 15*time.Second || status != model.ProgressingTaskProgressTypeStatus {
err := e.serverClient.PublishTaskProgressEvent(&model.TaskProgressType{
Event: model.Event{
EventTime: time.Now(),
Expand Down
Loading