Skip to content

Commit

Permalink
Moved discovery service to cloud functions
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
nneverlander committed Oct 31, 2018
1 parent 0719b56 commit a6c5391
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 134 deletions.
58 changes: 11 additions & 47 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/picolo/crdb.go
Expand Up @@ -38,7 +38,7 @@ func SpawnCrdbInst() {
}

join := strings.Join(shardInfo.JoinInfo, ",")
log.Infof("Cluster join address: %s", join)
log.Infof("Shard address: %s", join)

instanceId := uuid.MakeV4().String()
log.Infof("Crdb instance id: %s", instanceId)
Expand Down Expand Up @@ -89,7 +89,7 @@ tryAgain:
}

func MaybeSpawnShard() {
if isPortOpen("127.0.0.1", anInstancePort) { // todo change host to PicNode.NetInfo.PublicIp4
if isPortOpen(PicNode.NetInfo.PublicIp4, anInstancePort) {
log.Info("Node is publicly reachable. Spawning a new shard")
} else {
log.Info("Node is not publicly reachable. Not spawning a shard")
Expand Down
132 changes: 52 additions & 80 deletions pkg/picolo/discovery.go
@@ -1,133 +1,105 @@
package picolo

import (
"cloud.google.com/go/firestore"
"context"
"firebase.google.com/go"
"bytes"
"encoding/json"
log "github.com/sirupsen/logrus"
"google.golang.org/api/option"
"google.golang.org/genproto/googleapis/type/latlng"
"os"
"io/ioutil"
"net/http"
"time"
)

const nodesPath = "nodes"
const instsPath = "instances"
const shardsPath = "shards"
const flaresPath = "flares"
const SERVICE_CREDS_FILE_ENV = "SERVICE_CREDS_FILE"
const baseUrl = "https://us-central1-flares-d1c56.cloudfunctions.net/"
const registerNodePath = "registerNode"
const registerInstPath = "registerInstance"
const getShardsPath = "getShardToJoin"
const throwFlarePath = "throwFlare"

var location = &latlng.LatLng{Latitude: 9, Longitude: 179} // todo change this
var FB_APP *firebase.App

func InitAppWithServiceAccount() {
data, ok := os.LookupEnv(SERVICE_CREDS_FILE_ENV)
if !ok {
log.Fatal("SERVICE_CREDS_FILE_ENV is not set")
}
opt := option.WithCredentialsFile(data)
app, err := firebase.NewApp(context.Background(), nil, opt)
if err != nil {
log.Infof("Error initializing app: %v", err)
}
FB_APP = app
}

func RegisterNode() {
id := PicNode.Id
log.Infof("Registering node %s", id)

client, err := FB_APP.Firestore(context.Background())
if err != nil {
log.Fatalf("Error initializing database client: %v", err)
}
defer client.Close()

log.Infof("Registering node %s", PicNode.Id)
PicNode.CreatedAt = time.Now()
PicNode.UpdatedAt = time.Now()
_, err = client.Collection(nodesPath).Doc(id).Set(context.Background(), PicNode)
jsonValue, err := json.Marshal(PicNode)
if err != nil {
log.Fatalf("Error marshaling json %v", err)
}
resp, err := http.Post(baseUrl+registerNodePath, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
log.Fatalf("Error registering node: %v", err)
}
log.Infof("Registered node with response status: %s", resp.Status)
}

func RegisterInstance(shard *Shard, inst *CrdbInst, newShard bool) {
shardId := shard.Id
log.Infof("Registering crdb instance %s, adding it to shard %s", inst.Id, shardId)

client, err := FB_APP.Firestore(context.Background())
if err != nil {
log.Errorf("Error initializing database client: %v", err)
}
defer client.Close()

log.Infof("Registering crdb instance %s, adding it to shard %s", inst.Id, shard.Id)
allMap := make(map[string]interface{})
// add instance to shard
shard.CrdbInsts = append(shard.CrdbInsts, inst.Id)
// add shard to node
batch := client.Batch()
if newShard {
log.Infof("Adding shard %s to node %s", shardId, PicNode.Id)
log.Infof("Adding shard %s to node %s", shard.Id, PicNode.Id)
shard.CreatedAt = time.Now()
PicNode.Shards = append(PicNode.Shards, shard.Id)
PicNode.UpdatedAt = time.Now()
batch.Set(client.Collection(nodesPath).Doc(PicNode.Id), PicNode)
allMap["node"] = PicNode
}
shard.UpdatedAt = time.Now()
inst.CreatedAt = time.Now()
inst.UpdatedAt = time.Now()

batch.Set(client.Collection(shardsPath).Doc(shardId), shard)
batch.Set(client.Collection(instsPath).Doc(inst.Id), inst)
_, err = batch.Commit(context.Background())
allMap["shard"] = shard
allMap["instance"] = inst

jsonValue, err := json.Marshal(allMap)
if err != nil {
log.Errorf("Error registering instance: %v", err)
log.Fatalf("Error marshaling json %v", err)
}

resp, err := http.Post(baseUrl+registerInstPath, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
log.Fatalf("Error registering instance: %v", err)
}
log.Infof("Registered instance with response status: %s", resp.Status)
}

func GetShardToJoin() (shard Shard, err error) {
client, err := FB_APP.Firestore(context.Background())
log.Info("Getting a shard to join")
// get a shard to join
resp, err := http.Get(baseUrl + getShardsPath)
if err != nil {
log.Errorf("Error initializing database client: %v", err)
return
log.Fatalf("Error getting shard to join: %v", err)
}
defer client.Close()

// get a cluster to join
// currently joining the last updated cluster
query := client.Collection(shardsPath).OrderBy("UpdatedAt", firestore.Asc).Limit(1).Documents(context.Background())
docs, err := query.GetAll()
defer resp.Body.Close()
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
return
log.Fatalf("Error reading shard response: %v", err)
}
// get the last document
if len(docs) >= 1 {
doc := docs[len(docs)-1]
if err := doc.DataTo(&shard); err != nil {
log.Errorf("Error converting data %v", err)
}
} else {
log.Error("No shard to join")
if err := json.Unmarshal(bodyBytes, &shard); err != nil {
log.Errorf("Error converting data %v", err)
}
return
}

func ThrowFlare() {
log.Infof("Throwing a flare")
client, err := FB_APP.Firestore(context.Background())

flare := make(map[string]interface{})
flare["nodeId"] = PicNode.Id
flare["nodeName"] = PicNode.Name
flare["lastFired"] = time.Now()
flare["location"] = location

jsonValue, err := json.Marshal(flare)
if err != nil {
log.Errorf("Error initializing database client: %v", err)
return
log.Fatalf("Error marshaling json %v", err)
}
defer client.Close()

_, err = client.Collection(flaresPath).Doc(PicNode.Id).Set(context.Background(), map[string]interface{}{
"nodeId": PicNode.Id,
"nodeName": PicNode.Name,
"lastFired": firestore.ServerTimestamp,
"location": location,
}, firestore.MergeAll)

resp, err := http.Post(baseUrl+throwFlarePath, "application/json", bytes.NewBuffer(jsonValue))
if err != nil {
log.Errorf("Error throwing flare: %v", err)
log.Fatalf("Error throwing flare: %v", err)
}
log.Infof("Threw a flare with response status: %s, response code: %d", resp.Status, resp.StatusCode)
}
2 changes: 1 addition & 1 deletion pkg/picolo/selfUpdater.go
Expand Up @@ -10,7 +10,7 @@ import (

const version = "1.0.4"
const repo = "picolonet/cockroachdb"
const selfUpdateTime = "4:18"
const selfUpdateTime = "13:00"
const selfUpdateTimeZone = "America/Los_Angeles"
const picoloUpdaterCommandName = "picolo-updater"

Expand Down

0 comments on commit a6c5391

Please sign in to comment.