Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make cortex, view, and layer idens to be unique #1402

Merged
merged 12 commits into from Oct 24, 2019
179 changes: 131 additions & 48 deletions synapse/cortex.py
Expand Up @@ -750,6 +750,8 @@ async def __anit__(self, dirn, conf=None):
self.axon = None # type: s_axon.AxonApi
self.axready = asyncio.Event()

self.view = None # The default/main view

# generic fini handler for the Cortex
self.onfini(self._onCoreFini)

Expand All @@ -772,12 +774,13 @@ async def __anit__(self, dirn, conf=None):

# Initialize our storage and views
await self._initCoreAxon()

await self._migrateViewsLayers()
await self._initCoreLayers()
await self._checkLayerModels()
await self._initCoreViews()
await self._migrateLayerOffset()
await self._checkLayerModels()
await self._initCoreQueues()
# our "main" view has the same iden as we do
self.view = self.views.get(self.iden)

self.provstor = await s_provenance.ProvStor.anit(self.dirn)
self.onfini(self.provstor.fini)
Expand Down Expand Up @@ -1209,10 +1212,10 @@ async def syncLayerSplices(self, iden, offs):

async def initCoreMirror(self, url):
'''
Initialize this cortex as a down-stream mirror from a telepath url.
Initialize this cortex as a down-stream mirror from a telepath url, receiving splices from another cortex.

NOTE: This cortex *must* be initialized from a backup of the target
cortex!
Note:
This cortex *must* be initialized from a backup of the target cortex!
'''
self.schedCoro(self._initCoreMirror(url))

Expand Down Expand Up @@ -1243,15 +1246,15 @@ async def _initCoreMirror(self, url):

while not proxy.isfini:

# gotta do this in the loop as welll...
# gotta do this in the loop as well...
offs = await layr.getOffset(layr.iden)

# pump them into a queue so we can consume them in chunks
q = asyncio.Queue(maxsize=1000)

async def consume(x):
try:
async for item in proxy.syncLayerSplices(layr.iden, x):
async for item in proxy.syncLayerSplices(None, x):
await q.put(item)
finally:
await q.put(None)
Expand Down Expand Up @@ -1442,12 +1445,11 @@ async def getCellApi(self, link, user, path):

if len(path) == 1:
# get the top layer for the default view
view = self.getView()
layr = view.layers[0]
layr = self.getLayer()
return await s_layer.LayerApi.anit(self, link, user, layr)

if len(path) == 2:
layr = self.layers.get(path[1])
layr = self.getLayer(path[1])
if layr is None:
raise s_exc.NoSuchLayer(iden=path[1])

Expand Down Expand Up @@ -1494,7 +1496,7 @@ async def _calcFormCounts(self):
name, i, len(nameforms))
count = 0

async for buid, valu in self.view.layers[0].iterFormRows(name):
async for buid, valu in self.getLayer().iterFormRows(name):

count += 1
tcount += 1
Expand Down Expand Up @@ -1658,13 +1660,91 @@ def addLayerCtor(self, name, ctor):

async def _initCoreViews(self):

defiden = self.cellinfo.get('defaultview')

for iden, node in await self.hive.open(('cortex', 'views')):
view = await s_view.View.anit(self, node)
self.views[iden] = view
if iden == defiden:
self.view = view

# if we have no views, we are initializing. Add a default main view and layer.
if not self.views:
layr = await self.addLayer()
iden = s_common.guid()
view = await self.addView(iden, 'root', (layr.iden,))
await self.cellinfo.set('defaultview', iden)
self.view = view

# if we have no views, we are initializing. add the main view.
if self.views.get(self.iden) is None:
await self.addView(self.iden, 'root', (self.iden,))
async def _migrateViewsLayers(self):
'''
jnwatson marked this conversation as resolved.
Show resolved Hide resolved
Move directories and idens to current scheme where cortex, views, and layers all have unique idens

Note:
This changes directories and hive data, not existing View or Layer objects

TODO: due to our migration policy, remove in 0.3.0

'''
# pre-hive -> hive layer directory migration first
self._migrOrigLayer()

defiden = self.cellinfo.get('defaultview')
if defiden is not None:
# No need for migration; we're up-to-date
return

