Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Yuji Ito <llamerada.jp@gmail.com>
  • Loading branch information
llamerada-jp committed Sep 23, 2023
1 parent 1b4b476 commit e671fe9
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 32 deletions.
2 changes: 1 addition & 1 deletion cmd/app/exit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

func showHelp(err error) {
if err != nil {
log.Println(err)
log.Printf("`exit` program failed: %s", err.Error())
}
log.Fatalf("usage: %s [code]\n code: exit code, default 0", os.Args[0])
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/app/sleep/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (s *sleep) Setup(isInitialize bool, record []byte) error {
s.PassedSec = 0

} else {
fmt.Printf("continue to sleep for %d sec", s.PassedSec)
fmt.Printf("continue to sleep for %d sec\n", s.PassedSec)
err := json.Unmarshal(record, s)
if err != nil {
return err
Expand Down Expand Up @@ -108,7 +108,7 @@ func (s *sleep) start() error {

func showHelp(err error) {
if err != nil {
log.Println(err)
log.Printf("`sleep` program failed: %s", err.Error())
}
log.Fatalf("usage: %s [duration]\n duration: duration to sleep[sec], immediate wake up when 0, never wake up when negative value", os.Args[0])
}
Expand Down Expand Up @@ -136,7 +136,7 @@ func main() {

err = oinari.Run(sleep)
if err != nil {
log.Fatal(err)
log.Fatalf("`oinari.Run` failed on sleep program: %s", err.Error())
}

os.Exit(0)
Expand Down
10 changes: 4 additions & 6 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,17 @@ func (na *nodeAgent) OnConnect(nodeName string, nodeType api.NodeType) error {
// KVS
accountKvs := kvs.NewAccountKvs(na.col)
podKvs := kvs.NewPodKvs(na.col)
recordKVS := kvs.NewRecordKvs(na.col)

// drivers
// api driver manager
coreDriverManager := core.NewCoreDriverManager(na.cl)

// controllers
accountCtrl := controller.NewAccountController(account, localNid, accountKvs)
containerCtrl := controller.NewContainerController(localNid, cri, podKvs, coreDriverManager)
containerCtrl := controller.NewContainerController(localNid, cri, podKvs, recordKVS, coreDriverManager)
nodeCtrl := controller.NewNodeController(ctx, na.col, messaging, account, nodeName, nodeType)
podCtrl := controller.NewPodController(podKvs, messaging, localNid)

// api
apiManager := core.NewCoreDriverManager(na.cl)

// manager
localDs := node.NewLocalDatastore(na.col)
manager := node.NewManager(localDs, accountCtrl, containerCtrl, nodeCtrl, podCtrl)
Expand All @@ -154,7 +152,7 @@ func (na *nodeAgent) OnConnect(nodeName string, nodeType api.NodeType) error {
// handlers
mh.InitMessagingHandler(na.col, containerCtrl, nodeCtrl)
fh.InitResourceHandler(na.nodeMpx, accountCtrl, containerCtrl, nodeCtrl, podCtrl)
ch.InitHandler(na.apiMpx, apiManager)
ch.InitHandler(na.apiMpx, coreDriverManager, cri, podKvs, recordKVS)

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion lib/crosslink/multi_plexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (m *mpxImpl) Serve(dataRaw []byte, tags map[string]string, writer ResponseW
var ok bool

if path, ok = tags[TAG_PATH]; !ok {
log.Fatalln("`path` tag should be set")
log.Fatalln("`path` tag should be set in crosslink multi plexer")
}

if leaf, ok = tags[TAG_LEAF]; !ok {
Expand Down
2 changes: 1 addition & 1 deletion lib/oinari/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (o *oinari) ready() error {
o.cl.Call(NodeCrosslinkPath+"/ready", app.ReadyRequest{}, nil, func(b []byte, err error) {
// `ready` request only tell the status to the manager of this node, do nothing
if err != nil {
log.Fatal("failed to ready core api :%w", err)
log.Fatalf("failed to ready core api: %s", err.Error())
}
})
return nil
Expand Down
2 changes: 1 addition & 1 deletion lib/oinari/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (w *writer) Write(p []byte) (n int, err error) {
var res app.OutputResponse
err = json.Unmarshal(b, &res)
if err != nil {
log.Fatal(err)
log.Fatalf("unmarshal response of output failed on oinari api: %s", err.Error())
}

resCh <- writeResponse{
Expand Down
13 changes: 12 additions & 1 deletion node/apis/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ package core

import (
"log"
"sync"

"github.com/llamerada-jp/oinari/lib/crosslink"
)

type Manager struct {
cl crosslink.Crosslink
mtx sync.Mutex
cl crosslink.Crosslink
// key: containerID
drivers map[string]CoreDriver
}
Expand All @@ -35,6 +37,9 @@ func NewCoreDriverManager(cl crosslink.Crosslink) *Manager {
}

func (m *Manager) NewCoreDriver(containerID string, runtime []string) CoreDriver {
m.mtx.Lock()
defer m.mtx.Unlock()

if _, ok := m.drivers[containerID]; ok {
log.Fatal("driver already exists")
}
Expand All @@ -58,9 +63,15 @@ L:
}

func (m *Manager) DestroyDriver(containerID string) {
m.mtx.Lock()
defer m.mtx.Unlock()

delete(m.drivers, containerID)
}

func (m *Manager) GetDriver(containerID string) CoreDriver {
m.mtx.Lock()
defer m.mtx.Unlock()

return m.drivers[containerID]
}
8 changes: 7 additions & 1 deletion node/apis/core/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,10 @@ func callHelper[REQ any, RES any](driver *coreAPIDriverImpl, path string, reques
ch := make(chan *RES)
var funcError error

driver.cl.Call(strings.Join([]string{oinari.ApplicationCrosslinkPath, path}, "/"), request, nil,
driver.cl.Call(strings.Join([]string{oinari.ApplicationCrosslinkPath, path}, "/"), request,
map[string]string{
"containerID": driver.containerID,
},
func(response []byte, err error) {
defer close(ch)

Expand Down Expand Up @@ -94,5 +97,8 @@ func (driver *coreAPIDriverImpl) Teardown(isFinalize bool) ([]byte, error) {
res, err := callHelper[app.TeardownRequest, app.TeardownResponse](driver, "teardown", &app.TeardownRequest{
IsFinalize: isFinalize,
})
if isFinalize {
return nil, err
}
return res.Record, err
}
37 changes: 27 additions & 10 deletions node/apis/core/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI,
mpx.SetHandler("ready", crosslink.NewFuncHandler(func(request *app.ReadyRequest, tags map[string]string, writer crosslink.ResponseWriter) {
containerID, driver, err := getDriver(tags, manager)
if err != nil {
writer.ReplyError(err.Error())
writer.ReplyError(fmt.Sprintf("`getDriver` failed on `ready` handler: %s", err.Error()))
return
}

Expand All @@ -42,11 +42,11 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI,
},
})
if err != nil {
writer.ReplyError(err.Error())
writer.ReplyError(fmt.Sprintf("`ListContainers` failed on `ready` handler: %s", err.Error()))
return
}
if len(containerList.Containers) == 0 {
writer.ReplyError("container not found")
writer.ReplyError(fmt.Sprintf("container not found on `ready` handler"))
return
}

Expand All @@ -56,22 +56,24 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI,
},
})
if err != nil {
writer.ReplyError(err.Error())
writer.ReplyError(fmt.Sprintf("`ListPodSandbox` failed on `ready` handler: %s", err.Error()))
}
if len(sandboxList.Items) == 0 {
writer.ReplyError("sandbox not found")
writer.ReplyError("sandbox not found on `ready` handler")
return
}

podUUID := sandboxList.Items[0].Metadata.UID
pod, err := podKVS.Get(podUUID)
if err != nil {
writer.ReplyError(err.Error())
writer.ReplyError(fmt.Sprintf("`podKVS.Get` failed on `ready` handler: %s", err.Error()))
return
}
isInitialize := false
for _, status := range pod.Status.ContainerStatuses {
var containerName string
for idx, status := range pod.Status.ContainerStatuses {
if status.ContainerID == containerID {
containerName = pod.Spec.Containers[idx].Name
if status.LastState != nil {
isInitialize = true
}
Expand All @@ -80,21 +82,36 @@ func InitHandler(apiMpx crosslink.MultiPlexer, manager *core.Manager, c cri.CRI,
}

record, err := recordKVS.Get(podUUID)
if err != nil {
writer.ReplyError(fmt.Sprintf("`recordKVS.Get` failed on `ready` handler: %s", err.Error()))
return
}

go func() {
if record == nil {
err = driver.Setup(isInitialize, nil)
} else {
err = driver.Setup(isInitialize, record.Data.Entries[containerName].Record)
}
if err != nil {
// TODO: try to restart container
}
}()

writer.ReplySuccess(&app.ReadyResponse{})
}))

mpx.SetHandler("output", crosslink.NewFuncHandler(func(request *app.OutputRequest, tags map[string]string, writer crosslink.ResponseWriter) {
_, _, err := getDriver(tags, manager)
if err != nil {
writer.ReplyError(err.Error())
writer.ReplyError(fmt.Sprintf("`getDriver` failed on `output` handler: %s", err.Error()))
return
}

// TODO: broadcast message to neighbors
_, err = fmt.Println(string(request.Payload))
if err != nil {
writer.ReplyError(err.Error())
writer.ReplyError(fmt.Sprintf("`fmt.Println failed on `output` handler: %s", err.Error()))
return
}
writer.ReplySuccess(&app.OutputResponse{
Expand All @@ -111,7 +128,7 @@ func getDriver(tags map[string]string, manager *core.Manager) (string, core.Core

driver := manager.GetDriver(containerID)
if driver == nil {
return "", nil, fmt.Errorf("driver not found")
return "", nil, fmt.Errorf("driver not found for %s", containerID)
}

return containerID, driver, nil
Expand Down
55 changes: 52 additions & 3 deletions node/controller/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,19 @@ type containerControllerImpl struct {
localNid string
cri cri.CRI
podKvs kvs.PodKvs
recordKvs kvs.RecordKvs
apiCoreDriverManager *core.Manager
// key: Pod UUID
reconcileStates map[string]*reconcileState
mtx sync.Mutex
}

func NewContainerController(localNid string, cri cri.CRI, podKvs kvs.PodKvs, apiCoreDriverManager *core.Manager) ContainerController {
func NewContainerController(localNid string, cri cri.CRI, podKvs kvs.PodKvs, recordKVS kvs.RecordKvs, apiCoreDriverManager *core.Manager) ContainerController {
return &containerControllerImpl{
localNid: localNid,
cri: cri,
podKvs: podKvs,
recordKvs: recordKVS,
apiCoreDriverManager: apiCoreDriverManager,
reconcileStates: make(map[string]*reconcileState),
}
Expand Down Expand Up @@ -139,7 +141,7 @@ func (impl *containerControllerImpl) Reconcile(ctx context.Context, podUUID stri
return impl.updatePodInfo(state, pod)
}

if err := impl.letTerminate(state); err != nil {
if err := impl.letTerminate(state, pod); err != nil {
return err
}

Expand Down Expand Up @@ -319,7 +321,7 @@ func (impl *containerControllerImpl) letRunning(state *reconcileState, pod *api.
return nil
}

func (impl *containerControllerImpl) letTerminate(state *reconcileState) error {
func (impl *containerControllerImpl) letTerminate(state *reconcileState, pod *api.Pod) error {
// TODO: send exit signal when any container running
containers, err := impl.cri.ListContainers(&cri.ListContainersRequest{
Filter: &cri.ContainerFilter{
Expand All @@ -330,11 +332,46 @@ func (impl *containerControllerImpl) letTerminate(state *reconcileState) error {
return err
}

isFinalize := len(pod.Meta.DeletionTimestamp) != 0
var record *api.Record
if !isFinalize {
var err error
record, err = impl.recordKvs.Get(pod.Meta.Uuid)
if err != nil {
return err
}
if record == nil {
record = &api.Record{
Meta: &api.ObjectMeta{
Type: api.ResourceTypeRecord,
Name: pod.Meta.Name,
Owner: pod.Meta.Owner,
CreatorNode: pod.Meta.CreatorNode,
Uuid: pod.Meta.Uuid,
},
Data: &api.RecordData{
Entries: make(map[string]api.RecordEntry),
},
}
}
}

for _, container := range containers.Containers {
if container.State == cri.ContainerExited {
continue
}

raw, err := impl.apiCoreDriverManager.GetDriver(container.ID).Teardown(isFinalize)
if err != nil {
return fmt.Errorf("failed to teardown container: %w", err)
}
if raw != nil {
record.Data.Entries[container.Metadata.Name] = api.RecordEntry{
Record: raw,
Timestamp: misc.GetTimestamp(),
}
}

_, err = impl.cri.StopContainer(&cri.StopContainerRequest{
ContainerId: container.ID,
})
Expand All @@ -345,6 +382,18 @@ func (impl *containerControllerImpl) letTerminate(state *reconcileState) error {
impl.apiCoreDriverManager.DestroyDriver(container.ID)
}

if !isFinalize {
err = impl.recordKvs.Set(record)
if err != nil {
return err
}
} else {
err = impl.recordKvs.Delete(pod.Meta.Uuid)
if err != nil {
log.Printf("failed to delete record: %s", err.Error())
}
}

// TODO: skip processing when all container exited
// TODO: force exit all containers after timeout

Expand Down
2 changes: 1 addition & 1 deletion node/frontend/driver/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (impl *frontendDriverImpl) TellInitComplete() error {
impl.cl.Call("frontend/nodeReady", nil, nil,
func(_ []byte, err error) {
if err != nil {
log.Fatalln(err)
log.Fatalf("frontend/nodeReady has an error: %s", err.Error())
}
})
return nil
Expand Down
6 changes: 3 additions & 3 deletions node/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (mgr *manager) Start(ctx context.Context) error {

// first keepalive
if err := mgr.keepAlive(); err != nil {
log.Println(err)
log.Printf("keepAlive of node manager failed: %s", err.Error())
}

for {
Expand All @@ -68,12 +68,12 @@ func (mgr *manager) Start(ctx context.Context) error {

case <-tickerDealLR.C:
if err := mgr.dealLocalResource(); err != nil {
log.Println(err)
log.Println("dealLocalResource of node manager failed: %s", err.Error())
}

case <-tickerKeepAlive.C:
if err := mgr.keepAlive(); err != nil {
log.Println(err)
log.Println("keepAlive of node manager failed: %s", err.Error())
}

}
Expand Down

0 comments on commit e671fe9

Please sign in to comment.