diff --git a/conpot/protocols/snmp/build_pysnmp_mib_wrapper.py b/conpot/protocols/snmp/build_pysnmp_mib_wrapper.py deleted file mode 100644 index afc91752..00000000 --- a/conpot/protocols/snmp/build_pysnmp_mib_wrapper.py +++ /dev/null @@ -1,149 +0,0 @@ -# Copyright (C) 2013 Johnny Vestergaard -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - -from gevent import subprocess -import logging -import os -import re - -from pysmi.reader.localfile import FileReader -from pysmi.reader.httpclient import HttpReader -from pysmi.searcher.pyfile import PyFileSearcher -from pysmi.searcher.pypackage import PyPackageSearcher -from pysmi.searcher.stub import StubSearcher -from pysmi.writer.pyfile import PyFileWriter -from pysmi.parser.smi import SmiV2Parser -from pysmi.codegen.pysnmp import PySnmpCodeGen, baseMibs -from pysmi.compiler import MibCompiler - -logger = logging.getLogger(__name__) - -BUILD_SCRIPT = 'build-pysnmp-mib' - -# dict of lists, where the list contain the dependency names for the given dict key -mib_dependency_map = {} -compiled_mibs = [] -# key = mib name, value = full path to the file -file_map = {} - - -def mib2pysnmp(mib_file, output_dir): - """ - The 'build-pysnmp-mib' script we previously used is no longer available - Latest pysmi has the ability to generate a .py file from .mib automatically - - :param mib_file: path to the .mib file we want to compile - :param output_dir: path to the output directory - :return: True if we successfully compile the .mib to a .py - """ - - logger.debug('Compiling mib file: %s', mib_file) - - # create a mib compiler with output dir - mibCompiler = MibCompiler(SmiV2Parser(), PySnmpCodeGen(), - PyFileWriter(output_dir)) - - # add sources from where we fetch dependencies - mibCompiler.addSources(HttpReader('mibs.snmplabs.com', 80, '/asn1/@mib@')) - mibCompiler.addSources( - FileReader(os.path.dirname(os.path.abspath(mib_file)))) - - # add searchers - mibCompiler.addSearchers(PyFileSearcher(output_dir)) - mibCompiler.addSearchers(PyPackageSearcher('pysnmp.mibs')) - mibCompiler.addSearchers(StubSearcher(*baseMibs)) - - # compile, there should be a MIBFILE.py generated under output_dir - mibName = os.path.basename(mib_file).replace('.mib', '') - results = mibCompiler.compile(mibName) - - if results[mibName] == 'compiled' or results[mibName] == 'untouched': - return True - - return False - - -def _get_files(raw_mibs_dir, recursive): - for dir_path, dirs, files in os.walk(raw_mibs_dir, followlinks=True): - for file_name in files: - yield os.path.join(dir_path, file_name) - if not recursive: - break - - -def generate_dependencies(data, mib_name): - """ - Parses a MIB for dependencies and populates an internal dependency map. - :param data: A string representing an entire MIB file (string). - :param mib_name: Name of the MIB (string). - """ - if mib_name not in mib_dependency_map: - mib_dependency_map[mib_name] = [] - imports_section_search = re.search('IMPORTS(?P.*?);', data, re.DOTALL) - if imports_section_search: - imports_section = imports_section_search.group('imports_section') - for dependency in re.finditer('FROM (?P[\w-]+)', imports_section): - dependency_name = dependency.group('mib_name') - if dependency_name not in mib_dependency_map: - mib_dependency_map[dependency_name] = [] - mib_dependency_map[mib_name].append(dependency_name) - - -def find_mibs(raw_mibs_dirs, recursive=True): - """ - Scans for MIB files and populates an internal MIB->path mapping. - :param raw_mibs_dirs: Directories to search for MIB files (list of strings). - :param recursive: If True raw_mibs_dirs will be scanned recursively. - :return: A list of found MIB names (list of strings). - """ - files_scanned = 0 - for raw_mibs_dir in raw_mibs_dirs: - for _file in _get_files(raw_mibs_dir, recursive): - files_scanned += 1 - # making sure we don't start parsing some epic file - if os.path.getsize(_file) > int('1048576'): - continue - with open(_file) as _mibfile: - data = _mibfile.read() - # 2048 - just like a rock star. - mib_search = re.search('(?P[\w-]+) DEFINITIONS ::= BEGIN', data[0:2048], re.IGNORECASE) - if mib_search: - mib_name = mib_search.group('mib_name') - file_map[mib_name] = _file - generate_dependencies(data, mib_name) - logging.debug('Done scanning for mib files, recursive scan was initiated from {0} directories and found {1} ' - 'MIB files of {2} scanned files.' - .format(len(raw_mibs_dirs), len(file_map), files_scanned)) - return list(file_map.keys()) - - -def compile_mib(mib_name, output_dir): - """ - Compiles the given mib_name if it is found in the internal MIB file map. If the MIB depends on other MIBs, - these will get compiled automatically. - :param mib_name: Name of mib to compile (string). - :param output_dir: Output directory (string). - """ - # resolve dependencies recursively - - for dependency in mib_dependency_map[mib_name]: - if dependency not in compiled_mibs and dependency in file_map: - compile_mib(dependency, output_dir) - - result = mib2pysnmp(file_map[mib_name], output_dir) - if result: - compiled_mibs.append(mib_name) diff --git a/conpot/protocols/snmp/command_responder.py b/conpot/protocols/snmp/command_responder.py index 5af6b22d..04d5319e 100644 --- a/conpot/protocols/snmp/command_responder.py +++ b/conpot/protocols/snmp/command_responder.py @@ -139,7 +139,7 @@ def register(self, mibname, symbolname, instance, value, profile_map_name): logger.debug('Registered: OID %s Instance %s ASN.1 (%s @ %s) value %s dynrsp.', s.name, instance, s.label, mibname, value) else: - logger.debug('Skipped: OID for symbol %s not found in MIB %s', symbolname, mibname) + logger.warning('Skipped: OID for symbol %s not found in MIB %s', symbolname, mibname) def _get_mibSymbol(self, mibname, symbolname): modules = self.snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.mibSymbols diff --git a/conpot/protocols/snmp/mib_compiler.py b/conpot/protocols/snmp/mib_compiler.py new file mode 100644 index 00000000..88eee6a3 --- /dev/null +++ b/conpot/protocols/snmp/mib_compiler.py @@ -0,0 +1,66 @@ +# Copyright (C) 2013 Johnny Vestergaard +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +import logging +from typing import List + +from pysmi.codegen.pysnmp import PySnmpCodeGen, baseMibs +from pysmi.compiler import MibCompiler, statusCompiled, statusUntouched +from pysmi.parser.smi import SmiV2Parser +from pysmi.reader.httpclient import HttpReader +from pysmi.reader.localfile import FileReader +from pysmi.searcher.pyfile import PyFileSearcher +from pysmi.searcher.pypackage import PyPackageSearcher +from pysmi.searcher.stub import StubSearcher +from pysmi.writer.pyfile import PyFileWriter + +logger = logging.getLogger(__name__) + + +def compile_mib(mib_name: str, mib_sources: List[str], output_dir: str) -> bool: + """ + Compile raw MIB to .py file, fetching dependencies as necessary. + + :param mib_name: name of the MIB to compile + :param mib_sources: list of paths to search for raw MIBs + :param output_dir: path to the output directory + :return: True if we successfully compile the .mib to a .py + """ + + logger.debug("Compiling MIB: %s", mib_name) + + # create a mib compiler with output dir + mib_compiler = MibCompiler(SmiV2Parser(), PySnmpCodeGen(), PyFileWriter(output_dir)) + + # add sources from where we fetch dependencies + for source in mib_sources: + mib_compiler.addSources(FileReader(source)) + + mib_compiler.addSources(HttpReader("mibs.snmplabs.com", 80, "/asn1/@mib@")) + + # add searchers + mib_compiler.addSearchers(PyFileSearcher(output_dir)) + mib_compiler.addSearchers(PyPackageSearcher("pysnmp.mibs")) + mib_compiler.addSearchers(StubSearcher(*baseMibs)) + + # compile, there should be a MIBFILE.py generated under output_dir + results = mib_compiler.compile(mib_name) + + if results[mib_name] == statusCompiled or results[mib_name] == statusUntouched: + return True + + return False diff --git a/conpot/protocols/snmp/snmp_server.py b/conpot/protocols/snmp/snmp_server.py index 6572446a..5ecc08ae 100644 --- a/conpot/protocols/snmp/snmp_server.py +++ b/conpot/protocols/snmp/snmp_server.py @@ -16,16 +16,14 @@ # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import logging -import tempfile -import shutil import os from lxml import etree + +import conpot.core as conpot_core from conpot.core.protocol_wrapper import conpot_protocol from conpot.protocols.snmp.command_responder import CommandResponder -from conpot.protocols.snmp.build_pysnmp_mib_wrapper import find_mibs, compile_mib -import conpot.core as conpot_core - +from conpot.protocols.snmp.mib_compiler import compile_mib logger = logging.getLogger() @@ -81,46 +79,41 @@ def xml_general_config(self, dom): elif entity.attrib['command'].lower() == 'bulk': self.cmd_responder.resp_app_bulk.threshold = self.config_sanitize_threshold(entity.text) - def xml_mib_config(self, dom, mibpaths, rawmibs_dirs): - try: - mibs = dom.xpath('//snmp/mibs/*') - tmp_mib_dir = tempfile.mkdtemp() - mibpaths.append(tmp_mib_dir) - available_mibs = find_mibs(rawmibs_dirs) - - databus = conpot_core.get_databus() - # parse mibs and oid tables - for mib in mibs: - mib_name = mib.attrib['name'] - # compile the mib file if it is found and not already loaded. - if mib_name in available_mibs and not self.cmd_responder.has_mib(mib_name): - compile_mib(mib_name, tmp_mib_dir) - for symbol in mib: - symbol_name = symbol.attrib['name'] - - # retrieve instance from template - if 'instance' in symbol.attrib: - # convert instance to (int-)tuple - symbol_instance = symbol.attrib['instance'].split('.') - symbol_instance = tuple(map(int, symbol_instance)) - else: - # use default instance (0) - symbol_instance = (0,) - - - # retrieve value from databus - value = databus.get_value(symbol.xpath('./value/text()')[0]) - profile_map_name = symbol.xpath('./value/text()')[0] - - # register this MIB instance to the command responder - self.cmd_responder.register(mib_name, - symbol_name, - symbol_instance, - value, - profile_map_name) - finally: - # cleanup compiled mib files - shutil.rmtree(tmp_mib_dir) + def xml_mib_config(self, dom, compiled_dirs, rawmibs_dirs): + mibs = dom.xpath('//snmp/mibs/*') + + databus = conpot_core.get_databus() + # parse mibs and oid tables + for mib in mibs: + mib_name = mib.attrib['name'] + + if not self.cmd_responder.has_mib(mib_name): + if not compile_mib(mib_name, rawmibs_dirs, compiled_dirs[0]): + logger.warning('Skipped: Failed to compile MIB %s', mib_name) + continue + + for symbol in mib: + symbol_name = symbol.attrib['name'] + + # retrieve instance from template + if 'instance' in symbol.attrib: + # convert instance to (int-)tuple + symbol_instance = symbol.attrib['instance'].split('.') + symbol_instance = tuple(map(int, symbol_instance)) + else: + # use default instance (0) + symbol_instance = (0,) + + # retrieve value from databus + value = databus.get_value(symbol.xpath('./value/text()')[0]) + profile_map_name = symbol.xpath('./value/text()')[0] + + # register this MIB instance to the command responder + self.cmd_responder.register(mib_name, + symbol_name, + symbol_instance, + value, + profile_map_name) def config_sanitize_tarpit(self, value): diff --git a/conpot/templates/IEC104/template.xml b/conpot/templates/IEC104/template.xml index 3144519e..9b739ac3 100644 --- a/conpot/templates/IEC104/template.xml +++ b/conpot/templates/IEC104/template.xml @@ -70,7 +70,7 @@ 100000000 - "\x00\x0e\x8c\x29\xc5\x1a" + "0x000e8c29c51a" 1 diff --git a/conpot/tests/test_pysnmp_wrapper.py b/conpot/tests/test_pysnmp_wrapper.py deleted file mode 100644 index 898615d5..00000000 --- a/conpot/tests/test_pysnmp_wrapper.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright (C) 2013 Johnny Vestergaard -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., -# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - -import gevent.monkey; gevent.monkey.patch_all() -import unittest -import tempfile -import shutil -import conpot -import os -from conpot.protocols.snmp.build_pysnmp_mib_wrapper import mib2pysnmp, find_mibs, compile_mib -from conpot.protocols.snmp import command_responder - - -def check_content(pyfile): - ret = False - with open(pyfile) as f: - for l in f.readlines(): - if 'mibBuilder.exportSymbols("VOGON-POEM-MIB"' in l: - ret = True - break - return ret - - -class TestPySNMPWrapper(unittest.TestCase): - - def setUp(self): - self.dir_name = os.path.dirname(conpot.__file__) - self.mib_file = os.path.join(self.dir_name + '/tests/data/VOGON-POEM-MIB.mib') - - def test_wrapper_processing(self): - """ - Tests that the wrapper can process a valid mib file without errors. - """ - tmpdir = tempfile.mkdtemp() - result = mib2pysnmp(self.mib_file, tmpdir) - self.assertTrue(result and check_content(os.path.join(tmpdir, 'VOGON-POEM-MIB.py')), - 'mib2pysnmp2 did not generate the expected output.') - - def test_wrapper_output(self): - """ - Tests that the wrapper generates output that can be consumed by the command responder. - """ - tmpdir = None - - try: - result = None - tmpdir = tempfile.mkdtemp() - - if mib2pysnmp(self.mib_file, tmpdir): - cmd_responder = command_responder.CommandResponder('', 0, [tmpdir]) - cmd_responder.snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.loadModules('VOGON-POEM-MIB') - result = cmd_responder._get_mibSymbol('VOGON-POEM-MIB', 'poemNumber') - - self.assertIsNotNone(result, 'The expected MIB (VOGON-POEM-MIB) could not be loaded.') - finally: - shutil.rmtree(tmpdir) - - def test_find(self): - """ - Tests that the wrapper can find mib files. - """ - input_dir = None - try: - input_dir = tempfile.mkdtemp() - input_file = self.mib_file - shutil.copy(input_file, input_dir) - available_mibs = find_mibs([input_dir]) - self.assertIn('VOGON-POEM-MIB', available_mibs) - finally: - shutil.rmtree(input_dir) - - def test_compile(self): - """ - Tests that the wrapper can output mib files. - """ - input_dir = None - output_dir = None - try: - input_dir = tempfile.mkdtemp() - output_dir = tempfile.mkdtemp() - shutil.copy(self.mib_file, input_dir) - find_mibs([input_dir]) - compile_mib('VOGON-POEM-MIB', output_dir) - self.assertIn('VOGON-POEM-MIB.py', os.listdir(output_dir)) - finally: - shutil.rmtree(input_dir) - shutil.rmtree(output_dir) - - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/conpot/tests/test_snmp_mib_compiler.py b/conpot/tests/test_snmp_mib_compiler.py new file mode 100644 index 00000000..4918dcd3 --- /dev/null +++ b/conpot/tests/test_snmp_mib_compiler.py @@ -0,0 +1,69 @@ +# Copyright (C) 2013 Johnny Vestergaard +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +import gevent.monkey; gevent.monkey.patch_all() +import unittest +from tempfile import TemporaryDirectory +import conpot +import os +from conpot.protocols.snmp.mib_compiler import compile_mib +from conpot.protocols.snmp import command_responder + + +def check_content(pyfile): + ret = False + with open(pyfile) as f: + for l in f.readlines(): + if 'mibBuilder.exportSymbols("VOGON-POEM-MIB"' in l: + ret = True + break + return ret + + +class TestPySNMPWrapper(unittest.TestCase): + + def setUp(self): + self.dir_name = os.path.dirname(conpot.__file__) + self.mib_dirs = [os.path.join(self.dir_name + '/tests/data/')] + + def test_compile_with_dependency(self): + """ + Tests that the compiler can process valid mib files with an external dependency. + """ + with TemporaryDirectory() as output_dir: + result = compile_mib('VOGON-POEM-MIB', self.mib_dirs, output_dir) + self.assertTrue(result) + self.assertTrue(check_content(os.path.join(output_dir, 'VOGON-POEM-MIB.py'))) + self.assertIn('SNMPv2-MIB.py', os.listdir(output_dir)) + + def test_command_responder_loads_compiled_file(self): + """ + Tests that the compiler generates output that can be consumed by the command responder. + """ + with TemporaryDirectory() as tmpdir: + result = None + + if compile_mib("VOGON-POEM-MIB", self.mib_dirs, tmpdir): + cmd_responder = command_responder.CommandResponder('', 0, [tmpdir]) + cmd_responder.snmpEngine.msgAndPduDsp.mibInstrumController.mibBuilder.loadModules('VOGON-POEM-MIB') + result = cmd_responder._get_mibSymbol('VOGON-POEM-MIB', 'poemNumber') + + self.assertIsNotNone(result, 'The expected MIB (VOGON-POEM-MIB) could not be loaded.') + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/docs/source/api/reference/conpot.protocols.snmp.rst b/docs/source/api/reference/conpot.protocols.snmp.rst index 6953acee..821f6cc8 100644 --- a/docs/source/api/reference/conpot.protocols.snmp.rst +++ b/docs/source/api/reference/conpot.protocols.snmp.rst @@ -4,14 +4,6 @@ conpot.protocols.snmp package Submodules ---------- -conpot.protocols.snmp.build\_pysnmp\_mib\_wrapper module --------------------------------------------------------- - -.. automodule:: conpot.protocols.snmp.build_pysnmp_mib_wrapper - :members: - :undoc-members: - :show-inheritance: - conpot.protocols.snmp.command\_responder module ----------------------------------------------- @@ -36,6 +28,14 @@ conpot.protocols.snmp.databus\_mediator module :undoc-members: :show-inheritance: +conpot.protocols.snmp.mib\_compiler +----------------------------------- + +.. automodule:: conpot.protocols.snmp.mib_compiler + :members: + :undoc-members: + :show-inheritance: + conpot.protocols.snmp.snmp\_server module -----------------------------------------