In [1]:
from pyasn1.type import univ
from pyasn1.type import char
from pyasn1.type import tag
from pyasn1.type import namedtype
from pyasn1.type import constraint
from pyasn1.type import namedval
from pyasn1.codec.ber import encoder
from pyasn1.codec.ber import decoder
from trustmessages import *
from trustmessages.trustdatabase import QtmDb, SLDb
import base64
import itertools
import sys

In [2]:
encode = lambda m: base64.b64encode(encoder.encode(m))
users = itertools.cycle(["a@x.com", "b@x.com", "c@x.com"])
services = itertools.cycle(["buyer", "seller", "letter", "renter"])
quantitative = itertools.cycle(range(1, 6))
qualitative = itertools.cycle(["distrust", "neutral", "trust"])

In [3]:
def get_bytes(e):
    if sys.version_info.major == 3:
        return str([i for i in encoder.encode(e)])
    else:
        return str([ord(i) for i in encoder.encode(e)])

In [4]:
r = Rating()
r["source"] = next(users)
r["target"] = next(users)
r["service"] = next(services)
r["date"] = 100
val = next(quantitative)
r["value"] = encoder.encode(univ.Integer(val))

decoded, _ = decoder.decode(encoder.encode(r), asn1Spec=Rating())
assert(r.prettyPrint() == decoded.prettyPrint())
assert(decoder.decode(decoded["value"], asn1Spec=univ.Integer())[0] == val)
print(encode(r))

b'ZCEWB2FAeC5jb20WB2JAeC5jb20WBWJ1eWVyAgFkBAMCAQE='


In [5]:
r = Rating()
r["source"] = next(users)
r["target"] = next(users)
r["service"] = next(services)
r["date"] = 100
val = next(qualitative)
r["value"] = encoder.encode(char.PrintableString(val))

decoded, _ = decoder.decode(encoder.encode(r), asn1Spec=Rating())
assert(r.prettyPrint() == decoded.prettyPrint())
assert(decoder.decode(decoded["value"], asn1Spec=char.PrintableString())[0] == val)
print(encode(r))

b'ZCkWB2NAeC5jb20WB2FAeC5jb20WBnNlbGxlcgIBZAQKEwhkaXN0cnVzdA=='


In [6]:
a_req = DataRequest()
a_req["rid"] = 1
a_req["type"] = "trust"
sq = Query()
sq["con"] = Constraint()
sq["con"]["operator"] = "ge"
sq["con"]["value"] = Value()
sq["con"]["value"]["date"] = 80
a_req["query"] = sq

m = Message()
m["version"] = 1
m["payload"] = a_req
    
decoded, _ = decoder.decode(encoder.encode(m), asn1Spec=Message())
assert(decoded == m)
assert(decoded["payload"].getComponent().prettyPrint() == m["payload"].prettyPrint())
print(encode(m))

b'MBMCAQFiDgIBAQoBAGUGCgEFQgFQ'


In [7]:
a_res = DataResponse()
a_res["provider"] = "ebay"
a_res["type"] = "trust"
a_res["format"] = Format((1, 1, 1))
a_res["rid"] = 1
a_res["response"] = univ.SequenceOf(componentType=Rating())

for i in range(2):
    a = Rating()
    a["source"] = next(users)
    a["target"] = next(users)
    a["service"] = next(services)
    a["date"] = 1000
    a["value"] = encoder.encode(univ.Integer(5))
    a_res["response"].setComponentByPosition(i, a)

m = Message()
m["version"] = 1
m["payload"] = a_res
    
decoded, _ = decoder.decode(encoder.encode(m), asn1Spec=Message())
assert(decoded == m)
assert(decoded["payload"].getComponent().prettyPrint() == m["payload"].prettyPrint())
print(encode(m))

b'MGECAQFjXAIBAQYCKQEKAQAWBGViYXkwSmQjFgdiQHguY29tFgdjQHguY29tFgZsZXR0ZXICAgPoBAMCAQVkIxYHYUB4LmNvbRYHYkB4LmNvbRYGcmVudGVyAgID6AQDAgEF'


In [8]:
f_req = FormatRequest(100)
data, _ = decoder.decode(encoder.encode(f_req), asn1Spec=FormatRequest())

m = Message()
m["version"] = 1
m["payload"] = f_req
    
