Skip to content

Commit

Permalink
chore: cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
thoas committed Dec 20, 2020
1 parent 40d48f7 commit fb35bab
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 66 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
lint:
golangci-lint run

build:
go build -o mover ./cmd/mover/

test:
go test ./... -v
15 changes: 9 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
# mover

`mover` is a simple utility to extract data from a remote database server and load them in another environment.
`mover` is used to extract data from a remote database server and load them in another environment.

It uses the underlying introspection API from the RDMS to retrieve relationships from the extracted results.
It uses the underlying introspection API from the RDMS to retrieve relationships
(foreign keys, reference keys, etc.) based on results from your database schema.

## How do we use it internally?

Instead of harcoding fixtures for each workflows, we export data from our production database by sanitizing
sensible data (password, personal user information, etc.).

Thanks to this tool, we don't have to maintain anymore fixtures and we can
quickly reply production bugs in our local environment.
Thanks to this tool, we don't have to maintain fixtures anymore and we can
quickly replicate a production bug in our local environment.

## Usage

Expand All @@ -29,11 +30,13 @@ mkdir -p output
Extract data from backup database:

```console
go run cmd/mover/main.go -dsn "postgresql://user:password@remote.server:5432/dbname" -path output -action extract -query "SELECT * FROM user WHERE id = 1" -table "user"
export REMOTE_DSN="postgresql://user:password@remote.server:5432/dbname"
go run cmd/mover/main.go -dsn $REMOTE_DSN -path output -action extract -query "SELECT * FROM user WHERE id = 1" -table "user"
```

Load data to your local database:

```console
go run cmd/mover/main.go -dsn "postgresql://user:password@localhost:5432/dbname" -path output -action load
export LOCAL_DSN="postgresql://user:password@localhost:5432/dbname"
go run cmd/mover/main.go -dsn $LOCAL_DSN -path output -action load
```
87 changes: 43 additions & 44 deletions etl/etl.go → etl/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,40 @@ import (
"github.com/ulule/mover/dialect/postgres"
)

// copySchemaTables copies tables from database to schema configuration.
func copySchemaTables(schema []config.Schema, tables []dialect.Table) map[string]config.Schema {
schemas := make(map[string]config.Schema, len(tables))
for i := range tables {
tableName := tables[i].Name
found := false
for j := range schema {
if tables[i].Name == schema[j].TableName {
found = true
schema[j].Table = tables[i]
schemas[tableName] = schema[j]

}
}

if !found {
schemas[tableName] = config.Schema{
TableName: tables[i].Name,
Table: tables[i],
}
}

for k := range schemas[tableName].Queries {
for j := range tables {
if tables[j].Name == schemas[tableName].Queries[k].TableName {
schemas[tableName].Queries[k].Table = tables[j]
}
}
}
}

return schemas
}

