Skip to content

Commit

Permalink
Support using AWS profiles in history server.
Browse files Browse the repository at this point in the history
  • Loading branch information
mana-sys committed Jan 31, 2020
1 parent 690752c commit 58726ce
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 18 deletions.
9 changes: 5 additions & 4 deletions internal/cli/cobra.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ type rootOptions struct {
}

func NewRootCommand(adhesiveCli *command.AdhesiveCli) *cobra.Command {
var opts rootOptions
opts := adhesiveCli.Config

cmd := &cobra.Command{
Use: "adhesive",
PersistentPreRun: func(cmd *cobra.Command, args []string) {
// Set debug mode.
if opts.debug {
if opts.Debug {
logrus.SetLevel(logrus.DebugLevel)
}
},
Expand All @@ -36,8 +36,9 @@ func NewRootCommand(adhesiveCli *command.AdhesiveCli) *cobra.Command {
}

flags := cmd.Flags()
flags.BoolVarP(&opts.debug, "debug", "d", false, "Enable debug mode")
flags.StringVarP(&opts.configFile, "config", "c", "adhesive.toml",
flags.BoolVarP(&opts.Debug, "debug", "d", false, "Enable debug mode")
flags.StringVar(&opts.Profile, "profile", opts.Profile, "The profile to use")
flags.StringVarP(&opts.ConfigFile, "config", "c", "adhesive.toml",
"Path to Adhesive configuration file")

cmd.AddCommand(
Expand Down
23 changes: 17 additions & 6 deletions internal/cli/command/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type AdhesiveCli struct {
cfn *cloudformation.CloudFormation
s3 *s3.S3
glue *glue.Glue
sess *session.Session
}

func NewAdhesiveCli(path string) (*AdhesiveCli, error) {
Expand Down Expand Up @@ -55,20 +56,26 @@ func NewAdhesiveCli(path string) (*AdhesiveCli, error) {
}

func (cli *AdhesiveCli) InitializeClients() error {
sess, err := session.NewSession(&aws.Config{
Logger: aws.LoggerFunc(func(args ...interface{}) {
log.Debug(args...)
}),
LogLevel: aws.LogLevel(aws.LogDebugWithHTTPBody),
Region: aws.String(cli.Config.Region),
sess, err := session.NewSessionWithOptions(session.Options{
Config: aws.Config{
Logger: aws.LoggerFunc(func(args ...interface{}) {
log.Debug(args...)
}),
LogLevel: aws.LogLevel(aws.LogDebugWithHTTPBody),
Region: aws.String(cli.Config.Region),
},
Profile: cli.Config.Profile,
SharedConfigState: session.SharedConfigEnable,
})

if err != nil {
return err
}

cli.cfn = cloudformation.New(sess)
cli.s3 = s3.New(sess)
cli.glue = glue.New(sess)
cli.sess = sess
return nil
}

Expand All @@ -83,3 +90,7 @@ func (cli *AdhesiveCli) CloudFormation() *cloudformation.CloudFormation {
func (cli *AdhesiveCli) Glue() *glue.Glue {
return cli.glue
}

func (cli *AdhesiveCli) Session() *session.Session {
return cli.sess
}
31 changes: 28 additions & 3 deletions internal/cli/command/historyserver/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strconv"
"strings"

"github.com/aws/aws-sdk-go/aws/credentials/processcreds"
"github.com/mana-sys/adhesive/internal/cli/command"
"github.com/mana-sys/adhesive/internal/cli/config"
"github.com/spf13/cobra"
Expand All @@ -33,7 +34,7 @@ func NewHistoryServerCommand(adhesiveCli *command.AdhesiveCli) *cobra.Command {
}

// buildDockerCommand builds an exec.Cmd to run the history server Docker container with the provided options.
func buildDockerCommand(opts *config.HistoryServerOptions) (*exec.Cmd, error) {
func buildDockerCommand(adhesiveCli *command.AdhesiveCli, opts *config.HistoryServerOptions) (*exec.Cmd, error) {
credsDir, err := os.UserHomeDir()
if err != nil {
return nil, err
Expand All @@ -42,12 +43,30 @@ func buildDockerCommand(opts *config.HistoryServerOptions) (*exec.Cmd, error) {
credsDir = filepath.Join(credsDir, ".aws")

dockerArgs := []string{"run", "--rm"}

dockerArgs = append(dockerArgs, "-v", credsDir+":/root/.aws")
dockerArgs = append(dockerArgs, "-p", strconv.FormatInt(int64(opts.Port), 10)+":18080")

// Super hack: If we used the ProcessProvider, then we pass the credentials via environment variables to the
// Docker container.
value, err := adhesiveCli.Session().Config.Credentials.Get()
if err != nil {
return nil, err
}

if value.ProviderName == processcreds.ProviderName {
dockerArgs = append(dockerArgs,
"-e", "AWS_ACCESS_KEY_ID="+value.AccessKeyID,
"-e", "AWS_SECRET_ACCESS_KEY="+value.SecretAccessKey,
"-e", "AWS_SESSION_TOKEN="+value.SessionToken,
)
}

// Environment variable for Spark history server options.
sparkHistoryOptsStringFormat := "SPARK_HISTORY_OPTS=-Dspark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain " +
"-Dspark.history.fs.logDirectory=%s"

dockerArgs = append(dockerArgs, "-e", fmt.Sprintf(sparkHistoryOptsStringFormat, opts.LogDirectory))

dockerArgs = append(dockerArgs, "sysmana/sparkui:latest",
"/opt/spark/bin/spark-class org.apache.spark.deploy.history.HistoryServer")

Expand All @@ -67,7 +86,13 @@ func historyServer(adhesiveCli *command.AdhesiveCli, opts *config.HistoryServerO
return errors.New("option --log-directory must be an s3a:// formatted path")
}

cmd, err := buildDockerCommand(opts)
// Super hack: initialize the clients to retrieve the credentials. This is needed I couldn't
// figure out how to get credential_process to work for Java.
if err := adhesiveCli.InitializeClients(); err != nil {
return err
}

cmd, err := buildDockerCommand(adhesiveCli, opts)
if err != nil {
return err
}
Expand Down
6 changes: 4 additions & 2 deletions internal/cli/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ type Config struct {
HistoryServer HistoryServerOptions

// Root command options.
Profile string
Region string
ConfigFile string `toml:"-"`
Profile string
Region string
Debug bool `toml:"-"`
}

func defaultConfig() *Config {
Expand Down
5 changes: 2 additions & 3 deletions internal/cli/util/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"strings"
"time"

"github.com/mana-sys/adhesive/pkg/watchlog"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudformation"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
Expand Down Expand Up @@ -97,10 +95,11 @@ func MonitorStack(cfn *cloudformation.CloudFormation, stackId, stackName string,

func MonitorJobLogs(cwl *cloudwatchlogs.CloudWatchLogs, name, id string) error {
//ctx, cancel := context.WithCancel(context.Background())
w := watchlog.NewWatchLog(cwl)
//w := watchlog.NewWatchLog(cwl)

// Wait for the log streams to become available.

return nil
}

func ConsoleMonitorStack(ctx context.Context, state stackMonitorState) {
Expand Down

0 comments on commit 58726ce

Please sign in to comment.