Skip to content

Commit

Permalink
Add support for RELATIVE-OID (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
russhousley committed Feb 15, 2024
1 parent 05eac97 commit d17e0e1
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 1 deletion.
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Revision 0.5.2, released xx-xx-2024
---------------------------------------

- Added support for previously missing `RELATIVE-OID` construct

Revision 0.5.1, released 20-11-2023
---------------------------------------

Expand Down
1 change: 1 addition & 0 deletions THANKS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ Alex Gaynor
Geoffrey Thomas
Daniel Bratell
Kim Gräsman
Russ Housley
51 changes: 51 additions & 0 deletions pyasn1/codec/ber/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,56 @@ def valueDecoder(self, substrate, asn1Spec,
yield self._createComponent(asn1Spec, tagSet, oid, **options)


class RelativeOIDPayloadDecoder(AbstractSimplePayloadDecoder):
protoComponent = univ.RelativeOID(())

def valueDecoder(self, substrate, asn1Spec,
tagSet=None, length=None, state=None,
decodeFun=None, substrateFun=None,
**options):
if tagSet[0].tagFormat != tag.tagFormatSimple:
raise error.PyAsn1Error('Simple tag format expected')

for chunk in readFromStream(substrate, length, options):
if isinstance(chunk, SubstrateUnderrunError):
yield chunk

if not chunk:
raise error.PyAsn1Error('Empty substrate')

chunk = octs2ints(chunk)

reloid = ()
index = 0
substrateLen = len(chunk)
while index < substrateLen:
subId = chunk[index]
index += 1
if subId < 128:
reloid += (subId,)
elif subId > 128:
# Construct subid from a number of octets
nextSubId = subId
subId = 0
while nextSubId >= 128:
subId = (subId << 7) + (nextSubId & 0x7F)
if index >= substrateLen:
raise error.SubstrateUnderrunError(
'Short substrate for sub-OID past %s' % (reloid,)
)
nextSubId = chunk[index]
index += 1
reloid += ((subId << 7) + nextSubId,)
elif subId == 128:
# ASN.1 spec forbids leading zeros (0x80) in OID
# encoding, tolerating it opens a vulnerability. See
# https://www.esat.kuleuven.be/cosic/publications/article-1432.pdf
# page 7
raise error.PyAsn1Error('Invalid octet 0x80 in RELATIVE-OID encoding')

yield self._createComponent(asn1Spec, tagSet, reloid, **options)


class RealPayloadDecoder(AbstractSimplePayloadDecoder):
protoComponent = univ.Real()

Expand Down Expand Up @@ -1421,6 +1471,7 @@ class UTCTimePayloadDecoder(OctetStringPayloadDecoder):
univ.OctetString.tagSet: OctetStringPayloadDecoder(),
univ.Null.tagSet: NullPayloadDecoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierPayloadDecoder(),
univ.RelativeOID.tagSet: RelativeOIDPayloadDecoder(),
univ.Enumerated.tagSet: IntegerPayloadDecoder(),
univ.Real.tagSet: RealPayloadDecoder(),
univ.Sequence.tagSet: SequenceOrSequenceOfPayloadDecoder(), # conflicts with SequenceOf
Expand Down
35 changes: 35 additions & 0 deletions pyasn1/codec/ber/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,39 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
return octets, False, False


class RelativeOIDEncoder(AbstractItemEncoder):
supportIndefLenMode = False

def encodeValue(self, value, asn1Spec, encodeFun, **options):
if asn1Spec is not None:
value = asn1Spec.clone(value)

octets = ()

# Cycle through subIds
for subOid in value.asTuple():
if 0 <= subOid <= 127:
# Optimize for the common case
octets += (subOid,)

elif subOid > 127:
# Pack large Sub-Object IDs
res = (subOid & 0x7f,)
subOid >>= 7

while subOid:
res = (0x80 | (subOid & 0x7f),) + res
subOid >>= 7

# Add packed Sub-Object ID to resulted RELATIVE-OID
octets += res

else:
raise error.PyAsn1Error('Negative RELATIVE-OID arc %s at %s' % (subOid, value))

return octets, False, False


class RealEncoder(AbstractItemEncoder):
supportIndefLenMode = False
binEncBase = 2 # set to None to choose encoding base automatically
Expand Down Expand Up @@ -715,6 +748,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
univ.OctetString.tagSet: OctetStringEncoder(),
univ.Null.tagSet: NullEncoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
univ.RelativeOID.tagSet: RelativeOIDEncoder(),
univ.Enumerated.tagSet: IntegerEncoder(),
univ.Real.tagSet: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down Expand Up @@ -747,6 +781,7 @@ def encodeValue(self, value, asn1Spec, encodeFun, **options):
univ.OctetString.typeId: OctetStringEncoder(),
univ.Null.typeId: NullEncoder(),
univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
univ.RelativeOID.typeId: RelativeOIDEncoder(),
univ.Enumerated.typeId: IntegerEncoder(),
univ.Real.typeId: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down
3 changes: 3 additions & 0 deletions pyasn1/codec/native/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
univ.OctetString.tagSet: AbstractScalarPayloadDecoder(),
univ.Null.tagSet: AbstractScalarPayloadDecoder(),
univ.ObjectIdentifier.tagSet: AbstractScalarPayloadDecoder(),
univ.RelativeOID.tagSet: AbstractScalarPayloadDecoder(),
univ.Enumerated.tagSet: AbstractScalarPayloadDecoder(),
univ.Real.tagSet: AbstractScalarPayloadDecoder(),
univ.Sequence.tagSet: SequenceOrSetPayloadDecoder(), # conflicts with SequenceOf
Expand Down Expand Up @@ -103,6 +104,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
univ.OctetString.typeId: AbstractScalarPayloadDecoder(),
univ.Null.typeId: AbstractScalarPayloadDecoder(),
univ.ObjectIdentifier.typeId: AbstractScalarPayloadDecoder(),
univ.RelativeOID.typeId: AbstractScalarPayloadDecoder(),
univ.Enumerated.typeId: AbstractScalarPayloadDecoder(),
univ.Real.typeId: AbstractScalarPayloadDecoder(),
# ambiguous base types
Expand Down Expand Up @@ -130,6 +132,7 @@ def __call__(self, pyObject, asn1Spec, decodeFun=None, **options):
useful.UTCTime.typeId: AbstractScalarPayloadDecoder()
}


# deprecated aliases, https://github.com/pyasn1/pyasn1/issues/9
tagMap = TAG_MAP
typeMap = TYPE_MAP
Expand Down
8 changes: 7 additions & 1 deletion pyasn1/codec/native/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def encode(self, value, encodeFun, **options):
return str(value)


class RelativeOIDEncoder(AbstractItemEncoder):
def encode(self, value, encodeFun, **options):
return str(value)


class RealEncoder(AbstractItemEncoder):
def encode(self, value, encodeFun, **options):
return float(value)
Expand Down Expand Up @@ -111,6 +116,7 @@ def encode(self, value, encodeFun, **options):
univ.OctetString.tagSet: OctetStringEncoder(),
univ.Null.tagSet: NullEncoder(),
univ.ObjectIdentifier.tagSet: ObjectIdentifierEncoder(),
univ.RelativeOID.tagSet: RelativeOIDEncoder(),
univ.Enumerated.tagSet: IntegerEncoder(),
univ.Real.tagSet: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand All @@ -135,7 +141,6 @@ def encode(self, value, encodeFun, **options):
useful.UTCTime.tagSet: OctetStringEncoder()
}


