Skip to content

Commit

Permalink
Notes on cassandra avro interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip (flip) Kromer committed Aug 29, 2010
1 parent 1a95b6c commit d1bc9d6
Show file tree
Hide file tree
Showing 3 changed files with 565 additions and 0 deletions.
36 changes: 36 additions & 0 deletions docpages/avro/performance.textile
@@ -0,0 +1,36 @@


h2. Bulk Streaming use cases

* Take a bunch of nightly calculations and need to flood it into the DB -- http://sna-projects.com/blog/2009/06/building-a-1-tb-data-cycle-at-linkedin-with-hadoop-and-project-voldemort/ In this case, it's important that the bulk load happen efficiently but with low stress on the DB. I'm willing to make it so that data streams to each cassandra node in the sort order and pre-partitioned just like the node wants to use it. Should run at the full streaming speed of the disk.

* Building a new table or moving a legacy database over to cassandra. I want to write from one or several nodes, probably not in the cluster, and data will be completely unpartitioned. I might be able to make some guarantees about uniqueness of keys and rows (that is, you'll generally only see a key once, and/or when you see a key it will contain the entire row). 20k inserts/s / receiving node.

* Using cassandra to replace HDFS. Replication is for compute, not for availability -- so efficient writing at consistency level ANY is important. Would like to get 100k inserts/s per receiving node.

* A brand new user wants to just stuff his goddamn data into the goddamn database and start playing with it. It had better be not-terribly-slow, and it had better be really easy to take whatever insane format it shows up in and cram that into the data hole. It should also be conceptually straighforward: it should look like I'm writing hashes or hashes of hashes.


===========================================================================
From http://sna-projects.com/blog/2009/06/building-a-1-tb-data-cycle-at-linkedin-with-hadoop-and-project-voldemort/

Here are the times taken:

* 100GB: 28mins (400 mappers, 90 reducers)
* 512GB: 2hrs, 16mins (2313 mappers, 350 reducers)
* 1TB: 5hrs, 39mins (4608 mappers, 700 reducers)

Data transfer between the clusters happens at a steady rate bound by the disk or network. For our Amazon instances this is around 40MB/second.

Online Performance

Lookup time for a single Voldemort node compares well to a single MySQL instance as well. To test this we ran local tests against the 100GB per-node data from the 1 TB test. This test as well was run on an Amazon Extra Large instance with 15GB of RAM and the 4 ephemeral disks in a RAID 10 configuration. To run the tests we simulated we simulated 1 million requests from a real request stream recorded on our production system against each of storage systems. We see the following performance for 1 million requests against a single node:
MySQL Voldemort
Reqs per sec. 727 1291
Median req. time 0.23 ms 0.05 ms
Avg. req. time 13.7 ms 7.7 ms

99th percentile req. time
127.2 ms 100.7 ms

These numbers are both for local requests with no network involved as the only intention is to benchmark the storage layer of these systems.
211 changes: 211 additions & 0 deletions examples/cassandra_streaming/client_schema.avpr
@@ -0,0 +1,211 @@
{
"protocol" : "Cassandra",
"namespace" : "org.apache.cassandra.avro", "types" : [

Add/insert one value

Mutate ks, [col_ref], 'val', ts, ttl }

Add/insert multiple cols to same row

MutateRow ks, supercol_or_nil, { [col, val, ts, ttl], [col,val,ts,ttl],...}}
MutateCRow ks, { [col, val, ts, ttl], [col,val,ts,ttl],...}}
MutateSCRow ks, supercol, { [col, val, ts, ttl], [col,val,ts,ttl],...}}

Get one, many or all columns from given row

get
Multiget ks, supercol_or_nil, [col1, col2, ...] or nil

Get one, many or all columns from a slice of sequential rows

get_range

Remove one column from a row

remove

Remove many columns from a row

Remove all columns in a row



h3. Mo


{ "name" : "AccessLevel", "type" : "enum", "symbols" : [ "NONE", "READONLY", "READWRITE", "FALL" ] },
{ "name" : "ColumnPath", "type" : "record", "fields" : [
{ "name" : "column_family", "type" : "string"},
{ "name" : "super_column", "type" : [ "bytes", "null" ]},
{ "name" : "column", "type" : [ "bytes", "null" ] } ]},
{ "name" : "ColumnParent", "type" : "record", "fields" : [
{ "name" : "column_family", "type" : "string"},
{ "name" : "super_column", "type" : [ "bytes", "null" ] } ]},
{ "name" : "SliceRange", "type" : "record", "fields" : [
{ "name" : "start", "type" : "bytes"},
{ "name" : "finish", "type" : "bytes"},
{ "name" : "reversed", "type" : "boolean"},
{ "name" : "count", "type" : "int"},
{ "name" : "bitmasks", "type" : [ { "type" : "array", "items" : "bytes"}, "null" ] } ]},
{ "name" : "SlicePredicate", "type" : "record", "fields" : [
{ "name" : "column_names", "type" : [ { "type" : "array", "items" : "bytes"}, "null" ]},
{ "name" : "slice_range", "type" : [ "SliceRange", "null" ] } ]},

