Skip to content

Commit

Permalink
Merge 2e35013 into 7072b54
Browse files Browse the repository at this point in the history
  • Loading branch information
stackedsax committed May 7, 2015
2 parents 7072b54 + 2e35013 commit 99e2980
Show file tree
Hide file tree
Showing 3 changed files with 299 additions and 8 deletions.
27 changes: 19 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,11 @@

## Introduction

Blueflood is a multi-tenant distributed metric processing system created by engineers at
[Rackspace](http://www.rackspace.com).
It is used in production by the [Cloud Monitoring](http://www.rackspace.com/cloud/monitoring/)
team to process metrics generated by their monitoring systems.
Blueflood is capable of ingesting, rolling up and serving metrics at a massive scale.

This presentation given to the [SF Metrics Meetup](http://www.meetup.com/San-Francisco-Metrics-Meetup/) group in Feb. 2014 is a good video introduction to Blueflood: http://vimeo.com/87210602
Blueflood is a multi-tenant, distributed metric processing system. Blueflood is capable of ingesting, rolling-up and serving metrics at a massive scale.

## Getting Started

The latest code will always be here on Github.
The latest code will always be here on Github:

git clone https://github.com/rackerlabs/blueflood.git
cd blueflood
Expand Down Expand Up @@ -97,6 +91,23 @@ configuration:
GRAPHITE_PORT
GRAPHITE_PREFIX

## Media

* **Metrics Meetup**, December 2014 - *Blueflood: Multi-tenanted Time-series Datastore*:
* [Video]() - TBD
* [Slides](https://raw.githubusercontent.com/rackerlabs/blueflood/master/contrib/presentations/MetricsMeetupDecember2014.pdf)
* **Berlin Buzzwords**, May 2014 - *Blueflood 2.0 and beyond: Distributed open source metrics processing*:
* [Video](https://www.youtube.com/watch?v=NmZTdWzX5v8&list=PLq-odUc2x7i-Q5gQtkmba4ov37XRPjp6n&index=33)
* [Slides](http://berlinbuzzwords.de/sites/berlinbuzzwords.de/files/media/documents/gary_dusbabek_berlin_buzzwords_2014.pdf)
* **Metrics Meetup**, February 2014 - *Introduction to Blueflood*:
* [Video](http://vimeo.com/87210602)
* [Slides](http://www.lakshmikannan.me/slides/2014-02-19-sf-metrics-meetup/#/)
* This presentation given to the [SF Metrics Meetup](http://www.meetup.com/San-Francisco-Metrics-Meetup/) group in February 2014 is a good video introduction to Blueflood.
* **Cassandra EU**, October 2013 - *Blueflood: Simple Metrics Processing*:
* [Video](https://www.youtube.com/watch?v=1rcffSq26z0)
* [Slides](http://www.slideshare.net/gdusbabek/blueflood-open-source-metrics-processing-at-cassandraeu-2013)


## Contributing

First, we welcome bug reports and contributions.
Expand Down
49 changes: 49 additions & 0 deletions contrib/compactionEta.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#/bin/bash

# Description: a script to see how much time is left on major
# compactions on cassandra nodes
#
# Instructions:
#
# a. To run for a single machine, replace 'domain.com' with the appropriate domain.
# You can change the port or leave it blank to use the default port for nodetool:
#
# ssh db01.domain.com 'bash -s' < ./compactionEta.sh [port]
#
# b. To run for multiple machines, here is a one-liner example. This assumes you number
# your hosts like my team does, but hopefully you get the idea.
#
# for x in {00..31}; do ssh db$(printf "%2.2d" $x).domain.com 'bash -s' < ./compactionEta.sh [port]; done
#
# c. Or, to run on multiple machines a little faster, install GNU parallel
# and run the command below. Again, replace 'domain.com' and also <start>
# and <end> with the first and last servers you want to check.
#
# parallel -j20 --no-notice "ssh db{1}.domain.com 'bash -s' < ./compactionEta.sh [port]" ::: $(seq -w <start> <end>) | sort


if [ $1 ]; then
PORT=$1
else
PORT=7199
fi

# First we'll grab the seconds elapsed since a major compaction was started
SECONDS_ELAPSED=$(ps -p $(ps auxx | grep nodetool | grep compact | awk '{print $2}') -o etimes= 2>/dev/null)

# If a major compaction is running, ask nodetool how far along it is
# We will grab the largest one, assuming that it's the one we want
if [ -z "$SECONDS_ELAPSED" ]; then
echo $(hostname)": Node down or not running a major compaction"
exit 2
else
PERCENT_COMPLETE=$(/opt/cassandra/bin/nodetool -p $PORT compactionstats 2>/dev/null | grep 'metrics_full' | sort -n -k 5 | tail -1 | awk '{print $7}' | tr -d %)
fi

# Assuming we got $PERCENT_COMPLETE just fine, print out how much time is remaining
if [ "$SECONDS_ELAPSED" -a "$PERCENT_COMPLETE" ]; then
echo "$SECONDS_ELAPSED $PERCENT_COMPLETE" | awk -v HOST=$(hostname) '{printf HOST ":%8.2f hours\n", (($1*100/$2)-$1)/3600}'
else
echo $(hostname)": Node down or not running a major compaction"
exit 2
fi
231 changes: 231 additions & 0 deletions contrib/repairStatus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
from __future__ import print_function
from __future__ import unicode_literals
import json
import argparse
import requests
import re
import math
import logging as log
from datetime import datetime, timedelta


def generate_range_repair_query(node, start_time, current_time):
"""
Generate the json for an ES _count to return the number of ranges that have finished repairing
"""

query_json = {
"query": {
"filtered": {
"query": {
"bool": {
"should": [
{
"query_string": {
"query": 'level:"info" AND (*)'
}
}
]
}
},
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"from": start_time,
"to": current_time
}
}
},
{
"fquery": {
"query": {
"query_string": {
"query": 'host:("' + node + '")'
}
},
"_cache": True
}
},
{
"fquery": {
"query": {
"query_string": {
"query": 'method:("syncComplete")'
}
},
"_cache": True
}
},
{
"fquery": {
"query": {
"query_string": {
"query": 'message:("is fully synced")'
}
},
"_cache": True
}
}
]
}
}
}
}
}

return json.dumps(query_json)


def calculate_range_repairs(host, port, node, start_time, current_time):
"""
Execute an ES _count to return the number of ranges that have finished repairing
"""

url = 'http://{host}:{port}/_count'.format(host=host, port=port)
query = generate_range_repair_query(node, start_time, current_time)

log.info(
"Ranges completed: curl {url} -d '{query}'".format(url=url, query=query))
r = requests.get(url, data=query)

if not r.status_code or math.floor(r.status_code / 100) != 2:
raise ValueError("invalid status code returned: {code} {text}".format(
code=r.status_code, text=r.text))
else:
response = json.loads(r.text)
return response['count']


def generate_total_ranges_query(node, start_time, current_time):
"""
Generate the json for an ES _search to return the total number of ranges that need repairing
"""

query_json = {
"fields": "message",
"query": {
"filtered": {
"filter": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"to": current_time,
"from": start_time
}
}
},
{
"fquery": {
"_cache": True,
"query": {
"query_string": {
"query": 'host:("' + node + '")'
}
}
}
}
]
}
},
"query": {
"query_string": {
"query": 'message: "Starting repair"'
}
}
}
}
}