# Put in ambiguous & non-ambiguous types for faster codec lookup
TYPE_MAP = {
univ.Boolean.typeId: BooleanEncoder(),
Expand All @@ -144,6 +149,7 @@ def encode(self, value, encodeFun, **options):
univ.OctetString.typeId: OctetStringEncoder(),
univ.Null.typeId: NullEncoder(),
univ.ObjectIdentifier.typeId: ObjectIdentifierEncoder(),
univ.RelativeOID.typeId: RelativeOIDEncoder(),
univ.Enumerated.typeId: IntegerEncoder(),
univ.Real.typeId: RealEncoder(),
# Sequence & Set have same tags as SequenceOf & SetOf
Expand Down
128 changes: 128 additions & 0 deletions pyasn1/type/univ.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,134 @@ def prettyOut(self, value):
return '.'.join([str(x) for x in value])


class RelativeOID(base.SimpleAsn1Type):
"""Create |ASN.1| schema or value object.
|ASN.1| class is based on :class:`~pyasn1.type.base.SimpleAsn1Type`, its
objects are immutable and duck-type Python :class:`tuple` objects
(tuple of non-negative integers).
Keyword Args
------------
value: :class:`tuple`, :class:`str` or |ASN.1| object
Python sequence of :class:`int` or :class:`str` literal or |ASN.1| object.
If `value` is not given, schema object will be created.
tagSet: :py:class:`~pyasn1.type.tag.TagSet`
Object representing non-default ASN.1 tag(s)
subtypeSpec: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection`
Object representing non-default ASN.1 subtype constraint(s). Constraints
verification for |ASN.1| type occurs automatically on object
instantiation.
Raises
------
~pyasn1.error.ValueConstraintError, ~pyasn1.error.PyAsn1Error
On constraint violation or bad initializer.
Examples
--------
.. code-block:: python
class RelOID(RelativeOID):
'''
ASN.1 specification:
id-pad-null RELATIVE-OID ::= { 0 }
id-pad-once RELATIVE-OID ::= { 5 6 }
id-pad-twice RELATIVE-OID ::= { 5 6 7 }
'''
id_pad_null = RelOID('0')
id_pad_once = RelOID('5.6')
id_pad_twice = id_pad_once + (7,)
"""
#: Set (on class, not on instance) or return a
#: :py:class:`~pyasn1.type.tag.TagSet` object representing ASN.1 tag(s)
#: associated with |ASN.1| type.
tagSet = tag.initTagSet(
tag.Tag(tag.tagClassUniversal, tag.tagFormatSimple, 0x0d)
)

#: Set (on class, not on instance) or return a
#: :py:class:`~pyasn1.type.constraint.ConstraintsIntersection` object
#: imposing constraints on |ASN.1| type initialization values.
subtypeSpec = constraint.ConstraintsIntersection()

# Optimization for faster codec lookup
typeId = base.SimpleAsn1Type.getTypeId()

def __add__(self, other):
return self.clone(self._value + other)

def __radd__(self, other):
return self.clone(other + self._value)

def asTuple(self):
return self._value

# Sequence object protocol

def __len__(self):
return len(self._value)

def __getitem__(self, i):
if i.__class__ is slice:
return self.clone(self._value[i])
else:
return self._value[i]

def __iter__(self):
return iter(self._value)

def __contains__(self, value):
return value in self._value

def index(self, suboid):
return self._value.index(suboid)

def isPrefixOf(self, other):
"""Indicate if this |ASN.1| object is a prefix of other |ASN.1| object.
Parameters
----------
other: |ASN.1| object
|ASN.1| object
Returns
-------
: :class:`bool`
:obj:`True` if this |ASN.1| object is a parent (e.g. prefix) of the other |ASN.1| object
or :obj:`False` otherwise.
"""
l = len(self)
if l <= len(other):
if self._value[:l] == other[:l]:
return True
return False

def prettyIn(self, value):
if isinstance(value, RelativeOID):
return tuple(value)
elif octets.isStringType(value):
if '-' in value:
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)
try:
return tuple([int(subOid) for subOid in value.split('.') if subOid])
except ValueError:
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)

try:
tupleOfInts = tuple([int(subOid) for subOid in value if subOid >= 0])

except (ValueError, TypeError):
raise error.PyAsn1Error(
'Malformed RELATIVE-OID %s at %s: %s' % (value, self.__class__.__name__, sys.exc_info()[1])
)

if len(tupleOfInts) == len(value):
return tupleOfInts

raise error.PyAsn1Error('Malformed RELATIVE-OID %s at %s' % (value, self.__class__.__name__))

def prettyOut(self, value):
return '.'.join([str(x) for x in value])


class Real(base.SimpleAsn1Type):
"""Create |ASN.1| schema or value object.
Expand Down

0 comments on commit d17e0e1

Please sign in to comment.