{ "name" : "Clock", "type" : "record", "fields" : [
{ "name" : "timestamp", "type" : "long" } ]},
{ "name" : "Column", "type" : "record", "fields" : [
{ "name" : "name", "type" : "bytes"},
{ "name" : "value", "type" : "bytes"},
{ "name" : "clock", "type" : "Clock"},
{ "name" : "ttl", "type" : "int" } ]},
{ "name" : "SuperColumn", "type" : "record", "fields" : [
{ "name" : "name", "type" : "bytes"},
{ "name" : "columns", "type" : { "type" : "array", "items" : "Column" } } ]},
{ "name" : "ColumnOrSuperColumn", "type" : "record", "fields" : [
{ "name" : "column", "type" : "Column" },
{ "name" : "super_column", "type" : "null" } ]},
{ "name" : "Deletion", "type" : "record", "fields" : [
{ "name" : "clock", "type" : "Clock"},
{ "name" : "super_column", "type" : [ "bytes", "null" ]},
{ "name" : "predicate", "type" : [ "SlicePredicate", "null" ] } ]},
{ "name" : "Mutation", "type" : "record", "fields" : [
{ "name" : "column_or_supercolumn", "type" : "ColumnOrSuperColumn" },
{ "name" : "deletion", "type" : "null" }
]},
{ "name" : "StreamingMutation", "type" : "record", "fields" : [
{ "name" : "key", "type" : "bytes" },
{ "name" : "mutation", "type" : "Mutation" } ]},

{ "name" : "IndexType", "type" : "enum", "symbols" : [ "KEYS" ]},
{ "name" : "ColumnDef", "type" : "record", "fields" : [
{ "name" : "name", "type" : "bytes"},
{ "name" : "validation_class", "type" : "string"},
{ "name" : "index_type", "type" : [ "IndexType", "null" ]},
{ "name" : "index_name", "type" : [ "string", "null" ] } ]},
{ "name" : "CfDef", "type" : "record", "fields" : [
{ "name" : "keyspace", "type" : "string"},
{ "name" : "name", "type" : "string"},
{ "name" : "column_type", "type" : [ "string", "null" ]},
{ "name" : "clock_type", "type" : [ "string", "null" ]},
{ "name" : "comparator_type", "type" : [ "string", "null" ]},
{ "name" : "subcomparator_type", "type" : [ "string", "null" ]},
{ "name" : "reconciler", "type" : [ "string", "null" ]},
{ "name" : "comment", "type" : [ "string", "null" ]},
{ "name" : "row_cache_size", "type" : [ "double", "null" ]},
{ "name" : "preload_row_cache", "type" : [ "boolean", "null" ]},
{ "name" : "key_cache_size", "type" : [ "double", "null" ]},
{ "name" : "read_repair_chance", "type" : [ "double", "null" ]},
{ "name" : "gc_grace_seconds", "type" : [ "int", "null" ]},
{ "name" : "column_metadata", "type" : [ { "type" : "array", "items" : "ColumnDef"}, "null" ]},
{ "name" : "id", "type" : [ "int", "null" ] } ]},
{ "name" : "KsDef", "type" : "record", "fields" : [
{ "name" : "name", "type" : "string"}, { "name" : "strategy_class", "type" : "string"},
{ "name" : "strategy_options", "type" : [ { "type" : "map", "values" : "string"}, "null" ]},
{ "name" : "replication_factor", "type" : "int"}, { "name" : "cf_defs", "type" : { "type" : "array", "items" : "CfDef" } } ]},
{ "name" : "MutationsMapEntry", "type" : "record", "fields" : [ { "name" : "key", "type" : "bytes"}, { "name" : "mutations", "type" : { "type" : "map", "values" : { "type" : "array", "items" : "Mutation" } } } ]},
{ "name" : "CoscsMapEntry", "type" : "record", "fields" : [ { "name" : "key", "type" : "bytes"}, { "name" : "columns", "type" : { "type" : "array", "items" : "ColumnOrSuperColumn" } } ]},
{ "name" : "ConsistencyLevel", "type" : "enum", "symbols" : [ "ZERO", "ONE", "QUORUM", "DCQUORUM", "DCQUORUMSYNC", "ALL" ]},
{ "name" : "InvalidRequestException", "type" : "error", "fields" : [ { "name" : "why", "type" : [ "string", "null" ] } ]},
{ "name" : "NotFoundException", "type" : "error", "fields" : [ { "name" : "why", "type" : [ "string", "null" ] } ]},
{ "name" : "UnavailableException", "type" : "error", "fields" : [ { "name" : "why", "type" : [ "string", "null" ] } ]},
{ "name" : "TimedOutException", "type" : "error", "fields" : [ { "name" : "why", "type" : [ "string", "null" ] } ] }
],


