Skip to content

Commit 7ac096f

Browse files
segatoriaymerich
andauthored
refactor: server now calculates public IP instead of worker sending it (#29)
Co-authored-by: Isaac Aymerich <isaac.aymerich@roche.com>
1 parent 844efa1 commit 7ac096f

File tree

7 files changed

+11
-43
lines changed

7 files changed

+11
-43
lines changed

helper/helper.go

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,12 @@
11
package helper
22

33
import (
4-
"github.com/avast/retry-go"
54
log "github.com/sirupsen/logrus"
6-
"io"
7-
"math/rand"
8-
"net/http"
95
"path/filepath"
10-
"time"
116
)
127

138
var (
149
ValidVideoExtensions = []string{"mp4", "mpg", "m4a", "m4v", "f4v", "f4a", "m4b", "m4r", "f4b", "mov ", "ogg", "oga", "ogv", "ogx ", "wmv", "wma", "asf ", "webm", "avi", "flv", "vob ", "mkv"}
15-
STUNServers = []string{"https://api.ipify.org?format=text", "https://ifconfig.me", "https://ident.me/", "https://myexternalip.com/raw"}
1610
ffmpegPath = "ffmpeg"
1711
mkvExtractPath = "mkvextract"
1812
)
@@ -32,27 +26,6 @@ func CheckPath(path string) {
3226
}
3327
}
3428

35-
func GetPublicIP() (publicIP string, err error) {
36-
err = retry.Do(func() error {
37-
randomIndex := rand.Intn(len(STUNServers))
38-
resp, err := http.Get(STUNServers[randomIndex])
39-
if err != nil {
40-
return err
41-
}
42-
defer resp.Body.Close()
43-
publicIPBytes, err := io.ReadAll(resp.Body)
44-
if err != nil {
45-
return err
46-
}
47-
publicIP = string(publicIPBytes)
48-
return nil
49-
}, retry.Delay(time.Millisecond*100), retry.Attempts(360), retry.LastErrorOnly(true))
50-
if err != nil {
51-
return "", err
52-
}
53-
return publicIP, nil
54-
}
55-
5629
func GetFFmpegPath() string {
5730
return ffmpegPath
5831
}

model/model.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,9 @@ type TaskPGS struct {
8181
}
8282

8383
type EnvelopEvent struct {
84-
EventType EventType `json:"eventType"`
85-
EventData json.RawMessage `json:"eventData"`
84+
EventType EventType `json:"eventType"`
85+
EventData json.RawMessage `json:"eventData"`
86+
RemoteAddr string
8687
}
8788

8889
type Event struct {
@@ -100,7 +101,6 @@ type TaskEventType struct {
100101

101102
type PingEventType struct {
102103
Event
103-
IP string `json:"ip"`
104104
}
105105

106106
type TaskProgressStatus string

server/repository/repository.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ var (
1818
type Repository interface {
1919
getConnection() (SQLDBOperations, error)
2020
Initialize(ctx context.Context) error
21-
PingServerUpdate(ctx context.Context, pingEventType model.PingEventType) error
21+
PingServerUpdate(ctx context.Context, remoteAddr string, pingEventType model.PingEventType) error
2222
GetTimeoutJobs(ctx context.Context, timeout time.Duration) ([]*model.TaskEventType, error)
2323
GetJobs(ctx context.Context) (*[]model.Job, error)
2424
GetJobsByStatus(ctx context.Context, notificationType model.NotificationType, status model.NotificationStatus) (jobs []*model.Job, returnError error)
@@ -497,12 +497,12 @@ func (s *SQLRepository) GetJobByPath(ctx context.Context, path string) (video *m
497497
return s.getJobByPath(ctx, conn, path)
498498
}
499499

500-
func (s *SQLRepository) PingServerUpdate(ctx context.Context, pingEventType model.PingEventType) (returnError error) {
500+
func (s *SQLRepository) PingServerUpdate(ctx context.Context, remoteAddr string, pingEventType model.PingEventType) (returnError error) {
501501
conn, err := s.getConnection()
502502
if err != nil {
503503
return err
504504
}
505-
_, err = conn.ExecContext(ctx, "INSERT INTO workers (name, ip,last_seen ) VALUES ($1,$2,$3) ON CONFLICT (name) DO UPDATE SET ip = $2, last_seen=$3;", pingEventType.WorkerName, pingEventType.IP, time.Now())
505+
_, err = conn.ExecContext(ctx, "INSERT INTO workers (name, ip,last_seen ) VALUES ($1,$2,$3) ON CONFLICT (name) DO UPDATE SET ip = $2, last_seen=$3;", pingEventType.WorkerName, remoteAddr, time.Now())
506506
return err
507507
}
508508

server/scheduler/scheduler.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func (r *RuntimeScheduler) processEvent(ctx context.Context, event *model.Envelo
123123
if err = json.Unmarshal(event.EventData, &pingEvent); err != nil {
124124
return err
125125
}
126-
return r.repo.PingServerUpdate(ctx, pingEvent)
126+
return r.repo.PingServerUpdate(ctx, event.RemoteAddr, pingEvent)
127127
case model.NotificationEvent:
128128
taskEvent := model.TaskEventType{}
129129
if err = json.Unmarshal(event.EventData, &taskEvent); err != nil {
@@ -156,7 +156,6 @@ func (r *RuntimeScheduler) processEvent(ctx context.Context, event *model.Envelo
156156
default:
157157
return fmt.Errorf("unknown event type %s", event.EventType)
158158
}
159-
160159
return nil
161160
}
162161

server/web/web.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ func (s *Server) requestJob(writer http.ResponseWriter, request *http.Request) {
5656
}
5757

5858
func (s *Server) handleWorkerEvent(writer http.ResponseWriter, request *http.Request) {
59-
envelopEvent := &model.EnvelopEvent{}
59+
envelopEvent := &model.EnvelopEvent{
60+
RemoteAddr: strings.Split(request.RemoteAddr, ":")[0],
61+
}
6062
err := json.NewDecoder(request.Body).Decode(envelopEvent)
6163
if webError(writer, err, 500) {
6264
return

worker/serverclient/server_client.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"io"
1010
"net/http"
1111
"time"
12-
"transcoder/helper"
1312
"transcoder/model"
1413
"transcoder/server/web"
1514
)
@@ -126,16 +125,11 @@ func (s *ServerClient) request(method string, uri string, body io.Reader) (*http
126125
}
127126

128127
func (s *ServerClient) PublishPingEvent() error {
129-
publicIp, err := helper.GetPublicIP()
130-
if err != nil {
131-
return err
132-
}
133128
pingEvent := model.PingEventType{
134129
Event: model.Event{
135130
EventTime: time.Now(),
136131
WorkerName: s.workerName,
137132
},
138-
IP: publicIp,
139133
}
140134
event, err := envelopEvent(model.PingEvent, pingEvent)
141135
if err != nil {

worker/worker/progress.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (e *ReportStepProgressTracker) Increment(increment int) {
4747
}
4848

4949
func (e *ReportStepProgressTracker) reportTrackProgress(status model.TaskProgressStatus) {
50-
if time.Since(e.lastUpdate) > 5*time.Second || status != model.ProgressingTaskProgressTypeStatus {
50+
if time.Since(e.lastUpdate) > 15*time.Second || status != model.ProgressingTaskProgressTypeStatus {
5151
err := e.serverClient.PublishTaskProgressEvent(&model.TaskProgressType{
5252
Event: model.Event{
5353
EventTime: time.Now(),

0 commit comments

Comments
 (0)