/
meta1.py
155 lines (137 loc) · 6.3 KB
/
meta1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# Copyright (C) 2018 OpenIO SAS, as part of OpenIO SDS
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Meta1 client and meta2 balancing operations"""
from oio.directory.meta import MetaMapping
from oio.common.exceptions import OioException
from oio.directory.client import DirectoryClient
class Meta1RefMapping(MetaMapping):
"""Represents the content of the meta1 database"""
def __init__(self, namespace, directory_client=None, **kwargs):
super(Meta1RefMapping, self).__init__(
{'namespace': namespace}, ['meta2', 'sqlx'], **kwargs)
self._reference = directory_client
self.service_type_by_base = dict()
self.args_by_base = dict()
def _get_old_peers_by_base(self, base):
return self.raw_services_by_base.get(base, list())
def _get_peers_by_base(self, base):
return self.services_by_base.get(base, dict()).keys()
def _get_service_type_by_base(self, base):
return self.service_type_by_base.get(base, None)
def _get_args_by_base(self, base):
return self.args_by_base.get(base, None)
def _apply_link_services(self, moved_ok, **kwargs):
for base in moved_ok:
peers = self._get_peers_by_base(base)
service_type = self._get_service_type_by_base(base)
args = self._get_args_by_base(base)
cid, seq = self.get_cid_and_seq(base)
try:
self.reference.force(
service_type=service_type, cid=cid, replace=True,
services=dict(host=','.join(peers), type=service_type,
args=args, seq=seq))
"""
FIXME(ABO): This part can be removed when, either:
- meta1 sends the removed services bundled with the
account.services events.
- meta2 sends a storage.container.deleted event when the
sqliterepo layer is the one that notifies the deletion of
the databases.
"""
if service_type == 'meta2' and kwargs.get('src_service'):
self.rdir.meta2_index_delete(
volume_id=kwargs.get('src_service'),
container_id=cid
)
except OioException as exc:
self.logger.warn(
"Failed to link services for base %s (seq=%d): %s",
cid, seq, exc)
@property
def reference(self):
if not self._reference:
self._reference = DirectoryClient(self.conf)
return self._reference
def _service_id(self, service, service_type):
return self.conf['namespace'] + "|" + service_type + "|" + service
def _conscience_poll(self, service_type, known, avoid, **kwargs):
try:
services_found = self.conscience.poll(
service_type,
known=[self._service_id(svc, service_type) for svc in known],
avoid=[self._service_id(svc, service_type) for svc in avoid])
return [svc['addr'] for svc in services_found]
except OioException as exc:
self.logger.warn(
"Failed to poll services (type=%s, known=%s, avoid=%s): %s",
service_type, known, avoid, exc)
return list()
def move(self, src_service, dest_service, base_name, service_type,
**kwargs):
"""
Move a `base` of `src_service` to `dest_service`
"""
if service_type not in self.services_by_service_type.keys():
raise ValueError(
"service type must be %s"
% " or ".join(self.services_by_service_type.keys()))
cid, seq = self.get_cid_and_seq(base_name)
data = self.reference.list(cid=cid)
if dest_service is not None and dest_service not in \
self.services_by_service_type[service_type].keys():
raise ValueError(
"destination service must be a %s service" % service_type)
bases = dict()
for service in data['srv']:
if service['type'] != service_type:
continue
if seq is not None and seq != service['seq']:
continue
base = cid + "." + str(service['seq'])
raw_services = bases.get(base, None)
if raw_services is None:
raw_services = dict()
bases[base] = raw_services
host = service['host']
service.pop('host', None)
raw_services[host] = service
moved = set()
for base, raw_services in bases.iteritems():
old_peers = raw_services.keys()
if src_service not in old_peers:
continue
src_info = raw_services.pop(src_service)
if dest_service is None:
known = raw_services.keys()
services_found = self._conscience_poll(
service_type, known, [src_service], **kwargs)
if not services_found:
self.logger.warn(
"No destination service found %s (seq=%d)", cid, seq)
dest_service = services_found[0]
elif dest_service in old_peers:
continue
raw_services[dest_service] = src_info
moved.add(base)
self.raw_services_by_base[base] = old_peers
self.services_by_base[base] = raw_services
self.service_type_by_base[base] = service_type
self.args_by_base[base] = src_info['args']
if not moved:
raise ValueError(
"source service isn't used "
"or destination service is already used for this base")
return moved