return json.dumps(query_json)


def calculate_total_ranges(host, port, node, start_time, current_time):
"""
Execute an ES _search to return the total number of ranges that need repairing
"""

url = 'http://{host}:{port}/_search'.format(host=host, port=str(port))
query = generate_total_ranges_query(node, start_time, current_time)

log.info(
"Total ranges: curl {url} -d '{query}'".format(url=url, query=query))
r = requests.get(url, data=query)

if not r.status_code or math.floor(r.status_code / 100) != 2:
raise ValueError("invalid status code returned: {code} {text}".format(
code=r.status_code, text=r.text))
else:
response = json.loads(r.text)
message = json.dumps(response['hits']['hits'][0]['fields']['message'])
total_ranges = int(
re.search('(?<=repairing\s)\d+(?=\sranges)', message).group(0))
return total_ranges


def print_totals(node, days, total_successful_range_repairs, total_to_repair):
"""
Finally, compute the percentage complete and print some status information
"""
percent_complete = total_successful_range_repairs * 100 / total_to_repair
print("Checking repair messages for", node, "for the past", days, "days")
print("Count is: " + str(total_successful_range_repairs))
print("Total to repair is: " + str(total_to_repair))
print("Percent Complete: " + str(percent_complete) + "%")


def main():
parser = argparse.ArgumentParser(
description='''This script shows the current status of a repair running on a Cassandra node.
Elasticsearch is required so that we can query efficiently.
Note that NODE is required, and it should be any part of the hostname
of the node that will uniquely identify the node.''')
parser.add_argument('-n', '--node', type=str,
required=True,
help='''Which node to check. This can be any portion
of the hostname that uniquely identifies the node.
(required)''')
parser.add_argument('--host', type=str,
help='Hostname for Elasticsearch (required)',
default='localhost')
parser.add_argument('-p', '--port', type=int,
help='''Port on host where Elasticsearch is running
(default: 9200)''',
default=9200)
parser.add_argument('-d', '--days', type=int,
help='''How many days of logs to search. If the script fails to return
data, try increasing the default value here
(default: 3)''',
default=3)
parser.add_argument('-v', '--verbose', action='store_true',
help='Display verbose information')

args = parser.parse_args()

if args.verbose:
log.basicConfig(format="%(levelname)s: %(message)s", level=log.DEBUG)
else:
log.basicConfig(format="%(levelname)s: %(message)s")

# Set the current time and the point in time to start searching from
current_time = datetime.utcnow().isoformat()
start_time = (datetime.utcnow() - timedelta(days=args.days)).isoformat()

# Calculate how many
total_successful_range_repairs = calculate_range_repairs(
args.host, args.port, args.node, start_time, current_time)
total_to_repair = calculate_total_ranges(
args.host, args.port, args.node, start_time, current_time)

# Print totals
print_totals(
args.node, args.days, total_successful_range_repairs, total_to_repair)


if __name__ == '__main__':
main()

0 comments on commit 99e2980

Please sign in to comment.