Skip to content
This repository has been archived by the owner on Feb 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' into lockmgr-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 committed Jan 17, 2017
2 parents 16ba6e4 + c6284ea commit 6628050
Show file tree
Hide file tree
Showing 57 changed files with 3,146 additions and 1,266 deletions.
19 changes: 13 additions & 6 deletions README.md
Expand Up @@ -6,7 +6,6 @@ This repo contains the source code of Cherami server, cross-zone replicator serv

Getting started
---------------

To get cherami-server:

```
Expand All @@ -15,29 +14,37 @@ git clone git@github.com:uber/cherami-server.git $GOPATH/src/github.com/uber/che

Build
-----
We use [`glide`](https://glide.sh) to manage Go dependencies. Additionally, we need a Cassandra running locally in order to run the integration tests. Please make sure `glide` and `cqlsh` are in your PATH, and `cqlsh` can connect to the local Cassandra server.
We use [`glide`](https://glide.sh) to manage Go dependencies. Please make sure `glide` is in your PATH before you attempt to build.

* Build the `cherami-server` and other binaries:
* Build the `cherami-server` and other binaries (will not run test):
```
make bins
```

Local Test
----------
We need a Cassandra running locally in order to run the integration tests. Please make sure `cqlsh` is in `/usr/local/bin`, and it can connect to the local Cassandra server.
```
make test
```

Run Cherami locally
-------------------

* Setup the cherami keyspace for metadata:
```
./scripts/cherami-setup-schema
```

* The service can be started as follows:
```
CHERAMI_ENVIRONMENT=laptop CHERAMI_CONFIG_DIR=`pwd`/config CHERAMI_STORE=/tmp/store ./cherami-server start all
CHERAMI_ENVIRONMENT=local ./cherami-server start all
```

Note: `cherami-server` is configured via `config/base.yaml` with some parameters overriden by `config/local.yaml`.

One can use the CLI to verify if Cherami is running properly:
```
./cherami-cli --hostport=<localIP>:4922 create destination /test/cherami
./cherami-cli --env=prod --hostport=<localIP>:4922 create destination /test/cherami
```

Deploy Cherami as a cluster
Expand Down
27 changes: 22 additions & 5 deletions clients/metadata/metadata_cassandra.go
Expand Up @@ -102,6 +102,7 @@ const (
columnHostName = "hostname"
columnInputHostUUID = "input_host_uuid"
columnOriginZone = "origin_zone"
columnRemoteExtentPrimaryStore = "remote_extent_primary_store"
columnLastAddress = "last_address"
columnLastEnqueueTime = "last_enqueue_time"
columnLastSequence = "last_sequence"
Expand Down Expand Up @@ -1926,6 +1927,7 @@ const (
columnStoreUUIDS + `: ?, ` +
columnInputHostUUID + `: ?, ` +
columnOriginZone + `: ?, ` +
columnRemoteExtentPrimaryStore + `: ?, ` +
columnStatus + `: ?}`

sqlInsertDstExent = `INSERT INTO ` + tableDestinationExtents + ` (` +
Expand Down Expand Up @@ -1970,6 +1972,7 @@ const (
columnStoreUUIDS + `: ?, ` +
columnInputHostUUID + `: ?, ` +
columnOriginZone + `: ?, ` +
columnRemoteExtentPrimaryStore + `: ?, ` +
columnStatus + `: ?, ` +
columnArchivalLocation + `: ?` +
`}`
Expand Down Expand Up @@ -2084,6 +2087,7 @@ func (s *CassandraMetadataService) createExtentImpl(extent *shared.Extent, exten
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
extentStatus,
replicaStatsList,
consumerGroupVisibility,
Expand All @@ -2101,6 +2105,7 @@ func (s *CassandraMetadataService) createExtentImpl(extent *shared.Extent, exten
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
extentStatus,
replicaStatsList,
)
Expand All @@ -2122,6 +2127,7 @@ func (s *CassandraMetadataService) createExtentImpl(extent *shared.Extent, exten
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
extentStatus,
replicaStats,
)
Expand Down Expand Up @@ -2190,6 +2196,7 @@ func (s *CassandraMetadataService) deleteExtentImpl(extent *shared.Extent, exten
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
shared.ExtentStatus_DELETED,
extentStatsMap,
extentStats.ConsumerGroupVisibility,
Expand All @@ -2208,6 +2215,7 @@ func (s *CassandraMetadataService) deleteExtentImpl(extent *shared.Extent, exten
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
shared.ExtentStatus_DELETED,
extentStatsMap,
defaultDeleteTTLSeconds,
Expand All @@ -2227,6 +2235,7 @@ func (s *CassandraMetadataService) deleteExtentImpl(extent *shared.Extent, exten
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
shared.ExtentStatus_DELETED,
replicaStats,
defaultDeleteTTLSeconds,
Expand All @@ -2252,6 +2261,7 @@ func (s *CassandraMetadataService) updateExtent(extentStats *shared.ExtentStats,
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
newStatus,
newArchivalLocation,
extent.GetDestinationUUID(),
Expand All @@ -2266,6 +2276,7 @@ func (s *CassandraMetadataService) updateExtent(extentStats *shared.ExtentStats,
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
newStatus,
newArchivalLocation,
extent.GetDestinationUUID(),
Expand All @@ -2282,6 +2293,7 @@ func (s *CassandraMetadataService) updateExtent(extentStats *shared.ExtentStats,
extent.GetStoreUUIDs(),
extent.GetInputHostUUID(),
extent.GetOriginZone(),
extent.GetRemoteExtentPrimaryStore(),
newStatus,
newArchivalLocation,
storeID,
Expand Down Expand Up @@ -2316,6 +2328,10 @@ func (s *CassandraMetadataService) UpdateExtentStats(ctx thrift.Context, request
}

extent := extentStatsResult.ExtentStats.Extent
if request.IsSetRemoteExtentPrimaryStore() {
extent.RemoteExtentPrimaryStore = common.StringPtr(request.GetRemoteExtentPrimaryStore())
}

if len(request.GetArchivalLocation()) == 0 {
request.ArchivalLocation = common.StringPtr(extentStatsResult.ExtentStats.GetArchivalLocation())
}
Expand Down Expand Up @@ -2442,11 +2458,12 @@ func uuidSliceToStringSlice(i interface{}) []string {
func convertExtentStats(extentMap map[string]interface{}, extentStatsMap map[string]map[string]interface{}) *shared.ExtentStats {
result := &shared.ExtentStats{
Extent: &shared.Extent{
ExtentUUID: common.StringPtr(toUUIDString(extentMap[columnUUID])),
DestinationUUID: common.StringPtr(toUUIDString(extentMap[columnDestinationUUID])),
StoreUUIDs: uuidSliceToStringSlice(extentMap[columnStoreUUIDS]),
InputHostUUID: common.StringPtr(toUUIDString(extentMap[columnInputHostUUID])),
OriginZone: common.StringPtr(toString(extentMap[columnOriginZone])),
ExtentUUID: common.StringPtr(toUUIDString(extentMap[columnUUID])),
DestinationUUID: common.StringPtr(toUUIDString(extentMap[columnDestinationUUID])),
StoreUUIDs: uuidSliceToStringSlice(extentMap[columnStoreUUIDS]),
InputHostUUID: common.StringPtr(toUUIDString(extentMap[columnInputHostUUID])),
OriginZone: common.StringPtr(toString(extentMap[columnOriginZone])),
RemoteExtentPrimaryStore: common.StringPtr(toString(extentMap[columnRemoteExtentPrimaryStore])),
},
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus(toInt(extentMap[columnStatus]))),
ArchivalLocation: common.StringPtr(toString(extentMap[columnArchivalLocation])),
Expand Down
25 changes: 15 additions & 10 deletions clients/metadata/metadata_cassandra_test.go
Expand Up @@ -563,10 +563,11 @@ func (s *CassandraSuite) TestExtentCRU() {
extentUUID := uuid.New()
storeIds := []string{uuid.New(), uuid.New(), uuid.New()}
extent := &shared.Extent{
ExtentUUID: common.StringPtr(extentUUID),
DestinationUUID: common.StringPtr(dest.GetDestinationUUID()),
StoreUUIDs: storeIds,
InputHostUUID: common.StringPtr(uuid.New()),
ExtentUUID: common.StringPtr(extentUUID),
DestinationUUID: common.StringPtr(dest.GetDestinationUUID()),
StoreUUIDs: storeIds,
InputHostUUID: common.StringPtr(uuid.New()),
RemoteExtentPrimaryStore: common.StringPtr(uuid.New()),
}
createExtent := &shared.CreateExtentRequest{Extent: extent}
t0 := time.Now().UnixNano() / int64(time.Millisecond)
Expand Down Expand Up @@ -622,10 +623,11 @@ func (s *CassandraSuite) TestExtentCRU() {
s.Equal(extentStatsOrig.GetStatusUpdatedTimeMillis(), extentStats.GetExtentStats().GetStatusUpdatedTimeMillis())

updateExtent := &m.UpdateExtentStatsRequest{
DestinationUUID: common.StringPtr(extent.GetDestinationUUID()),
ExtentUUID: common.StringPtr(extent.GetExtentUUID()),
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_ARCHIVED),
ArchivalLocation: common.StringPtr("S3:foo/bar"),
DestinationUUID: common.StringPtr(extent.GetDestinationUUID()),
ExtentUUID: common.StringPtr(extent.GetExtentUUID()),
Status: common.MetadataExtentStatusPtr(shared.ExtentStatus_ARCHIVED),
ArchivalLocation: common.StringPtr("S3:foo/bar"),
RemoteExtentPrimaryStore: common.StringPtr(uuid.New()),
}

t0 := time.Now().UnixNano() / int64(time.Millisecond)
Expand All @@ -634,14 +636,17 @@ func (s *CassandraSuite) TestExtentCRU() {

s.Nil(err)
s.Equal(updateExtent.Status, updateResult.ExtentStats.Status)
s.Equal(updateExtent.ArchivalLocation, updateResult.ExtentStats.ArchivalLocation)
s.Equal(updateExtent.GetArchivalLocation(), updateResult.GetExtentStats().GetArchivalLocation())
s.Equal(updateExtent.GetRemoteExtentPrimaryStore(), updateResult.GetExtentStats().GetExtent().GetRemoteExtentPrimaryStore())

readExtentStats = &m.ReadExtentStatsRequest{DestinationUUID: extent.DestinationUUID, ExtentUUID: extent.ExtentUUID}
time.Sleep(1 * time.Second)
extentStats, err = s.client.ReadExtentStats(nil, readExtentStats)
s.Nil(err)
s.NotNil(extentStats)
s.Equal(shared.ExtentStatus_ARCHIVED, extentStats.ExtentStats.GetStatus())
s.Equal(shared.ExtentStatus_ARCHIVED, extentStats.GetExtentStats().GetStatus())
s.Equal(updateExtent.GetArchivalLocation(), extentStats.GetExtentStats().GetArchivalLocation())
s.Equal(updateExtent.GetRemoteExtentPrimaryStore(), extentStats.GetExtentStats().GetExtent().GetRemoteExtentPrimaryStore())
s.True(extentStats.GetExtentStats().GetStatusUpdatedTimeMillis() >= t0)
s.True(extentStats.GetExtentStats().GetStatusUpdatedTimeMillis() <= tX)
}
Expand Down
Expand Up @@ -115,7 +115,8 @@ CREATE TYPE extent (
input_host_uuid uuid,
status int, -- ExtentStatus enum
archival_location text,
origin_zone text
origin_zone text,
remote_extent_primary_store text, // the primary store for remote extent
);

CREATE TYPE extent_replica_stats (
Expand Down
21 changes: 0 additions & 21 deletions clients/metadata/schema/metadata_keyspace_test.cql

This file was deleted.

@@ -0,0 +1 @@
ALTER TYPE extent ADD remote_extent_primary_store text;
8 changes: 8 additions & 0 deletions clients/metadata/schema/v13/manifest.json
@@ -0,0 +1,8 @@
{
"CurrVersion": 13,
"MinCompatibleVersion": 8,
"Description": "add remote_extent_primary_store to extent type",
"SchemaUpdateCqlFiles": [
"201612180000_xdc_add_remote_extent_primary_store.cql"
]
}
2 changes: 1 addition & 1 deletion clients/metadata/util.go
Expand Up @@ -144,7 +144,7 @@ func (s *TestCluster) SetupTestCluster() {
ip := `127.0.0.1`
s.createCluster(ip, gocql.Consistency(1), generateRandomKeyspace(10))
s.createKeyspace(1)
s.loadSchema("schema/metadata_test.cql")
s.loadSchema("schema/metadata.cql")

var err error
s.client, err = NewCassandraMetadataService(&configure.MetadataConfig{
Expand Down
6 changes: 3 additions & 3 deletions cmd/tools/cli/main.go
Expand Up @@ -53,7 +53,7 @@ func main() {
})
app.Name = "cherami"
app.Usage = "A command-line tool for cherami users"
app.Version = "1.1.4"
app.Version = "1.1.5"
app.Flags = []cli.Flag{
cli.BoolTFlag{
Name: "hyperbahn",
Expand All @@ -66,8 +66,8 @@ func main() {
},
cli.StringFlag{
Name: "env",
Value: "",
Usage: "Deployment to connect. By default connects to production. Use \"staging\" to connect to staging",
Value: "staging",
Usage: "Deployment to connect to. By default connects to staging. Use \"prod\" to connect to production",
},
cli.StringFlag{
Name: "hyperbahn_bootstrap_file, hbfile",
Expand Down

0 comments on commit 6628050

Please sign in to comment.