From ff325a9b9b7bacea49aa56e3bbb115ccf862eff3 Mon Sep 17 00:00:00 2001 From: Christopher Aubut Date: Fri, 18 Oct 2019 17:16:09 -0500 Subject: [PATCH 1/4] Add netfield database functions --- README.rst | 65 +++++++++ netfields/functions.py | 111 +++++++++++++++ test/models.py | 3 +- test/tests/test_functions.py | 258 +++++++++++++++++++++++++++++++++++ 4 files changed, 436 insertions(+), 1 deletion(-) create mode 100644 netfields/functions.py create mode 100644 test/tests/test_functions.py diff --git a/README.rst b/README.rst index c4a7b20..aa9b0eb 100644 --- a/README.rst +++ b/README.rst @@ -113,6 +113,71 @@ http://www.postgresql.org/docs/9.4/interactive/functions-net.html ``__min_prefixlen`` Minimum value (inclusive) for ``CIDR`` prefix, does not distinguish between IPv4 and IPv6 +Database Functions +'''''''''''''''''' + +Postgres network address functions are exposed via the ``netfields.functions`` module. They can be used to extract additional information from these fields or to construct complex queries. + +.. code-block:: python + + from django.db.models import F + + from netfields import CidrAddressField, NetManager + from netfields.functions import Family, Masklen + + class Example(models.Model): + inet = CidrAddressField() + # ... + + ipv4_with_num_ips = ( + Example.objects.annotate( + family=Family(F('inet')), + num_ips=2 ** (32 - Masklen(F('inet'))) # requires Django >2.0 to resolve + ) + .filter(family=4) + ) + +**CidrAddressField and InetAddressField Functions** + ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| Postgres Function | Django Function | Return Type | Description | ++================================+==================+======================+================================================================+ +| abbrev(``T``) | Abbrev | ``TextField`` | abbreviated display format as text | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| broadcast(``T``) | Broadcast | ``InetAddressField`` | broadcast address for network | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| family(``T``) | Family | ``IntegerField`` | extract family of address; 4 for IPv4, 6 for IPv6 | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| host(``T``) | Host | ``TextField`` | extract IP address as text | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| hostmask(``T``) | Hostmask | ``InetAddressField`` | construct host mask for network | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| masklen(``T``) | Masklen | ``IntegerField`` | extract netmask length | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| netmask(``T``) | Netmask | ``InetAddressField`` | construct netmask for network | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| network(``T``) | Network | ``CidrAddressField`` | extract network part of address | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| set_masklen(``T``) | SetMasklen | ``T`` | set netmask length for inet value | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| text(``T``) | AsText | ``TextField`` | extract IP address and netmask length as text | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| inet_same_family(``T``, ``T``) | IsSameFamily | ``BooleanField`` | are the addresses from the same family? | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| inet_merge(``T``, ``T``) | Merge | ``CidrAddressField`` | the smallest network which includes both of the given networks | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ + +**MACAddressField Functions** + ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ +| Postgres Function | Django Function | Return Type | Description | ++================================+==================+======================+================================================================+ +| trunc(``T``) | Trunc | ``T`` | set last 3 bytes to zero | ++--------------------------------+------------------+----------------------+----------------------------------------------------------------+ + +Indexes +''''''' + As of Django 2.2, indexes can be created for ``InetAddressField`` and ``CidrAddressField`` extra lookups directly on the model. .. code-block:: python diff --git a/netfields/functions.py b/netfields/functions.py new file mode 100644 index 0000000..5c1358f --- /dev/null +++ b/netfields/functions.py @@ -0,0 +1,111 @@ +"""Postgres network address functions. + +https://www.postgresql.org/docs/11/functions-net.html +""" + +from django.db.models import BooleanField, Func, IntegerField, TextField + +from .fields import CidrAddressField, InetAddressField + + +class Abbrev(Func): + """Function to abbreviate field as text.""" + + arity = 1 + function = 'ABBREV' + output_field = TextField() + + +class Broadcast(Func): + """Function to extract broadcast address for network.""" + + arity = 1 + function = 'BROADCAST' + output_field = InetAddressField() + + +class Family(Func): + """Function to extract family of address; 4 for IPv4, 6 for IPv6.""" + + arity = 1 + function = 'FAMILY' + output_field = IntegerField() + + +class Host(Func): + """Function to extract IP address as text.""" + + arity = 1 + function = 'HOST' + output_field = TextField() + + +class Hostmask(Func): + """Function to construct host mask for network.""" + + arity = 1 + function = 'HOSTMASK' + output_field = InetAddressField() + + +class Masklen(Func): + """Function to extract netmask length.""" + + arity = 1 + function = 'MASKLEN' + output_field = IntegerField() + + +class Netmask(Func): + """Function to construct netmask for network.""" + + arity = 1 + function = 'NETMASK' + output_field = InetAddressField() + + +class Network(Func): + """Function to extract network part of address.""" + + arity = 1 + function = 'NETWORK' + output_field = CidrAddressField() + + +class SetMasklen(Func): + """Function to set netmask length.""" + + arity = 2 + function = 'SET_MASKLEN' + + +class AsText(Func): + """Function to extract IP address and netmask length as text.""" + + arity = 1 + function = 'TEXT' + output_field = TextField() + + +class IsSameFamily(Func): + """Function to test that addresses are from the same family.""" + + arity = 2 + function = 'INET_SAME_FAMILY' + output_field = BooleanField() + + +class Merge(Func): + """Function to calculate the smallest network which includes both of the given + networks. + """ + + arity = 2 + function = 'INET_MERGE' + output_field = CidrAddressField() + + +class Trunc(Func): + + arity = 1 + function = 'TRUNC' diff --git a/test/models.py b/test/models.py index a6eeb98..a487c15 100644 --- a/test/models.py +++ b/test/models.py @@ -91,7 +91,8 @@ class Meta: class AggregateTestModel(Model): - pass + network = CidrAddressField(blank=True, null=True, default=None) + inet = InetAddressField(blank=True, null=True, default=None) class AggregateTestChildModel(Model): diff --git a/test/tests/test_functions.py b/test/tests/test_functions.py new file mode 100644 index 0000000..9fcf585 --- /dev/null +++ b/test/tests/test_functions.py @@ -0,0 +1,258 @@ +from __future__ import unicode_literals +from django import VERSION +from ipaddress import ip_interface, ip_network +from netaddr import EUI + +from django.db.models import Case, F, When +from django.test import TestCase +from unittest import skipIf + +from netfields.functions import ( + Abbrev, + Broadcast, + Family, + Host, + Hostmask, + Masklen, + Netmask, + Network, + SetMasklen, + AsText, + IsSameFamily, + Merge, + Trunc +) + +from test.models import ( + AggregateTestChildModel, + AggregateTestModel, + CidrTestModel, + InetTestModel, + MACTestModel +) + + +class TestInetFieldFunctions(TestCase): + def setUp(self): + InetTestModel.objects.create(field='10.1.0.1/16') + InetTestModel.objects.create(field='2001:4f8:3:ba::1/64') + + def test_abbreviate(self): + qs = InetTestModel.objects.annotate(abbrv=Abbrev(F('field'))) + self.assertEqual(qs[0].abbrv, '10.1.0.1/16') + self.assertEqual(qs[1].abbrv, '2001:4f8:3:ba::1/64') + + def test_broadcast(self): + qs = InetTestModel.objects.annotate(broadcast=Broadcast(F('field'))) + self.assertEqual(qs[0].broadcast, ip_interface('10.1.255.255/16')) + self.assertEqual(qs[1].broadcast, ip_interface('2001:4f8:3:ba:ffff:ffff:ffff:ffff/64')) + + def test_family(self): + qs = InetTestModel.objects.annotate(family=Family(F('field'))) + self.assertEqual(qs[0].family, 4) + self.assertEqual(qs[1].family, 6) + + def test_host(self): + qs = InetTestModel.objects.annotate(host=Host(F('field'))) + self.assertEqual(qs[0].host, '10.1.0.1') + self.assertEqual(qs[1].host, '2001:4f8:3:ba::1') + + def test_hostmask(self): + qs = InetTestModel.objects.annotate(hostmask=Hostmask(F('field'))) + self.assertEqual(qs[0].hostmask, ip_interface('0.0.255.255')) + self.assertEqual(qs[1].hostmask, ip_interface('::ffff:ffff:ffff:ffff')) + + def test_masklen(self): + qs = InetTestModel.objects.annotate(masklen=Masklen(F('field'))) + self.assertEqual(qs[0].masklen, 16) + self.assertEqual(qs[1].masklen, 64) + + def test_netmask(self): + qs = InetTestModel.objects.annotate(netmask=Netmask(F('field'))) + self.assertEqual(qs[0].netmask, ip_interface('255.255.0.0')) + self.assertEqual(qs[1].netmask, ip_interface('ffff:ffff:ffff:ffff::')) + + def test_network(self): + qs = InetTestModel.objects.annotate(network=Network(F('field'))) + self.assertEqual(qs[0].network, ip_network('10.1.0.0/16')) + self.assertEqual(qs[1].network, ip_network('2001:4f8:3:ba::/64')) + + def test_set_masklen(self): + ( + InetTestModel.objects + .annotate(family=Family(F('field'))) + .update( + field=Case( + When(family=4, then=SetMasklen(F('field'), 24)), + When(family=6, then=SetMasklen(F('field'), 120)) + ) + ) + ) + qs = InetTestModel.objects.all() + self.assertEqual(qs[0].field, ip_interface('10.1.0.1/24')) + self.assertEqual(qs[1].field, ip_interface('2001:4f8:3:ba::1/120')) + + def test_as_text(self): + qs = InetTestModel.objects.annotate(text=AsText(F('field'))) + self.assertEqual(qs[0].text, '10.1.0.1/16') + self.assertEqual(qs[1].text, '2001:4f8:3:ba::1/64') + + def test_is_same_family(self): + parent = AggregateTestModel.objects.create(inet='0.0.0.0/0') + AggregateTestChildModel.objects.create( + parent=parent, inet='10.1.0.1/16', network='10.1.0.0/16' + ) + AggregateTestChildModel.objects.create( + parent=parent, inet='2001:4f8:3:ba::1/64', network='2001:4f8:3:ba::/64' + ) + + qs = ( + AggregateTestChildModel.objects.annotate( + is_same_family=IsSameFamily(F('inet'), F('parent__inet')) + ) + .order_by('id') + ) + self.assertEqual(qs[0].is_same_family, True) + self.assertEqual(qs[1].is_same_family, False) + + def test_merge(self): + parent = AggregateTestModel.objects.create(inet='10.0.0.0/24') + AggregateTestChildModel.objects.create( + parent=parent, inet='10.0.1.0/24', network='10.0.0.0/23' + ) + + parent = AggregateTestModel.objects.create(inet='2001:4f8:3:ba::/64') + AggregateTestChildModel.objects.create( + parent=parent, inet='2001:4f8:3:bb::/64', network='2001:4f8:3:ba::/63' + ) + + qs = ( + AggregateTestChildModel.objects.annotate( + merged=Merge(F('inet'), F('parent__inet')) + ) + ) + self.assertEqual(qs[0].merged, qs[0].network) + self.assertEqual(qs[1].merged, qs[1].network) + + +class TestCidrFieldFunctions(TestCase): + def setUp(self): + CidrTestModel.objects.create(field='10.1.0.0/16') + CidrTestModel.objects.create(field='2001:4f8:3:ba::/64') + + def test_abbreviate(self): + qs = CidrTestModel.objects.annotate(abbrv=Abbrev(F('field'))) + self.assertEqual(qs[0].abbrv, '10.1/16') + self.assertEqual(qs[1].abbrv, '2001:4f8:3:ba/64') + + def test_broadcast(self): + qs = CidrTestModel.objects.annotate(broadcast=Broadcast(F('field'))) + self.assertEqual(qs[0].broadcast, ip_interface('10.1.255.255/16')) + self.assertEqual(qs[1].broadcast, ip_interface('2001:4f8:3:ba:ffff:ffff:ffff:ffff/64')) + + def test_family(self): + qs = CidrTestModel.objects.annotate(family=Family(F('field'))) + self.assertEqual(qs[0].family, 4) + self.assertEqual(qs[1].family, 6) + + def test_host(self): + qs = CidrTestModel.objects.annotate(host=Host(F('field'))) + self.assertEqual(qs[0].host, '10.1.0.0') + self.assertEqual(qs[1].host, '2001:4f8:3:ba::') + + def test_hostmask(self): + qs = CidrTestModel.objects.annotate(hostmask=Hostmask(F('field'))) + self.assertEqual(qs[0].hostmask, ip_interface('0.0.255.255')) + self.assertEqual(qs[1].hostmask, ip_interface('::ffff:ffff:ffff:ffff')) + + def test_masklen(self): + qs = CidrTestModel.objects.annotate(masklen=Masklen(F('field'))) + self.assertEqual(qs[0].masklen, 16) + self.assertEqual(qs[1].masklen, 64) + + def test_netmask(self): + qs = CidrTestModel.objects.annotate(netmask=Netmask(F('field'))) + self.assertEqual(qs[0].netmask, ip_interface('255.255.0.0')) + self.assertEqual(qs[1].netmask, ip_interface('ffff:ffff:ffff:ffff::')) + + def test_network(self): + qs = CidrTestModel.objects.annotate(network=Network(F('field'))) + self.assertEqual(qs[0].network, ip_network('10.1.0.0/16')) + self.assertEqual(qs[1].network, ip_network('2001:4f8:3:ba::/64')) + + def test_set_masklen(self): + ( + CidrTestModel.objects + .annotate(family=Family(F('field'))) + .update( + field=Case( + When(family=4, then=SetMasklen(F('field'), 24)), + When(family=6, then=SetMasklen(F('field'), 120)) + ) + ) + ) + qs = CidrTestModel.objects.all() + self.assertEqual(qs[0].field, ip_network('10.1.0.0/24')) + self.assertEqual(qs[1].field, ip_network('2001:4f8:3:ba::/120')) + + def test_as_text(self): + qs = CidrTestModel.objects.annotate(text=AsText(F('field'))) + self.assertEqual(qs[0].text, '10.1.0.0/16') + self.assertEqual(qs[1].text, '2001:4f8:3:ba::/64') + + def test_is_same_family(self): + parent = AggregateTestModel.objects.create(network='0.0.0.0/0') + AggregateTestChildModel.objects.create( + parent=parent, inet= '10.1.0.1/16', network='10.1.0.0/16' + ) + AggregateTestChildModel.objects.create( + parent=parent, inet='2001:4f8:3:ba::1/64', network='2001:4f8:3:ba::/64' + ) + + qs = ( + AggregateTestChildModel.objects.annotate( + is_same_family=IsSameFamily(F('network'), F('parent__network')) + ) + .order_by('id') + ) + self.assertEqual(qs[0].is_same_family, True) + self.assertEqual(qs[1].is_same_family, False) + + def test_merge(self): + parent = AggregateTestModel.objects.create(network='10.0.0.0/24') + AggregateTestChildModel.objects.create( + parent=parent, inet='10.0.1.0/24', network='10.0.0.0/23' + ) + + parent = AggregateTestModel.objects.create(network='2001:4f8:3:ba::/64') + AggregateTestChildModel.objects.create( + parent=parent, inet='2001:4f8:3:bb::/64', network='2001:4f8:3:ba::/63' + ) + + qs = ( + AggregateTestChildModel.objects.annotate( + merged=Merge(F('network'), F('parent__network')) + ) + ) + self.assertEqual(qs[0].merged, qs[0].network) + self.assertEqual(qs[1].merged, qs[1].network) + + @skipIf(VERSION < (2, 0), 'Django unable to resolve type of num_ips to be IntegerField until 2.0.') + def test_read_me_example(self): + qs = ( + CidrTestModel.objects.annotate( + family=Family(F('field')), + num_ips=2 ** (32 - Masklen(F('field'))), + ) + .filter(family=4) + ) + self.assertEqual(qs[0].num_ips, 65536) + + +class TestMacFieldFunctions(TestCase): + def setUp(self): + MACTestModel.objects.create(field='aa:bb:cc:dd:ee:ff') + + def test_trunc(self): + qs = MACTestModel.objects.annotate(trunc=Trunc(F('field'))) + self.assertEqual(qs[0].trunc, EUI('aa:bb:cc:00:00:00')) From b276a95e1c9c6e77e74301ff068fc269a542c45a Mon Sep 17 00:00:00 2001 From: Christopher Aubut Date: Fri, 8 Nov 2019 22:46:14 -0700 Subject: [PATCH 2/4] Update README.rst Fixed type signature of SetMasklen in readme. --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index aa9b0eb..ac06a36 100644 --- a/README.rst +++ b/README.rst @@ -158,7 +158,7 @@ Postgres network address functions are exposed via the ``netfields.functions`` m +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ | network(``T``) | Network | ``CidrAddressField`` | extract network part of address | +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ -| set_masklen(``T``) | SetMasklen | ``T`` | set netmask length for inet value | +| set_masklen(``T``, int) | SetMasklen | ``T`` | set netmask length for inet value | +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ | text(``T``) | AsText | ``TextField`` | extract IP address and netmask length as text | +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ From d26418ae60822b51ca847d60c78a658e5787468b Mon Sep 17 00:00:00 2001 From: Christopher Aubut Date: Fri, 8 Nov 2019 22:47:41 -0700 Subject: [PATCH 3/4] Update README.rst Fixed shape of readme table --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index ac06a36..91af20a 100644 --- a/README.rst +++ b/README.rst @@ -158,7 +158,7 @@ Postgres network address functions are exposed via the ``netfields.functions`` m +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ | network(``T``) | Network | ``CidrAddressField`` | extract network part of address | +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ -| set_masklen(``T``, int) | SetMasklen | ``T`` | set netmask length for inet value | +| set_masklen(``T``, int) | SetMasklen | ``T`` | set netmask length for inet value | +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ | text(``T``) | AsText | ``TextField`` | extract IP address and netmask length as text | +--------------------------------+------------------+----------------------+----------------------------------------------------------------+ From 197a5d50f050ce62733dab40a7718793d670507c Mon Sep 17 00:00:00 2001 From: Christopher Aubut Date: Fri, 8 Nov 2019 22:59:54 -0700 Subject: [PATCH 4/4] Update README.rst Update reference to new functions. --- README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 91af20a..876fd3c 100644 --- a/README.rst +++ b/README.rst @@ -116,7 +116,7 @@ http://www.postgresql.org/docs/9.4/interactive/functions-net.html Database Functions '''''''''''''''''' -Postgres network address functions are exposed via the ``netfields.functions`` module. They can be used to extract additional information from these fields or to construct complex queries. +`Postgres network address functions `_ are exposed via the ``netfields.functions`` module. They can be used to extract additional information from these fields or to construct complex queries. .. code-block:: python