// Engine extracts and loads data from database with specific dialect.
type Engine struct {
schema map[string]config.Schema
Expand All @@ -33,55 +67,24 @@ type jsonPayload struct {

// NewEngine returns a new Engine instance.
func NewEngine(ctx context.Context, cfg config.Config, dsn string, logger *zap.Logger) (*Engine, error) {
etl := &Engine{
config: cfg,
logger: logger,
}

dialect, err := etl.newDialect(ctx, dsn)
dialect, err := postgres.NewPGDialect(ctx, dsn)
if err != nil {
return nil, err
}

etl.dialect = dialect

tables, err := dialect.Tables(ctx)
if err != nil {
return nil, err
}

schema := make(map[string]config.Schema, len(tables))
for i := range tables {
tableName := tables[i].Name
found := false
for j := range cfg.Schema {
if tables[i].Name == cfg.Schema[j].TableName {
found = true
cfg.Schema[j].Table = tables[i]
schema[tableName] = cfg.Schema[j]

}
}
schema := copySchemaTables(cfg.Schema, tables)

if !found {
schema[tableName] = config.Schema{
TableName: tables[i].Name,
Table: tables[i],
}
}

for k := range schema[tableName].Queries {
for j := range tables {
if tables[j].Name == schema[tableName].Queries[k].TableName {
schema[tableName].Queries[k].Table = tables[j]
}
}
}
}

etl.schema = schema

return etl, nil
return &Engine{
config: cfg,
logger: logger,
dialect: dialect,
schema: schema,
}, nil
}

// Describe returns a table from its name.
Expand Down Expand Up @@ -164,18 +167,14 @@ func (e *Engine) extract(ctx context.Context, outputPath string, schema config.S
zap.String("files", strings.Join(filenames, " ")),
zap.String("output_path", outputPath))

if err := downloadFiles(ctx, filenames, path.Join(outputPath, "media")); err != nil {
if err := downloadFiles(ctx, filenames, path.Join(outputPath, "media"), 10); err != nil {
e.logger.Error("unable to download files", zap.Error(err))
}
}

return nil
}

func (e *Engine) newDialect(ctx context.Context, dsn string) (dialect.Dialect, error) {
return postgres.NewPGDialect(ctx, dsn)
}

func (e *Engine) newLoader() *loader {
return &loader{
dialect: e.dialect,
Expand Down
21 changes: 11 additions & 10 deletions etl/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,17 @@ import (
"github.com/ulule/mover/dialect"
)

type resultSet []map[string]interface{}
type extract map[string]entry
type entry map[string]resultSet

type extractor struct {
extract extract
dialect dialect.Dialect
schema map[string]config.Schema
logger *zap.Logger
}
type (
resultSet []map[string]interface{}
extract map[string]entry
entry map[string]resultSet
extractor struct {
extract extract
dialect dialect.Dialect
schema map[string]config.Schema
logger *zap.Logger
}
)

func depthF(depth int, msg string, args ...interface{}) string {
return strings.Repeat("\t", depth+1) + fmt.Sprintf(msg, args...)
Expand Down
17 changes: 11 additions & 6 deletions etl/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,14 @@ func chunkStrings(slice []string, chunkSize int) [][]string {
return chunks
}

func downloadFiles(ctx context.Context, filenames []string, outputPath string) error {
func downloadFiles(ctx context.Context, filenames []string, outputPath string, chunkSize int) error {
g, _ := errgroup.WithContext(ctx)
chunks := chunkStrings(filenames, 10)
var chunks [][]string
if chunkSize == 0 {
chunks = [][]string{filenames}
} else {
chunks = chunkStrings(filenames, chunkSize)
}
for i := range chunks {
for j := range chunks[i] {
g.Go((func(filename string) func() error {
Expand All @@ -81,28 +86,28 @@ func downloadFiles(ctx context.Context, filenames []string, outputPath string) e
return nil
}

func downloadFile(absoluteURL string, output string) error {
func downloadFile(absoluteURL string, outputDir string) error {
res, err := http.Get(absoluteURL)
if err != nil {
return fmt.Errorf("unable to retrieve %s: %w", absoluteURL, err)
}
defer res.Body.Close()

if res.StatusCode != 200 {
return fmt.Errorf("unable to download %s to %s: received %d HTTP code", absoluteURL, output, res.StatusCode)
return fmt.Errorf("unable to download %s to %s: received %d HTTP code", absoluteURL, outputDir, res.StatusCode)
}

u, err := url.Parse(absoluteURL)
if err != nil {
return fmt.Errorf("unable to parse %s: %w", absoluteURL, err)
}

path := filepath.Join(output, filepath.Dir(u.Path))
path := filepath.Join(outputDir, filepath.Dir(u.Path))
if err := os.MkdirAll(path, os.ModePerm); err != nil {
return fmt.Errorf("unable to create directory %s: %w", path, err)
}

file, err := os.Create(filepath.Join(output, u.Path))
file, err := os.Create(filepath.Join(outputDir, u.Path))
if err != nil {
return fmt.Errorf("unable to create file %s: %w", path, err)
}
Expand Down

0 comments on commit fb35bab

Please sign in to comment.