-
Notifications
You must be signed in to change notification settings - Fork 72
/
cortex.py
6663 lines (5084 loc) · 221 KB
/
cortex.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import os
import copy
import regex
import asyncio
import logging
import textwrap
import contextlib
import collections
from collections.abc import Mapping
import synapse
import synapse.exc as s_exc
import synapse.axon as s_axon
import synapse.common as s_common
import synapse.telepath as s_telepath
import synapse.datamodel as s_datamodel
import synapse.lib.base as s_base
import synapse.lib.cell as s_cell
import synapse.lib.chop as s_chop
import synapse.lib.coro as s_coro
import synapse.lib.hive as s_hive
import synapse.lib.view as s_view
import synapse.lib.cache as s_cache
import synapse.lib.layer as s_layer
import synapse.lib.nexus as s_nexus
import synapse.lib.oauth as s_oauth
import synapse.lib.queue as s_queue
import synapse.lib.scope as s_scope
import synapse.lib.storm as s_storm
import synapse.lib.agenda as s_agenda
import synapse.lib.config as s_config
import synapse.lib.parser as s_parser
import synapse.lib.dyndeps as s_dyndeps
import synapse.lib.grammar as s_grammar
import synapse.lib.httpapi as s_httpapi
import synapse.lib.msgpack as s_msgpack
import synapse.lib.modules as s_modules
import synapse.lib.schemas as s_schemas
import synapse.lib.spooled as s_spooled
import synapse.lib.version as s_version
import synapse.lib.urlhelp as s_urlhelp
import synapse.lib.jsonstor as s_jsonstor
import synapse.lib.modelrev as s_modelrev
import synapse.lib.stormsvc as s_stormsvc
import synapse.lib.lmdbslab as s_lmdbslab
import synapse.lib.crypto.rsa as s_rsa
# Importing these registers their commands
import synapse.lib.stormhttp as s_stormhttp # NOQA
import synapse.lib.stormwhois as s_stormwhois # NOQA
import synapse.lib.provenance as s_provenance
import synapse.lib.stormtypes as s_stormtypes
import synapse.lib.stormlib.aha as s_stormlib_aha # NOQA
import synapse.lib.stormlib.gen as s_stormlib_gen # NOQA
import synapse.lib.stormlib.gis as s_stormlib_gis # NOQA
import synapse.lib.stormlib.hex as s_stormlib_hex # NOQA
import synapse.lib.stormlib.log as s_stormlib_log # NOQA
import synapse.lib.stormlib.xml as s_stormlib_xml # NOQA
import synapse.lib.stormlib.auth as s_stormlib_auth # NOQA
import synapse.lib.stormlib.cell as s_stormlib_cell # NOQA
import synapse.lib.stormlib.imap as s_stormlib_imap # NOQA
import synapse.lib.stormlib.ipv6 as s_stormlib_ipv6 # NOQA
import synapse.lib.stormlib.json as s_stormlib_json # NOQA
import synapse.lib.stormlib.math as s_stormlib_math # NOQA
import synapse.lib.stormlib.mime as s_stormlib_mime # NOQA
import synapse.lib.stormlib.pack as s_stormlib_pack # NOQA
import synapse.lib.stormlib.smtp as s_stormlib_smtp # NOQA
import synapse.lib.stormlib.stix as s_stormlib_stix # NOQA
import synapse.lib.stormlib.yaml as s_stormlib_yaml # NOQA
import synapse.lib.stormlib.basex as s_stormlib_basex # NOQA
import synapse.lib.stormlib.graph as s_stormlib_graph # NOQA
import synapse.lib.stormlib.iters as s_stormlib_iters # NOQA
import synapse.lib.stormlib.macro as s_stormlib_macro
import synapse.lib.stormlib.model as s_stormlib_model
import synapse.lib.stormlib.oauth as s_stormlib_oauth # NOQA
import synapse.lib.stormlib.stats as s_stormlib_stats # NOQA
import synapse.lib.stormlib.storm as s_stormlib_storm # NOQA
import synapse.lib.stormlib.vault as s_stormlib_vault # NOQA
import synapse.lib.stormlib.backup as s_stormlib_backup # NOQA
import synapse.lib.stormlib.cortex as s_stormlib_cortex # NOQA
import synapse.lib.stormlib.hashes as s_stormlib_hashes # NOQA
import synapse.lib.stormlib.random as s_stormlib_random # NOQA
import synapse.lib.stormlib.scrape as s_stormlib_scrape # NOQA
import synapse.lib.stormlib.infosec as s_stormlib_infosec # NOQA
import synapse.lib.stormlib.project as s_stormlib_project # NOQA
import synapse.lib.stormlib.version as s_stormlib_version # NOQA
import synapse.lib.stormlib.easyperm as s_stormlib_easyperm # NOQA
import synapse.lib.stormlib.ethereum as s_stormlib_ethereum # NOQA
import synapse.lib.stormlib.modelext as s_stormlib_modelext # NOQA
import synapse.lib.stormlib.compression as s_stormlib_compression # NOQA
import synapse.lib.stormlib.notifications as s_stormlib_notifications # NOQA
logger = logging.getLogger(__name__)
stormlogger = logging.getLogger('synapse.storm')
'''
A Cortex implements the synapse hypergraph object.
'''
reqver = '>=0.2.0,<3.0.0'
# Constants returned in results from syncLayersEvents and syncIndexEvents
SYNC_NODEEDITS = 0 # A nodeedits: (<offs>, 0, <etyp>, (<etype args>), {<meta>})
SYNC_NODEEDIT = 1 # A nodeedit: (<offs>, 0, <etyp>, (<etype args>))
SYNC_LAYR_ADD = 3 # A layer was added
SYNC_LAYR_DEL = 4 # A layer was deleted
# push/pull def
reqValidPush = s_config.getJsValidator({
'type': 'object',
'properties': {
'url': {'type': 'string'},
'time': {'type': 'number'},
'iden': {'type': 'string', 'pattern': s_config.re_iden},
'user': {'type': 'string', 'pattern': s_config.re_iden},
},
'additionalProperties': True,
'required': ['iden', 'url', 'user', 'time'],
})
reqValidPull = reqValidPush
reqValidTagModel = s_config.getJsValidator({
'type': 'object',
'properties': {
'prune': {'type': 'number', 'minimum': 1},
'regex': {'type': 'array', 'items': {'type': ['string', 'null']}},
},
'additionalProperties': False,
'required': [],
})
reqValidStormMacro = s_config.getJsValidator({
'type': 'object',
'properties': {
'name': {'type': 'string', 'pattern': '^.{1,491}$'},
'iden': {'type': 'string', 'pattern': s_config.re_iden},
# user kept for backward compat. remove eventually...
'user': {'type': 'string', 'pattern': s_config.re_iden},
'creator': {'type': 'string', 'pattern': s_config.re_iden},
'desc': {'type': 'string', 'default': ''},
'storm': {'type': 'string'},
'created': {'type': 'number'},
'updated': {'type': 'number'},
'permissions': s_msgpack.deepcopy(s_schemas.easyPermSchema),
},
'required': [
'name',
'iden',
'user',
'storm',
'creator',
'created',
'updated',
'permissions',
],
})
def cmprkey_indx(x):
return x[1]
def cmprkey_buid(x):
return x[1][1]
async def wrap_liftgenr(iden, genr):
async for indx, buid, sode in genr:
yield iden, (indx, buid), sode
class CortexAxonMixin:
async def prepare(self):
await self.cell.axready.wait()
await s_coro.ornot(super().prepare)
def getAxon(self):
return self.cell.axon
async def getAxonInfo(self):
return self.cell.axoninfo
class CortexAxonHttpHasV1(CortexAxonMixin, s_axon.AxonHttpHasV1):
pass
class CortexAxonHttpDelV1(CortexAxonMixin, s_axon.AxonHttpDelV1):
pass
class CortexAxonHttpUploadV1(CortexAxonMixin, s_axon.AxonHttpUploadV1):
pass
class CortexAxonHttpBySha256V1(CortexAxonMixin, s_axon.AxonHttpBySha256V1):
pass
class CortexAxonHttpBySha256InvalidV1(CortexAxonMixin, s_axon.AxonHttpBySha256InvalidV1):
pass
class CoreApi(s_cell.CellApi):
'''
The CoreApi is exposed when connecting to a Cortex over Telepath.
Many CoreApi methods operate on packed nodes consisting of primitive data structures
which can be serialized with msgpack/json.
An example of a packaged Node::
( (<form>, <valu>), {
"props": {
<name>: <valu>,
...
},
"tags": {
"foo": <time>,
"foo.bar": <time>,
},
})
'''
@s_cell.adminapi()
def getCoreMods(self):
return self.cell.getCoreMods()
async def getModelDict(self):
'''
Return a dictionary which describes the data model.
Returns:
(dict): A model description dictionary.
'''
return await self.cell.getModelDict()
async def getModelDefs(self):
return await self.cell.getModelDefs()
def getCoreInfo(self):
'''
Return static generic information about the cortex including model definition
'''
return self.cell.getCoreInfo()
async def getCoreInfoV2(self):
'''
Return static generic information about the cortex including model definition
'''
return await self.cell.getCoreInfoV2()
@s_cell.adminapi()
async def saveLayerNodeEdits(self, layriden, edits, meta):
return await self.cell.saveLayerNodeEdits(layriden, edits, meta)
def _reqValidStormOpts(self, opts):
if opts is None:
opts = {}
opts.setdefault('user', self.user.iden)
if opts.get('user') != self.user.iden:
self.user.confirm(('impersonate',))
return opts
async def callStorm(self, text, opts=None):
'''
Return the value expressed in a return() statement within storm.
'''
opts = self._reqValidStormOpts(opts)
return await self.cell.callStorm(text, opts=opts)
async def exportStorm(self, text, opts=None):
'''
Execute a storm query and package nodes for export/import.
NOTE: This API yields nodes after an initial complete lift
in order to limit exported edges.
'''
opts = self._reqValidStormOpts(opts)
async for pode in self.cell.exportStorm(text, opts=opts):
yield pode
async def feedFromAxon(self, sha256, opts=None):
'''
Import a msgpack .nodes file from the axon.
'''
opts = self._reqValidStormOpts(opts)
return await self.cell.feedFromAxon(sha256, opts=opts)
async def _reqDefLayerAllowed(self, perms):
view = self.cell.getView()
wlyr = view.layers[0]
self.user.confirm(perms, gateiden=wlyr.iden)
async def addNode(self, form, valu, props=None):
'''
Deprecated in 2.0.0.
'''
s_common.deprecated('CoreApi.addNode')
async with await self.cell.snap(user=self.user) as snap:
self.user.confirm(('node', 'add', form), gateiden=snap.wlyr.iden)
with s_provenance.claim('coreapi', meth='node:add', user=snap.user.iden):
node = await snap.addNode(form, valu, props=props)
return node.pack()
async def addNodes(self, nodes):
'''
Add a list of packed nodes to the cortex.
Args:
nodes (list): [ ( (form, valu), {'props':{}, 'tags':{}}), ... ]
Yields:
(tuple): Packed node tuples ((form,valu), {'props': {}, 'tags':{}})
Deprecated in 2.0.0
'''
s_common.deprecated('CoreApi.addNodes')
# First check that that user may add each form
done = {}
for node in nodes:
formname = node[0][0]
if done.get(formname):
continue
await self._reqDefLayerAllowed(('node', 'add', formname))
done[formname] = True
async with await self.cell.snap(user=self.user) as snap:
with s_provenance.claim('coreapi', meth='node:add', user=snap.user.iden):
snap.strict = False
async for node in snap.addNodes(nodes):
if node is not None:
node = node.pack()
yield node
async def getFeedFuncs(self):
'''
Get a list of Cortex feed functions.
Notes:
Each feed dictionary has the name of the feed function, the
full docstring for the feed function, and the first line of
the docstring broken out in their own keys for easy use.
Returns:
tuple: A tuple of dictionaries.
'''
return await self.cell.getFeedFuncs()
async def addFeedData(self, name, items, *, viewiden=None):
view = self.cell.getView(viewiden, user=self.user)
if view is None:
raise s_exc.NoSuchView(mesg=f'No such view iden={viewiden}', iden=viewiden)
wlyr = view.layers[0]
parts = name.split('.')
self.user.confirm(('feed:data', *parts), gateiden=wlyr.iden)
await self.cell.boss.promote('feeddata',
user=self.user,
info={'name': name,
'view': view.iden,
'nitems': len(items),
})
async with await self.cell.snap(user=self.user, view=view) as snap:
with s_provenance.claim('feed:data', name=name, user=snap.user.iden):
snap.strict = False
await snap.addFeedData(name, items)
async def count(self, text, opts=None):
'''
Count the number of nodes which result from a storm query.
Args:
text (str): Storm query text.
opts (dict): Storm query options.
Returns:
(int): The number of nodes resulting from the query.
'''
opts = self._reqValidStormOpts(opts)
return await self.cell.count(text, opts=opts)
async def storm(self, text, opts=None):
'''
Evaluate a storm query and yield result messages.
Yields:
((str,dict)): Storm messages.
'''
opts = self._reqValidStormOpts(opts)
async for mesg in self.cell.storm(text, opts=opts):
yield mesg
async def reqValidStorm(self, text, opts=None):
'''
Parse a Storm query to validate it.
Args:
text (str): The text of the Storm query to parse.
opts (dict): A Storm options dictionary.
Returns:
True: If the query is valid.
Raises:
BadSyntaxError: If the query is invalid.
'''
return await self.cell.reqValidStorm(text, opts)
async def syncLayerNodeEdits(self, offs, layriden=None, wait=True):
'''
Yield (indx, mesg) nodeedit sets for the given layer beginning at offset.
Once caught up, this API will begin yielding nodeedits in real-time.
The generator will only terminate on network disconnect or if the
consumer falls behind the max window size of 10,000 nodeedit messages.
'''
layr = self.cell.getLayer(layriden)
if layr is None:
raise s_exc.NoSuchLayer(mesg=f'No such layer {layriden}', iden=layriden)
self.user.confirm(('sync',), gateiden=layr.iden)
async for item in self.cell.syncLayerNodeEdits(layr.iden, offs, wait=wait):
yield item
async def getPropNorm(self, prop, valu):
'''
Get the normalized property value based on the Cortex data model.
Args:
prop (str): The property to normalize.
valu: The value to normalize.
Returns:
(tuple): A two item tuple, containing the normed value and the info dictionary.
Raises:
s_exc.NoSuchProp: If the prop does not exist.
s_exc.BadTypeValu: If the value fails to normalize.
'''
return await self.cell.getPropNorm(prop, valu)
async def getTypeNorm(self, name, valu):
'''
Get the normalized type value based on the Cortex data model.
Args:
name (str): The type to normalize.
valu: The value to normalize.
Returns:
(tuple): A two item tuple, containing the normed value and the info dictionary.
Raises:
s_exc.NoSuchType: If the type does not exist.
s_exc.BadTypeValu: If the value fails to normalize.
'''
return await self.cell.getTypeNorm(name, valu)
async def addForm(self, formname, basetype, typeopts, typeinfo):
'''
Add an extended form to the data model.
Extended forms *must* begin with _
'''
self.user.confirm(('model', 'form', 'add', formname))
return await self.cell.addForm(formname, basetype, typeopts, typeinfo)
async def delForm(self, formname):
'''
Remove an extended form from the data model.
'''
self.user.confirm(('model', 'form', 'del', formname))
return await self.cell.delForm(formname)
async def addFormProp(self, form, prop, tdef, info):
'''
Add an extended property to the given form.
Extended properties *must* begin with _
'''
self.user.confirm(('model', 'prop', 'add', form))
if not s_grammar.isBasePropNoPivprop(prop):
mesg = f'Invalid prop name {prop}'
raise s_exc.BadPropDef(prop=prop, mesg=mesg)
return await self.cell.addFormProp(form, prop, tdef, info)
async def delFormProp(self, form, name):
'''
Remove an extended property from the given form.
'''
self.user.confirm(('model', 'prop', 'del', form))
return await self.cell.delFormProp(form, name)
async def addUnivProp(self, name, tdef, info):
'''
Add an extended universal property.
Extended properties *must* begin with _
'''
self.user.confirm(('model', 'univ', 'add'))
if not s_grammar.isBasePropNoPivprop(name):
mesg = f'Invalid prop name {name}'
raise s_exc.BadPropDef(name=name, mesg=mesg)
return await self.cell.addUnivProp(name, tdef, info)
async def delUnivProp(self, name):
'''
Remove an extended universal property.
'''
self.user.confirm(('model', 'univ', 'del'))
return await self.cell.delUnivProp(name)
async def addTagProp(self, name, tdef, info):
'''
Add a tag property to record data about tags on nodes.
'''
self.user.confirm(('model', 'tagprop', 'add'))
if not s_grammar.isBasePropNoPivprop(name):
mesg = f'Invalid prop name {name}'
raise s_exc.BadPropDef(name=name, mesg=mesg)
return await self.cell.addTagProp(name, tdef, info)
async def delTagProp(self, name):
'''
Remove a previously added tag property.
'''
self.user.confirm(('model', 'tagprop', 'del'))
return await self.cell.delTagProp(name)
async def addStormPkg(self, pkgdef, verify=False):
self.user.confirm(('pkg', 'add'))
return await self.cell.addStormPkg(pkgdef, verify=verify)
async def delStormPkg(self, iden):
self.user.confirm(('pkg', 'del'))
return await self.cell.delStormPkg(iden)
@s_cell.adminapi()
async def getStormPkgs(self):
return await self.cell.getStormPkgs()
@s_cell.adminapi()
async def getStormPkg(self, name):
return await self.cell.getStormPkg(name)
@s_cell.adminapi()
async def addStormDmon(self, ddef):
return await self.cell.addStormDmon(ddef)
@s_cell.adminapi()
async def getStormDmons(self):
return await self.cell.getStormDmons()
@s_cell.adminapi()
async def getStormDmonLog(self, iden):
return await self.cell.getStormDmonLog(iden)
@s_cell.adminapi()
async def getStormDmon(self, iden):
return await self.cell.getStormDmon(iden)
@s_cell.adminapi()
async def bumpStormDmon(self, iden):
return await self.cell.bumpStormDmon(iden)
@s_cell.adminapi()
async def disableStormDmon(self, iden):
return await self.cell.disableStormDmon(iden)
@s_cell.adminapi()
async def enableStormDmon(self, iden):
return await self.cell.enableStormDmon(iden)
@s_cell.adminapi()
async def delStormDmon(self, iden):
return await self.cell.delStormDmon(iden)
@s_cell.adminapi(log=True)
async def enableMigrationMode(self):
await self.cell._enableMigrationMode()
@s_cell.adminapi(log=True)
async def disableMigrationMode(self):
await self.cell._disableMigrationMode()
@s_cell.adminapi()
async def cloneLayer(self, iden, ldef=None):
ldef = ldef or {}
ldef['creator'] = self.user.iden
return await self.cell.cloneLayer(iden, ldef)
async def getStormVar(self, name, default=None):
self.user.confirm(('globals', 'get', name))
return await self.cell.getStormVar(name, default=default)
async def popStormVar(self, name, default=None):
self.user.confirm(('globals', 'pop', name))
return await self.cell.popStormVar(name, default=default)
async def setStormVar(self, name, valu):
self.user.confirm(('globals', 'set', name))
return await self.cell.setStormVar(name, valu)
async def syncLayersEvents(self, offsdict=None, wait=True):
self.user.confirm(('sync',))
async for item in self.cell.syncLayersEvents(offsdict=offsdict, wait=wait):
yield item
async def syncIndexEvents(self, matchdef, offsdict=None, wait=True):
self.user.confirm(('sync',))
async for item in self.cell.syncIndexEvents(matchdef, offsdict=offsdict, wait=wait):
yield item
async def iterFormRows(self, layriden, form, stortype=None, startvalu=None):
'''
Yields buid, valu tuples of nodes of a single form, optionally (re)starting at startvalue
Args:
layriden (str): Iden of the layer to retrieve the nodes
form(str): A form name
stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop
startvalu (Any): The value to start at. May only be not None if stortype is not None.
Returns:
AsyncIterator[Tuple(buid, valu)]
'''
self.user.confirm(('layer', 'lift', layriden))
async for item in self.cell.iterFormRows(layriden, form, stortype=stortype, startvalu=startvalu):
yield item
async def iterPropRows(self, layriden, form, prop, stortype=None, startvalu=None):
'''
Yields buid, valu tuples of nodes with a particular secondary property, optionally (re)starting at startvalue
Args:
layriden (str): Iden of the layer to retrieve the nodes
form(str): A form name.
prop (str): A secondary property name.
stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop
startvalu (Any): The value to start at. May only be not None if stortype is not None.
Returns:
AsyncIterator[Tuple(buid, valu)]
'''
self.user.confirm(('layer', 'lift', layriden))
async for item in self.cell.iterPropRows(layriden, form, prop, stortype=stortype, startvalu=startvalu):
yield item
async def iterUnivRows(self, layriden, prop, stortype=None, startvalu=None):
'''
Yields buid, valu tuples of nodes with a particular universal property, optionally (re)starting at startvalue
Args:
layriden (str): Iden of the layer to retrieve the nodes
prop (str): A universal property name.
stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop
startvalu (Any): The value to start at. May only be not None if stortype is not None.
Returns:
AsyncIterator[Tuple(buid, valu)]
'''
self.user.confirm(('layer', 'lift', layriden))
async for item in self.cell.iterUnivRows(layriden, prop, stortype=stortype, startvalu=startvalu):
yield item
async def iterTagRows(self, layriden, tag, form=None, starttupl=None):
'''
Yields (buid, (valu, form)) values that match a tag and optional form, optionally (re)starting at starttupl.
Args:
layriden (str): Iden of the layer to retrieve the nodes
tag (str): the tag to match
form (Optional[str]): if present, only yields buids of nodes that match the form.
starttupl (Optional[Tuple[buid, form]]): if present, (re)starts the stream of values there.
Returns:
AsyncIterator[Tuple(buid, (valu, form))]
Note:
This yields (buid, (tagvalu, form)) instead of just buid, valu in order to allow resuming an interrupted
call by feeding the last value retrieved into starttupl
'''
self.user.confirm(('layer', 'lift', layriden))
async for item in self.cell.iterTagRows(layriden, tag, form=form, starttupl=starttupl):
yield item
async def iterTagPropRows(self, layriden, tag, prop, form=None, stortype=None, startvalu=None):
'''
Yields (buid, valu) that match a tag:prop, optionally (re)starting at startvalu.
Args:
layriden (str): Iden of the layer to retrieve the nodes
tag (str): tag name
prop (str): prop name
form (Optional[str]): optional form name
stortype (Optional[int]): a STOR_TYPE_* integer representing the type of form:prop
startvalu (Any): The value to start at. May only be not None if stortype is not None.
Returns:
AsyncIterator[Tuple(buid, valu)]
'''
self.user.confirm(('layer', 'lift', layriden))
async for item in self.cell.iterTagPropRows(layriden, tag, prop, form=form, stortype=stortype,
startvalu=startvalu):
yield item
async def getAxonUpload(self):
self.user.confirm(('axon', 'upload'))
await self.cell.axready.wait()
upload = await self.cell.axon.upload()
return await s_axon.UpLoadProxy.anit(self.link, upload)
async def getAxonBytes(self, sha256):
self.user.confirm(('axon', 'get'))
await self.cell.axready.wait()
async for byts in self.cell.axon.get(s_common.uhex(sha256)):
yield byts
@s_cell.adminapi()
async def getUserNotif(self, indx):
return await self.cell.getUserNotif(indx)
@s_cell.adminapi()
async def delUserNotif(self, indx):
return await self.cell.delUserNotif(indx)
@s_cell.adminapi()
async def addUserNotif(self, useriden, mesgtype, mesgdata=None):
return await self.cell.addUserNotif(useriden, mesgtype, mesgdata=mesgdata)
@s_cell.adminapi()
async def iterUserNotifs(self, useriden, size=None):
async for item in self.cell.iterUserNotifs(useriden, size=size):
yield item
@s_cell.adminapi()
async def watchAllUserNotifs(self, offs=None):
async for item in self.cell.watchAllUserNotifs(offs=offs):
yield item
@s_cell.adminapi()
async def getHttpExtApiByPath(self, path):
return await self.cell.getHttpExtApiByPath(path)
class Cortex(s_oauth.OAuthMixin, s_cell.Cell): # type: ignore
'''
A Cortex implements the synapse hypergraph.
The bulk of the Cortex API lives on the Snap() object which can
be obtained by calling Cortex.snap() in a with block. This allows
callers to manage transaction boundaries explicitly and dramatically
increases performance.
'''
# For the cortex, nexslog:en defaults to True
confbase = copy.deepcopy(s_cell.Cell.confbase)
confbase['nexslog:en']['default'] = True # type: ignore
confbase['mirror']['hidedocs'] = False # type: ignore
confbase['mirror']['hidecmdl'] = False # type: ignore
confdefs = {
'axon': {
'description': 'A telepath URL for a remote axon.',
'type': 'string'
},
'jsonstor': {
'description': 'A telepath URL for a remote jsonstor.',
'type': 'string'
},
'cron:enable': {
'default': True,
'description': 'Enable cron jobs running.',
'type': 'boolean'
},
'trigger:enable': {
'default': True,
'description': 'Enable triggers running.',
'type': 'boolean'
},
'layer:lmdb:map_async': {
'default': True,
'description': 'Set the default lmdb:map_async value in LMDB layers.',
'type': 'boolean'
},
'layer:lmdb:max_replay_log': {
'default': 10000,
'description': 'Set the max size of the replay log for all layers.',
'type': 'integer'
},
'layers:lockmemory': {
'default': False,
'description': 'Should new layers lock memory for performance by default.',
'type': 'boolean'
},
'layers:logedits': {
'default': True,
'description': 'Whether nodeedits are logged in each layer.',
'type': 'boolean'
},
'provenance:en': { # TODO: Remove in 3.0.0
'default': False,
'description': 'This no longer does anything.',
'type': 'boolean',
'hideconf': True,
},
'max:nodes': {
'description': 'Maximum number of nodes which are allowed to be stored in a Cortex.',
'type': 'integer',
'minimum': 1,
'hidecmdl': True,
},
'modules': {
'default': [],
'description': 'A list of module classes to load.',
'type': 'array'
},
'storm:log': {
'default': False,
'description': 'Log storm queries via system logger.',
'type': 'boolean'
},
'storm:log:level': {
'default': 'INFO',
'description': 'Logging log level to emit storm logs at.',
'type': [
'integer',
'string',
],
},
'storm:interface:search': {
'default': True,
'description': 'Enable Storm search interfaces for lookup mode.',
'type': 'boolean',
},
'storm:interface:scrape': {
'default': True,
'description': 'Enable Storm scrape interfaces when using $lib.scrape APIs.',
'type': 'boolean',
},
'http:proxy': {
'description': 'An aiohttp-socks compatible proxy URL to use storm HTTP API.',
'type': 'string',
},
'tls:ca:dir': {
'description': 'An optional directory of CAs which are added to the TLS CA chain for Storm HTTP API calls.',
'type': 'string',
},
}
cellapi = CoreApi
viewapi = s_view.ViewApi
layerapi = s_layer.LayerApi
hiveapi = s_hive.HiveApi
viewctor = s_view.View.anit
layrctor = s_layer.Layer.anit
# phase 2 - service storage
async def initServiceStorage(self):
# NOTE: we may not make *any* nexus actions in this method
self.macrodb = self.slab.initdb('storm:macros')
self.httpextapidb = self.slab.initdb('http:ext:apis')
if self.inaugural:
await self.cellinfo.set('cortex:version', s_version.version)
corevers = self.cellinfo.get('cortex:version')
s_version.reqVersion(corevers, reqver, exc=s_exc.BadStorageVersion,
mesg='cortex version in storage is incompatible with running software')
self.viewmeta = self.slab.initdb('view:meta')
self.views = {}
self.layers = {}
self.viewsbylayer = collections.defaultdict(list)
self.modules = {}
self.feedfuncs = {}
self.stormcmds = {}
self.maxnodes = self.conf.get('max:nodes')
self.nodecount = 0
self.migration = False
self.stormmods = {} # name: mdef
self.stormpkgs = {} # name: pkgdef
self.stormvars = None # type: s_hive.HiveDict
self.svcsbyiden = {}
self.svcsbyname = {}
self.svcsbysvcname = {} # remote name, not local name
self._propSetHooks = {}
self._runtLiftFuncs = {}
self._runtPropSetFuncs = {}
self._runtPropDelFuncs = {}
self.ontagadds = collections.defaultdict(list)
self.ontagdels = collections.defaultdict(list)
self.ontagaddglobs = s_cache.TagGlobs()
self.ontagdelglobs = s_cache.TagGlobs()
self.tagvalid = s_cache.FixedCache(self._isTagValid, size=1000)
self.tagprune = s_cache.FixedCache(self._getTagPrune, size=1000)
self.querycache = s_cache.FixedCache(self._getStormQuery, size=10000)
self.libroot = (None, {}, {})
self.stormlibs = []
self.bldgbuids = {} # buid -> (Node, Event) Nodes under construction
self.axon = None # type: s_axon.AxonApi
self.axready = asyncio.Event()
self.axoninfo = {}
self.view = None # The default/main view
self._cortex_permdefs = []
self._initCorePerms()
# Reset the storm:log:level from the config value to an int for internal use.
self.conf['storm:log:level'] = s_common.normLogLevel(self.conf.get('storm:log:level'))
self.stormlog = self.conf.get('storm:log')
self.stormloglvl = self.conf.get('storm:log:level')
# generic fini handler for the Cortex
self.onfini(self._onCoreFini)
await self._initCoreHive()
self._initStormLibs()
self._initFeedFuncs()
self.modsbyiface = {}
self.stormiface_search = self.conf.get('storm:interface:search')
self.stormiface_scrape = self.conf.get('storm:interface:scrape')
self._initCortexHttpApi()
self._exthttpapis = {} # iden -> adef; relies on cpython ordered dictionary behavior.
self._exthttpapiorder = b'exthttpapiorder'
self._exthttpapicache = s_cache.FixedCache(self._getHttpExtApiByPath, size=1000)
self._initCortexExtHttpApi()
self.model = s_datamodel.Model()
await self._bumpCellVers('cortex:extmodel', (
(1, self._migrateTaxonomyIface),
), nexs=False)
# Perform module loading
await self._loadCoreMods()
await self._loadExtModel()
await self._initStormCmds()
# Initialize our storage and views
await self._initCoreAxon()
await self._initCoreLayers()
await self._initCoreViews()
self.onfini(self._finiStor)
await self._initCoreQueues()
self.addHealthFunc(self._cortexHealth)
await self._initOAuthManager()
stormdmonhive = await self.hive.open(('cortex', 'storm', 'dmons'))
self.stormdmonhive = await stormdmonhive.dict()
self.stormdmons = await s_storm.DmonManager.anit(self)
self.onfini(self.stormdmons)
self.agenda = await s_agenda.Agenda.anit(self)
self.onfini(self.agenda)
await self._initStormGraphs()
self.trigson = self.conf.get('trigger:enable')
await self._initRuntFuncs()
taghive = await self.hive.open(('cortex', 'tagmeta'))
cmdhive = await self.hive.open(('cortex', 'storm', 'cmds'))