Skip to content
This repository has been archived by the owner on Jan 19, 2022. It is now read-only.

Commit

Permalink
Merge djmitche/build-fwunit:issue20 (PR #20)
Browse files Browse the repository at this point in the history
  • Loading branch information
djmitche committed Mar 15, 2015
2 parents 5ad9048 + 5814cf1 commit 98affa7
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 142 deletions.
76 changes: 76 additions & 0 deletions docs/implementation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,79 @@ The rules are normalized as follows (and this is what consumes most of the time
* If traffic matches a rule, it is permitted. If no rule matches, it is denied.

* Policies allowing any application are represented by explicit rules for each known application, with the addition of rules with application '@@other' to represent the unknown applications.


Loading Source Objects
----------------------

Rule sets are embedded in :py:class:`~fwunit.analysis.sources.Source` objects, which provide a set of useful methods for analysis.
In a testing environment, rule sets are loaded via :py:class:`~fwunit.analysis.testcontext.TestContext`; this section describes access to rules within fwunit itself.

To get a source object, you will first need a config, which can be retrieved from :py:func:`~fwunit.analysis.config.load_config`:

.. py:function:: fwunit.analysis.config.load_config(filename="fwunit.yaml")
:param filename: the configuration filename to load
:returns: a config object

Load a configuration file.
As a side-effect, this function chdir's to the configuration directory.

The function operates on the assumption that a single process will only ever reference one configuration, and thus caches the configuration after the first call.
Subsequent calls with the same filename will return the same object.
Subsequent calls with a different filename will raise an exception.

With the config object in hand, call :py:func:`~fwunit.analysis.sources.load_source`:

.. py:function:: fwunit.analysis.sources.load_source(config, source)
:param config: a config object
:param source: the name of the source to load
:returns: a source object
:rtype: :py:class:`~fwunit.analysis.sources.Source`

Load the ruleset for the given source.
Rulesets are cached globally to the process.

Using Source Objects
--------------------

.. py:class:: fwunit.analysis.sources.Source
The data from a particular source in ``fwunit.yaml``, along with some analysis methods.

.. py:method:: rulesDeny(src, dst, apps)
:param src: source IPs
:param dst: destination IPs
:param apps: application names
:type apps: list or string

Returns True if the rules deny all traffic from *src* to *dst* via all given *apps*; otherwise False.

.. py:method:: rulesPermit(src, dst, apps)
:param src: source IPs
:param dst: destination IPs
:param apps: application names
:type apps: list or string

Returns True if the rules allow all traffic from *src* to *dst* via all given *apps*; otherwise False.

Note that ``rulesdeny(..)`` is not the same as ``not rulesPermit(..)``: if some -- but not all -- traffic is permitted from *src* to *dst*, then both methods will return False.

.. py:method:: allApps(src, dst, debug=False)
:param src: source IPs
:param dst: destination IPs
:param debug: if True, log the full list of matching flows

See :py:meth:`~fwunit.analysis.testcontext.TestContext.allApps`.

.. py:method:: sourcesFor(dst, app, ignore_sources=None)
:param dst: destination IPs
:param app: application
:param ignore_sources: source IPs to ignore

See :py:meth:`~fwunit.analysis.testcontext.TestContext.sourcesFor`.
12 changes: 6 additions & 6 deletions docs/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The following might be in a file named `test_puppet.py`.
from fwunit import TestContext
from fwunit.ip import IP, IPSet
rules = TestContext('my-network')
tc = TestContext('my-network')
# hosts
Expand All @@ -37,17 +37,17 @@ The following might be in a file named `test_puppet.py`.
def test_puppetmaster_access():
"""The entire internal_network can access the puppet masters."""
for app in 'puppet', 'http', 'https':
fw1.assertPermits(internal_network, puppetmasters, app)
tc.assertPermits(internal_network, puppetmasters, app)
def test_puppetmaster_no_other_apps():
"""Access to puppetmasters is limited to puppet, http, and https"""
fw1.assertAllApps(IPSet([IP('0.0.0.0/0')]), puppetmasters,
tc.assertAllApps(IPSet([IP('0.0.0.0/0')]), puppetmasters,
['puppet', 'http', 'https'])
def test_puppetmaster_limited():
"""The exteernal networks cannot access the puppet masters."""
for app in 'puppet', 'http', 'https':
fw1.assertDenies(external_network, puppetmasters, app)
tc.assertDenies(external_network, puppetmasters, app)
Running this test is as simple as

Expand All @@ -65,7 +65,7 @@ It's safe to do this individually in each test script, as the results are cached
from fwunit import TestContext
rules = TestContext('source-name')
tc = TestContext('source-name')
The :py:class:`~fwunit.analysis.testcontext.TestContext` class uses ``fwunit.yaml`` in the current directory to look up the proper source file for the given source name.

Expand Down Expand Up @@ -102,7 +102,7 @@ Once you have the rules loaded, you can start writing test methods::

def test_puppetmaster_access():
for app in 'puppet', 'http', 'https':
fw1.assertPermits(internal_network, puppetmasters, app)
tc.assertPermits(internal_network, puppetmasters, app)

Utility Methods
---------------
Expand Down
28 changes: 28 additions & 0 deletions fwunit/analysis/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import os
import yaml

_loaded_config = None

def load_config(filename="fwunit.yaml"):
global _loaded_config
if _loaded_config:
if _loaded_config[0] != filename:
raise RuntimeError("load_config already called with %r" % (_loaded_config[0],))
return

# chdir to cfg file so rel paths work
config_dir = os.path.dirname(os.path.abspath(filename))
os.chdir(config_dir)

_loaded_config = (filename, yaml.load(open(filename)))
return _loaded_config[1]


def _clear():
# for tests only
global _loaded_config
_loaded_config = None
134 changes: 134 additions & 0 deletions fwunit/analysis/sources.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import itertools
import json
import logging

from blessings import Terminal
from fwunit import types
from fwunit.ip import IP, IPSet, IPPairs

log = logging.getLogger(__name__)
terminal = Terminal()

def _ipset(ip):
if isinstance(ip, basestring):
ip = IP(ip)
if isinstance(ip, IP):
ip = IPSet([ip])
return ip

class Source(object):
def __init__(self, filename):
self.rules = types.from_jsonable(json.load(open(filename))['rules'])

def _rules_for_app(self, app):
# get the rules for the given app, or if no such app is known, for @@other
try:
return self.rules[app]
except KeyError:
return self.rules.get('@@other', [])

def rulesDeny(self, src, dst, apps):
src = _ipset(src)
dst = _ipset(dst)
log.info("rulesDeny(%r, %r, %r)" % (src, dst, apps))
failures = 0
apps = apps if not isinstance(apps, basestring) else [apps]
for app in apps:
log.info("checking application %r" % app)
for rule in self._rules_for_app(app):
src_matches = (rule.src & src)
if not src_matches:
continue
dst_matches = (rule.dst & dst)
if not dst_matches:
continue
log.error("policy {t.cyan}{name}{t.normal} permits {t.bold_cyan}{app}{t.normal} "
"traffic\n{t.yellow}{src}{t.normal} -> {t.magenta}{dst}{t.normal}".format(
t=terminal,
name=rule.name,
app=app,
src=src_matches,
dst=dst_matches)
)
failures += 1
return failures == 0

def rulesPermit(self, src, dst, apps):
src = _ipset(src)
dst = _ipset(dst)
log.info("rulesPermit(%r, %r, %r)" % (src, dst, apps))
remaining = IPPairs((src, dst))
apps = apps if not isinstance(apps, basestring) else [apps]
for app in apps:
log.info("checking application %r" % app)
for rule in self._rules_for_app(app):
if (rule.src & src) and (rule.dst & dst):
log.info("matched policy {t.cyan}{name}{t.normal}\n{t.yellow}{src}{t.normal} "
"-> {t.magenta}{dst}{t.normal}".format(
t=terminal, name=rule.name, src=rule.src, dst=rule.dst))
remaining = remaining - IPPairs((rule.src, rule.dst))
if remaining:
flows = ",\n".join("{t.yellow}{src}{t.normal} -> {t.magenta}{dst}{t.normal}".format(
t=terminal, src=p[0], dst=p[1]) for p in remaining)
log.error("No rule found for flows, app {t.bold_cyan}{app}{t.normal}\n{flows}"
.format(t=terminal, app=app, flows=flows))
return False
return True

def allApps(self, src, dst, debug=False):
src = _ipset(src)
dst = _ipset(dst)
log.info("allApps(%r, %r)" % (src, dst))
rv = set()
for rule in itertools.chain(*self.rules.itervalues()):
if not debug and rule.app in rv:
continue
src_matches = (rule.src & src)
if not src_matches:
continue
dst_matches = (rule.dst & dst)
if not dst_matches:
continue
log.info("matched policy {t.cyan}{name}{t.normal} app {t.bold_cyan}{app}{t.normal}\n"
"{t.yellow}{src}{t.normal} -> {t.magenta}{dst}{t.normal}".format(
t=terminal, name=rule.name, src=src_matches, dst=dst_matches, app=rule.app))
rv.add(rule.app)
if '@@other' in rv:
rv = set(['any'])
return rv

def sourcesFor(self, dst, app, ignore_sources=None):
dst = _ipset(dst)
log.info("sourcesFor(%r, %r, ignore_sources=%r)" % (dst, app, ignore_sources))
rv = IPSet()
for rule in self._rules_for_app(app):
if rule.dst & dst:
src = rule.src
if ignore_sources:
src = src - ignore_sources
if src:
log.info("matched policy {t.cyan}{name}{t.normal}\n{t.yellow}{src}{t.normal} "
"-> {t.magenta}{dst}{t.normal}".format(
t=terminal, name=rule.name, src=src, dst=rule.dst & dst))
rv = rv + src
return rv

_cache = {}


def load_source(cfg, source):
"""Load the named source. Sources are cached, so multiple calls with the same name
will not repeatedly re-load the data from disk."""
if source not in _cache:
_cache[source] = Source(cfg[source]['output'])
return _cache[source]


def _clear():
# for tests only
global _cache
_cache = {}

0 comments on commit 98affa7

Please sign in to comment.