"messages" : { "get" : {
"request" : [ { "name" : "key", "type" : "bytes"},
{ "name" : "column_path", "type" : "ColumnPath"},
{ "name" : "consistency_level", "type" : "ConsistencyLevel"
} ],
"response" : "ColumnOrSuperColumn",
"errors" : [ "InvalidRequestException", "NotFoundException", "UnavailableException", "TimedOutException" ]
},
"get_slice" : {
"request" : [ { "name" : "key", "type" : "bytes"},
{ "name" : "column_parent", "type" : "ColumnParent"},
{ "name" : "predicate", "type" : "SlicePredicate"},
{ "name" : "consistency_level", "type" : "ConsistencyLevel"
} ],
"response" : { "type" : "array",
"items" : "ColumnOrSuperColumn"
},
"errors" : [ "InvalidRequestException", "UnavailableException", "TimedOutException" ]
},
"multiget_slice" : {
"request" : [ { "name" : "keys", "type" : { "type" : "array",
"items" : "bytes"
}},
{ "name" : "column_parent", "type" : "ColumnParent"},
{ "name" : "predicate", "type" : "SlicePredicate"},
{ "name" : "consistency_level", "type" : "ConsistencyLevel"
} ],
"response" : { "type" : "array",
"items" : "CoscsMapEntry"
},
"errors" : [ "InvalidRequestException", "UnavailableException", "TimedOutException" ]
},
"get_count" : {
"request" : [ { "name" : "key", "type" : "bytes"},
{ "name" : "column_parent", "type" : "ColumnParent"},
{ "name" : "predicate", "type" : "SlicePredicate"},
{ "name" : "consistency_level", "type" : "ConsistencyLevel"
} ],
"response" : "int",
"errors" : [ "InvalidRequestException", "UnavailableException", "TimedOutException" ]
},
"insert" : {
"request" : [ { "name" : "key", "type" : "bytes"},
{ "name" : "column_parent", "type" : "ColumnParent"},
{ "name" : "column", "type" : "Column"},
{ "name" : "consistency_level", "type" : "ConsistencyLevel"
} ],
"response" : "null",
"errors" : [ "InvalidRequestException", "UnavailableException", "TimedOutException" ]
},
"remove" : {
"request" : [ { "name" : "key", "type" : "bytes"},
{ "name" : "column_path", "type" : "ColumnPath"},
{ "name" : "clock", "type" : "Clock"},
{ "name" : "consistency_level", "type" : "ConsistencyLevel"
} ],
"response" : "null",
"errors" : [ "InvalidRequestException", "UnavailableException", "TimedOutException" ]
},
"batch_mutate" : {
"request" : [ { "name" : "mutation_map", "type" : { "type" : "array",
"items" : "MutationsMapEntry"
}},
{ "name" : "consistency_level", "type" : "ConsistencyLevel"
} ],
"response" : "null",
"errors" : [ "InvalidRequestException", "UnavailableException", "TimedOutException" ]
},
"system_add_keyspace" : {
"request" : [ { "name" : "ks_def", "type" : "KsDef"
} ],
"response" : "null",
"errors" : [ "InvalidRequestException" ]
},
"set_keyspace" : {
"request" : [ { "name" : "keyspace", "type" : "string"
} ],
"response" : "null",
"errors" : [ "InvalidRequestException" ]
},
"describe_keyspaces" : {
"request" : [ ],
"response" : { "type" : "array",
"items" : "string"
}
},
"describe_cluster_name" : {
"request" : [ ],
"response" : "string"
},
"describe_version" : {
"request" : [ ],
"response" : "string"
}
}
}

0 comments on commit d1bc9d6

Please sign in to comment.