Skip to content

Commit

Permalink
Move upgrade code to shared lib
Browse files Browse the repository at this point in the history
Moving the ceph mon upgrade code over to the
ceph shared library. This will make it easier
to make patches and have them be applied to all
3 charms at once.

Change-Id: I203bc9859745a3a6c5c61b13d9ac18afaba17bc1
  • Loading branch information
cholcombe973 authored and javacruft committed Sep 21, 2016
1 parent b30eaf3 commit e5306f1
Show file tree
Hide file tree
Showing 5 changed files with 586 additions and 388 deletions.
165 changes: 4 additions & 161 deletions hooks/ceph_hooks.py
Expand Up @@ -15,12 +15,9 @@
# limitations under the License.

import os
import random
import socket
import subprocess
import sys
import uuid
import time

sys.path.append('lib')
import ceph
Expand Down Expand Up @@ -52,10 +49,7 @@
mkdir,
write_file,
rsync,
cmp_pkgrevno,
service_stop,
service_start,
chownr)
cmp_pkgrevno)
from charmhelpers.fetch import (
apt_install,
apt_update,
Expand All @@ -72,11 +66,7 @@
from charmhelpers.core.sysctl import create as create_sysctl
from charmhelpers.core.templating import render
from charmhelpers.contrib.storage.linux.ceph import (
CephConfContext,
monitor_key_set,
monitor_key_exists,
monitor_key_get,
get_mon_map)
CephConfContext)
from utils import (
get_networks,
get_public_addr,
Expand Down Expand Up @@ -130,7 +120,8 @@ def check_for_upgrade():
if new_version == upgrade_paths[old_version]:
log("{} to {} is a valid upgrade path. Proceeding.".format(
old_version, new_version))
roll_monitor_cluster(new_version)
ceph.roll_monitor_cluster(new_version=new_version,
upgrade_key='admin')
else:
# Log a helpful error message
log("Invalid upgrade path from {} to {}. "
Expand All @@ -139,154 +130,6 @@ def check_for_upgrade():
pretty_print_upgrade_paths()))


def lock_and_roll(my_name, version):
start_timestamp = time.time()

log('monitor_key_set {}_{}_start {}'.format(my_name,
version,
start_timestamp))
monitor_key_set('admin', "{}_{}_start".format(my_name, version),
start_timestamp)
log("Rolling")
# This should be quick
upgrade_monitor()
log("Done")

stop_timestamp = time.time()
# Set a key to inform others I am finished
log('monitor_key_set {}_{}_done {}'.format(my_name,
version,
stop_timestamp))
monitor_key_set('admin', "{}_{}_done".format(my_name, version),
stop_timestamp)


def wait_on_previous_node(previous_node, version):
log("Previous node is: {}".format(previous_node))

previous_node_finished = monitor_key_exists(
'admin',
"{}_{}_done".format(previous_node, version))

while previous_node_finished is False:
log("{} is not finished. Waiting".format(previous_node))
# Has this node been trying to upgrade for longer than
# 10 minutes?
# If so then move on and consider that node dead.

# NOTE: This assumes the clusters clocks are somewhat accurate
# If the hosts clock is really far off it may cause it to skip
# the previous node even though it shouldn't.
current_timestamp = time.time()
previous_node_start_time = monitor_key_get(
'admin',
"{}_{}_start".format(previous_node, version))
if (current_timestamp - (10 * 60)) > previous_node_start_time:
# Previous node is probably dead. Lets move on
if previous_node_start_time is not None:
log(
"Waited 10 mins on node {}. current time: {} > "
"previous node start time: {} Moving on".format(
previous_node,
(current_timestamp - (10 * 60)),
previous_node_start_time))
return
else:
# I have to wait. Sleep a random amount of time and then
# check if I can lock,upgrade and roll.
wait_time = random.randrange(5, 30)
log('waiting for {} seconds'.format(wait_time))
time.sleep(wait_time)
previous_node_finished = monitor_key_exists(
'admin',
"{}_{}_done".format(previous_node, version))


# Edge cases:
# 1. Previous node dies on upgrade, can we retry?
def roll_monitor_cluster(new_version):
"""
This is tricky to get right so here's what we're going to do.
There's 2 possible cases: Either I'm first in line or not.
If I'm not first in line I'll wait a random time between 5-30 seconds
and test to see if the previous monitor is upgraded yet.
"""
log('roll_monitor_cluster called with {}'.format(new_version))
my_name = socket.gethostname()
monitor_list = []
mon_map = get_mon_map('admin')
if mon_map['monmap']['mons']:
for mon in mon_map['monmap']['mons']:
monitor_list.append(mon['name'])
else:
status_set('blocked', 'Unable to get monitor cluster information')
sys.exit(1)
log('monitor_list: {}'.format(monitor_list))

# A sorted list of osd unit names
mon_sorted_list = sorted(monitor_list)

try:
position = mon_sorted_list.index(my_name)
log("upgrade position: {}".format(position))
if position == 0:
# I'm first! Roll
# First set a key to inform others I'm about to roll
lock_and_roll(my_name=my_name, version=new_version)
else:
# Check if the previous node has finished
status_set('blocked',
'Waiting on {} to finish upgrading'.format(
mon_sorted_list[position - 1]))
wait_on_previous_node(previous_node=mon_sorted_list[position - 1],
version=new_version)
lock_and_roll(my_name=my_name, version=new_version)
except ValueError:
log("Failed to find {} in list {}.".format(
my_name, mon_sorted_list))
status_set('blocked', 'failed to upgrade monitor')


def upgrade_monitor():
current_version = ceph.get_version()
status_set("maintenance", "Upgrading monitor")
log("Current ceph version is {}".format(current_version))
new_version = config('release-version')
log("Upgrading to: {}".format(new_version))

try:
add_source(config('source'), config('key'))
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format(
err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
try:
if ceph.systemd():
for mon_id in ceph.get_local_mon_ids():
service_stop('ceph-mon@{}'.format(mon_id))
else:
service_stop('ceph-mon-all')
apt_install(packages=ceph.PACKAGES, fatal=True)

# Ensure the ownership of Ceph's directories is correct
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
owner=ceph.ceph_user(),
group=ceph.ceph_user())
if ceph.systemd():
for mon_id in ceph.get_local_mon_ids():
service_start('ceph-mon@{}'.format(mon_id))
else:
service_start('ceph-mon-all')
status_set("active", "")
except subprocess.CalledProcessError as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)


@hooks.hook('install.real')
@harden()
def install():
Expand Down

0 comments on commit e5306f1

Please sign in to comment.