diff --git a/cmd/app/exit/main.go b/cmd/app/exit/main.go index f388ecb..de40098 100644 --- a/cmd/app/exit/main.go +++ b/cmd/app/exit/main.go @@ -24,7 +24,7 @@ import ( func showHelp(err error) { if err != nil { - log.Println(err) + log.Printf("`exit` program failed: %s", err.Error()) } log.Fatalf("usage: %s [code]\n code: exit code, default 0", os.Args[0]) } diff --git a/cmd/app/sleep/main.go b/cmd/app/sleep/main.go index 7a0e1be..a0ac147 100644 --- a/cmd/app/sleep/main.go +++ b/cmd/app/sleep/main.go @@ -54,7 +54,7 @@ func (s *sleep) Setup(isInitialize bool, record []byte) error { s.PassedSec = 0 } else { - fmt.Printf("continue to sleep for %d sec", s.PassedSec) + fmt.Printf("continue to sleep for %d sec\n", s.PassedSec) err := json.Unmarshal(record, s) if err != nil { return err @@ -108,7 +108,7 @@ func (s *sleep) start() error { func showHelp(err error) { if err != nil { - log.Println(err) + log.Printf("`sleep` program failed: %s", err.Error()) } log.Fatalf("usage: %s [duration]\n duration: duration to sleep[sec], immediate wake up when 0, never wake up when negative value", os.Args[0]) } @@ -136,7 +136,7 @@ func main() { err = oinari.Run(sleep) if err != nil { - log.Fatal(err) + log.Fatalf("`oinari.Run` failed on sleep program: %s", err.Error()) } os.Exit(0) diff --git a/cmd/node/main.go b/cmd/node/main.go index a98efa8..d6d01c4 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -128,19 +128,17 @@ func (na *nodeAgent) OnConnect(nodeName string, nodeType api.NodeType) error { // KVS accountKvs := kvs.NewAccountKvs(na.col) podKvs := kvs.NewPodKvs(na.col) + recordKVS := kvs.NewRecordKvs(na.col) - // drivers + // api driver manager coreDriverManager := core.NewCoreDriverManager(na.cl) // controllers accountCtrl := controller.NewAccountController(account, localNid, accountKvs) - containerCtrl := controller.NewContainerController(localNid, cri, podKvs, coreDriverManager) + containerCtrl := controller.NewContainerController(localNid, cri, podKvs, recordKVS, coreDriverManager) nodeCtrl := controller.NewNodeController(ctx, na.col, messaging, account, nodeName, nodeType) podCtrl := controller.NewPodController(podKvs, messaging, localNid) - // api - apiManager := core.NewCoreDriverManager(na.cl) - // manager localDs := node.NewLocalDatastore(na.col) manager := node.NewManager(localDs, accountCtrl, containerCtrl, nodeCtrl, podCtrl) @@ -154,7 +152,7 @@ func (na *nodeAgent) OnConnect(nodeName string, nodeType api.NodeType) error { // handlers mh.InitMessagingHandler(na.col, containerCtrl, nodeCtrl) fh.InitResourceHandler(na.nodeMpx, accountCtrl, containerCtrl, nodeCtrl, podCtrl) - ch.InitHandler(na.apiMpx, apiManager) + ch.InitHandler(na.apiMpx, coreDriverManager, cri, podKvs, recordKVS) return nil } diff --git a/lib/crosslink/multi_plexer.go b/lib/crosslink/multi_plexer.go index 18540c8..d54ff9c 100644 --- a/lib/crosslink/multi_plexer.go +++ b/lib/crosslink/multi_plexer.go @@ -28,7 +28,7 @@ func (m *mpxImpl) Serve(dataRaw []byte, tags map[string]string, writer ResponseW var ok bool if path, ok = tags[TAG_PATH]; !ok { - log.Fatalln("`path` tag should be set") + log.Fatalln("`path` tag should be set in crosslink multi plexer") } if leaf, ok = tags[TAG_LEAF]; !ok { diff --git a/lib/oinari/run.go b/lib/oinari/run.go index 62ba6d2..b114f03 100644 --- a/lib/oinari/run.go +++ b/lib/oinari/run.go @@ -109,7 +109,7 @@ func (o *oinari) ready() error { o.cl.Call(NodeCrosslinkPath+"/ready", app.ReadyRequest{}, nil, func(b []byte, err error) { // `ready` request only tell the status to the manager of this node, do nothing if err != nil { - log.Fatal("failed to ready core api :%w", err) + log.Fatalf("failed to ready core api: %s", err.Error()) } }) return nil diff --git a/lib/oinari/writer.go b/lib/oinari/writer.go index 98bdd2b..27921dc 100644 --- a/lib/oinari/writer.go +++ b/lib/oinari/writer.go @@ -45,7 +45,7 @@ func (w *writer) Write(p []byte) (n int, err error) { var res app.OutputResponse err = json.Unmarshal(b, &res) if err != nil { - log.Fatal(err) + log.Fatalf("unmarshal response of output failed on oinari api: %s", err.Error()) } resCh <- writeResponse{ diff --git a/node/apis/core/core.go b/node/apis/core/core.go index 4cf3252..767964e 100644 --- a/node/apis/core/core.go +++ b/node/apis/core/core.go @@ -17,12 +17,14 @@ package core import ( "log" + "sync" "github.com/llamerada-jp/oinari/lib/crosslink" ) type Manager struct { - cl crosslink.Crosslink + mtx sync.Mutex + cl crosslink.Crosslink // key: containerID drivers map[string]CoreDriver } @@ -35,6 +37,9 @@ func NewCoreDriverManager(cl crosslink.Crosslink) *Manager { } func (m *Manager) NewCoreDriver(containerID string, runtime []string) CoreDriver { + m.mtx.Lock() + defer m.mtx.Unlock() + if _, ok := m.drivers[containerID]; ok { log.Fatal("driver already exists") } @@ -58,9 +63,15 @@ L: } func (m *Manager) DestroyDriver(containerID string) { + m.mtx.Lock() + defer m.mtx.Unlock() + delete(m.drivers, containerID) } func (m *Manager) GetDriver(containerID string) CoreDriver { + m.mtx.Lock() + defer m.mtx.Unlock() + return m.drivers[containerID] } diff --git a/node/apis/core/driver.go b/node/apis/core/driver.go index 7479086..d029cd1 100644 --- a/node/apis/core/driver.go +++ b/node/apis/core/driver.go @@ -48,7 +48,10 @@ func callHelper[REQ any, RES any](driver *coreAPIDriverImpl, path string, reques ch := make(chan *RES) var funcError error - driver.cl.Call(strings.Join([]string{oinari.ApplicationCrosslinkPath, path}, "/"), request, nil, + driver.cl.Call(strings.Join([]string{oinari.ApplicationCrosslinkPath, path}, "/"), request, + map[string]string{ + "containerID": driver.containerID, + }, func(response []byte, err error) { defer close(ch) @@ -94,5 +97,8 @@ func (driver *coreAPIDriverImpl) Teardown(isFinalize bool) ([]byte, error) { res, err := callHelper[app.TeardownRequest, app.TeardownResponse](driver, "teardown", &app.TeardownRequest{ IsFinalize: isFinalize, }) + if isFinalize { + return nil, err + } return res.Record, err } diff --git a/node/apis/core/handler/handler.go b/node/apis/core/handler/handler.go index 3ad76c0..27ce689 100644 --- a/node/apis/core/handler/handler.go +++ b/node/apis/core/handler/handler.go @@ -32,7 +32,7 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI, mpx.SetHandler("ready", crosslink.NewFuncHandler(func(request *app.ReadyRequest, tags map[string]string, writer crosslink.ResponseWriter) { containerID, driver, err := getDriver(tags, manager) if err != nil { - writer.ReplyError(err.Error()) + writer.ReplyError(fmt.Sprintf("`getDriver` failed on `ready` handler: %s", err.Error())) return } @@ -42,11 +42,11 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI, }, }) if err != nil { - writer.ReplyError(err.Error()) + writer.ReplyError(fmt.Sprintf("`ListContainers` failed on `ready` handler: %s", err.Error())) return } if len(containerList.Containers) == 0 { - writer.ReplyError("container not found") + writer.ReplyError(fmt.Sprintf("container not found on `ready` handler")) return } @@ -56,22 +56,24 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI, }, }) if err != nil { - writer.ReplyError(err.Error()) + writer.ReplyError(fmt.Sprintf("`ListPodSandbox` failed on `ready` handler: %s", err.Error())) } if len(sandboxList.Items) == 0 { - writer.ReplyError("sandbox not found") + writer.ReplyError("sandbox not found on `ready` handler") return } podUUID := sandboxList.Items[0].Metadata.UID pod, err := podKVS.Get(podUUID) if err != nil { - writer.ReplyError(err.Error()) + writer.ReplyError(fmt.Sprintf("`podKVS.Get` failed on `ready` handler: %s", err.Error())) return } isInitialize := false - for _, status := range pod.Status.ContainerStatuses { + var containerName string + for idx, status := range pod.Status.ContainerStatuses { if status.ContainerID == containerID { + containerName = pod.Spec.Containers[idx].Name if status.LastState != nil { isInitialize = true } @@ -80,6 +82,21 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI, } record, err := recordKVS.Get(podUUID) + if err != nil { + writer.ReplyError(fmt.Sprintf("`recordKVS.Get` failed on `ready` handler: %s", err.Error())) + return + } + + go func() { + if record == nil { + err = driver.Setup(isInitialize, nil) + } else { + err = driver.Setup(isInitialize, record.Data.Entries[containerName].Record) + } + if err != nil { + // TODO: try to restart container + } + }() writer.ReplySuccess(&app.ReadyResponse{}) })) @@ -87,14 +104,14 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI, mpx.SetHandler("output", crosslink.NewFuncHandler(func(request *app.OutputRequest, tags map[string]string, writer crosslink.ResponseWriter) { _, _, err := getDriver(tags, manager) if err != nil { - writer.ReplyError(err.Error()) + writer.ReplyError(fmt.Sprintf("`getDriver` failed on `output` handler: %s", err.Error())) return } // TODO: broadcast message to neighbors _, err = fmt.Println(string(request.Payload)) if err != nil { - writer.ReplyError(err.Error()) + writer.ReplyError(fmt.Sprintf("`fmt.Println failed on `output` handler: %s", err.Error())) return } writer.ReplySuccess(&app.OutputResponse{ @@ -111,7 +128,7 @@ func getDriver(tags map[string]string, manager *core.Manager) (string, core.Core driver := manager.GetDriver(containerID) if driver == nil { - return "", nil, fmt.Errorf("driver not found") + return "", nil, fmt.Errorf("driver not found for %s", containerID) } return containerID, driver, nil diff --git a/node/controller/container.go b/node/controller/container.go index e9f8763..2996123 100644 --- a/node/controller/container.go +++ b/node/controller/container.go @@ -51,17 +51,19 @@ type containerControllerImpl struct { localNid string cri cri.CRI podKvs kvs.PodKvs + recordKvs kvs.RecordKvs apiCoreDriverManager *core.Manager // key: Pod UUID reconcileStates map[string]*reconcileState mtx sync.Mutex } -func NewContainerController(localNid string, cri cri.CRI, podKvs kvs.PodKvs, apiCoreDriverManager *core.Manager) ContainerController { +func NewContainerController(localNid string, cri cri.CRI, podKvs kvs.PodKvs, recordKVS kvs.RecordKvs, apiCoreDriverManager *core.Manager) ContainerController { return &containerControllerImpl{ localNid: localNid, cri: cri, podKvs: podKvs, + recordKvs: recordKVS, apiCoreDriverManager: apiCoreDriverManager, reconcileStates: make(map[string]*reconcileState), } @@ -139,7 +141,7 @@ func (impl *containerControllerImpl) Reconcile(ctx context.Context, podUUID stri return impl.updatePodInfo(state, pod) } - if err := impl.letTerminate(state); err != nil { + if err := impl.letTerminate(state, pod); err != nil { return err } @@ -319,7 +321,7 @@ func (impl *containerControllerImpl) letRunning(state *reconcileState, pod *api. return nil } -func (impl *containerControllerImpl) letTerminate(state *reconcileState) error { +func (impl *containerControllerImpl) letTerminate(state *reconcileState, pod *api.Pod) error { // TODO: send exit signal when any container running containers, err := impl.cri.ListContainers(&cri.ListContainersRequest{ Filter: &cri.ContainerFilter{ @@ -330,11 +332,46 @@ func (impl *containerControllerImpl) letTerminate(state *reconcileState) error { return err } + isFinalize := len(pod.Meta.DeletionTimestamp) != 0 + var record *api.Record + if !isFinalize { + var err error + record, err = impl.recordKvs.Get(pod.Meta.Uuid) + if err != nil { + return err + } + if record == nil { + record = &api.Record{ + Meta: &api.ObjectMeta{ + Type: api.ResourceTypeRecord, + Name: pod.Meta.Name, + Owner: pod.Meta.Owner, + CreatorNode: pod.Meta.CreatorNode, + Uuid: pod.Meta.Uuid, + }, + Data: &api.RecordData{ + Entries: make(map[string]api.RecordEntry), + }, + } + } + } + for _, container := range containers.Containers { if container.State == cri.ContainerExited { continue } + raw, err := impl.apiCoreDriverManager.GetDriver(container.ID).Teardown(isFinalize) + if err != nil { + return fmt.Errorf("failed to teardown container: %w", err) + } + if raw != nil { + record.Data.Entries[container.Metadata.Name] = api.RecordEntry{ + Record: raw, + Timestamp: misc.GetTimestamp(), + } + } + _, err = impl.cri.StopContainer(&cri.StopContainerRequest{ ContainerId: container.ID, }) @@ -345,6 +382,18 @@ func (impl *containerControllerImpl) letTerminate(state *reconcileState) error { impl.apiCoreDriverManager.DestroyDriver(container.ID) } + if !isFinalize { + err = impl.recordKvs.Set(record) + if err != nil { + return err + } + } else { + err = impl.recordKvs.Delete(pod.Meta.Uuid) + if err != nil { + log.Printf("failed to delete record: %s", err.Error()) + } + } + // TODO: skip processing when all container exited // TODO: force exit all containers after timeout diff --git a/node/frontend/driver/frontend.go b/node/frontend/driver/frontend.go index 30cecbc..00c1636 100644 --- a/node/frontend/driver/frontend.go +++ b/node/frontend/driver/frontend.go @@ -40,7 +40,7 @@ func (impl *frontendDriverImpl) TellInitComplete() error { impl.cl.Call("frontend/nodeReady", nil, nil, func(_ []byte, err error) { if err != nil { - log.Fatalln(err) + log.Fatalf("frontend/nodeReady has an error: %s", err.Error()) } }) return nil diff --git a/node/manager.go b/node/manager.go index 157eba4..74b3950 100644 --- a/node/manager.go +++ b/node/manager.go @@ -58,7 +58,7 @@ func (mgr *manager) Start(ctx context.Context) error { // first keepalive if err := mgr.keepAlive(); err != nil { - log.Println(err) + log.Printf("keepAlive of node manager failed: %s", err.Error()) } for { @@ -68,12 +68,12 @@ func (mgr *manager) Start(ctx context.Context) error { case <-tickerDealLR.C: if err := mgr.dealLocalResource(); err != nil { - log.Println(err) + log.Println("dealLocalResource of node manager failed: %s", err.Error()) } case <-tickerKeepAlive.C: if err := mgr.keepAlive(); err != nil { - log.Println(err) + log.Println("keepAlive of node manager failed: %s", err.Error()) } }