forked from osquery/osquery-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtable_plugin.py
135 lines (107 loc) · 4.34 KB
/
table_plugin.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree. An additional grant
of patent rights can be found in the PATENTS file in the same directory.
"""
# pylint: disable=no-self-use
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
from abc import ABCMeta, abstractmethod
from builtins import str
from collections import namedtuple
from future.utils import with_metaclass
import json
import logging
from osquery.extensions.ttypes import ExtensionResponse, ExtensionStatus
from osquery.plugin import BasePlugin
class TablePlugin(with_metaclass(ABCMeta, BasePlugin)):
"""All table plugins should inherit from TablePlugin"""
_no_action_message = "Table plugins must include a request action"
def call(self, context):
"""Internal routing for this plugin type.
Do not override this method.
"""
if "action" not in context:
return ExtensionResponse(
status=ExtensionStatus(code=1,
message=self._no_action_message,),
response=[],)
if context["action"] == "generate":
ctx = {}
if "context" in context:
ctx = json.loads(context["context"])
rows = self.generate(ctx)
for i, row in enumerate(rows):
for key, value in row.items():
if not isinstance(value, str):
try:
rows[i][key] = str(value)
except ValueError as e:
rows[i][key] = ''
logging.error("Cannot convert key %s: %s" % (
i, key, str(e)))
return ExtensionResponse(
status=ExtensionStatus(code=0, message="OK",),
response=rows)
elif context["action"] == "columns":
return ExtensionResponse(
status=ExtensionStatus(code=0, message="OK",),
response=self.routes(),)
return ExtensionResponse(code=1, message="Unknown action",)
def registry_name(self):
"""The name of the registry type for table plugins.
Do not override this method."""
return "table"
def routes(self):
"""The routes that will be broadcasted for table plugins
Do not override this method.
"""
routes = []
for column in self.columns():
route = {
"id": "column",
"name": column.name,
"type": column.type,
"op": "0",
}
routes.append(route)
return routes
@abstractmethod
def columns(self):
"""The columns of your table plugin.
This method should return an array of osquery.TableColumn instances.
Consider the following example:
class MyTablePlugin(osquery.TablePlugin):
def columns(self):
return [
osquery.TableColumn(name="foo", type=osquery.STRING),
osquery.TableColumn(name="baz", type=osquery.STRING),
]
This must be implemented by your plugin.
"""
raise NotImplementedError
@abstractmethod
def generate(self, context):
"""The implementation of your table plugin.
This method should return a list of dictionaries, such that each
dictionary has a key corresponding to each of your table's columns.
Consider the following example:
class MyTablePlugin(osquery.TablePlugin):
def generate(self, context):
query_data = []
for i in range(5):
row = {}
row["foo"] = "bar"
row["baz"] = "boo"
query_data.append(row)
return query_data
This must be implemented by your plugin.
"""
raise NotImplementedError
STRING = "TEXT"
"""The text SQL column type"""
INTEGER = "INTEGER"
"""The integer SQL column type"""
TableColumn = namedtuple("TableColumn", ["name", "type"])
"""An object which allows you to define the name and type of a SQL column"""