Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
Multiple Rolling Upgrades
Browse files Browse the repository at this point in the history
The rolling upgrade code sets keys in the ceph mon
cluster to discover whether it can upgrade itself. This
patch addresses an issue where the upgrade code was not
taking into account multiple upgrades to newer ceph versions
in a row.

Change-Id: Icae681e1817ce50039ef22a0677398fe84057bf7
  • Loading branch information
cholcombe973 authored and javacruft committed Sep 23, 2016
1 parent a980e1c commit 57808f0
Show file tree
Hide file tree
Showing 6 changed files with 619 additions and 384 deletions.
168 changes: 8 additions & 160 deletions hooks/ceph_hooks.py
Expand Up @@ -15,11 +15,7 @@
# limitations under the License.

import os
import random
import socket
import subprocess
import sys
import time

sys.path.append('lib')
import ceph
Expand Down Expand Up @@ -55,8 +51,6 @@
write_file,
rsync,
cmp_pkgrevno,
service_stop, service_start,
chownr,
)
from charmhelpers.fetch import (
apt_install,
Expand All @@ -75,10 +69,6 @@
from charmhelpers.core.templating import render
from charmhelpers.contrib.storage.linux.ceph import (
CephConfContext,
monitor_key_set,
monitor_key_exists,
monitor_key_get,
get_mon_map,
)
from utils import (
get_networks,
Expand Down Expand Up @@ -134,7 +124,14 @@ def check_for_upgrade():
if new_version == upgrade_paths[old_version]:
log("{} to {} is a valid upgrade path. Proceeding.".format(
old_version, new_version))
roll_monitor_cluster(new_version)
ceph.roll_monitor_cluster(new_version=new_version,
upgrade_key='admin')
# Wait for all monitors to finish.
status_set("maintenance", "Waiting on mons to finish upgrading")
ceph.wait_for_all_monitors_to_upgrade(new_version=new_version,
upgrade_key='admin')
ceph.roll_osd_cluster(new_version=new_version,
upgrade_key='admin')
else:
# Log a helpful error message
log("Invalid upgrade path from {} to {}. "
Expand All @@ -143,155 +140,6 @@ def check_for_upgrade():
pretty_print_upgrade_paths()))


def lock_and_roll(my_name):
start_timestamp = time.time()

log('monitor_key_set {}_start {}'.format(my_name, start_timestamp))
monitor_key_set('admin', "{}_start".format(my_name), start_timestamp)
log("Rolling")
# This should be quick
upgrade_monitor()
log("Done")

stop_timestamp = time.time()
# Set a key to inform others I am finished
log('monitor_key_set {}_done {}'.format(my_name, stop_timestamp))
monitor_key_set('admin', "{}_done".format(my_name), stop_timestamp)


def wait_on_previous_node(previous_node):
log("Previous node is: {}".format(previous_node))

previous_node_finished = monitor_key_exists(
'admin',
"{}_done".format(previous_node))

while previous_node_finished is False:
log("{} is not finished. Waiting".format(previous_node))
# Has this node been trying to upgrade for longer than
# 10 minutes?
# If so then move on and consider that node dead.

# NOTE: This assumes the clusters clocks are somewhat accurate
# If the hosts clock is really far off it may cause it to skip
# the previous node even though it shouldn't.
current_timestamp = time.time()
previous_node_start_time = monitor_key_get(
'admin',
"{}_start".format(previous_node))
if (current_timestamp - (10 * 60)) > previous_node_start_time:
# Previous node is probably dead. Lets move on
if previous_node_start_time is not None:
log(
"Waited 10 mins on node {}. current time: {} > "
"previous node start time: {} Moving on".format(
previous_node,
(current_timestamp - (10 * 60)),
previous_node_start_time))
return
else:
# I have to wait. Sleep a random amount of time and then
# check if I can lock,upgrade and roll.
wait_time = random.randrange(5, 30)
log('waiting for {} seconds'.format(wait_time))
time.sleep(wait_time)
previous_node_finished = monitor_key_exists(
'admin',
"{}_done".format(previous_node))


# Edge cases:
# 1. Previous node dies on upgrade, can we retry?
def roll_monitor_cluster(new_version):
"""
This is tricky to get right so here's what we're going to do.
There's 2 possible cases: Either I'm first in line or not.
If I'm not first in line I'll wait a random time between 5-30 seconds
and test to see if the previous monitor is upgraded yet.
"""
log('roll_monitor_cluster called with {}'.format(new_version))
my_name = socket.gethostname()
monitor_list = []
mon_map = get_mon_map('admin')
if mon_map['monmap']['mons']:
for mon in mon_map['monmap']['mons']:
monitor_list.append(mon['name'])
else:
status_set('blocked', 'Unable to get monitor cluster information')
sys.exit(1)
log('monitor_list: {}'.format(monitor_list))

# A sorted list of osd unit names
mon_sorted_list = sorted(monitor_list)

try:
position = mon_sorted_list.index(my_name)
log("upgrade position: {}".format(position))
if position == 0:
# I'm first! Roll
# First set a key to inform others I'm about to roll
lock_and_roll(my_name=my_name)
else:
# Check if the previous node has finished
status_set('blocked',
'Waiting on {} to finish upgrading'.format(
mon_sorted_list[position - 1]))
wait_on_previous_node(previous_node=mon_sorted_list[position - 1])
lock_and_roll(my_name=my_name)
except ValueError:
log("Failed to find {} in list {}.".format(
my_name, mon_sorted_list))
status_set('blocked', 'failed to upgrade monitor')


def upgrade_monitor():
current_version = ceph.get_version()
status_set("maintenance", "Upgrading monitor")
log("Current ceph version is {}".format(current_version))
new_version = config('release-version')
log("Upgrading to: {}".format(new_version))

try:
add_source(config('source'), config('key'))
apt_update(fatal=True)
except subprocess.CalledProcessError as err:
log("Adding the ceph source failed with message: {}".format(
err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)
try:
if ceph.systemd():
for mon_id in ceph.get_local_mon_ids():
service_stop('ceph-mon@{}'.format(mon_id))
for osd_id in ceph.get_local_osd_ids():
service_stop('ceph-osd@{}'.format(osd_id))
else:
service_stop('ceph-mon-all')
service_stop('ceph-osd-all')

apt_install(packages=ceph.PACKAGES, fatal=True)

# Ensure the ownership of Ceph's directories is correct
chownr(path=os.path.join(os.sep, "var", "lib", "ceph"),
owner=ceph.ceph_user(),
group=ceph.ceph_user())

if ceph.systemd():
for mon_id in ceph.get_local_mon_ids():
service_start('ceph-mon@{}'.format(mon_id))
for osd_id in ceph.get_local_osd_ids():
service_start('ceph-osd@{}'.format(osd_id))
else:
service_start('ceph-mon-all')
service_start('ceph-osd-all')

except subprocess.CalledProcessError as err:
log("Stopping ceph and upgrading packages failed "
"with message: {}".format(err.message))
status_set("blocked", "Upgrade to {} failed".format(new_version))
sys.exit(1)


@hooks.hook('install.real')
@harden()
def install():
Expand Down
2 changes: 1 addition & 1 deletion hooks/charmhelpers/contrib/network/ip.py
Expand Up @@ -406,7 +406,7 @@ def is_ip(address):
# Test to see if already an IPv4/IPv6 address
address = netaddr.IPAddress(address)
return True
except netaddr.AddrFormatError:
except (netaddr.AddrFormatError, ValueError):
return False


Expand Down

0 comments on commit 57808f0

Please sign in to comment.