diff --git a/src/alvarium/annotators/exceptions.py b/src/alvarium/annotators/exceptions.py index 7f8299a..d195d6a 100644 --- a/src/alvarium/annotators/exceptions.py +++ b/src/alvarium/annotators/exceptions.py @@ -1,2 +1,2 @@ class AnnotatorException(Exception): - """A general exception type to be used by the annotators""" + """A general exception type to be used by the annotators""" \ No newline at end of file diff --git a/src/alvarium/annotators/factories.py b/src/alvarium/annotators/factories.py index 7e522b8..e2f94a8 100644 --- a/src/alvarium/annotators/factories.py +++ b/src/alvarium/annotators/factories.py @@ -7,6 +7,7 @@ from .pki import PkiAnnotator from .source import SourceAnnotator from .tls import TlsAnnotator +from .pki_http import HttpPkiAnnotator class AnnotatorFactory(): """A factory that provides multiple implementations of the Annotator interface""" @@ -23,6 +24,7 @@ def get_annotator(self, kind: AnnotationType, sdk_info: SdkInfo) -> Annotator: return TlsAnnotator(hash=sdk_info.hash.type, signature=sdk_info.signature) elif kind == AnnotationType.PKI: return PkiAnnotator(hash=sdk_info.hash.type, sign_info=sdk_info.signature) + elif kind == AnnotationType.PKIHTTP: + return HttpPkiAnnotator(hash=sdk_info.hash.type, sign_info=sdk_info.signature) else: raise AnnotatorException("Annotator type is not supported") - diff --git a/src/alvarium/annotators/handler/__init__.py b/src/alvarium/annotators/handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/alvarium/annotators/handler/contracts.py b/src/alvarium/annotators/handler/contracts.py new file mode 100644 index 0000000..ad39bb7 --- /dev/null +++ b/src/alvarium/annotators/handler/contracts.py @@ -0,0 +1,30 @@ +from dataclasses import dataclass +from enum import Enum + +HTTP_REQUEST_KEY = "HttpRequestKey" +CONTENT_TYPE = "Content-Type" +CONTENT_LENGTH = "Content-Length" +class DerivedComponent(Enum): + Method = "@method" + TargetURI = "@target-uri" + Authority = "@authority" + Scheme = "@scheme" + Path = "@path" + Query = "@query" + QueryParams = "@query-params" + + def __str__(self) -> str: + return f'{self.value}' +@dataclass +class parseResult: + """A data class that holds the parsed data""" + + seed: str + signature: str + keyid: str + algorithm: str + + def __eq__(self, __o: object) -> bool: + if not isinstance(__o, parseResult): + return NotImplemented + return self.seed == __o.seed and self.signature == __o.signature and self.keyid == __o.keyid and self.algorithm == __o.algorithm \ No newline at end of file diff --git a/src/alvarium/annotators/handler/ed25519.py b/src/alvarium/annotators/handler/ed25519.py new file mode 100644 index 0000000..0ebd65d --- /dev/null +++ b/src/alvarium/annotators/handler/ed25519.py @@ -0,0 +1,40 @@ +import datetime +from typing import List + +from requests import Request +from alvarium.annotators.handler.interfaces import RequestHandler + +from alvarium.sign.contracts import SignInfo +from alvarium.sign.ed25519 import Ed25519SignProvider +from io import StringIO +from .utils import parseSignature + + +class Ed25519RequestHandler(RequestHandler): + + def __init__(self, request: Request) -> None: + self.request = request + + def AddSignatureHeaders(self, ticks: datetime, fields: List[str], keys: SignInfo) -> None: + headerValue = StringIO() + + for i in range(len(fields)): + headerValue.write(f'"{str(fields[i])}"') + if i < len(fields) - 1: + headerValue.write(f' ') + + headerValue.write(f';created={str(int(ticks.timestamp()))};keyid="{str(keys.public.path)}";alg="{str(keys.public.type)}";') + + self.request.headers['Signature-Input'] = headerValue.getvalue() + + parsed = parseSignature(r=self.request) + inputValue = bytes(parsed.seed, 'utf-8') + p = Ed25519SignProvider() + + with open(keys.private.path, 'r') as file: + prv_hex = file.read() + prv = bytes.fromhex(prv_hex) + + signature = p.sign(key=prv, content=inputValue) + + self.request.headers['Signature'] = str(signature) diff --git a/src/alvarium/annotators/handler/exceptions.py b/src/alvarium/annotators/handler/exceptions.py new file mode 100644 index 0000000..289049a --- /dev/null +++ b/src/alvarium/annotators/handler/exceptions.py @@ -0,0 +1,5 @@ +class ParserException(Exception): + """A general exception type to be used by the parser""" + +class RequestHandlerException(Exception): + """A general exception type to be used by the request handler""" diff --git a/src/alvarium/annotators/handler/factories.py b/src/alvarium/annotators/handler/factories.py new file mode 100644 index 0000000..e9a0e5a --- /dev/null +++ b/src/alvarium/annotators/handler/factories.py @@ -0,0 +1,13 @@ +from requests import Request +from alvarium.annotators.handler.ed25519 import Ed25519RequestHandler +from alvarium.sign.contracts import SignInfo, SignType +from .interfaces import RequestHandler +from .exceptions import RequestHandlerException + +class RequestHandlerFactory(): + + def getRequestHandler(self, request: Request, keys: SignInfo) -> RequestHandler: + if keys.private.type == SignType.ED25519: + return Ed25519RequestHandler(request=request) + else: + raise RequestHandlerException("Key type is not supported") diff --git a/src/alvarium/annotators/handler/interfaces.py b/src/alvarium/annotators/handler/interfaces.py new file mode 100644 index 0000000..4654d48 --- /dev/null +++ b/src/alvarium/annotators/handler/interfaces.py @@ -0,0 +1,10 @@ +from abc import ABC, abstractmethod +import datetime +from typing import List +from alvarium.sign.contracts import SignInfo + +class RequestHandler(ABC): + + @abstractmethod + def AddSignatureHeaders(self, ticks: datetime, fields: List[str], keys: SignInfo) -> None: + pass \ No newline at end of file diff --git a/src/alvarium/annotators/handler/utils.py b/src/alvarium/annotators/handler/utils.py new file mode 100644 index 0000000..89603c6 --- /dev/null +++ b/src/alvarium/annotators/handler/utils.py @@ -0,0 +1,93 @@ +from requests import Request, structures +from .exceptions import ParserException +from urllib.parse import urlparse +from .contracts import parseResult, DerivedComponent +from io import StringIO + +def parseSignature(r: Request) -> parseResult: + + # Making the request headers case insensitive + headers = structures.CaseInsensitiveDict(r.headers) + + # Signature Inputs Extraction + signatureInput = headers.get("Signature-Input") + try: + signature = headers.get("Signature") + if signature == None: + signature = "" + except KeyError: + signature = "" + + signatureInputList = signatureInput.split(";",1) + signatureInputHeader = signatureInputList[0].split(" ") + signatureInputTail = signatureInputList[1] + + signatureInputParsedTail = signatureInputTail.split(";") + + algorithm = "" + keyid = "" + for s in signatureInputParsedTail: + if "alg" in s: + raw = s.split("=")[1] + algorithm = raw[1:len(raw)-1] + if "keyid" in s: + raw = s.split("=")[1] + keyid = raw[1:len(raw)-1] + + parsed_url = urlparse(r.url) + + signatureInputFields = {} + signatureInputBody = StringIO() + + for field in signatureInputHeader: + # Remove double quotes from the field to access it directly in the header map + key = field[1 : len(field)-1] + if key[0] == "@": + if DerivedComponent(key) == DerivedComponent.Method: + signatureInputFields[key] = [r.method] + elif DerivedComponent(key) == DerivedComponent.TargetURI: + signatureInputFields[key] = [r.url] + elif DerivedComponent(key) == DerivedComponent.Authority: + signatureInputFields[key] = [parsed_url.netloc] + elif DerivedComponent(key) == DerivedComponent.Scheme: + signatureInputFields[key] = [parsed_url.scheme] + elif DerivedComponent(key) == DerivedComponent.Path: + signatureInputFields[key] = [parsed_url.path] + elif DerivedComponent(key) == DerivedComponent.Query: + signatureInputFields[key] = ["?"+parsed_url.query] + elif DerivedComponent(key) == DerivedComponent.QueryParams: + queryParams = [] + rawQueryParams = parsed_url.query.split("&") + for rawQueryParam in rawQueryParams: + if rawQueryParam != "": + parameter = rawQueryParam.split("=") + name = parameter[0] + value = parameter[1] + queryParam = f';name="{name}": {value}' + queryParams.append(queryParam) + signatureInputFields[key] = queryParams + else: + raise ParserException(f"Unhandled Derived Component {key}") + else: + try: + # Multi-value headers are not permitted in Python + fieldValues = headers.get(key) + # Removing leading and trailing whitespaces + signatureInputFields[key] = [fieldValues.strip()] + except KeyError: + raise ParserException(f"Header field not found {key}") + + # Construct final output string + keyValues = signatureInputFields[key] + if len(keyValues) == 1: + signatureInputBody.write(f'"{key}" {keyValues[0]}\n') + else: + for value in keyValues: + signatureInputBody.write(f'"{key}"{value}\n') + + parsedSignatureInput = f"{signatureInputBody.getvalue()};{signatureInputTail}" + s = parseResult(seed=parsedSignatureInput, signature=signature, keyid=keyid, algorithm=algorithm) + + return s + + diff --git a/src/alvarium/annotators/interfaces.py b/src/alvarium/annotators/interfaces.py index 5113129..b3be188 100644 --- a/src/alvarium/annotators/interfaces.py +++ b/src/alvarium/annotators/interfaces.py @@ -8,4 +8,4 @@ class Annotator(ABC): @abstractmethod def execute(self, data:bytes, ctx: PropertyBag = None) -> Annotation: - pass \ No newline at end of file + pass diff --git a/src/alvarium/annotators/pki_http.py b/src/alvarium/annotators/pki_http.py new file mode 100644 index 0000000..14480c9 --- /dev/null +++ b/src/alvarium/annotators/pki_http.py @@ -0,0 +1,87 @@ +import socket +import os +from alvarium.annotators.handler.contracts import HTTP_REQUEST_KEY +from alvarium.annotators.handler.exceptions import ParserException + +from alvarium.sign.exceptions import SignException +from alvarium.sign.factories import SignProviderFactory +from alvarium.contracts.annotation import Annotation, AnnotationType +from alvarium.hash.contracts import HashType +from alvarium.sign.contracts import KeyInfo, SignInfo, KeyInfo, SignType +from alvarium.utils import PropertyBag +from .contracts import Signable +from .utils import derive_hash, sign_annotation +from .interfaces import Annotator +from .exceptions import AnnotatorException +from alvarium.annotators.handler.utils import parseSignature + +class HttpPkiAnnotator(Annotator): + def __init__(self, hash: HashType, sign_info: SignInfo) -> None: + self.hash = hash + self.sign_info = sign_info + self.kind = AnnotationType.PKIHTTP + + def execute(self, data: bytes, ctx: PropertyBag = None) -> Annotation: + key = derive_hash(hash=self.hash, data=data) + host: str = socket.gethostname() + + # Call parser on request + req = ctx.get_property(key=HTTP_REQUEST_KEY) + + try: + parsed_data = parseSignature(r=req) + except ParserException as e: + raise AnnotatorException("Cannot parse the HTTP request.", e) + + signable = Signable(seed=parsed_data.seed, signature=parsed_data.signature) + + try: + signType = SignType(parsed_data.algorithm) + except Exception as e: + raise AnnotatorException("Invalid key type specified" + str(parsed_data.algorithm),e) + + k = KeyInfo(signType, parsed_data.keyid) + + try: + is_satisfied = self._verify_signature(key=k, signable=signable) + except Exception as e: + raise AnnotatorException(str(e),e) + + annotation = Annotation(key=key, host=host, hash=self.hash, kind=self.kind, is_satisfied=is_satisfied) + + signature: str = sign_annotation(key_info=self.sign_info.private, annotation=annotation) + annotation.signature = signature + return annotation + + def _verify_signature(self, key: KeyInfo, signable: Signable) -> bool: + """ Responsible for verifying the signature, returns true if the verification passed, false otherwise.""" + + if(len(signable.signature) == 0): + return False + + try: + sign_provider = SignProviderFactory().get_provider(sign_type=key.type) + except SignException as e: + raise AnnotatorException("cannot get sign provider.", e) + + if(not os.path.isfile(key.path)): + raise AnnotatorException("Cannot read Public Key File.") + + with open(key.path, 'r') as file: + pub_key = file.read() + + try: + hex_pub_key = bytes.fromhex(pub_key) + except Exception as e: + raise AnnotatorException("Cannot read Public Key File.",e) + + try: + hex_signature = bytes.fromhex(signable.signature) + except Exception as e: + raise AnnotatorException("Invalid signature syntax: It is not in hex.",e) + + + + return sign_provider.verify(key=hex_pub_key, + content=bytes(signable.seed, 'utf-8'), + signed=hex_signature) \ No newline at end of file diff --git a/src/alvarium/contracts/annotation.py b/src/alvarium/contracts/annotation.py index 39bb286..8d2c9cb 100644 --- a/src/alvarium/contracts/annotation.py +++ b/src/alvarium/contracts/annotation.py @@ -10,6 +10,7 @@ class AnnotationType(Enum): TPM = "tpm" PKI = "pki" + PKIHTTP = "pki-http" TLS = "tls" SOURCE = "src" MOCK = "mock" diff --git a/src/alvarium/sign/ed25519.py b/src/alvarium/sign/ed25519.py index 73f7263..69f7a99 100644 --- a/src/alvarium/sign/ed25519.py +++ b/src/alvarium/sign/ed25519.py @@ -4,7 +4,7 @@ from cryptography.hazmat.primitives.asymmetric import ed25519 -class Ed25519ignProvider(SignProvider): +class Ed25519SignProvider(SignProvider): """The implementation of the Ed25519 sign provider interface""" def sign(self, key: bytes, content: bytes) -> str: diff --git a/src/alvarium/sign/factories.py b/src/alvarium/sign/factories.py index e90ada0..277efd1 100644 --- a/src/alvarium/sign/factories.py +++ b/src/alvarium/sign/factories.py @@ -2,7 +2,7 @@ from .contracts import SignType from .exceptions import SignException from .mock import NoneSignProvider -from .ed25519 import Ed25519ignProvider +from .ed25519 import Ed25519SignProvider class SignProviderFactory: @@ -15,6 +15,6 @@ def get_provider(self, sign_type: SignType) -> SignProvider: if sign_type == SignType.NONE: return NoneSignProvider() if sign_type == SignType.ED25519: - return Ed25519ignProvider() + return Ed25519SignProvider() else: raise SignException(f'{sign_type} is not implemented yet') diff --git a/tests/annotators/handler/__init__.py b/tests/annotators/handler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/annotators/handler/test_ed25519.py b/tests/annotators/handler/test_ed25519.py new file mode 100644 index 0000000..e9e4399 --- /dev/null +++ b/tests/annotators/handler/test_ed25519.py @@ -0,0 +1,41 @@ +import json +import unittest +import datetime + +from requests import Request + +from alvarium.annotators.handler.contracts import CONTENT_LENGTH, CONTENT_TYPE, DerivedComponent +from alvarium.annotators.handler.factories import RequestHandlerFactory +from alvarium.sign.contracts import SignInfo, KeyInfo + +class TestHandler(unittest.TestCase): + + def test_handler_should_return_correct_signature_headers(self): + with open("./tests/mock-info.json", 'r') as file: + b = file.read() + + ticks = datetime.datetime.now() + url = 'http://example.com/foo?var1=&var2=2' + headers = { "Date": str(ticks), + 'Content-Type': 'application/json', + 'Content-Length':'10'} + + req = Request(method='POST', url=url, headers=headers) + + info_json = json.loads(b) + keys = SignInfo(public = KeyInfo.from_json(json.dumps(info_json["signature"]["public"])), + private = KeyInfo.from_json(json.dumps(info_json["signature"]["private"]))) + + fields = [DerivedComponent.Method, DerivedComponent.Path, DerivedComponent.Authority, CONTENT_TYPE, CONTENT_LENGTH] + + handler = RequestHandlerFactory().getRequestHandler(request=req,keys=keys) + handler.AddSignatureHeaders(ticks=ticks, fields=fields, keys=keys) + + result = handler.request.headers['Signature-Input'] + expected = f'"@method" "@path" "@authority" "Content-Type" "Content-Length";created={str(int(ticks.timestamp()))};keyid="{str(keys.public.path)}";alg="{str(keys.public.type)}";' + + self.assertEqual(expected, result) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/annotators/handler/test_utils.py b/tests/annotators/handler/test_utils.py new file mode 100644 index 0000000..4af207a --- /dev/null +++ b/tests/annotators/handler/test_utils.py @@ -0,0 +1,128 @@ +import json +import unittest +from requests import Request +from alvarium.annotators.handler.utils import parseSignature +from alvarium.annotators.handler.contracts import parseResult + +class TestUtils(unittest.TestCase): + + def __init__(self, *args, **kwargs) -> None: + super(TestUtils, self).__init__( *args, **kwargs) + payload = {'KeyA': 'This is some test data'} + headers = {'Content-Type': 'application/json', + "Date":"Tue, 20 Apr 2021 02:07:55 GMT", + 'Content-Length':'18'} + + #The URL has to be a absolute + url = 'http://example.com/foo?var1=&var2=2' + + data=json.dumps(payload) + + self.req = Request(method='POST',url=url,headers=headers,json=data) + + def test_Parser_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"date\" \"@method\" \"@path\" \"@authority\" \"content-type\" \"content-length\" \"@query-params\" \"@query\";created=1644758607;keyid=\"public.key\";alg=\"ed25519\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"date\" Tue, 20 Apr 2021 02:07:55 GMT\n\"@method\" POST\n\"@path\" /foo\n\"@authority\" example.com\n\"content-type\" application/json\n\"content-length\" 18\n\"@query-params\";name=\"var1\": \n\"@query-params\";name=\"var2\": 2\n\"@query\" ?var1=&var2=2\n;created=1644758607;keyid=\"public.key\";alg=\"ed25519\";" + expectedAlg = "ed25519" + expectedKeyId = "public.key" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + + def test_Method_Derived_Component_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"@method\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"@method\" POST\n;" + expectedAlg = "" + expectedKeyId = "" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + + def test_Target_URI_Derived_Component_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"@target-uri\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"@target-uri\" http://example.com/foo?var1=&var2=2\n;" + expectedAlg = "" + expectedKeyId = "" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + + def test_Authority_Derived_Component_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"@authority\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"@authority\" example.com\n;" + expectedAlg = "" + expectedKeyId = "" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + + def test_Scheme_Derived_Component_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"@scheme\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"@scheme\" http\n;" + expectedAlg = "" + expectedKeyId = "" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + + def test_Path_Derived_Component_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"@path\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"@path\" /foo\n;" + expectedAlg = "" + expectedKeyId = "" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + + def test_Query_Derived_Component_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"@query\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"@query\" ?var1=&var2=2\n;" + expectedAlg = "" + expectedKeyId = "" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + + def test_Query_Params_Derived_Component_Should_Return_ParseResult(self): + + self.req.headers['Signature-Input'] = "\"@query-params\";" + parsed = parseSignature(r=self.req) + + expectedSeed = "\"@query-params\";name=\"var1\": \n\"@query-params\";name=\"var2\": 2\n;" + expectedAlg = "" + expectedKeyId = "" + + expectedResult = parseResult(seed=expectedSeed, signature="", keyid=expectedKeyId, algorithm=expectedAlg) + + self.assertTrue(parsed.__eq__(expectedResult)) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/annotators/test_annotator.py b/tests/annotators/test_annotator.py index ea035a3..c9aad4b 100644 --- a/tests/annotators/test_annotator.py +++ b/tests/annotators/test_annotator.py @@ -7,7 +7,7 @@ from alvarium.annotators.factories import AnnotatorFactory from alvarium.utils import ImmutablePropertyBag -class AnnotatorTest(unittest.TestCase): +class TestAnnotator(unittest.TestCase): def test_mock_Annotator_Should_Return_Annotation(self): key_info = KeyInfo(type=SignType.NONE,path="path") diff --git a/tests/annotators/test_pki_http.py b/tests/annotators/test_pki_http.py new file mode 100644 index 0000000..5135519 --- /dev/null +++ b/tests/annotators/test_pki_http.py @@ -0,0 +1,120 @@ +import unittest +import json +import datetime + +from requests import Request + +from alvarium.annotators.factories import AnnotatorFactory +from alvarium.annotators.handler.factories import RequestHandlerFactory +from alvarium.contracts.annotation import Annotation, AnnotationType +from alvarium.contracts.config import SdkInfo +from alvarium.hash.contracts import HashInfo, HashType +from alvarium.sign.contracts import KeyInfo, SignInfo, SignType +from alvarium.annotators.handler.contracts import CONTENT_LENGTH, CONTENT_TYPE, HTTP_REQUEST_KEY, DerivedComponent +from alvarium.utils import ImmutablePropertyBag +from alvarium.annotators.exceptions import AnnotatorException + + +class TestPkiHttp(unittest.TestCase): + + def __init__(self, *args, **kwargs): + super(TestPkiHttp, self).__init__( *args, **kwargs) + + kind = AnnotationType.PKIHTTP + hash = HashType.SHA256 + pub_key = KeyInfo(type=SignType.ED25519, path="./tests/sign/keys/public.key") + priv_key = KeyInfo(type=SignType.ED25519, path="./tests/sign/keys/private.key") + sdk_info = SdkInfo(annotators=[], hash=HashInfo(type=hash), stream=None, + signature=SignInfo(public=pub_key, private=priv_key)) + self.annotator = AnnotatorFactory().get_annotator(kind=kind, sdk_info=sdk_info) + self.request = self.buildRequest(keys=sdk_info.signature) + + def test_httppki_execute_valid_test(self): + + ctx = ImmutablePropertyBag({HTTP_REQUEST_KEY: self.request}) + + annotation = self.annotator.execute(data=bytes(self.request.json, 'utf-8'), ctx=ctx) + + self.assertTrue(annotation.is_satisfied) + self.assertEqual(type(annotation), Annotation) + + + def test_httppki_execute_invalid_algorithm_test(self): + + modified_request = self.request + modified_request.headers['Signature-Input'] = '\"@method\" \"@path\" \"@authority\" \"Content-Type\" \"Content-Length\";created=1646146637;keyid=\"public.key\";alg=\"invalid\"' + ctx = ImmutablePropertyBag({HTTP_REQUEST_KEY: modified_request}) + + with self.assertRaises(AnnotatorException): + self.annotator.execute(data=bytes(modified_request.json, 'utf-8'), ctx=ctx) + + + def test_httppki_execute_invalid_key_test(self): + + modified_request = self.request + modified_request.headers['Signature-Input'] = '\"@method\" \"@path\" \"@authority\" \"Content-Type\" \"Content-Length\";created=1646146637;keyid=\"invalid\";alg=\"ed25519\"' + ctx = ImmutablePropertyBag({HTTP_REQUEST_KEY: modified_request}) + + with self.assertRaises(AnnotatorException): + self.annotator.execute(data=bytes(modified_request.json, 'utf-8'), ctx=ctx) + + + + def test_httppki_execute_empty_signature_test(self): + + modified_request = self.request + modified_request.headers['signature'] = "" + + ctx = ImmutablePropertyBag({HTTP_REQUEST_KEY: modified_request}) + + # with self.assertRaises(AnnotatorException): + annotation = self.annotator.execute(data=bytes(modified_request.json, 'utf-8'), ctx=ctx) + self.assertFalse(annotation.is_satisfied) + + + def test_httppki_execute_invalid_signature_syntax_test(self): + + modified_request = self.request + modified_request.headers['signature'] = "invalid" + + ctx = ImmutablePropertyBag({HTTP_REQUEST_KEY: modified_request}) + + with self.assertRaises(AnnotatorException): + self.annotator.execute(data=bytes(modified_request.json, 'utf-8'), ctx=ctx) + + def test_httppki_execute_incorrect_signature_test(self): + + modified_request = self.request + modified_request.headers['signature'] = "123456" + + ctx = ImmutablePropertyBag({HTTP_REQUEST_KEY: modified_request}) + + annotation = self.annotator.execute(data=bytes(modified_request.json, 'utf-8'), ctx=ctx) + self.assertFalse(annotation.is_satisfied) + + + def buildRequest(self, keys: SignInfo): + + payload = {'KeyA': 'This is some test data'} + headers = {'Content-Type': 'application/json', + "Date":"Tue, 20 Apr 2021 02:07:55 GMT", + 'Content-Length':'18'} + + ticks = datetime.datetime.now() + + # The URL has to be a absolute + url = 'http://example.com/foo?var1=&var2=2' + + data=json.dumps(payload) + + fields = [DerivedComponent.Method, DerivedComponent.Path, DerivedComponent.Authority, CONTENT_TYPE, CONTENT_LENGTH] + req = Request(method='POST',url=url,headers=headers,json=data) + + handler = RequestHandlerFactory().getRequestHandler(request=req,keys=keys) + handler.AddSignatureHeaders(ticks=ticks, fields=fields, keys=keys) + + return handler.request + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/tests/annotators/test_source.py b/tests/annotators/test_source.py index 49b17cd..58060aa 100644 --- a/tests/annotators/test_source.py +++ b/tests/annotators/test_source.py @@ -5,7 +5,7 @@ from alvarium.hash.contracts import HashInfo, HashType from alvarium.sign.contracts import KeyInfo, SignInfo, SignType -class SourceAnnotatorTest(unittest.TestCase): +class TestSourceAnnotator(unittest.TestCase): def test_execute_should_return_annotation(self): hash = HashType.SHA256 diff --git a/tests/annotators/test_tls.py b/tests/annotators/test_tls.py index 2403256..1990bd9 100644 --- a/tests/annotators/test_tls.py +++ b/tests/annotators/test_tls.py @@ -9,7 +9,7 @@ import ssl import socket -class TlsAnnotatorTest(unittest.TestCase): +class TestTlsAnnotator(unittest.TestCase): def test_tls_annotator_with_tls_connection_should_return_is_satisfied_true(self): key_info = KeyInfo(type=SignType.ED25519, path="./tests/sign/keys/private.key") sign_Info = SignInfo(public=key_info, private=key_info)