decoded, _ = decoder.decode(encoder.encode(m), asn1Spec=Message())
assert(decoded == m)
assert(decoded["payload"].getComponent().prettyPrint() == m["payload"].prettyPrint())
print(encode(m))

b'MAYCAQFAAWQ='


In [9]:
f_res = FormatResponse()
f_res["rid"] = 100
f_res["assessment-id"] = Format((1, 2, 3))
f_res["trust-id"] = Format((1, 2, 3))
f_res["assessment-def"] = char.IA5String("Here be an ASN.1 spec for assessment values")
f_res["trust-def"] = char.IA5String("Here be an ASN.1 spec for trust values")

m = Message()
m["version"] = 1
m["payload"] = f_res
    
decoded, _ = decoder.decode(encoder.encode(m), asn1Spec=Message())
assert(decoded == m)
assert(decoded["payload"].getComponent().prettyPrint() == m["payload"].prettyPrint())
print(encode(m))

b'MGUCAQFhYAIBZAYCKgMWK0hlcmUgYmUgYW4gQVNOLjEgc3BlYyBmb3IgYXNzZXNzbWVudCB2YWx1ZXMGAioDFiZIZXJlIGJlIGFuIEFTTi4xIHNwZWMgZm9yIHRydXN0IHZhbHVlcw=='


In [10]:
e = Fault()
e["rid"] = 10
e["message"] = "something went wrong!"

data, _ = decoder.decode(encoder.encode(e), asn1Spec=Fault())

m = Message()
m["version"] = 1
m["payload"] = e
    
decoded, _ = decoder.decode(encoder.encode(m), asn1Spec=Message())
assert(decoded == m)
assert(decoded["payload"].getComponent().prettyPrint() == m["payload"].prettyPrint())
print(encode(m))

b'MB8CAQFnGgIBChYVc29tZXRoaW5nIHdlbnQgd3Jvbmch'


In [11]:
e1 = Fault()
e1["rid"] = 10
e1["message"] = "a"

e2 = Fault()
e2["rid"] = 10
e2["message"] = "b"

print(get_bytes(e1))
print(get_bytes(e2))

[103, 6, 2, 1, 10, 22, 1, 97]
[103, 6, 2, 1, 10, 22, 1, 98]


In [12]:
# Encode SL
bdu = SLDb().AssessmentClass()
bdu["b"] = 0.1
bdu["d"] = 0.2
bdu["u"] = 0.7

decoded, _ = decoder.decode(encoder.encode(bdu), asn1Spec=SLDb().AssessmentClass())
assert(bdu == decoded)
print(encode(bdu))

b'MBUJBQMxRS0xCQUDMkUtMQkFAzdFLTE='


In [13]:
# encode Data with SL
r = Rating()
r["source"] = next(users)
r["target"] = next(users)
r["service"] = next(services)
r["date"] = 100
r["value"] = encoder.encode(bdu)

decoded, _ = decoder.decode(encoder.encode(r), asn1Spec=Rating())
assert(r == decoded)
print(encode(r))

b'ZDUWB2NAeC5jb20WB2FAeC5jb20WBWJ1eWVyAgFkBBcwFQkFAzFFLTEJBQMyRS0xCQUDN0UtMQ=='


In [14]:
# decode SL from Java
bytes = base64.decodebytes(b"MCEJCYDJDMzMzMzMzQkJgMoMzMzMzMzNCQmAzAszMzMzMzM=")
bdu, _ = decoder.decode(bytes, asn1Spec=SLDb().AssessmentClass())
assert bdu["b"] == 0.1
assert bdu["d"] == 0.2
assert bdu["u"] == 0.7

In [15]:
# decode Java's Data@SL
bytes = base64.decodebytes(b"ZDwWBWFsaWNlFgNib2IWBnNlbGxlcgIBCgQjMCEJCYDJDMzMzMzMzQkJgMoMzMzMzMzNCQmAzAszMzMzMzM=")
a, _ = decoder.decode(bytes, asn1Spec=Rating())
bdu, _ = decoder.decode(a["value"], asn1Spec=SLDb().AssessmentClass())
assert bdu["b"] == 0.1
assert bdu["d"] == 0.2
assert bdu["u"] == 0.7