oldlayriden = self.iden
newlayriden = s_common.guid()
jnwatson marked this conversation as resolved.
Show resolved Hide resolved

oldviewiden = self.iden
newviewiden = s_common.guid()

if not await self.hive.exists(('cortex', 'views', oldviewiden)):
# No view info present; this is a fresh cortex
return

await self.hive.rename(('cortex', 'views', oldviewiden), ('cortex', 'views', newviewiden))
logger.info('Migrated view from duplicate iden %s to new iden %s', oldviewiden, newviewiden)

# Move view/layer metadata
await self.hive.rename(('cortex', 'layers', oldlayriden), ('cortex', 'layers', newlayriden))
logger.info('Migrated layer from duplicate iden %s to new iden %s', oldlayriden, newlayriden)

# Move layer data
oldpath = os.path.join(self.dirn, 'layers', oldlayriden)
newpath = os.path.join(self.dirn, 'layers', newlayriden)
os.rename(oldpath, newpath)

# Replace all views' references to old layer iden with new layer iden
node = await self.hive.open(('cortex', 'views'))
for iden, viewnode in node:
info = await viewnode.dict()
layers = info.get('layers')
newlayers = [newlayriden if layr == oldlayriden else layr for layr in layers]
await info.set('layers', newlayers)

await self.cellinfo.set('defaultview', newviewiden)

async def _migrateLayerOffset(self):
'''
In case this is a downstream mirror, move the offsets for the old layr iden to the new layr iden

Precondition:
Layers and Views are initialized. Mirror logic has not started

TODO: due to our migration policy, remove in 0.3.0
'''
oldlayriden = self.iden
layr = self.getLayer()
newlayriden = layr.iden

offs = await layr.getOffset(oldlayriden)
if offs == 0:
return

await layr.setOffset(newlayriden, offs)
await layr.delOffset(oldlayriden)

async def addView(self, iden, owner, layers):

Expand All @@ -1682,22 +1762,20 @@ async def addView(self, iden, owner, layers):
async def delView(self, iden):
'''
Delete a cortex view by iden.

Note:
This does not delete any of the view's layers
'''
if iden == self.iden:
if iden == self.view.iden:
raise s_exc.SynErr(mesg='cannot delete the main view')

view = self.views.pop(iden, None)
if view is None:
raise s_exc.NoSuchView(iden=iden)

layeriden = view.iden if view.parent is not None and view.layers[0].iden == view.iden else None

await self.hive.pop(('cortex', 'views', iden))
await view.fini()

if layeriden is not None:
await self.delLayer(iden)

async def delLayer(self, iden):
layr = self.layers.get(iden, None)
if layr is None:
Expand All @@ -1720,18 +1798,30 @@ async def setViewLayers(self, layers, iden=None):
layers ([str]): A top-down list of of layer guids
iden (str): The view iden (defaults to default view).
'''
if iden is None:
iden = self.iden

view = self.views.get(iden)
view = self.getView(iden)
if view is None:
raise s_exc.NoSuchView(iden=iden)

await view.setLayers(layers)

def getLayer(self, iden=None):
'''
Get a Layer object.

Args:
iden (str): The layer iden to retrieve.

Returns:
Layer: A Layer object.
'''
if iden is None:
iden = self.iden
return self.view.layers[0]

# For backwards compatibility, resolve references to old layer iden == cortex.iden to the main layer
# TODO: due to our migration policy, remove in 0.3.x
if iden == self.iden:
return self.view.layers[0]

return self.layers.get(iden)

def getView(self, iden=None):
Expand All @@ -1745,17 +1835,18 @@ def getView(self, iden=None):
View: A View object.
'''
if iden is None:
iden = self.iden
return self.view

return self.views.get(iden)

async def addLayer(self, **info):
'''
Add a Layer to the cortex.

Args:
iden (str): optional iden. default: guid() )
type (str): optional type. default: lmdb )
owner (str): optional owner. default: root )
iden (str): optional iden. default: guid()
type (str): optional type. default: lmdb
owner (str): optional owner. default: root
config (dict): type specific config options
'''
iden = info.pop('iden', None)
Expand Down Expand Up @@ -1816,13 +1907,8 @@ async def _initCoreLayers(self):
for iden, node in node:
await self._layrFromNode(node)

self._migrOrigLayer()

