Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix the channels communication between go routine in starting app via docker #963

Merged
merged 2 commits into from
Oct 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,9 @@ func (r *Record) GetCmd() *cobra.Command {
return err
}
// for _, v := range ports {

// }

r.logger.Debug("the ports are", zap.Any("ports", ports))
// r.recorder.CaptureTraffic(tcsPath, mockPath, appCmd, appContainer, networkName, delay)
r.recorder.CaptureTraffic(path, appCmd, appContainer, networkName, delay, ports)
Expand Down
114 changes: 88 additions & 26 deletions pkg/hooks/launch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
ErrInterrupted = errors.New("exited with interrupt")
ErrCommandError = errors.New("exited due to command error")
ErrUnExpected = errors.New("an unexpected error occurred")
ErrDockerError = errors.New("an error occurred while using docker client")
)

func (h *Hook) LaunchUserApplication(appCmd, appContainer, appNetwork string, Delay uint64) error {
Expand Down Expand Up @@ -119,8 +120,9 @@ func (h *Hook) processDockerEnv(appCmd, appContainer, appNetwork string) error {
value := true
h.objects.DockerCmdMap.Update(uint32(key), &value, ebpf.UpdateAny)

stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
stopListenContainer := make(chan bool)
stopApplicationErrors := false
abortStopListenContainerChan := false

dockerClient := h.idc
appErrCh := make(chan error, 1)
Expand All @@ -132,7 +134,10 @@ func (h *Hook) processDockerEnv(appCmd, appContainer, appNetwork string) error {
defer h.Recover(pkg.GenerateRandomID())

err := h.runApp(appCmd, true)
appErrCh <- err
h.logger.Debug("Application stopped with the error", zap.Error(err))
if !stopApplicationErrors {
appErrCh <- err
}
}()
}

Expand Down Expand Up @@ -162,19 +167,29 @@ func (h *Hook) processDockerEnv(appCmd, appContainer, appNetwork string) error {

for {
if time.Now().After(endTime) {
dockerErrCh <- fmt.Errorf("no container found for :%v", appContainer)
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- fmt.Errorf("no container found for :%v", appContainer)
return
}
}

select {
case <-stopListenContainer:
return
case err := <-errs:
if err != nil && err != context.Canceled {
h.logger.Error("failed to listen for the docker events", zap.Error(err))
if err != nil && err != context.Canceled {
select {
case <-stopListenContainer:
default:
dockerErrCh <- fmt.Errorf("failed to listen for the docker events: %v", err)
}
}
}
return
case <-stopper:
dockerErrCh <- fmt.Errorf("found sudden interrupt")
return
case <-logTicker.C:
h.logger.Info("still waiting for the container to start.", zap.String("containerName", appContainer))
case e := <-messages:
Expand Down Expand Up @@ -240,16 +255,26 @@ func (h *Hook) processDockerEnv(appCmd, appContainer, appNetwork string) error {

err = h.idc.ConnectContainerToNetworks(KeployContainerName, containerDetails.NetworkSettings.Networks)
if err != nil {
dockerErrCh <- fmt.Errorf("could not inject keploy container to the application's network with error [%v]", err)
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- fmt.Errorf("could not inject keploy container to the application's network with error [%v]", err)
return
}
}

//sending new proxy ip to kernel, since dynamically injected new network has different ip for keploy.
kInspect, err := dockerClient.ContainerInspect(context.Background(), KeployContainerName)
if err != nil {
h.logger.Error(fmt.Sprintf("failed to get inspect keploy container:%v", kInspect))
dockerErrCh <- err
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- err
return
}
}

var newProxyIpString string
Expand All @@ -265,30 +290,50 @@ func (h *Hook) processDockerEnv(appCmd, appContainer, appNetwork string) error {
}
proxyIp, err := ConvertIPToUint32(newProxyIpString)
if err != nil {
dockerErrCh <- fmt.Errorf("failed to convert ip string:[%v] to 32-bit integer", newProxyIpString)
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- fmt.Errorf("failed to convert ip string:[%v] to 32-bit integer", newProxyIpString)
return
}
}

proxyPort := h.GetProxyPort()
err = h.SendProxyInfo(proxyIp, proxyPort, [4]uint32{0000, 0000, 0000, 0001})
if err != nil {
h.logger.Error("failed to send new proxy ip to kernel", zap.Any("NewProxyIp", proxyIp))
dockerErrCh <- err
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- err
return
}
}

