Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace hook execution fslock with mutex implementation. #5663

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ const (
// dir that, if it exists, will cause a machine agent to uninstall
// when it receives the termination signal.
UninstallAgentFile = "uninstall-agent"

// MachineLockName is the name of the mutex that the agent creates to
// ensure serialization of tasks such as uniter hook executions, juju-run,
// and others.
MachineLockName = "machine-lock"
)

// These are base values used for the corresponding defaults.
Expand Down
17 changes: 10 additions & 7 deletions apiserver/images.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"io/ioutil"
"net/http"
"path"
"path/filepath"
"strings"
"time"

"github.com/juju/errors"
"github.com/juju/utils/fslock"
"github.com/juju/mutex"
"github.com/juju/utils/clock"

"github.com/juju/juju/apiserver/common"
apihttp "github.com/juju/juju/apiserver/http"
Expand Down Expand Up @@ -108,14 +109,16 @@ func (h *imagesDownloadHandler) loadImage(st *state.State, envuuid, kind, series
) {
// We want to ensure that if an image needs to be downloaded and cached,
// this only happens once.
imageIdent := fmt.Sprintf("image-%s-%s-%s-%s", envuuid, kind, series, arch)
lockDir := filepath.Join(h.dataDir, "locks")
lock, err := fslock.NewLock(lockDir, imageIdent)
spec := mutex.Spec{
Name: fmt.Sprintf("image-%s-%s-%s", kind, series, arch),
Clock: clock.WallClock,
Delay: time.Second,
}
releaser, err := mutex.Acquire(spec)
if err != nil {
return nil, nil, errors.Trace(err)
}
lock.Lock("fetch and cache image " + imageIdent)
defer lock.Unlock()
defer releaser.Release()
storage := st.ImageStorage()
metadata, imageReader, err := storage.Image(kind, series, arch)
// Not in storage, so go fetch it.
Expand Down
43 changes: 22 additions & 21 deletions cmd/envcmd/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (

"github.com/juju/cmd"
"github.com/juju/errors"
"github.com/juju/utils/fslock"
"github.com/juju/mutex"
"github.com/juju/utils/clock"

"github.com/juju/juju/juju/osenv"
)
Expand All @@ -21,7 +22,7 @@ const (
CurrentEnvironmentFilename = "current-environment"
CurrentSystemFilename = "current-system"

lockName = "current.lock"
lockName = "current-lock"

systemSuffix = " (system)"
)
Expand All @@ -43,8 +44,9 @@ type ServerFile struct {

// NOTE: synchronisation across functions in this file.
//
// Each of the read and write functions use a fslock to synchronise calls
// across both the current executable and across different executables.
// Each of the read and write functions use a juju/mutex is used to
// synchronise calls across both the current executable and across different
// executables.

func getCurrentEnvironmentFilePath() string {
return filepath.Join(osenv.JujuHome(), CurrentEnvironmentFilename)
Expand All @@ -57,11 +59,11 @@ func getCurrentSystemFilePath() string {
// Read the file $JUJU_HOME/current-environment and return the value stored
// there. If the file doesn't exist an empty string is returned and no error.
func ReadCurrentEnvironment() (string, error) {
lock, err := acquireEnvironmentLock("read current-environment")
releaser, err := acquireEnvironmentLock()
if err != nil {
return "", errors.Trace(err)
}
defer lock.Unlock()
defer releaser.Release()

current, err := ioutil.ReadFile(getCurrentEnvironmentFilePath())
if err != nil {
Expand All @@ -76,11 +78,11 @@ func ReadCurrentEnvironment() (string, error) {
// Read the file $JUJU_HOME/current-system and return the value stored there.
// If the file doesn't exist an empty string is returned and no error.
func ReadCurrentSystem() (string, error) {
lock, err := acquireEnvironmentLock("read current-system")
releaser, err := acquireEnvironmentLock()
if err != nil {
return "", errors.Trace(err)
}
defer lock.Unlock()
defer releaser.Release()

current, err := ioutil.ReadFile(getCurrentSystemFilePath())
if err != nil {
Expand All @@ -94,11 +96,11 @@ func ReadCurrentSystem() (string, error) {

// Write the envName to the file $JUJU_HOME/current-environment file.
func WriteCurrentEnvironment(envName string) error {
lock, err := acquireEnvironmentLock("write current-environment")
releaser, err := acquireEnvironmentLock()
if err != nil {
return errors.Trace(err)
}
defer lock.Unlock()
defer releaser.Release()

path := getCurrentEnvironmentFilePath()
err = ioutil.WriteFile(path, []byte(envName+"\n"), 0644)
Expand All @@ -117,11 +119,11 @@ func WriteCurrentEnvironment(envName string) error {

// Write the systemName to the file $JUJU_HOME/current-system file.
func WriteCurrentSystem(systemName string) error {
lock, err := acquireEnvironmentLock("write current-system")
releaser, err := acquireEnvironmentLock()
if err != nil {
return errors.Trace(err)
}
defer lock.Unlock()
defer releaser.Release()

path := getCurrentSystemFilePath()
err = ioutil.WriteFile(path, []byte(systemName+"\n"), 0644)
Expand All @@ -138,19 +140,18 @@ func WriteCurrentSystem(systemName string) error {
return nil
}

func acquireEnvironmentLock(operation string) (*fslock.Lock, error) {
// NOTE: any reading or writing from the directory should be done with a
// fslock to make sure we have a consistent read or write. Also worth
// noting, we should use a very short timeout.
lock, err := fslock.NewLock(osenv.JujuHome(), lockName)
if err != nil {
return nil, errors.Trace(err)
func acquireEnvironmentLock() (mutex.Releaser, error) {
spec := mutex.Spec{
Name: lockName,
Clock: clock.WallClock,
Delay: 20 * time.Millisecond,
Timeout: lockTimeout,
}
err = lock.LockWithTimeout(lockTimeout, operation)
releaser, err := mutex.Acquire(spec)
if err != nil {
return nil, errors.Trace(err)
}
return lock, nil
return releaser, nil
}

// CurrentConnectionName looks at both the current environment file
Expand Down
2 changes: 1 addition & 1 deletion cmd/juju/environment/jenv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (*jenvSuite) TestWriteError(c *gc.C) {

jenvCmd := &environment.JenvCommand{}
ctx, err := testing.RunCommand(c, jenvCmd, f.Name())
c.Assert(err, gc.ErrorMatches, "cannot write the jenv file: cannot write info: .*: permission denied")
c.Assert(err, gc.ErrorMatches, "cannot write the jenv file: .*")
c.Assert(testing.Stdout(ctx), gc.Equals, "")
}

Expand Down
14 changes: 4 additions & 10 deletions cmd/jujud/agent/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/juju/cmd"
"github.com/juju/errors"
"github.com/juju/loggo"
"github.com/juju/mutex"
"github.com/juju/names"
"github.com/juju/replicaset"
"github.com/juju/utils"
Expand Down Expand Up @@ -769,11 +770,7 @@ func (a *MachineAgent) postUpgradeAPIWorker(
if err != nil {
return nil, errors.Trace(err)
}
lock, err := cmdutil.HookExecutionLock(cmdutil.DataDir)
if err != nil {
return nil, errors.Trace(err)
}
return rebootworker.NewReboot(reboot, agentConfig, lock)
return rebootworker.NewReboot(reboot, agentConfig, agent.MachineLockName, clock.WallClock)
})
runner.StartWorker("apiaddressupdater", func() (worker.Worker, error) {
addressUpdater := agent.APIHostPortsSetter{a}
Expand Down Expand Up @@ -974,10 +971,6 @@ func (a *MachineAgent) updateSupportedContainers(
if err := machine.SetSupportedContainers(containers...); err != nil {
return errors.Annotatef(err, "setting supported containers for %s", tag)
}
initLock, err := cmdutil.HookExecutionLock(agentConfig.DataDir())
if err != nil {
return err
}
// Start the watcher to fire when a container is first requested on the machine.
envUUID, err := st.EnvironTag()
if err != nil {
Expand Down Expand Up @@ -1013,7 +1006,8 @@ func (a *MachineAgent) updateSupportedContainers(
Machine: machine,
Provisioner: pr,
Config: agentConfig,
InitLock: initLock,
InitLockName: agent.MachineLockName,
AcquireLockFunc: mutex.Acquire,
}
handler := provisioner.NewContainerSetupHandler(params)
a.startWorkerAfterUpgrade(runner, watcherName, func() (worker.Worker, error) {
Expand Down
14 changes: 4 additions & 10 deletions cmd/jujud/agent/unit/manifolds.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package unit
import (
"time"

"github.com/juju/utils/clock"

coreagent "github.com/juju/juju/agent"
"github.com/juju/juju/worker/agent"
"github.com/juju/juju/worker/apiaddressupdater"
Expand All @@ -15,7 +17,6 @@ import (
"github.com/juju/juju/worker/leadership"
"github.com/juju/juju/worker/logger"
"github.com/juju/juju/worker/logsender"
"github.com/juju/juju/worker/machinelock"
"github.com/juju/juju/worker/proxyupdater"
"github.com/juju/juju/worker/rsyslog"
"github.com/juju/juju/worker/uniter"
Expand Down Expand Up @@ -50,13 +51,6 @@ func Manifolds(config ManifoldsConfig) dependency.Manifolds {
// (Currently, that is "all manifolds", but consider a shared clock.)
AgentName: agent.Manifold(config.Agent),

// The machine lock manifold is a thin concurrent wrapper around an
// FSLock in an agreed location. We expect it to be replaced with an
// in-memory lock when the unit agent moves into the machine agent.
MachineLockName: machinelock.Manifold(machinelock.ManifoldConfig{
AgentName: AgentName,
}),

// The api caller is a thin concurrent wrapper around a connection
// to some API server. It's used by many other manifolds, which all
// select their own desired facades. It will be interesting to see
Expand Down Expand Up @@ -148,7 +142,8 @@ func Manifolds(config ManifoldsConfig) dependency.Manifolds {
AgentName: AgentName,
APICallerName: APICallerName,
LeadershipTrackerName: LeadershipTrackerName,
MachineLockName: MachineLockName,
MachineLockName: coreagent.MachineLockName,
Clock: clock.WallClock,
}),
}
}
Expand All @@ -161,7 +156,6 @@ const (
LeadershipTrackerName = "leadership-tracker"
LoggingConfigUpdaterName = "logging-config-updater"
LogSenderName = "log-sender"
MachineLockName = "machine-lock"
ProxyConfigUpdaterName = "proxy-config-updater"
RsyslogConfigUpdaterName = "rsyslog-config-updater"
UniterName = "uniter"
Expand Down
6 changes: 0 additions & 6 deletions cmd/jujud/agent/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -841,12 +841,6 @@ func (s *UpgradeSuite) assertHostUpgrades(c *gc.C) {
cmds := s.getAptCmds()
s.assertCommonUpgrades(c, cmds)

// Lock directory
// TODO(bogdanteleaga): Fix this on windows. Currently a bash script is
// used to create the directory which partially works on windows 8 but
// doesn't work on windows server.
lockdir := filepath.Join(s.DataDir(), "locks")
c.Assert(lockdir, jc.IsDirectory)
// SSH key file should not be generated for hosts.
_, err := os.Stat(s.keyFile())
c.Assert(err, jc.Satisfies, os.IsNotExist)
Expand Down
6 changes: 5 additions & 1 deletion cmd/jujud/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/juju/loggo"
"github.com/juju/utils/exec"

"github.com/juju/juju/agent"
jujucmd "github.com/juju/juju/cmd"
agentcmd "github.com/juju/juju/cmd/jujud/agent"
"github.com/juju/juju/cmd/pprof"
Expand Down Expand Up @@ -197,7 +198,10 @@ func Main(args []string) int {
code = exit_err
err = fmt.Errorf("jujuc should not be called directly")
case names.JujuRun:
code = cmd.Main(&RunCommand{}, ctx, args[1:])
run := &RunCommand{
MachineLockName: agent.MachineLockName,
}
code = cmd.Main(run, ctx, args[1:])
default:
code, err = jujuCMain(commandName, ctx, args)
}
Expand Down
15 changes: 10 additions & 5 deletions cmd/jujud/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"os"
"strconv"
"strings"
"time"

"github.com/juju/cmd"
"github.com/juju/errors"
"github.com/juju/mutex"
"github.com/juju/names"
"github.com/juju/utils/clock"
"github.com/juju/utils/exec"
"launchpad.net/gnuflag"

Expand All @@ -24,6 +27,7 @@ import (

type RunCommand struct {
cmd.CommandBase
MachineLockName string
unit names.UnitTag
commands string
showHelp bool
Expand Down Expand Up @@ -172,15 +176,16 @@ func (c *RunCommand) appendProxyToCommands() string {
func (c *RunCommand) executeNoContext() (*exec.ExecResponse, error) {
// Acquire the uniter hook execution lock to make sure we don't
// stomp on each other.
lock, err := cmdutil.HookExecutionLock(cmdutil.DataDir)
if err != nil {
return nil, errors.Trace(err)
spec := mutex.Spec{
Name: c.MachineLockName,
Clock: clock.WallClock,
Delay: 250 * time.Millisecond,
}
err = lock.Lock("juju-run")
releaser, err := mutex.Acquire(spec)
if err != nil {
return nil, errors.Trace(err)
}
defer lock.Unlock()
defer releaser.Release()

runCmd := c.appendProxyToCommands()

Expand Down
Loading