Skip to content

Commit

Permalink
Basic implementation of history-server command.
Browse files Browse the repository at this point in the history
  • Loading branch information
mana-sys committed Jan 30, 2020
1 parent 55330d0 commit ca4b8a5
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 45 deletions.
100 changes: 59 additions & 41 deletions internal/cli/command/historyserver/cmd.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package historyserver

import (
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"

"github.com/mana-sys/adhesive/internal/cli/command"
"github.com/mana-sys/adhesive/internal/cli/config"
"github.com/spf13/cobra"
)

Expand All @@ -12,54 +21,63 @@ func NewHistoryServerCommand(adhesiveCli *command.AdhesiveCli) *cobra.Command {
Use: "history-server",
Short: "Launch the Spark history server",
RunE: func(cmd *cobra.Command, args []string) error {
return historyServer(adhesiveCli)
return historyServer(adhesiveCli, opts)
},
}

flags := cmd.Flags()
flags.IntVarP(&opts.Port, "port", "p", opts.Port, "The port to listen on")
flags.StringVar(&opts.LogDirectory, "log-directory", opts.LogDirectory, "The location of the Spark logs. Must be an s3a:// formatted path.")

return cmd
}

//// buildDockerCommand builds an exec.Cmd to run the Docker container with the provided options.
//func buildDockerCommand(entrypoint string, args []string) (*exec.Cmd, error) {
// var (
// err error
// envs []string
// vols []string
// )
//
// for _, env := range options.env {
// envs = append(envs, "-e "+env)
// }
//
// for _, vol := range options.volumes {
// vols = append(vols, "-v "+vol)
// }
//
// credsDir := options.credentials
// if credsDir == "" {
// if credsDir, err = os.UserHomeDir(); err != nil {
// return nil, errors.New("unable to determine home directory")
// }
// credsDir = filepath.Join(credsDir, ".aws")
// }
//
// wd, err := os.Getwd()
//
// dockerArgs := []string{"run", "--rm", "-t"}
// dockerArgs = append(dockerArgs, envs...)
// dockerArgs = append(dockerArgs, vols...)
// dockerArgs = append(dockerArgs, "-v", credsDir+":/root/.aws",
// "-v", DistPackagesVolume+":"+DistPackagesDirectory,
// "-v", wd+":/project",
// "--entrypoint", entrypoint, DockerImageName+":0.9")
// dockerArgs = append(dockerArgs, args...)
//
// return exec.Command("docker", dockerArgs...), nil
//}

func historyServer(adhesiveCli *command.AdhesiveCli) error {
return nil
// buildDockerCommand builds an exec.Cmd to run the history server Docker container with the provided options.
func buildDockerCommand(opts *config.HistoryServerOptions) (*exec.Cmd, error) {
credsDir, err := os.UserHomeDir()
if err != nil {
return nil, err
}

credsDir = filepath.Join(credsDir, ".aws")

dockerArgs := []string{"run", "--rm"}
dockerArgs = append(dockerArgs, "-v", credsDir+":/root/.aws")
dockerArgs = append(dockerArgs, "-p", strconv.FormatInt(int64(opts.Port), 10)+":18080")
sparkHistoryOptsStringFormat := "SPARK_HISTORY_OPTS=-Dspark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.DefaultAWSCredentialsProviderChain " +
"-Dspark.history.fs.logDirectory=%s"

dockerArgs = append(dockerArgs, "-e", fmt.Sprintf(sparkHistoryOptsStringFormat, opts.LogDirectory))
dockerArgs = append(dockerArgs, "sysmana/sparkui:latest",
"/opt/spark/bin/spark-class org.apache.spark.deploy.history.HistoryServer")

return exec.Command("docker", dockerArgs...), nil
}

func historyServer(adhesiveCli *command.AdhesiveCli, opts *config.HistoryServerOptions) error {
if opts.LogDirectory == "" {
return errors.New("option --log-directory is required")
}

if opts.Port == 0 {
opts.Port = 18080
}

if !strings.HasPrefix(opts.LogDirectory, "s3a://") {
return errors.New("option --log-directory must be an s3a:// formatted path")
}

cmd, err := buildDockerCommand(opts)
if err != nil {
return err
}

cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err = cmd.Start(); err != nil {
return err
}

return cmd.Wait()
}
4 changes: 1 addition & 3 deletions internal/cli/command/local/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,5 @@ func buildAndRunDockerCommand(entrypoint string, options *dockerOptions, args []
return err
}

err = dockerCmd.Wait()

return err
return dockerCmd.Wait()
}
2 changes: 2 additions & 0 deletions internal/cli/command/startjobrun/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func startJobRun(adhesiveCli *command.AdhesiveCli, opts *config.StartJobRunOptio
name = *out.StackResourceDetail.PhysicalResourceId
}

// Start the job.
_, err := glu.StartJobRun(&glue.StartJobRunInput{
JobName: aws.String(name),
})
Expand All @@ -70,5 +71,6 @@ func startJobRun(adhesiveCli *command.AdhesiveCli, opts *config.StartJobRunOptio
return err
}

// TODO: If the --tail-logs option is enabled, then stream the logs to the console.
return nil
}
3 changes: 2 additions & 1 deletion internal/cli/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type PackageOptions struct {
}

type HistoryServerOptions struct {
Port int
Port int
LogDirectory string
}

type RemoveOptions struct {
Expand Down
15 changes: 15 additions & 0 deletions internal/cli/util/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"strings"
"time"

"github.com/mana-sys/adhesive/pkg/watchlog"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudformation"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/mana-sys/adhesive/pkg/watchstack"
)

Expand All @@ -27,6 +30,10 @@ const (
OpDelete
)

const (
GlueErrorLogsPrefix = "/awsglue-"
)

type stackMonitorState struct {
name string
op stackOp
Expand Down Expand Up @@ -88,6 +95,14 @@ func MonitorStack(cfn *cloudformation.CloudFormation, stackId, stackName string,
return out, nil
}

func MonitorJobLogs(cwl *cloudwatchlogs.CloudWatchLogs, name, id string) error {
//ctx, cancel := context.WithCancel(context.Background())
w := watchlog.NewWatchLog(cwl)

// Wait for the log streams to become available.

}

func ConsoleMonitorStack(ctx context.Context, state stackMonitorState) {
ticker := time.NewTicker(state.uiInterval)

Expand Down

0 comments on commit ca4b8a5

Please sign in to comment.