h.logger.Debug(fmt.Sprintf("New proxy ip:%v & proxy port:%v sent to kernel", proxyIp, proxyPort))
} else {
dockerErrCh <- fmt.Errorf("NetworkSettings not found for the %v container", appContainer)
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- fmt.Errorf("NetworkSettings not found for the %v container", appContainer)
return
}
}

//inspecting it again to get the ip of the container used in test mode.
containerDetails, err := dockerClient.ContainerInspect(context.Background(), appContainer)
if err != nil {
h.logger.Error(fmt.Sprintf("failed to get inspect app container:%v to retrive the ip", containerDetails))
dockerErrCh <- err
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- err
return
}
}

// find the application container ip in case of test mode
Expand All @@ -302,15 +347,26 @@ func (h *Hook) processDockerEnv(appCmd, appContainer, appNetwork string) error {
h.logger.Debug("receiver channel received the ip address", zap.Any("containerIp found", containerIp))
}
} else {
dockerErrCh <- fmt.Errorf("network details for %v network not found", appNetwork)
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- fmt.Errorf("network details for %v network not found", appNetwork)
return
}
}
} else {
dockerErrCh <- fmt.Errorf("network settings or networks not available in inspect data")
return
select {
case <-stopListenContainer:
return
default:
dockerErrCh <- fmt.Errorf("network settings or networks not available in inspect data")
return
}
}

h.logger.Info("container & network found and processed successfully", zap.Any("time", time.Now().UnixNano()))
abortStopListenContainerChan = true
if models.GetMode() == models.MODE_TEST {
h.userIpAddress <- containerIp
}
Expand All @@ -323,11 +379,15 @@ func (h *Hook) processDockerEnv(appCmd, appContainer, appNetwork string) error {

select {
case err := <-dockerErrCh:
stopApplicationErrors = true
if err != nil {
h.logger.Error("failed to process the user application container or network", zap.Any("err", err.Error()))
return err
return ErrDockerError
}
case err := <-appErrCh:
if !abortStopListenContainerChan {
stopListenContainer <- true
}
if err != nil {
return err
}
Expand Down Expand Up @@ -393,6 +453,8 @@ func (h *Hook) runApp(appCmd string, isDocker bool) error {
stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, os.Kill, syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGKILL)

fmt.Println(os.Getwd())

err := cmd.Run()
if err != nil {
select {
Expand Down
7 changes: 5 additions & 2 deletions pkg/service/record/record.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,23 +89,26 @@ func (r *recorder) CaptureTraffic(path string, appCmd, appContainer, appNetwork
default:
// start user application
go func() {
stopApplication := false
if err := loadedHooks.LaunchUserApplication(appCmd, appContainer, appNetwork, Delay); err != nil {
switch err {
case hooks.ErrInterrupted:
r.logger.Info("keploy terminated user application")
return
case hooks.ErrCommandError:
r.logger.Error("failed to run user application hence stopping keploy", zap.Error(err))
r.logger.Error("failed to run user application hence stopping keploy")
case hooks.ErrUnExpected:
r.logger.Warn("user application terminated unexpectedly, please check application logs if this behaviour is not expected")
case hooks.ErrDockerError:
stopApplication = true
default:
r.logger.Error("unknown error recieved from application")
}
}
if !abortStopHooksForcefully {
abortStopHooksInterrupt <- true
// stop listening for the eBPF events
loadedHooks.Stop(true)
loadedHooks.Stop(!stopApplication)
//stop listening for proxy server
ps.StopProxyServer()
exitCmd <- true
Expand Down
Loading