Skip to content

Commit

Permalink
check panic
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang555 authored and zhenghaoz committed Mar 24, 2021
1 parent 5cc8794 commit 8f47d37
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 0 deletions.
5 changes: 5 additions & 0 deletions base/parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func Parallel(nJobs int, nWorkers int, worker func(workerId, jobId int) error) e
c := make(chan int, chanSize)
// producer
go func() {
defer CheckPanic()

// send jobs
for i := 0; i < nJobs; i++ {
c <- i
Expand All @@ -51,6 +53,7 @@ func Parallel(nJobs int, nWorkers int, worker func(workerId, jobId int) error) e
for j := 0; j < nWorkers; j++ {
// start workers
go func(workerId int) {
defer CheckPanic()
defer wg.Done()
for {
// read job
Expand Down Expand Up @@ -91,6 +94,7 @@ func BatchParallel(nJobs int, nWorkers int, batchSize int, worker func(workerId,
c := make(chan batchJob, chanSize)
// producer
go func() {
defer CheckPanic()
// send jobs
for i := 0; i < nJobs; i += batchSize {
c <- batchJob{beginId: i, endId: Min(i+batchSize, nJobs)}
Expand All @@ -107,6 +111,7 @@ func BatchParallel(nJobs int, nWorkers int, batchSize int, worker func(workerId,
for j := 0; j < nWorkers; j++ {
// start workers
go func(workerId int) {
defer CheckPanic()
defer wg.Done()
for {
// read job
Expand Down
9 changes: 9 additions & 0 deletions base/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@
package base

import (
"go.uber.org/zap"
"log"
"time"

"github.com/sirupsen/logrus"
)

// Max finds the maximum in a vector of integers. Panic if the slice is empty.
Expand Down Expand Up @@ -87,3 +90,9 @@ func GCD(a ...int) int {
}
return divisor
}

func CheckPanic() {
if r := recover(); r != nil {
logrus.Error("panic recovered ", zap.Any("panic", r))
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ require (
github.com/sirupsen/logrus v1.8.0
github.com/spf13/cobra v0.0.7
github.com/steinfletcher/apitest v1.5.2
github.com/stretchr/objx v0.1.1 // indirect
github.com/stretchr/testify v1.7.0
go.mongodb.org/mongo-driver v1.4.6
go.uber.org/zap v1.10.0
gonum.org/v1/gonum v0.0.0-20190409070159-6e46824336d2
google.golang.org/grpc v1.36.0
google.golang.org/protobuf v1.25.0
Expand Down
4 changes: 4 additions & 0 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func (m *Master) NodeDown(key string, value interface{}) {
}

func (m *Master) Loop() {
defer base.CheckPanic()

// calculate loop period
loopPeriod := base.GCD(
m.cfg.CF.FitPeriod,
Expand Down Expand Up @@ -444,6 +446,8 @@ func (m *Master) CollectSimilar(items []data.Item, dataset *cf.DataSet) error {
// create progress tracker
completed := make(chan []interface{}, 1000)
go func() {
defer base.CheckPanic()

completedCount := 0
ticker := time.NewTicker(time.Second)
for {
Expand Down
2 changes: 2 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (s *Server) Serve() {
}

func (s *Server) Sync() {
defer base.CheckPanic()
for {
ctx := context.Background()

Expand Down Expand Up @@ -144,6 +145,7 @@ func (s *Server) Sync() {
}

func (s *Server) Register() {
defer base.CheckPanic()
for {
if _, err := s.MasterClient.RegisterServer(context.Background(), &protocol.Void{}); err != nil {
log.Fatal("server:", err)
Expand Down
3 changes: 3 additions & 0 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func NewWorker(masterHost string, masterPort int, jobs int) *Worker {
}

func (w *Worker) Register() {
defer base.CheckPanic()
for {
if _, err := w.MasterClient.RegisterWorker(context.Background(), &protocol.Void{}); err != nil {
log.Fatal("worker:", err)
Expand All @@ -63,6 +64,7 @@ func (w *Worker) Register() {
}

func (w *Worker) Sync() {
defer base.CheckPanic()
for {
// pull model version
log.Info("worker: pull model version from master")
Expand Down Expand Up @@ -153,6 +155,7 @@ func (w *Worker) GenerateMatchItems(m cf.MatrixFactorization, users []string) {
// progress tracker
completed := make(chan interface{})
go func() {
defer base.CheckPanic()
completedCount := 0
ticker := time.NewTicker(time.Second)
for {
Expand Down

0 comments on commit 8f47d37

Please sign in to comment.