-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Created NotaryParser class, removing parsing methods from Notary and …
…Notaries. Created default_notaries() function, removing Notaries.default() method.
- Loading branch information
Showing
9 changed files
with
157 additions
and
107 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
"""Parser for Notaries""" | ||
|
||
import M2Crypto | ||
import re | ||
|
||
from Exceptions import NotaryException | ||
from Notary import Notary | ||
from Notaries import Notaries | ||
|
||
class NotaryParser: | ||
"""Parse serialized Notaries and return a Notaries instance""" | ||
|
||
def parse_file(self, path): | ||
"""Return Notaries described in file. | ||
See parse_stream() for expected format""" | ||
with file(path, "r") as stream: | ||
notaries = self.parse_stream(stream) | ||
return notaries | ||
|
||
def parse_stream(self, stream): | ||
"""Return Notaries described in stream. | ||
Expected format for each Notary is: | ||
# Lines starting with '#' are comments and ignored | ||
<hostname>:<port> | ||
-----BEGIN PUBLIC KEY----- | ||
<multiple lines of Base64-encoded data> | ||
-----END PUBLIC KEY---- | ||
""" | ||
notaries = Notaries() | ||
while True: | ||
notary = self._parse_notary(stream) | ||
if notary is None: # EOF | ||
break | ||
else: | ||
notaries.append(notary) | ||
return notaries | ||
|
||
hostname_port_re = re.compile("(\S+):(\d+)") | ||
|
||
def _parse_notary(self, stream): | ||
"""Return Notary described in given stream. | ||
Expected format is: | ||
# Lines starting with '#' are comments and ignored | ||
<hostname>:<port> | ||
-----BEGIN PUBLIC KEY----- | ||
<multiple lines of Base64-encoded data> | ||
-----END PUBLIC KEY---- | ||
If EOF is found before a Notary, returns None. | ||
""" | ||
hostname, port, public_key = None, None, None | ||
for line in stream: | ||
line = line.strip() | ||
if line.startswith("#") or (line == ""): | ||
continue # Ignore comments and blank lines | ||
match = self.hostname_port_re.match(line) | ||
if match is not None: | ||
hostname = match.group(1) | ||
port = int(match.group(2)) | ||
elif line == "-----BEGIN PUBLIC KEY-----": | ||
if hostname is None: | ||
raise NotaryException("Public key found without Notary") | ||
lines = [line + "\n"] | ||
for line in stream: | ||
lines.append(line) | ||
if line.startswith("-----END PUBLIC KEY-----"): | ||
break | ||
else: | ||
raise NotaryException( | ||
"No closing 'END PUBLIC KEY' line for key found") | ||
public_key = self._public_key_from_lines(lines) | ||
break # End of Notary | ||
else: | ||
raise NotaryException("Unrecognized line: " + line) | ||
if hostname is None: | ||
# We hit EOF before finding a Notary | ||
return None | ||
if public_key is None: | ||
raise NotaryException( | ||
"No public key found for Notary %s:%s" % (hostname, port)) | ||
return Notary(hostname, port, public_key) | ||
|
||
@classmethod | ||
def _public_key_from_lines(cls, lines): | ||
"""Read and return public key from lines""" | ||
bio = M2Crypto.BIO.MemoryBuffer("".join(lines)) | ||
pub_key = M2Crypto.EVP.PKey() | ||
pub_key.assign_rsa(M2Crypto.RSA.load_pub_key_bio(bio)) | ||
return pub_key |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
"""Default Notaries""" | ||
|
||
import pkgutil | ||
import StringIO | ||
|
||
from NotaryParser import NotaryParser | ||
|
||
def default_notaries(): | ||
"""Return the default Notaries""" | ||
parser = NotaryParser() | ||
data = pkgutil.get_data("Perspectives", "conf/http_notary_list.txt") | ||
return parser.parse_stream(StringIO.StringIO(data)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
#!/usr/bin/env python | ||
"""Unititest for NotaryParser class""" | ||
|
||
import os.path | ||
import unittest | ||
|
||
class TestNotaryParser(unittest.TestCase): | ||
"""Tests for NotaryParser class""" | ||
|
||
notary_file = os.path.join(os.path.dirname(os.path.abspath( __file__ )), | ||
"./http_notary_list.txt") | ||
|
||
def test_init(self): | ||
"""Test basic creation of NotaryParser""" | ||
from Perspectives import NotaryParser | ||
parser = NotaryParser() | ||
|
||
def test_parse_file(self): | ||
"""Test NotaryParser.parse_file()""" | ||
from Perspectives import NotaryParser | ||
parser = NotaryParser() | ||
notaries = parser.parse_file(self.notary_file) | ||
self.assertIsNotNone(notaries) | ||
self.assertEqual(len(notaries), 4) | ||
for notary in notaries: | ||
self.assertIsNotNone(notary.hostname) | ||
self.assertIsNotNone(notary.port) | ||
self.assertIsNotNone(notary.public_key) | ||
|
||
if __name__ == "__main__": | ||
unittest.main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters