Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

split _ldap.pyx into coro.asn1.ber and coro.ldap.query

  • Loading branch information...
commit 8926493030f33793383073d62c487eec39edba43 1 parent 32522fb
Sam Rushing authored
View
2  coro/asn1/__init__.py
@@ -0,0 +1,2 @@
+# -*- Mode: Python -*-
+# even empty, this file is needed so cython will see the .pxd
View
52 coro/asn1/ber.pxd
@@ -0,0 +1,52 @@
+# -*- Mode: Cython -*-
+
+# flags for BER tags
+cdef enum FLAGS:
+ FLAGS_UNIVERSAL = 0x00
+ FLAGS_STRUCTURED = 0x20
+ FLAGS_APPLICATION = 0x40
+ FLAGS_CONTEXT = 0x80
+
+# NULL is a pyrex keyword
+# universal BER tags
+cdef enum TAGS:
+ TAGS_BOOLEAN = 0x01
+ TAGS_INTEGER = 0x02
+ TAGS_BITSTRING = 0x03
+ TAGS_OCTET_STRING = 0x04
+ TAGS_NULL = 0x05
+ TAGS_OBJID = 0x06
+ TAGS_OBJDESCRIPTOR = 0x07
+ TAGS_EXTERNAL = 0x08
+ TAGS_REAL = 0x09
+ TAGS_ENUMERATED = 0x0a
+ TAGS_EMBEDDED_PDV = 0x0b
+ TAGS_UTF8STRING = 0x0c
+ TAGS_SEQUENCE = 0x10 | 0x20 # Equivalent to FLAGS_STRUCTURED
+ TAGS_SET = 0x11 | 0x20 # Equivalent to FLAGS_STRUCTURED
+
+cdef int length_of_length (int n)
+cdef void encode_length (int l, int n, char * buffer)
+cdef object _encode_integer (int n)
+cdef object _encode_long_integer (n)
+cdef object _TLV1 (int tag, bytes data)
+cdef object _TLV (int tag, object data)
+cdef object _CHOICE (int n, bint structured)
+cdef object _APPLICATION (int n)
+cdef object _ENUMERATED (int n)
+cdef object _INTEGER (int n)
+cdef object _BOOLEAN (int n)
+cdef object _SEQUENCE (object elems)
+cdef object _SET (object elems)
+cdef object _OCTET_STRING (bytes s)
+cdef object _OBJID (list l)
+cdef object decode_string (unsigned char * s, int * pos, int length)
+cdef object decode_raw (unsigned char * s, int * pos, int length)
+cdef object decode_bitstring (unsigned char * s, int * pos, int length)
+cdef object decode_integer (unsigned char * s, int * pos, int length)
+cdef object decode_long_integer (unsigned char * s, int * pos, int length)
+cdef object decode_structured (unsigned char * s, int * pos, int length)
+cdef object decode_objid (unsigned char * s, int * pos, int length)
+cdef object decode_boolean (unsigned char * s, int * pos, int length)
+cdef int _decode_length (unsigned char * s, int * pos, int lol)
+cdef object _decode (unsigned char * s, int * pos, int eos, bint just_tlv)
View
580 coro/asn1/ber.pyx
@@ -0,0 +1,580 @@
+# -*- Mode: Cython -*-
+# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+
+# [this code originally from _ldap.pyx]
+
+# XXX I'm not happy with 'just_tlv' and the code that uses it - [see
+# x509.py:der_extract()]. I think a better solution would be to
+# change the decoder to include 'location' information in its
+# output. This would probably break existing users of that
+# facility (ldap, anyone else?). The problem is that the decoders
+# were originally written for LDAP, which has no need for access
+# to raw encoded data.
+#
+# A really nice thing to do would be to make the whole thing act
+# more like a *codec* - it'd be great if you could take the output
+# of the decoder and feed it back to the encoder and get the same
+# DER out of it. The current decoder is 'halfway' like this - it
+# doesn't bother with tag info for SEQUENCE, SET, INTEGER,
+# etc... If we had something like this we could describe ASN1 data
+# structures with something close to the ASN1 syntax, and be able
+# to automatically decode and encode those structures via nice
+# class wrappers.
+
+from cpython cimport PyBytes_FromStringAndSize, PyNumber_Long, PyLong_Check
+from libc.string cimport memcpy
+
+import sys
+W = sys.stderr.write
+
+# ================================================================================
+# BER encoders
+# ================================================================================
+
+# based on the table in dumpasn1.c
+TAG_TABLE = {
+ 0x01 : 'BOOLEAN', # 1: Boolean
+ 0x02 : 'INTEGER', # 2: Integer
+ 0x03 : 'BITSTRING', # 2: Bit string
+ 0x04 : 'OCTETSTRING', # 4: Byte string
+ 0x05 : 'NULLTAG', # 5: NULL
+ 0x06 : 'OID', # 6: Object Identifier
+ 0x07 : 'OBJDESCRIPTOR', # 7: Object Descriptor
+ 0x08 : 'EXTERNAL', # 8: External
+ 0x09 : 'REAL', # 9: Real
+ 0x0A : 'ENUMERATED', # 10: Enumerated
+ 0x0B : 'EMBEDDED_PDV', # 11: Embedded Presentation Data Value
+ 0x0C : 'UTF8STRING', # 12: UTF8 string
+ 0x10 : 'SEQUENCE', # 16: Sequence/sequence of
+ 0x11 : 'SET', # 17: Set/set of
+ 0x12 : 'NUMERIC_STRING', # 18: Numeric string
+ 0x13 : 'PRINTABLE_STRING', # 19: Printable string (ASCII subset)
+ 0x14 : 'T61_STRING', # 20: T61/Teletex string
+ 0x15 : 'VIDEOTEX_STRING', # 21: Videotex string
+ 0x16 : 'IA5_STRING', # 22: IA5/ASCII string
+ 0x17 : 'UTC_TIME', # 23: UTC time
+ 0x18 : 'GENERALIZED_TIME', # 24: Generalized time
+ 0x19 : 'GRAPHIC_STRING', # 25: Graphic string
+ 0x1A : 'VISIBLE_STRING', # 26: Visible string (ASCII subset)
+ 0x1B : 'GENERAL_STRING', # 27: General string
+ 0x1C : 'UNIVERSAL_STRING', # 28: Universal string
+ 0x1E : 'BMP_STRING', # 30: Basic Multilingual Plane/Unicode string
+ }
+
+cdef int length_of_length (int n):
+ cdef int r
+ # how long will the BER-encoded length <n> be?
+ if n < 0x80:
+ return 1
+ else:
+ r = 1
+ while n:
+ n = n >> 8
+ r = r + 1
+ return r
+
+cdef void encode_length (int l, int n, char * buffer):
+ # caller must ensure room. see length_of_length above.
+ cdef int i
+ if l < 0x80:
+ buffer[0] = <char> l
+ else:
+ buffer[0] = <char> (0x80 | ((n-1) & 0x7f))
+ for i from 1 <= i < n:
+ buffer[n-i] = <char> (l & 0xff)
+ l = l >> 8
+
+# encode an integer, ASN1 style.
+# two's complement with the minimum number of bytes.
+cdef object _encode_integer (int n):
+ cdef int n0, byte, i
+ # 16 bytes is more than enough for int == int64_t
+ cdef char result[16]
+ i = 0
+ n0 = n
+ byte = 0x80 # for n==0
+ while 1:
+ n = n >> 8
+ if n0 == n:
+ if n == -1 and ((not byte & 0x80) or (i==0)):
+ # negative, but high bit clear
+ result[15-i] = <char> 0xff
+ i = i + 1
+ elif n == 0 and (byte & 0x80):
+ # positive, but high bit set
+ result[15-i] = <char> 0x00
+ i = i + 1
+ break
+ else:
+ byte = n0 & 0xff
+ result[15-i] = <char> byte
+ i = i + 1
+ n0 = n
+ return PyBytes_FromStringAndSize (&result[16-i], i)
+
+# encode an integer, ASN1 style.
+# two's complement with the minimum number of bytes.
+cdef object _encode_long_integer (n):
+ cdef int byte, i, rlen
+ cdef char * rbuf
+ # 1) how many bytes?
+ n0 = n
+ n1 = n
+ rlen = 0
+ while 1:
+ n1 = n1 >> 8
+ if n1 == n0:
+ break
+ else:
+ rlen = rlen + 1
+ n0 = n1
+ if rlen == 0:
+ rlen = 1
+ # 2) create result string
+ result = PyBytes_FromStringAndSize (NULL, rlen)
+ rbuf = result
+ # 3) render result string
+ i = 0
+ n0 = n
+ byte = 0x80 # for n==0
+ while 1:
+ n = n >> 8
+ if n0 == n:
+ if n == -1 and ((not byte & 0x80) or (i==0)):
+ # negative, but high bit clear
+ rbuf[(rlen-1)-i] = <char> 0xff
+ i = i + 1
+ elif n == 0 and byte & 0x80:
+ # positive, but high bit set
+ rbuf[(rlen-1)-i] = <char> 0x00
+ i = i + 1
+ break
+ else:
+ byte = n0 & 0xff
+ rbuf[(rlen-1)-i] = <char> byte
+ i = i + 1
+ n0 = n
+ return result
+
+def encode_long_integer (n):
+ return _encode_long_integer (n)
+
+# this function is at the heart of all ASN output.
+# it returns a <tag, length, value> string.
+
+# _TLV1 (tag, data)
+# <tag> is an ASN1 tag
+# <data> is a single string
+cdef object _TLV1 (int tag, bytes data):
+ # compute length of concatenated data
+ cdef int rlen, i, lol
+ cdef bytes s
+ rlen = len (data)
+ # compute length of length
+ lol = length_of_length (rlen)
+ # create result string
+ result = PyBytes_FromStringAndSize (NULL, 1 + lol + rlen)
+ cdef char * rbuf
+ rbuf = result
+ # render tag
+ rbuf[0] = <char> tag
+ rbuf = rbuf + 1
+ # render length
+ encode_length (rlen, lol, rbuf)
+ rbuf = rbuf + lol
+ # render data
+ memcpy (rbuf, <char *> data, rlen)
+ # return result
+ return result
+
+# _TLV (tag, *data)
+# <data> is a sequence of strings
+# <tag> is an ASN1 tag
+cdef object _TLV (int tag, object data):
+ # compute length of concatenated data
+ cdef int rlen, i, ilen, lol
+ cdef bytes s
+ rlen = 0
+ for s in data:
+ rlen += len(s)
+ # compute length of length
+ lol = length_of_length (rlen)
+ # create result string
+ result = PyBytes_FromStringAndSize (NULL, 1 + lol + rlen)
+ cdef char * rbuf
+ rbuf = result
+ # render tag
+ rbuf[0] = <char> tag
+ rbuf = rbuf + 1
+ # render length
+ encode_length (rlen, lol, rbuf)
+ rbuf = rbuf + lol
+ # render data
+ for s in data:
+ ilen = len(s)
+ memcpy (rbuf, <char *>s, ilen)
+ rbuf = rbuf + ilen
+ # return result
+ return result
+
+cdef object _CHOICE (int n, bint structured):
+ if structured:
+ n = n | <int>FLAGS_STRUCTURED
+ n = n | <int>FLAGS_CONTEXT
+ return n
+
+cdef object _APPLICATION (int n):
+ return n | <int>FLAGS_APPLICATION | <int>FLAGS_STRUCTURED
+
+cdef object _ENUMERATED (int n):
+ return _TLV1 (TAGS_ENUMERATED, _encode_integer (n))
+
+cdef object _INTEGER (int n):
+ return _TLV1 (TAGS_INTEGER, _encode_integer (n))
+
+cdef object _BOOLEAN (int n):
+ if n:
+ n = 0xff
+ else:
+ n = 0x00
+ return _TLV1 (TAGS_BOOLEAN, _encode_integer (n))
+
+cdef object _SEQUENCE (object elems):
+ return _TLV (TAGS_SEQUENCE, elems)
+
+cdef object _SET (object elems):
+ return _TLV (TAGS_SET, elems)
+
+cdef object _OCTET_STRING (bytes s):
+ return _TLV1 (TAGS_OCTET_STRING, s)
+
+cdef object _OBJID (list l):
+ cdef unsigned int i, list_len, one_num, temp_buf_off, temp_buf_len, done
+ cdef unsigned int buf_len, first_two_as_int
+ cdef char temp_buf[5], buf[32]
+
+ if len(l) < 2:
+ raise ValueError, "OBJID arg too short"
+ if l[0] < 2:
+ if l[1] >= 40:
+ raise ValueError, "OBJID arg out of range"
+ elif l[0] == 2:
+ if l[1] > 175:
+ raise ValueError, "OBJID arg out of range"
+ else:
+ raise ValueError, "OBJID arg out of range"
+
+ first_two_as_int = (l[0] * 40) + l[1]
+
+ # buf grows forwards. temp_buf grows backwards and is periodically
+ # emptied (forwards) into buf.
+
+ buf[0] = first_two_as_int
+ buf_len = 1
+
+ list_len = len (l)
+ for i from 2 <= i < list_len:
+ one_num = l[i]
+ temp_buf_off = 5
+ temp_buf_len = 0
+ done = 0
+ while not done:
+ temp_buf_off = temp_buf_off - 1
+ temp_buf_len = temp_buf_len + 1
+ temp_buf[temp_buf_off] = (one_num & 0x7f) | 0x80
+ one_num = one_num >> 7
+ if one_num == 0:
+ done = 1
+ temp_buf[4] = temp_buf[4] & 0x7f
+ if (buf_len + temp_buf_len) > 32:
+ raise ValueError, "OBJID arg too long"
+ memcpy (&buf[buf_len], &temp_buf[temp_buf_off], temp_buf_len)
+ buf_len = buf_len + temp_buf_len
+ result = PyBytes_FromStringAndSize (buf, buf_len)
+ return _TLV1 (TAGS_OBJID, result)
+
+# ================================================================================
+# externally visible python interfaces
+# ================================================================================
+
+def TLV (int tag, *data):
+ return _TLV (tag, data)
+
+def CHOICE (int n, bint structured):
+ return _CHOICE (n, structured)
+
+def APPLICATION (int n):
+ return _APPLICATION (n)
+
+def ENUMERATED (int n):
+ return _ENUMERATED (n)
+
+def INTEGER (n):
+ if PyLong_Check (n):
+ return _TLV (TAGS_INTEGER, _encode_long_integer (n))
+ else:
+ return _INTEGER (n)
+
+def BOOLEAN (int n):
+ return _BOOLEAN (n)
+
+def SEQUENCE (*elems):
+ return _SEQUENCE (elems)
+
+def SET (*elems):
+ return _SET (elems)
+
+def OCTET_STRING (s):
+ return _OCTET_STRING (s)
+
+def OBJID (l):
+ return _OBJID (l)
+
+# ================================================================================
+# BER decoders
+# ================================================================================
+
+class DecodeError (Exception):
+ """An ASN.1 decoding error occurred"""
+ def __str__(self):
+ return 'ASN.1 decoding error'
+
+class InsufficientData (DecodeError):
+ """ASN.1 encoding specifies more data than is available"""
+ def __str__(self):
+ return 'unexpected end of data'
+
+class LengthTooLong (DecodeError):
+ """We do not support ASN.1 data length > 32 bits"""
+ def __str__(self):
+ return 'length too long'
+
+# Note: this codec was originally written for LDAP, but is now used outside of
+# that context. We should consider implementing indefinite lengths.
+class IndefiniteLength (DecodeError):
+ """Quoth RFC2251 5.1: 'only the definite form of length encoding will be used' """
+ def __str__(self):
+ return 'indefinite length'
+
+class MultiByteTag (DecodeError):
+ """multi-byte tags not supported"""
+ def __str__(self):
+ return 'multi-byte tags not supported'
+
+kind_unknown = 'unknown'
+kind_application = 'application'
+kind_context = 'context'
+kind_oid = 'oid'
+kind_bitstring = 'bitstring'
+
+# SAFETY NOTE: it's important for each decoder to correctly handle length == zero.
+
+cdef object decode_string (unsigned char * s, int * pos, int length):
+ # caller guarantees sufficient data in <s>
+ result = PyBytes_FromStringAndSize (<char *> (s+(pos[0])), length)
+ pos[0] = pos[0] + length
+ return result
+
+cdef object decode_raw (unsigned char * s, int * pos, int length):
+ # caller guarantees sufficient data in <s>
+ result = PyBytes_FromStringAndSize (<char *> (s+(pos[0])), length)
+ pos[0] = pos[0] + length
+ return result
+
+cdef object decode_bitstring (unsigned char * s, int * pos, int length):
+ # caller guarantees sufficient data in <s>
+ unused = <int>s[pos[0]]
+ result = PyBytes_FromStringAndSize (<char *> (s+(pos[0]+1)), length-1)
+ pos[0] = pos[0] + length
+ return unused, result
+
+cdef object decode_integer (unsigned char * s, int * pos, int length):
+ cdef int n
+ if length == 0:
+ return 0
+ else:
+ n = s[pos[0]]
+ if n & 0x80:
+ # negative
+ n = n - 0x100
+ length = length - 1
+ while length:
+ pos[0] = pos[0] + 1
+ n = (n << 8) | s[pos[0]]
+ length = length - 1
+ # advance past the last byte
+ pos[0] = pos[0] + 1
+ # this will do the typecast
+ # XXX ensure this handles the full 32-bit signed range
+ return n
+
+# almost identical, but note the cast to long, this generates very different code
+cdef object decode_long_integer (unsigned char * s, int * pos, int length):
+ if length == 0:
+ return 0
+ else:
+ n = s[pos[0]]
+ if n & 0x80:
+ # negative
+ n = n - 0x100
+ # cast to long
+ n = PyNumber_Long (n)
+ length = length - 1
+ while length:
+ pos[0] = pos[0] + 1
+ n = (n << 8) | s[pos[0]]
+ length = length - 1
+ # advance past the last byte
+ pos[0] = pos[0] + 1
+ return n
+
+cdef object decode_structured (unsigned char * s, int * pos, int length):
+ cdef int start, end
+ cdef list result = []
+ start = pos[0]
+ end = start + length
+ if length:
+ while pos[0] < end:
+ #print 'structured: pos=%d end=%d remain=%d result=%r' % (pos[0], end, end - pos[0], result)
+ item = _decode (s, pos, end, 0)
+ result.append (item)
+ return result
+
+cdef object decode_objid (unsigned char * s, int * pos, int length):
+ cdef int i, m, n, hi, lo
+ cdef list r
+ m = s[pos[0]]
+ # first * 40 + second
+ r = [m // 40, m % 40]
+ n = 0
+ pos[0] = pos[0] + 1
+ for i from 1 <= i < length:
+ m = s[pos[0]]
+ hi = m & 0x80
+ lo = m & 0x7f
+ n = (n << 7) | lo
+ if not hi:
+ r.append (n)
+ n = 0
+ pos[0] = pos[0] + 1
+ return r
+
+cdef object decode_boolean (unsigned char * s, int * pos, int length):
+ pos[0] = pos[0] + 1
+ if s[pos[0]-1] == 0xff:
+ return True
+ else:
+ return False
+
+cdef int _decode_length (unsigned char * s, int * pos, int lol):
+ # actually supports only up to 32-bit lengths
+ cdef unsigned int i, n
+ n = 0
+ for i from 0 <= i < lol:
+ n = (n << 8) | s[pos[0]]
+ pos[0] = pos[0] + 1
+ return n
+
+cdef object _decode (unsigned char * s, int * pos, int eos, bint just_tlv):
+ cdef int tag, lol
+ cdef unsigned int length
+ # 1) get tag
+ tag = <int> s[pos[0]]
+ if tag & 0x1f == 0x1f:
+ raise MultiByteTag, pos[0]
+ else:
+ pos[0] = pos[0] + 1
+ # 2) get length
+ if (pos[0]) > eos:
+ # assure at least one byte [valid for length == 0]
+ raise InsufficientData, pos[0]
+ elif s[pos[0]] < 0x80:
+ # one-byte length
+ length = s[pos[0]]
+ pos[0] = pos[0] + 1
+ elif s[pos[0]] == 0x80:
+ raise IndefiniteLength, pos[0]
+ else:
+ # long definite length form, lower 7 bits
+ # give us the number of bytes of length
+ lol = s[pos[0]] & 0x7f
+ pos[0] = pos[0] + 1
+ if lol > 4:
+ # we don't support lengths > 32 bits
+ raise LengthTooLong, pos[0]
+ elif pos[0] + lol > eos:
+ raise InsufficientData, pos[0]
+ else:
+ length = _decode_length (s, pos, lol)
+ #print '_decode(), pos=%d length=%d eos=%d' % (pos[0], length, eos)
+ # 3) get value
+ # assure at least <length> bytes
+ if (<int> length) < 0:
+ # length > 2GB... hmmm... thuggery...
+ raise InsufficientData, pos[0]
+ elif (pos[0] + length) > eos:
+ raise InsufficientData, pos[0]
+ elif just_tlv:
+ return (tag & 0x1f, tag & 0xe0, length)
+ elif tag == TAGS_OCTET_STRING:
+ return decode_string (s, pos, length)
+ elif tag == TAGS_INTEGER:
+ if length > 4:
+ return decode_long_integer (s, pos, length)
+ else:
+ return decode_integer (s, pos, length)
+ elif tag == TAGS_BOOLEAN:
+ return decode_boolean (s, pos, length)
+ elif tag == TAGS_SEQUENCE:
+ return decode_structured (s, pos, length)
+ elif tag == TAGS_SET:
+ return decode_structured (s, pos, length)
+ elif tag == TAGS_ENUMERATED:
+ return decode_integer (s, pos, length)
+ elif tag == TAGS_OBJID:
+ return (kind_oid, decode_objid (s, pos, length))
+ elif tag == TAGS_BITSTRING:
+ return (kind_bitstring, decode_bitstring (s, pos, length))
+ elif tag == TAGS_NULL:
+ return None
+ else:
+ if tag & <int>FLAGS_CONTEXT:
+ kind = kind_context
+ elif tag & <int>FLAGS_APPLICATION:
+ kind = kind_application
+ elif TAG_TABLE.has_key (tag & 0x1f):
+ kind = TAG_TABLE[tag & 0x1f]
+ else:
+ kind = kind_unknown
+ if tag & <int>FLAGS_STRUCTURED:
+ return (kind, tag & 0x1f, decode_structured (s, pos, length))
+ else:
+ return (kind, tag & 0x1f, decode_raw (s, pos, length))
+
+def decode (bytes s, int pos=0, just_tlv=0):
+ return _decode (
+ <unsigned char *> s,
+ &pos,
+ len (s),
+ just_tlv
+ ), pos
+
View
146 coro/asn1/test/t0.py
@@ -0,0 +1,146 @@
+# -*- Mode: Python -*-
+
+from coro.asn1.ber import *
+import unittest
+
+# These are mostly positive test cases, need some negative ones as well.
+# Though - this code *has* been through the protos c06-ldapv3-enc-r1 test suite,
+# but it's a rather large suite (89MB). Consider automating a download of
+# the suite here?
+
+class ber_test_case (unittest.TestCase):
+ pass
+
+class simple_test (ber_test_case):
+ def runTest (self):
+ x = SEQUENCE (
+ SET (INTEGER(34), INTEGER(19), OCTET_STRING('fishing line')),
+ OBJID ([2,3,4,5,6,88]),
+ OCTET_STRING ("spaghetti"),
+ )
+ self.assertEqual (x, '0(1\x14\x02\x01"\x02\x01\x13\x04\x0cfishing line\x06\x05S\x04\x05\x06X\x04\tspaghetti')
+ self.assertEqual (decode (x), ([[34, 19, 'fishing line'], ('oid', [2, 3, 4, 5, 6, 88]), 'spaghetti'], 42))
+
+# www.google.com cert
+google_cert = """-----BEGIN CERTIFICATE-----
+MIIDITCCAoqgAwIBAgIQT52W2WawmStUwpV8tBV9TTANBgkqhkiG9w0BAQUFADBM
+MQswCQYDVQQGEwJaQTElMCMGA1UEChMcVGhhd3RlIENvbnN1bHRpbmcgKFB0eSkg
+THRkLjEWMBQGA1UEAxMNVGhhd3RlIFNHQyBDQTAeFw0xMTEwMjYwMDAwMDBaFw0x
+MzA5MzAyMzU5NTlaMGgxCzAJBgNVBAYTAlVTMRMwEQYDVQQIEwpDYWxpZm9ybmlh
+MRYwFAYDVQQHFA1Nb3VudGFpbiBWaWV3MRMwEQYDVQQKFApHb29nbGUgSW5jMRcw
+FQYDVQQDFA53d3cuZ29vZ2xlLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
+gYEA3rcmQ6aZhc04pxUJuc8PycNVjIjujI0oJyRLKl6g2Bb6YRhLz21ggNM1QDJy
+wI8S2OVOj7my9tkVXlqGMaO6hqpryNlxjMzNJxMenUJdOPanrO/6YvMYgdQkRn8B
+d3zGKokUmbuYOR2oGfs5AER9G5RqeC1prcB6LPrQ2iASmNMCAwEAAaOB5zCB5DAM
+BgNVHRMBAf8EAjAAMDYGA1UdHwQvMC0wK6ApoCeGJWh0dHA6Ly9jcmwudGhhd3Rl
+LmNvbS9UaGF3dGVTR0NDQS5jcmwwKAYDVR0lBCEwHwYIKwYBBQUHAwEGCCsGAQUF
+BwMCBglghkgBhvhCBAEwcgYIKwYBBQUHAQEEZjBkMCIGCCsGAQUFBzABhhZodHRw
+Oi8vb2NzcC50aGF3dGUuY29tMD4GCCsGAQUFBzAChjJodHRwOi8vd3d3LnRoYXd0
+ZS5jb20vcmVwb3NpdG9yeS9UaGF3dGVfU0dDX0NBLmNydDANBgkqhkiG9w0BAQUF
+AAOBgQAhrNWuyjSJWsKrUtKyNGadeqvu5nzVfsJcKLt0AMkQH0IT/GmKHiSgAgDp
+ulvKGQSy068Bsn5fFNum21K5mvMSf3yinDtvmX3qUA12IxL/92ZzKbeVCq3Yi7Le
+IOkKcGQRCMha8X2e7GmlpdWC1ycenlbN0nbVeSv3JUMcafC4+Q==
+-----END CERTIFICATE-----"""
+
+class x509_test (ber_test_case):
+
+ def runTest (self):
+ import base64
+ lines = google_cert.split ('\n')
+ enc = base64.decodestring (''.join (lines[1:-1]))
+ self.assertEqual (
+ decode (enc),
+ ([[('context', 0, [2]),
+ 105827261859531100510423749949966875981L,
+ [('oid', [1, 2, 840, 113549, 1, 1, 5]), None],
+ [[[('oid', [2, 5, 4, 6]), ('PRINTABLE_STRING', 19, 'ZA')]],
+ [[('oid', [2, 5, 4, 10]),
+ ('PRINTABLE_STRING', 19, 'Thawte Consulting (Pty) Ltd.')]],
+ [[('oid', [2, 5, 4, 3]), ('PRINTABLE_STRING', 19, 'Thawte SGC CA')]]],
+ [('UTC_TIME', 23, '111026000000Z'), ('UTC_TIME', 23, '130930235959Z')],
+ [[[('oid', [2, 5, 4, 6]), ('PRINTABLE_STRING', 19, 'US')]],
+ [[('oid', [2, 5, 4, 8]), ('PRINTABLE_STRING', 19, 'California')]],
+ [[('oid', [2, 5, 4, 7]), ('T61_STRING', 20, 'Mountain View')]],
+ [[('oid', [2, 5, 4, 10]), ('T61_STRING', 20, 'Google Inc')]],
+ [[('oid', [2, 5, 4, 3]), ('T61_STRING', 20, 'www.google.com')]]],
+ [[('oid', [1, 2, 840, 113549, 1, 1, 1]), None],
+ ('bitstring',
+ (0,
+ "0\x81\x89\x02\x81\x81\x00\xde\xb7&C\xa6\x99\x85\xcd8\xa7\x15\t\xb9\xcf\x0f"
+ "\xc9\xc3U\x8c\x88\xee\x8c\x8d('$K*^\xa0\xd8\x16\xfaa\x18K\xcfm`\x80\xd35@2r"
+ "\xc0\x8f\x12\xd8\xe5N\x8f\xb9\xb2\xf6\xd9\x15^Z\x861\xa3\xba\x86\xaak\xc8\xd9"
+ "q\x8c\xcc\xcd'\x13\x1e\x9dB]8\xf6\xa7\xac\xef\xfab\xf3\x18\x81\xd4$F\x7f\x01w|"
+ "\xc6*\x89\x14\x99\xbb\x989\x1d\xa8\x19\xfb9\x00D}\x1b\x94jx-i\xad\xc0z,\xfa\xd0"
+ "\xda \x12\x98\xd3\x02\x03\x01\x00\x01"))],
+ ('context',
+ 3,
+ [[[('oid', [2, 5, 29, 19]), True, '0\x00'],
+ [('oid', [2, 5, 29, 31]),
+ "0-0+\xa0)\xa0'\x86%http://crl.thawte.com/ThawteSGCCA.crl"],
+ [('oid', [2, 5, 29, 37]),
+ '0\x1f\x06\x08+\x06\x01\x05\x05\x07\x03\x01\x06\x08+\x06\x01\x05\x05\x07\x03'
+ '\x02\x06\t`\x86H\x01\x86\xf8B\x04\x01'],
+ [('oid', [1, 3, 6, 1, 5, 5, 7, 1, 1]),
+ '0d0"\x06\x08+\x06\x01\x05\x05\x070\x01\x86\x16http://ocsp.thawte.com0>\x06'
+ '\x08+\x06\x01\x05\x05\x070\x02\x862http://www.thawte.com/repository/Thawte_SGC_CA.crt']]])],
+ [('oid', [1, 2, 840, 113549, 1, 1, 5]), None],
+ ('bitstring',
+ (0,
+ "!\xac\xd5\xae\xca4\x89Z\xc2\xabR\xd2\xb24f\x9dz\xab\xee\xe6|\xd5~\xc2\\("
+ "\xbbt\x00\xc9\x10\x1fB\x13\xfci\x8a\x1e$\xa0\x02\x00\xe9\xba[\xca\x19\x04"
+ "\xb2\xd3\xaf\x01\xb2~_\x14\xdb\xa6\xdbR\xb9\x9a\xf3\x12\x7f|\xa2\x9c;o\x99"
+ "}\xeaP\rv#\x12\xff\xf7fs)\xb7\x95\n\xad\xd8\x8b\xb2\xde \xe9\npd\x11\x08"
+ "\xc8Z\xf1}\x9e\xeci\xa5\xa5\xd5\x82\xd7'\x1e\x9eV\xcd\xd2v\xd5y+\xf7%C\x1c"
+ "i\xf0\xb8\xf9"))],
+ 805)
+ )
+ dec, length = decode (enc)
+ public_key = dec[0][6][1][1][1]
+ self.assertEqual (
+ decode (public_key),
+ ([156396091895984667473837837332877995558144703880815901117439532534031286131520903863087599986938779606924811933611903716377206837300122262900786662124968110191717844999183338594373129421417536020806373385428322642107305024162536996222164292639147591878860587271770855626780464602884552232097424473091745159379L, 65537], 140)
+ )
+
+class bignum_test (ber_test_case):
+
+ def runTest (self):
+ self.assertEquals (
+ decode ('\x02\x82\x04\xe3\x01' + '\x00' * 1250),
+ (1<<10000, 1255)
+ )
+ self.assertEquals (
+ INTEGER (1<<10000),
+ '\x02\x82\x04\xe3\x01' + '\x00' * 1250,
+ )
+
+class bignum_test_2 (ber_test_case):
+
+ def runTest (self):
+ for i in range (5):
+ n = 1 << (10 ** i)
+ self.assertEquals (
+ decode (INTEGER (n))[0],
+ n
+ )
+
+class bignum_test_3 (ber_test_case):
+
+ def runTest (self):
+ import random
+ n = 1
+ for x in range (10000):
+ n = n * 10 + random.randint (0, 10)
+ print n
+ self.assertEquals (decode (INTEGER (n))[0], n)
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest (simple_test())
+ suite.addTest (x509_test())
+ suite.addTest (bignum_test())
+ suite.addTest (bignum_test_2())
+ suite.addTest (bignum_test_3())
+ return suite
+
+if __name__ == '__main__':
+ unittest.main (defaultTest='suite')
View
401 coro/ldap/client.py
@@ -0,0 +1,401 @@
+# -*- Mode: Python -*-
+# Copyright (c) 2002-2011 IronPort Systems and Cisco Systems
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy
+# of this software and associated documentation files (the "Software"), to deal
+# in the Software without restriction, including without limitation the rights
+# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+# copies of the Software, and to permit persons to whom the Software is
+# furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+# SOFTWARE.
+
+# pull in visible bits of the low-level pyrex module
+import coro
+from coro.asn1.ber import *
+from coro.ldap.query import *
+import re
+
+W = coro.write_stderr
+
+re_dn = re.compile(r'\s*([,=])\s*')
+re_dn_attr = re.compile(r'^([^,]+)(=[^,]+)(,.*)?$')
+
+class ProtocolError (Exception):
+ """An LDAP Protocol Error occurred"""
+ pass
+
+class Exit_Recv_Thread (Exception):
+ "oob signal the ldap client recv thread to exit"
+ pass
+
+class LDAP:
+ BindRequest = 0
+ BindResponse = 1
+ UnbindRequest = 2
+ SearchRequest = 3
+ SearchResultEntry = 4
+ SearchResultDone = 5
+ SearchResultReference = 19 # <--- NOT IN SEQUENCE
+ ModifyRequest = 6
+ ModifyResponse = 7
+ AddRequest = 8
+ AddResponse = 9
+ DelRequest = 10
+ DelResponse = 11
+ ModifyDNRequest = 12
+ ModifyDNResponse = 13
+ CompareRequest = 14
+ CompareResponse = 15
+ AbandonRequest = 16
+ ExtendedRequest = 23 # <--- NOT IN SEQUENCE
+ ExtendedResponse = 24
+
+class SCOPE:
+ BASE = 0
+ ONELEVEL = 1
+ SUBTREE = 2
+
+class DEREF:
+ NEVER = 0
+ SEARCHING = 1
+ FINDING = 2
+ ALWAYS = 3
+
+def encode_search_request (
+ base_object,
+ scope,
+ deref_aliases,
+ size_limit,
+ time_limit,
+ types_only,
+ filter,
+ which_attrs=None,
+ compatibility={}
+ ):
+ if scope is None:
+ scope = compatibility.get('scope', SCOPE.SUBTREE)
+ if which_attrs is None:
+ which_attrs = SEQUENCE()
+ elif len(which_attrs) == 0:
+ # Per section 4.5.1 of rfc 2251, if you really mean the empty
+ # list, you can't pass the empty list because the empty list means
+ # something else. You need to pass a list consisting of the OID 1.1,
+ # which really (see sections 4.1.2, 4.1.4, and 4.1.5) isn't an OID
+ # at all. Except some servers (Exchange 5.5) require something
+ # different here, hence the lookup in the compatibility dict.
+ which_attrs = SEQUENCE (
+ OCTET_STRING (compatibility.get ('no_attr_attr', '1.1'))
+ )
+ else:
+ which_attrs = SEQUENCE (*[OCTET_STRING (x) for x in which_attrs])
+ return TLV (
+ APPLICATION (LDAP.SearchRequest),
+ OCTET_STRING (base_object),
+ ENUMERATED (scope),
+ ENUMERATED (deref_aliases),
+ INTEGER (size_limit),
+ INTEGER (time_limit),
+ BOOLEAN (types_only),
+ parse_query (filter),
+ which_attrs,
+ )
+
+class AUTH:
+ # 1 and 2 are reserved
+ simple = 0x00
+ sasl = 0x03
+
+class RESULT:
+ success = 0
+ operationsError = 1
+ protocolError = 2
+ timeLimitExceeded = 3
+ sizeLimitExceeded = 4
+ compareFalse = 5
+ compareTrue = 6
+ authMethodNotSupported = 7
+ strongAuthRequired = 8
+ referral = 10
+ adminLimitExceeded = 11
+ unavailableCriticalExtension = 12
+ confidentialityRequired = 13
+ saslBindInProgress = 14
+ noSuchAttribute = 16
+ undefinedAttributeType = 17
+ inappropriateMatching = 18
+ constraintViolation = 19
+ attributeOrValueExists = 20
+ invalidAttributeSyntax = 21
+ noSuchObject = 32
+ aliasProblem = 33
+ invalidDNSyntax = 34
+ aliasDereferencingProblem = 36
+ inappropriateAuthentication = 48
+ invalidCredentials = 49
+ insufficientAccessRights = 50
+ busy = 51
+ unavailable = 52
+ unwillingToPerform = 53
+ loopDetect = 54
+ namingViolation = 64
+ objectClassViolation = 65
+ notAllowedOnNonLeaf = 66
+ notAllowedOnRDN = 67
+ entryAlreadyExists = 68
+ objectClassModsProhibited = 69
+ affectsMultipleDSAs = 71
+ other = 80
+
+class Error (Exception):
+
+ def __init__ (self, answer):
+ Exception.__init__ (self)
+ self.code = answer[0]
+ self.answer = answer
+ self.error_string = result_string (answer[0])
+
+ def __str__ (self):
+ if len(self.answer) == 3:
+ # We know how to parse it if it's length 3. Second element is
+ # the "got DN", and third element is the error message. See
+ # section 4 of RFC 1777.
+
+ if self.answer[2]:
+ parenthesize_got_dn = 1
+ err_msg = " %r" % (self.answer[2],)
+ else:
+ parenthesize_got_dn = 0
+ err_msg = ""
+
+ if self.answer[1]:
+ err_msg += " "
+ if parenthesize_got_dn:
+ err_msg += "("
+ err_msg += "Failed after successfully matching partial DN: %r" \
+ % (self.answer[1],)
+ if parenthesize_got_dn:
+ err_msg += ")"
+ else:
+ err_msg = " %r" % (self.answer,)
+
+ return '<LDAP Error "%s" [0x%x]%s>' % (self.error_string, self.code,
+ err_msg)
+ __repr__ = __str__
+
+RESULT._reverse_map = r = {}
+for attr in dir(RESULT):
+ value = getattr (RESULT, attr)
+ if (type(value) == type(0)):
+ r[value] = attr
+
+def result_string (result):
+ try:
+ return RESULT._reverse_map[result]
+ except KeyError:
+ return "unknown error %r" % (result,)
+
+def encode_bind_request (version, name, auth_data):
+ assert (1 <= version <= 127)
+ return TLV (
+ APPLICATION (LDAP.BindRequest),
+ INTEGER (version),
+ OCTET_STRING (name),
+ auth_data
+ )
+
+def encode_simple_bind (version, name, login):
+ return encode_bind_request (
+ version,
+ name,
+ TLV (
+ CHOICE (AUTH.simple, 0),
+ login
+ )
+ )
+
+def encode_sasl_bind (version, name, mechanism, credentials=''):
+ if credentials:
+ cred = OCTET_STRING (credentials)
+ else:
+ cred = ''
+ return encode_bind_request (
+ version,
+ name,
+ TLV (
+ CHOICE (AUTH.sasl),
+ OCTET_STRING (mechanism),
+ cred
+ )
+ )
+
+def encode_starttls ():
+ # encode STARTTLS request: RFC 2830, 2.1
+ return TLV (
+ APPLICATION (LDAP.ExtendedRequest),
+ TLV (CHOICE (0, 0), '1.3.6.1.4.1.1466.20037')
+ )
+
+class client:
+
+ # Note: default port is 389
+ def __init__ (self, addr):
+ self.msgid = 1
+ self.addr = addr
+ if isinstance (addr, tuple):
+ self.sock = coro.tcp_sock()
+ else:
+ self.sock = coro.unix_sock()
+ self.sock.connect (addr)
+ self.pending = {}
+ self.recv_thread_ob = coro.spawn (self.recv_thread)
+
+ def recv_exact (self, size):
+ try:
+ return self.sock.recv_exact (size)
+ except AttributeError:
+ # tlslite has no recv_exact
+ left = size
+ r = []
+ while left:
+ block = self.sock.recv (left)
+ if not block:
+ break
+ else:
+ r.append (block)
+ left -= len (block)
+ return ''.join (r)
+
+ # XXX the ironport code had a simple buffering layer here, might want
+ # to reinstate that...
+ def _recv_packet (self):
+ # All received packets must be BER SEQUENCE. We can tell from
+ # the header how much data we need to complete the packet.
+ # ensure we have the sequence header - I'm inlining the (type,
+ # length) detection here to get good buffering behavior
+ tl = self.recv_exact (2)
+ if not tl:
+ return [None, None]
+ tag = tl[0]
+ if tag != '0': # SEQUENCE | STRUCTURED
+ raise ProtocolError ('bad tag byte: %r' % (tag,))
+ l = ord (tl[1])
+ p = [tl]
+ if l & 0x80:
+ # <l> tells us how many bytes of actual length
+ ll = l & 0x7f
+ len_bytes = self.recv_exact (ll)
+ p.append (len_bytes)
+ # fetch length
+ n = 0
+ for i in xrange (ll):
+ n = (n << 8) | ord(len_bytes[i])
+ if (n < 0) or (n > 1000000):
+ # let's be reasonable, folks
+ raise ProtocolError ('invalid packet length: %d' % (n,))
+ need = n
+ else:
+ # <l> is the length of the sequence
+ need = l
+ # fetch the rest of the packet...
+ p.append (self.recv_exact (need))
+ packet = ''.join (p)
+ reply, plen = decode (packet)
+ return reply
+
+ def recv_thread (self):
+ while not self.exit_recv_thread:
+ [msgid, reply] = self._recv_packet()
+ if msgid is None:
+ break
+ else:
+ probe = self.pending.get (msgid, None)
+ if probe is None:
+ raise ProtocolError ('unknown message id in reply: %d' % (msgid,))
+ else:
+ probe.schedule (reply)
+
+ default_timeout = 10
+
+ def send_message (self, msg):
+ msgid = self.msgid
+ self.msgid += 1
+ self.sock.send (SEQUENCE (INTEGER (msgid), msg))
+ try:
+ self.pending[msgid] = me = coro.current()
+ reply = coro.with_timeout (self.default_timeout, me._yield)
+ return reply
+ finally:
+ del self.pending[msgid]
+
+ # server replies NO:
+ #starttls decoded=[1, ('application', 24, [2, '', 'unsupported extended operation'])]
+ # server replies YES:
+ #starttls decoded=[1, ('application', 24, [0, '', ''])]
+
+ exit_recv_thread = False
+
+ def starttls (self, *future_cert_params):
+ import tlslite
+ self.exit_recv_thread = True
+ reply = self.send_message (encode_starttls())
+ if reply[2] == 0:
+ conn = tlslite.TLSConnection (self.sock)
+ # does ldap allow client-cert authentication?
+ conn.handshakeClientCert()
+ self.osock = self.sock
+ self.sock = conn
+ # restart recv thread (maybe) with TLS socket wrapper
+ self.exit_recv_thread = False
+ self.recv_thread_ob = coro.spawn (self.recv_thread)
+ return reply
+
+ ldap_protocol_version = 3
+ def simple_bind (self, name, login):
+ return self.send_message (encode_simple_bind (self.ldap_protocol_version, name, login))
+
+ def sasl_bind (self, name, mechanism, credentials):
+ return self.send_message (encode_sasl_bind (self.ldap_protocol_version, name, mechanism, credentials))
+
+def t0():
+ sample = encode_message (
+ 3141,
+ encode_search_request (
+ 'dc=nightmare,dc=com',
+ SCOPE.SUBTREE,
+ DEREF.NEVER,
+ 0,
+ 0,
+ 0,
+ '(&(objectclass=inetorgperson)(userid=srushing))',
+ #'(&(objectclass=inetorgperson)(userid=newton))',
+ # ask for these specific attributes only
+ ['mailAlternateAddress', 'rfc822ForwardingMailbox']
+ )
+ )
+
+ import pprint
+ import socket
+ s = socket.socket (socket.AF_INET, socket.SOCK_STREAM)
+ s.connect (('127.0.0.1', 389))
+ s.send (sample)
+ pprint.pprint (decode (s.recv (8192)))
+
+def t1():
+ c = client (('127.0.0.1', 389))
+ c.bind_simple (3, 'cn=manager,dc=nightmare,dc=com', 'fnord')
+ return c
+
+if __name__ == '__main__':
+ import coro.backdoor
+ coro.spawn (coro.backdoor.serve, unix_path='/tmp/ldap.bd')
+ coro.event_loop()
View
330 coro/ldap/query.pyx
@@ -0,0 +1,330 @@
+# -*- Mode: Cython -*-
+
+from cpython cimport PyBytes_FromStringAndSize
+from coro.asn1.ber cimport *
+
+# ================================================================================
+# ldap search filter language parser
+# ================================================================================
+
+# this is not yet complete. see rfc2254
+
+class QuerySyntaxError (Exception):
+ """Error parsing rfc2254 query filter"""
+ def __str__(self):
+ if (len(self.args) == 2) \
+ and isinstance(self.args[0], str) \
+ and isinstance(self.args[1], int) \
+ and (self.args[1] >= 0) \
+ and (self.args[1] < len(self.args[0])):
+ return 'LDAP Query Syntax Error: Invalid character \'%c\' at ' \
+ 'position %d of query "%s"' \
+ % (self.args[0][self.args[1]], self.args[1], self.args[0])
+ else:
+ return 'LDAP Query Syntax Error: %s' % Exception.__str__(self)
+
+cdef enum:
+ SCOPE_BASE = 0
+ SCOPE_ONELEVEL = 1
+ SCOPE_SUBTREE = 2
+
+cdef enum:
+ DEREF_NEVER = 0
+ DEREF_SEARCHING = 1
+ DEREF_FINDING = 2
+ DEREF_ALWAYS = 3
+
+cdef enum:
+ FILTER_AND = 0
+ FILTER_OR = 1
+ FILTER_NOT = 2
+ FILTER_EQUALITY_MATCH = 3
+ FILTER_SUBSTRINGS = 4
+ FILTER_GREATER_OR_EQUAL = 5
+ FILTER_LESS_OR_EQUAL = 6
+ FILTER_PRESENT = 7
+ FILTER_APPROX_MATCH = 8
+ FILTER_EXTENSIBLE_MATCH = 9
+
+cdef enum:
+ SUBSTRING_INITIAL = 0
+ SUBSTRING_ANY = 1
+ SUBSTRING_FINAL = 2
+
+def parse_query (s, pos=0):
+ expression, pos = parse_expression (s, pos, 0)
+ return expression
+
+cdef parse_expression (bytes x, int pos, int depth):
+ cdef char * s = x
+ cdef char kind
+ cdef list expressions
+ cdef bytes value
+ cdef bint is_substring
+ if s[pos] != c'(':
+ raise QuerySyntaxError, (x, pos)
+ elif depth > 50:
+ raise QuerySyntaxError, "expression too complex"
+ else:
+ # skip the open-paren
+ pos = pos + 1
+ # is this a logical expression or a comparison?
+ if s[pos] == c'|' or s[pos] == c'&' or s[pos] == c'!':
+ # logical
+ kind = s[pos]
+ expressions = []
+ pos = pos + 1
+ while s[pos] != c')':
+ expression, pos = parse_expression (x, pos, depth+1)
+ expressions.append (expression)
+ if kind == c'|':
+ return _TLV (_CHOICE (FILTER_OR, 1), expressions), pos + 1
+ elif kind == c'&':
+ return _TLV (_CHOICE (FILTER_AND, 1), expressions), pos + 1
+ elif kind == c'!':
+ return _TLV (_CHOICE (FILTER_NOT, 1), expressions[:1]), pos + 1
+ else:
+ # comparison
+ attr, is_substring, pos = parse_name (x, pos)
+ operator, pos = parse_operator (x, pos)
+ value, is_substring, pos = parse_value (x, pos)
+ attr = unescape (attr)
+ # we don't unescape <value> yet, because we might need
+ # some escaped splat chars to make it through parse_substring()
+ # [where the pieces will be unescaped individually]
+ if is_substring:
+ if value == '*' and operator == FILTER_EQUALITY_MATCH:
+ # (tag=*)
+ return _TLV (
+ _CHOICE (FILTER_PRESENT, 0), # unstructured
+ (attr,) # tag implied by CHOICE
+ ), pos + 1
+ elif operator == FILTER_EQUALITY_MATCH:
+ # (tag=sub*strin*g*)
+ return _TLV (
+ _CHOICE (FILTER_SUBSTRINGS, 1), (
+ _OCTET_STRING (attr),
+ _SEQUENCE (parse_substring (value, 0, len (value)))
+ )
+ ), pos + 1
+ else:
+ raise QuerySyntaxError, "invalid wildcard syntax"
+ else:
+ return _TLV (
+ _CHOICE (operator, 1), (
+ _OCTET_STRING (attr),
+ _OCTET_STRING (unescape (value)),
+ )
+ ), pos + 1
+
+cdef parse_operator (bytes x, int pos):
+ cdef char * s = x
+ cdef int slen = len (x)
+ if (pos + 2) >= slen:
+ raise QuerySyntaxError, (s, pos)
+ elif s[pos] == c'=':
+ return FILTER_EQUALITY_MATCH, pos + 1
+ elif s[pos] == c'~' and s[pos+1] == c'=':
+ return FILTER_APPROX_MATCH, pos + 2
+ elif s[pos] == c'<' and s[pos+1] == c'=':
+ return FILTER_LESS_OR_EQUAL, pos + 2
+ elif s[pos] == c'>' and s[pos+1] == c'=':
+ return FILTER_GREATER_OR_EQUAL, pos + 2
+ else:
+ raise QuerySyntaxError, (x, pos)
+
+# [initial]*any*any*any*[final]
+
+cdef object parse_substring (char * s, int pos, int slen):
+ # assumes the presence of at least one splat
+ cdef int i, start
+ cdef list result = []
+ start = 0
+ i = 0
+ while 1:
+ if i == slen:
+ if start != i:
+ # final
+ result.append (
+ _TLV (_CHOICE (SUBSTRING_FINAL, 0), (unescape (s[start:]),))
+ )
+ return result
+ elif s[i] == c'*':
+ if start == 0:
+ if i > 0:
+ # initial
+ result.append (
+ _TLV (_CHOICE (SUBSTRING_INITIAL, 0), (unescape (s[0:i]),))
+ )
+ else:
+ # any
+ result.append (
+ _TLV (_CHOICE (SUBSTRING_ANY, 0), (unescape (s[start:i]),))
+ )
+ # next bit will start *after* the splat
+ start = i + 1
+ i = i + 1
+ else:
+ i = i + 1
+
+def ue (s):
+ return unescape (s)
+
+# # another possibility would be to access the 'characters'
+# # array in stringobject.c directly. [it's static, though]
+# cdef bytes char (int ch):
+# if (ch < 0) or (ch >= 256):
+# raise ValueError, "chr() arg not in range (256)"
+# else:
+# return <char>ch
+
+cdef int name_punc_table[256]
+cdef int i
+
+for i from 0 <= i < 256:
+ if chr (i) in '()=<>~':
+ name_punc_table[i] = 1
+ else:
+ name_punc_table[i] = 0
+
+cdef object parse_name (bytes x, int pos):
+ cdef int slen, is_substring, rpos, start
+ cdef unsigned char * s
+ s = <unsigned char *>x
+ slen = len (x)
+ rpos = 0
+ start = pos
+ if name_punc_table[s[pos]]:
+ raise QuerySyntaxError, (x, pos)
+ else:
+ is_substring = 0
+ # we expect names to be delimited by an operator or a close-paren
+ while pos < slen:
+ if not name_punc_table[s[pos]]:
+ if s[pos] == c'*':
+ is_substring = 1
+ rpos = rpos + 1
+ if rpos == 4096:
+ raise QuerySyntaxError, (x, pos)
+ pos = pos + 1
+ else:
+ return PyBytes_FromStringAndSize (<char *>(s + start), rpos), is_substring, pos
+ else:
+ raise QuerySyntaxError, (x, pos)
+
+cdef object parse_value (bytes x, int pos):
+ cdef int slen, is_substring, rpos, start
+ cdef unsigned char * s
+ s = <unsigned char *>x
+ slen = len (x)
+ rpos = 0
+ start = pos
+ is_substring = 0
+ # we expect values to be delimited by a close-paren
+ while pos < slen:
+ if s[pos] != c')':
+ if s[pos] == c'*':
+ is_substring = 1
+ rpos = rpos + 1
+ if rpos == 4096:
+ raise QuerySyntaxError, (x, pos)
+ pos = pos + 1
+ else:
+ return PyBytes_FromStringAndSize (<char *>(s + start), rpos), is_substring, pos
+ else:
+ raise QuerySyntaxError, (x, pos)
+
+cdef object unescape (bytes x):
+ cdef int rpos, flag, pos
+ cdef char * s = x
+ cdef int slen = len (x)
+ cdef char buffer[4096]
+ cdef char ch
+ pos = 0
+ rpos = 0
+ flag = 0
+ while pos < slen:
+ if s[pos] == c'\\':
+ flag = 1
+ pos = pos + 1
+ ch, pos = parse_hex_escape (s, pos, slen)
+ else:
+ ch = s[pos]
+ pos = pos + 1
+ buffer[rpos] = ch
+ rpos = rpos + 1
+ if rpos == 4096:
+ raise QuerySyntaxError, (x, pos)
+ if flag:
+ # return a new, unescaped string
+ return PyBytes_FromStringAndSize (buffer, rpos)
+ else:
+ # return the original string
+ return x
+
+cdef int parse_hex_digit (int ch):
+ if (ch >= 48 and ch <= 57):
+ return (ch - 48)
+ elif (ch >= 97 and ch <= 102):
+ return (ch - 97) + 10
+ elif (ch >= 65 and ch <= 70):
+ return (ch - 65) + 10
+ else:
+ return -1
+
+cdef object parse_hex_escape (char * s, int pos, int len):
+ cdef char ch, result
+ if pos + 2 > len:
+ raise QuerySyntaxError, (s, pos)
+ else:
+ ch = parse_hex_digit (s[pos])
+ if ch == -1:
+ raise QuerySyntaxError, (s, pos)
+ else:
+ result = ch << 4
+ pos = pos + 1
+ ch = parse_hex_digit (s[pos])
+ if ch == -1:
+ raise QuerySyntaxError, (s, pos)
+ else:
+ result = result | ch
+ pos = pos + 1
+ return result, pos
+
+cdef int escape_table[256]
+for i from 0 <= i < 256:
+ if chr (i) in '\\()=<>~*':
+ escape_table[i] = 1
+ else:
+ escape_table[i] = 0
+
+cdef bytes hex_digits = b"0123456789abcdef"
+
+# 525486/sec
+def query_escape (bytes s):
+ cdef int slen, rlen, i, j
+ cdef unsigned char ch
+ cdef char * sbuf, * rbuf
+ sbuf = s
+ slen = len (s)
+ rlen = slen
+ # compute length of result
+ for i from 0 <= i < slen:
+ if escape_table[<unsigned char>sbuf[i]]:
+ rlen = rlen + 2
+ # create result string
+ r = PyBytes_FromStringAndSize (NULL, rlen)
+ rbuf = r
+ # fill result string
+ j = 0
+ for i from 0 <= i < slen:
+ ch = sbuf[i]
+ if escape_table[ch]:
+ rbuf[j+0] = <char> 92
+ rbuf[j+1] = <char> hex_digits[ch >> 4]
+ rbuf[j+2] = <char> hex_digits[ch & 0xf]
+ j = j + 3
+ else:
+ rbuf[j] = ch
+ j = j + 1
+ return r
View
77 coro/ldap/test/t0.py
@@ -0,0 +1,77 @@
+# -*- Mode: Python -*-
+
+import unittest
+import sys
+from coro.asn1.ber import *
+from coro.ldap.query import *
+
+C = 'context'
+
+pq_tests = [
+ # simple equality
+ ('(xxx=yyy)',
+ ((C, 3, ['xxx', 'yyy']),
+ 12)),
+ # simple expression, plus 'present'
+ ('(|(xx=y)(zz=*))',
+ ((C, 1, [(C, 3, ['xx', 'y']), (C, 7, 'zz')]),
+ 15)),
+ # nary expressions
+ ('(|(a=b)(b=c)(c=d)(e=f)(f=g)(h=i))',
+ ((C, 1, [(C, 3, ['a', 'b']), (C, 3, ['b', 'c']), (C, 3, ['c', 'd']), (C, 3, ['e', 'f']), (C, 3, ['f', 'g']), (C, 3, ['h', 'i'])]),
+ 50)),
+ ('(|(!(a=*))(&(b=c)(d=e))(x<=y))',
+ ((C, 1, [(C, 2, [(C, 7, 'a')]), (C, 0, [(C, 3, ['b', 'c']), (C, 3, ['d', 'e'])]), (C, 6, ['x', 'y'])]),
+ 33)),
+ # approximate match
+ ('(zz~=yy)', ((C, 8, ['zz', 'yy']), 10)),
+ # substring
+ ('(a=ins*tiga*tor)', ((C, 4, ['a', [(C, 0, 'ins'), (C, 1, 'tiga'), (C, 2, 'tor')]]), 23)),
+ ('(a=*y)', ((C, 4, ['a', [(C, 2, 'y')]]), 10)),
+ ('(a=y*)', ((C, 4, ['a', [(C, 0, 'y')]]), 10)),
+ ('(a=*y*)', ((C, 4, ['a', [(C, 1, 'y')]]), 10)),
+ ('(a=*x*y)', ((C, 4, ['a', [(C, 1, 'x'), (C, 2, 'y')]]), 13)),
+ ('(a=*x*y*)', ((C, 4, ['a', [(C, 1, 'x'), (C, 1, 'y')]]), 13)),
+ ('(a=*x*y*z)', ((C, 4, ['a', [(C, 1, 'x'), (C, 1, 'y'), (C, 2, 'z')]]), 16)),
+ # syntax errors
+ ('(a=', QuerySyntaxError),
+ ('(a<b)', QuerySyntaxError),
+ # good hex escape
+ ('(a=some\\AAthing)',((C, 3, ['a', 'some\252thing']), 17)),
+ # bad hex escape
+ ('(a=some\\AZthing)', QuerySyntaxError),
+ # upper/lower case hex escape
+ ('(a=xy\\Aaz)', ((C, 3, ['a', 'xy\252z']), 11)),
+ # escaped splat
+ ('(a=x*y\\2az)', ((C, 4, ['a', [(C, 0, 'x'), (C, 2, 'y*z')]]), 15)),
+ # illegal splat
+ ('(a~=sam*son)', QuerySyntaxError),
+ # junk/illegal
+ ('junk', QuerySyntaxError),
+ # lots of parens
+ (('('*100), QuerySyntaxError),
+ # expression too complex
+ (('(!' * 55) + '(x=y)' + (')' * 55), QuerySyntaxError),
+ # expression not too complex
+ (('(!' * 10) + '(x=y)' + (')' * 10),
+ ((C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 2, [(C, 3, ['x', 'y'])])])])])])])])])])]),
+ 28)),
+ ]
+
+class parse_query_test (unittest.TestCase):
+ def runTest (self):
+ for q, e in pq_tests:
+ try:
+ self.assertEqual (decode (parse_query (q)), e)
+ except AssertionError:
+ raise
+ except:
+ self.assertEqual (sys.exc_info()[0], e)
+
+def suite():
+ suite = unittest.TestSuite()
+ suite.addTest (parse_query_test())
+ return suite
+
+if __name__ == '__main__':
+ unittest.main (defaultTest='suite')
Please sign in to comment.
Something went wrong with that request. Please try again.