if self.layers.get(self.iden) is None:
# we have no layers. initialize the default layer.
await self.addLayer(iden=self.iden)

def _migrOrigLayer(self):
jnwatson marked this conversation as resolved.
Show resolved Hide resolved
# TODO: due to our migration policy, remove in 0.2.x

oldpath = os.path.join(self.dirn, 'layers', '000-default')
if not os.path.exists(oldpath):
Expand Down Expand Up @@ -1958,7 +2044,7 @@ async def _runPushLoop(self):
offs = await core.getFeedOffs(iden)

while not self.isfini:
layer = self.view.layers[0]
layer = self.getLayer()

items = [x async for x in layer.splices(offs, 10000)]

Expand Down Expand Up @@ -2029,7 +2115,7 @@ async def _runFeedLoop(self, feed):

async with await s_telepath.openurl(url) as tank:

layer = self.view.layers[0]
layer = self.getLayer()

iden = await tank.iden()

Expand Down Expand Up @@ -2066,7 +2152,7 @@ async def _runCryoLoop(self):
# TODO: what to do when write layer changes?

# push splices for our main layer
layr = self.view.layers[0]
layr = self.getLayer()

while not self.isfini:
timeout = 2
Expand Down Expand Up @@ -2329,12 +2415,9 @@ def _viewFromOpts(self, opts):
return self.view

viewiden = opts.get('view')
if viewiden is None:
return self.view
else:
view = self.views.get(viewiden)
if view is None:
raise s_exc.NoSuchView(iden=viewiden)
view = self.getView(viewiden)
if view is None:
raise s_exc.NoSuchView(iden=viewiden)

return view

Expand Down Expand Up @@ -2494,14 +2577,14 @@ async def addFeedData(self, name, items, seqn=None):
return await snap.addFeedData(name, items, seqn=seqn)

async def getFeedOffs(self, iden):
return await self.view.layers[0].getOffset(iden)
return await self.getLayer().getOffset(iden)

async def setFeedOffs(self, iden, offs):
if offs < 0:
mesg = 'Offset must be >= 0.'
raise s_exc.BadConfValu(mesg=mesg, offs=offs, iden=iden)

return await self.view.layers[0].setOffset(iden, offs)
return await self.getLayer().setOffset(iden, offs)

async def snap(self, user=None, view=None):
'''
Expand Down Expand Up @@ -2633,7 +2716,7 @@ def _loadCoreModule(self, ctor, conf=None):
async def stat(self):
stats = {
'iden': self.iden,
'layer': await self.view.layers[0].stat(),
'layer': await self.getLayer().stat(),
'formcounts': self.counts,
}
return stats
Expand Down
3 changes: 2 additions & 1 deletion synapse/cryotank.py
Expand Up @@ -10,6 +10,7 @@
import synapse.lib.cell as s_cell
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.slabseqn as s_slabseqn
import synapse.lib.slaboffs as s_slaboffs

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,7 +53,7 @@ async def __anit__(self, dirn, conf=None):

self.slab = await s_lmdbslab.Slab.anit(path)

self.offs = s_lmdbslab.Offs(self.slab, 'offsets')
self.offs = s_slaboffs.SlabOffs(self.slab, 'offsets')

self._items = s_slabseqn.SlabSeqn(self.slab, 'items')
self._metrics = s_slabseqn.SlabSeqn(self.slab, 'metrics')
Expand Down
1 change: 1 addition & 0 deletions synapse/exc.py
Expand Up @@ -55,6 +55,7 @@ class BadCoreStore(SynErr):

class BadCtorType(SynErr): pass
class BadFormDef(SynErr): pass
class BadHivePath(SynErr): pass
class BadLiftValu(SynErr): pass
class BadPropDef(SynErr): pass
class BadTypeDef(SynErr): pass
Expand Down
5 changes: 5 additions & 0 deletions synapse/lib/cell.py
Expand Up @@ -446,6 +446,11 @@ async def __anit__(self, dirn, conf=None, readonly=False):

await self._initCellHttp()

# self.cellinfo, a HiveDict for general purpose persistent storage
node = await self.hive.open(('cellinfo', ))
self.cellinfo = await node.dict()
self.onfini(node)

self._health_funcs = []
self.addHealthFunc(self._cellHealth)

Expand Down