Skip to content

Commit

Permalink
multidc for export and dont write a new version for same kv (if incve…
Browse files Browse the repository at this point in the history
…rsion is not enforced)
  • Loading branch information
mgaida committed Aug 29, 2017
1 parent 464d98d commit f00c56f
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 72 deletions.
57 changes: 29 additions & 28 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"fmt"
"log"
"os"

Expand All @@ -14,6 +13,10 @@ import (
func main() {
app := cli.NewApp()
app.Name = "consul-mirror"
app.Author = "Michael Gaida"
app.Copyright = "Apache License - Version 2.0, January 2004 - http://www.apache.org/licenses/"
app.Email = "michael.gaida@protonmail.com"
app.Description = "Mirror your consul cluster for fallback in case outages or to copy it into another environment"
app.Version = "0.1.0"

app.Flags = []cli.Flag{
Expand All @@ -23,38 +26,40 @@ func main() {
},
}

validateHelpText := `consul-mirror validate [options] FILE
Performs a basic sanity test on consul-mirror configuration files.
The validate command will attempt to parse the contents just as the
"consul-mirror" command would, and catch any errors. This is useful
to do a test of the configuration only, without actually starting
consul-mirror.
Returns 0 if the configuration is valid, or 1 if there are problems.`
app.Commands = []cli.Command{
cli.Command{
Name: "validate",
UsageText: validateHelpText,
Name: "validate",
UsageText: `consul-mirror validate [options] FILE
Performs a basic sanity test on consul-mirror configuration files.
The validate command will attempt to parse the contents just as the
"consul-mirror" command would, and catch any errors. This is useful
to do a test of the configuration only, without actually starting
consul-mirror.
Returns 0 if the configuration is valid, or 1 if there are problems.`,

Action: func(c *cli.Context) {
commandValidate(c.Args().First(), validateHelpText)
if c.Args().Present() {
os.Exit(commandValidate(c.Args().First()))
}
cli.ShowCommandHelp(c, "validate")
},
},
cli.Command{
Name: "import, i",
Name: "import",
Usage: "import from consul",
Action: func(c *cli.Context) {
commandImport(c.GlobalBool("verbose"), c.BoolT("dc"))
},
},
cli.Command{
Name: "export, e",
Name: "export",
Usage: "export from consul",
Flags: []cli.Flag{
cli.BoolFlag{
Name: "dc",
Usage: "keep the dcs",
Name: "ignoredc",
Usage: "ignore the original dc",
},
cli.BoolFlag{
Name: "incversion",
Expand All @@ -66,7 +71,7 @@ func main() {
},
},
Action: func(c *cli.Context) {
commandExport(c.GlobalBool("verbose"), c.BoolT("dc"), c.BoolT("incversion"), c.String("prefix"))
commandExport(c.GlobalBool("verbose"), c.BoolT("ignoredc"), c.BoolT("incversion"), c.String("prefix"))
},
},
}
Expand All @@ -84,19 +89,19 @@ func initConsul(verbose bool) (*storage.Mssql, *consul.Consul) {

// s := storage.Mssql{}
conn := storage.OpenConnection(config)
defer conn.Close()

consul := consul.GetConsul(config)

return conn, consul
}

func commandExport(verbose, keepDC, incversion bool, prefix string) {
func commandExport(verbose, ignoreDC, incversion bool, prefix string) {
conn, consul := initConsul(verbose)
defer conn.Close()

dcs := consul.GetDCs()
kvs := consul.GetKVs(prefix, dcs)
conn.WriteKVs(kvs, keepDC)
conn.WriteKVs(kvs, ignoreDC, incversion)
}

func commandImport(verbose, keepDC bool) {
Expand All @@ -109,11 +114,7 @@ func commandImport(verbose, keepDC bool) {
}
}

func commandValidate(file, validateHelpText string) {
if file != "" {
testConfiguration := configuration.GetConfig(file)
os.Exit(testConfiguration.ValidateConfiguration())
} else {
fmt.Println(validateHelpText)
}
func commandValidate(file string) int {
testConfiguration := configuration.GetConfig(file)
return testConfiguration.ValidateConfiguration()
}
91 changes: 49 additions & 42 deletions storage/in.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package storage

import (
"fmt"
"log"
"time"

Expand All @@ -9,67 +10,73 @@ import (

// WriteKVs writes a KV array to a MSSQL table
// TODO: Only write if value (in dc) changed
func (db *Mssql) WriteKVs(kvs []consul.KV, keepDCs bool) {
version, err := db.conn.Prepare("select ISNULL(MAX(version), 0) from kv where kvkey = ? and datacenter = ?")
defer version.Close()
if err != nil {
log.Fatal("Prepare statement for get highest version failed: ", err.Error())
}
func (db *Mssql) WriteKVs(kvs []consul.KV, ignoreDCs, incversion bool) {

insert, err := db.conn.Prepare("insert into kv (timestamp, createIndex, flags, kvkey, lockindex, modifyindex, regex, session, kvvalue, version, datacenter) values (?,?,?,?,?,?,?,?,?,?, ?)")
defer insert.Close()
if err != nil {
log.Fatal("Prepare stmt failed: ", err.Error())
}

for i := range kvs {
v := 0
versionres, err := version.Query(kvs[i].Key, kvs[i].Datacenter)
if err != nil {
log.Fatal("Get highest version failed: ", err.Error())
}
for versionres.Next() {
err := versionres.Scan(&v)

if err != nil {
log.Fatal("Scan highest version failed: ", err.Error())
}
v++
dc := ""
if !ignoreDCs {
dc = kvs[i].Datacenter
}

if db.debug {
log.Printf("Write KV %s=%s (version %d)\n", kvs[i].Key, kvs[i].Value, v)
}
v := db.getLatestVersion(kvs[i].Key, dc)

res, err := insert.Exec(
time.Now(),
kvs[i].CreateIndex,
kvs[i].Flags,
kvs[i].Key,
kvs[i].LockIndex,
kvs[i].ModifyIndex,
kvs[i].Regex,
kvs[i].Session,
kvs[i].Value,
v,
kvs[i].Datacenter)
// If the incversion is false we want only to write a entry if the key for dc changed
// For the version we should gather the old version if available and check if the value changes
if err != nil {
log.Fatal("Exec into DB failed: ", err.Error())
}
if db.debug {
lastID, err := res.LastInsertId()
if err != nil {
log.Fatal(err)
if (incversion == true) || (db.kvIsModified(kvs[i], v) == true) {
v++

if db.debug {
log.Printf("Write KV %s=%s (version %d)\n", kvs[i].Key, kvs[i].Value, v)
}
rowCnt, err := res.RowsAffected()

res, err := insert.Exec(
time.Now(),
kvs[i].CreateIndex,
kvs[i].Flags,
kvs[i].Key,
kvs[i].LockIndex,
kvs[i].ModifyIndex,
kvs[i].Regex,
kvs[i].Session,
kvs[i].Value,
v,
dc)
if err != nil {
log.Fatal(err)
log.Fatal("Exec into DB failed: ", err.Error())
}
if db.debug {
lastID, err := res.LastInsertId()
if err != nil {
log.Fatal(err)
}
rowCnt, err := res.RowsAffected()
if err != nil {
log.Fatal(err)
}
log.Printf("ID = %d, affected = %d\n", lastID, rowCnt)
}
log.Printf("ID = %d, affected = %d\n", lastID, rowCnt)
}
}
}

func (db *Mssql) kvIsModified(kv consul.KV, version int) bool {
dbkv, err := db.getKV(kv.Key, kv.Datacenter, version)
if err != nil {
log.Fatal(err)
}
if kv.Key == "global/it-devops/consul-mirror/" {
fmt.Print("")
}
return !kv.Equals(dbkv)
}

func (db *Mssql) writeACLs(acls []consul.ACL) {
prep, err := db.conn.Prepare("inset into acl (CreateIndex, ID, ModifyIndex, Name, Rules, Type) values (?,?,?,?,?,?)")
if err != nil {
Expand Down
46 changes: 44 additions & 2 deletions storage/out.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,60 @@ func (db *Mssql) GetKVs() ([]consul.KV, error) {
var result = []consul.KV{}

// Get all keys with the highest version
rows, err := db.conn.Query("select DISTINCT a.flags, a.kvkey, a.lockindex, a.modifyindex, a.regex, a.session, a.kvvalue from [consul].[dbo].[kv] a left outer join [consul].[dbo].[kv] b on a.datacenter = b.datacenter AND a.kvkey = b.kvkey AND a.version < b.version where b.kvkey is NULL")
rows, err := db.conn.Query("select DISTINCT a.flags, a.kvkey, a.lockindex, a.modifyindex, a.regex, a.session, a.kvvalue, a.datacenter from [consul].[dbo].[kv] a left outer join [consul].[dbo].[kv] b on a.datacenter = b.datacenter AND a.kvkey = b.kvkey AND a.version < b.version where b.kvkey is NULL")
defer rows.Close()
if err != nil {
log.Fatal(err)
}

for rows.Next() {
err = rows.Scan(&kv.Flags, &kv.Key, &kv.LockIndex, &kv.ModifyIndex, &kv.Regex, &kv.Session, &kv.Value)
err = rows.Scan(&kv.Flags, &kv.Key, &kv.LockIndex, &kv.ModifyIndex, &kv.Regex, &kv.Session, &kv.Value, &kv.Datacenter)
if err != nil {
log.Fatal(err)
}
result = append(result, kv)
}
return result, nil
}

// GetKV reads KVs from the storage and returns all with the highest version
func (db *Mssql) getKV(key, dc string, version int) (consul.KV, error) {
var kv = consul.KV{}

// Get kv with the highest version
rows, err := db.conn.Query("select flags, kvkey, lockindex, modifyindex, regex, session, kvvalue, datacenter from [consul].[dbo].[kv] where kvkey = ? AND datacenter = ? AND version = ?", key, dc, version)
defer rows.Close()
if err != nil {
log.Fatal(err)
}
for rows.Next() {
err = rows.Scan(&kv.Flags, &kv.Key, &kv.LockIndex, &kv.ModifyIndex, &kv.Regex, &kv.Session, &kv.Value, &kv.Datacenter)
if err != nil {
log.Fatal(err)
}
}
return kv, nil
}

// getLatestVersion returns the highest version for a key in a dc
func (db *Mssql) getLatestVersion(key, dc string) int {
v := 0
version, err := db.conn.Prepare("select ISNULL(MAX(version), 0) from kv where kvkey = ? and datacenter = ?")
defer version.Close()
if err != nil {
log.Fatal("Prepare statement for get highest version failed: ", err.Error())
}

versionres, err := version.Query(key, dc)
if err != nil {
log.Fatal("Get highest version failed: ", err.Error())
}
for versionres.Next() {
err := versionres.Scan(&v)

if err != nil {
log.Fatal("Scan highest version failed: ", err.Error())
}
}
return v
}

0 comments on commit f00c56f

Please sign in to comment.