In [16]:
# TODO: decode Java's DataResponse@SL
bytes = base64.decodebytes(b"MIGoAgEBY4GiAgEBBgIpAQoBARYHc29tZXRtc"
                            b"zCBjGREFgVhbGljZRYLeW91QHlvdS5jb20WBn"
                            b"NlbGxlcgIBCgQjMCEJCYDJDMzMzMzMzQkJgMo"
                            b"MzMzMzMzNCQmAzAszMzMzMzNkRBYFYWxpY2UW"
                            b"C3lvdUB5b3UuY29tFgZzZWxsZXICAQoEIzAhC"
                            b"QmAyQzMzMzMzM0JCYDKDMzMzMzMzQkJgMwLMz"
                            b"MzMzMz")
m, _ = decoder.decode(bytes, asn1Spec=Message())
assert m["version"] == 1
for a in m["payload"]["data-response"]["response"]:
    bdu, _ = decoder.decode(a["value"], asn1Spec=SLDb().AssessmentClass())
    assert bdu["b"] == 0.1
    assert bdu["d"] == 0.2
    assert bdu["u"] == 0.7

In [17]:
# encode QTM
qtm = QtmDb().AssessmentClass("very-good")
decoded, _ = decoder.decode(encoder.encode(qtm), asn1Spec=QtmDb().AssessmentClass())
assert(qtm == decoded)
print(encode(qtm))

b'CgEE'


In [18]:
# encode Data with QTM
r = Rating()
r["source"] = next(users)
r["target"] = next(users)
r["service"] = next(services)
r["date"] = 100
r["value"] = encoder.encode(qtm)

decoded, _ = decoder.decode(encoder.encode(r), asn1Spec=Rating())
assert(r == decoded)
print(encode(r))

b'ZCIWB2JAeC5jb20WB2NAeC5jb20WBnNlbGxlcgIBZAQDCgEE'


In [19]:
# DataResponse with QTM
a_res = DataResponse()
a_res["provider"] = "ebay"
a_res["format"] = Format((1, 1, 1))
a_res["type"] = "assessment"
a_res["rid"] = 1
a_res["response"] = univ.SequenceOf(componentType=Rating())

for i in range(2):
    qtm = QtmDb().AssessmentClass("very-good")
    
    a = Rating()
    a["source"] = next(users)
    a["target"] = next(users)
    a["service"] = next(services)
    a["date"] = 1000
    a["value"] = encoder.encode(qtm)
    a_res["response"].setComponentByPosition(i, a)

data, _ = decoder.decode(encoder.encode(a_res), asn1Spec=DataResponse())
assert(data == a_res)
assert(data.prettyPrint() == a_res.prettyPrint())
print(encode(a_res))

b'Y1wCAQEGAikBCgEBFgRlYmF5MEpkIxYHYUB4LmNvbRYHYkB4LmNvbRYGbGV0dGVyAgID6AQDCgEEZCMWB2NAeC5jb20WB2FAeC5jb20WBnJlbnRlcgICA+gEAwoBBA=='


In [20]:
# AssessmentResponse with SL
a_res = DataResponse()
a_res["provider"] = "ebay"
a_res["format"] = Format((1, 1, 1))
a_res["rid"] = 1
a_res["type"] = "assessment"
a_res["response"] = univ.SequenceOf(componentType=Rating())

for i in range(2):
    bdu = SLDb().AssessmentClass()
    bdu["b"] = 0.1
    bdu["d"] = 0.2
    bdu["u"] = 0.7
    
    a = Rating()
    a["source"] = next(users)
    a["target"] = next(users)
    a["service"] = next(services)
    a["date"] = 1000
    a["value"] = encoder.encode(bdu)
    a_res["response"].setComponentByPosition(i, a)

data, _ = decoder.decode(encoder.encode(a_res), asn1Spec=DataResponse())
assert(data == a_res)
assert(data.prettyPrint() == a_res.prettyPrint())
print(encode(a_res))

b'Y4GDAgEBBgIpAQoBARYEZWJheTBxZDYWB2JAeC5jb20WB2NAeC5jb20WBWJ1eWVyAgID6AQXMBUJBQMxRS0xCQUDMkUtMQkFAzdFLTFkNxYHYUB4LmNvbRYHYkB4LmNvbRYGc2VsbGVyAgID6AQXMBUJBQMxRS0xCQUDMkUtMQkFAzdFLTE='


In [21]:
m = Message()
m["version"] = 1
m["payload"] = a_req

data, _ = decoder.decode(encoder.encode(m), asn1Spec=Message())
assert(data == m)
assert(encode(data) == encode(m))