Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a different key for AES-256-CTR and HMAC-SHA256. #2

Merged
merged 2 commits into from
Sep 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions doc/theory_of_operation.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ We output the encrypted chunks.
The HMAC is updated with every encrypted chunk (*Encrypt-then-MAC*). The last step is to output the
HMAC digest.

### Version 2

Version 2 is exactly like version one, except:

- Instead of 32 random bytes for the symmetric encryption key, we generate 64 bytes.

The first half of the key is used for the AES encryption, the second half is used
for the HMAC-SHA256 authentication

## File Format

### Encoding
Expand Down Expand Up @@ -66,3 +75,11 @@ for a ~128 KiB file:
The last 32 bytes are the HMAC digest of the file after encryption without a length indicator:

XX XX XX XX ... [32 bytes]

### Version 2

Version 2 files start with the following bytes:

7a 70 79 00 00 02

Which is `enB5AAAC` in base64.
24 changes: 16 additions & 8 deletions zpy/decrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,16 @@
from Crypto.Hash import HMAC, SHA256

import zpy.util
import zpy.legacy.decrypt


def decrypt_stream_v1_base64(identity, stdin, stdout):
def decrypt_stream_v2_base64(identity, stdin, stdout):
with zpy.util.DecodingReader(stdin) as stdin:
return decrypt_stream_v1(identity, stdin, stdout)
return decrypt_stream_v2(identity, stdin, stdout)


def decrypt_stream_v1(identity, stdin, stdout):
magic = b"zpy\x00\x00\x01" # this has already been read from stdin
def decrypt_stream_v2(identity, stdin, stdout):
magic = b"zpy\x00\x00\x02" # this has already been read from stdin
# the first 8 bytes of the input stream are the aes counter iv
iv = stdin.read(16)
ctr = Counter.new(128, initial_value=int.from_bytes(iv, "big"))
Expand All @@ -40,8 +41,10 @@ def decrypt_stream_v1(identity, stdin, stdout):
# the rsa private key (the length depends on the key size)
key = PKCS1_OAEP.new(zpy.util.load_identity(identity)).decrypt(key)
# aes in counter mode and encrypt-then-mac
aes = AES.new(key, mode=AES.MODE_CTR, counter=ctr)
mac = HMAC.new(key, digestmod=SHA256)
key_aes = key[:32]
key_mac = key[32:]
aes = AES.new(key_aes, mode=AES.MODE_CTR, counter=ctr)
mac = HMAC.new(key_mac, digestmod=SHA256)
mac.update(header)
while True:
# each variable length chunk begins with its length in 2 bytes
Expand All @@ -66,9 +69,14 @@ def decrypt(identity, filename):
else:
header += stdin.read(2)
if header == b"enB5AAAB":
decrypt_stream_v1_base64(identity, stdin, stdout)
zpy.legacy.decrypt.decrypt_stream_v1_base64(
identity, stdin, stdout)
elif header == b"zpy\x00\x00\x01":
decrypt_stream_v1(identity, stdin, stdout)
zpy.legacy.decrypt.decrypt_stream_v1(identity, stdin, stdout)
elif header == b"enB5AAAC":
decrypt_stream_v2_base64(identity, stdin, stdout)
elif header == b"zpy\x00\x00\x02":
decrypt_stream_v2(identity, stdin, stdout)
else:
raise RuntimeError("invalid file header")
return 0
30 changes: 19 additions & 11 deletions zpy/encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,27 @@
from Crypto.Hash import HMAC, SHA256

import zpy.util
import zpy.legacy.encrypt


def encrypt_stream_v1_base64(identity, stdin, stdout):
def encrypt_stream_v2_base64(identity, stdin, stdout):
with zpy.util.EncodingWriter(stdout) as stdout:
return encrypt_stream_v1(identity, stdin, stdout)
return encrypt_stream_v2(identity, stdin, stdout)


def encrypt_stream_v1(identity, stdin, stdout):
magic = b"zpy\x00\x00\x01"
def encrypt_stream_v2(identity, stdin, stdout):
magic = b"zpy\x00\x00\x02"
rng = Random.new()
iv = rng.read(16) # counter mode prefix
key = rng.read(32) # random AES-256 key
key_aes = rng.read(32) # random AES-256 key
key_mac = rng.read(32) # random HMAC key
# AES-256 in counter mode
ctr = Counter.new(128, initial_value=int.from_bytes(iv, "big"))
aes = AES.new(key, mode=AES.MODE_CTR, counter=ctr)
mac = HMAC.new(key, digestmod=SHA256) # HMAC-SHA256 for ciphertext
aes = AES.new(key_aes, mode=AES.MODE_CTR, counter=ctr)
mac = HMAC.new(key_mac, digestmod=SHA256) # HMAC-SHA256 for ciphertext
# the AES-256 key is encrypte with the users rsa private key
key = PKCS1_OAEP.new(zpy.util.load_identity(identity)).encrypt(key)
key = PKCS1_OAEP.new(zpy.util.load_identity(
identity)).encrypt(key_aes + key_mac)
# the output stream begins with the 8 byte iv, the length of the
# encrypted AES-256 key in two bytes and the encrypted key itself
header = magic + iv + len(key).to_bytes(2, "big") + key
Expand All @@ -61,13 +64,18 @@ def encrypt_stream_v1(identity, stdin, stdout):
stdout.write(b"\x00\x00" + mac.digest())


def encrypt(identity, filename, version=1, raw=False):
def encrypt(identity, filename, version=2, raw=False):
with open(filename, "rb") as stdin:
with open("/dev/stdout", "wb") as stdout:
if not raw and version == 1:
encrypt_stream_v1_base64(identity, stdin, stdout)
zpy.legacy.encrypt.encrypt_stream_v1_base64(
identity, stdin, stdout)
elif version == 1:
encrypt_stream_v1(identity, stdin, stdout)
zpy.legacy.encrypt.encrypt_stream_v1(identity, stdin, stdout)
elif not raw and version == 2:
encrypt_stream_v2_base64(identity, stdin, stdout)
elif version == 2:
encrypt_stream_v2(identity, stdin, stdout)
else:
raise RuntimeError("invalid version number")
return 0
Empty file added zpy/legacy/__init__.py
Empty file.
57 changes: 57 additions & 0 deletions zpy/legacy/decrypt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright (C) 2015 Stefano Palazzo <stefano.palazzo@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import hmac

from Crypto.Cipher import PKCS1_OAEP
from Crypto.Cipher import AES
from Crypto.Util import Counter
from Crypto.Hash import HMAC, SHA256

import zpy.util


def decrypt_stream_v1_base64(identity, stdin, stdout):
with zpy.util.DecodingReader(stdin) as stdin:
return decrypt_stream_v1(identity, stdin, stdout)


def decrypt_stream_v1(identity, stdin, stdout):
magic = b"zpy\x00\x00\x01" # this has already been read from stdin
# the first 8 bytes of the input stream are the aes counter iv
iv = stdin.read(16)
ctr = Counter.new(128, initial_value=int.from_bytes(iv, "big"))
key_size = stdin.read(2)
key = stdin.read(int.from_bytes(key_size, "big"))
header = magic + iv + key_size + key
# read the encrypted symmetric key and decrypt it with
# the rsa private key (the length depends on the key size)
key = PKCS1_OAEP.new(zpy.util.load_identity(identity)).decrypt(key)
# aes in counter mode and encrypt-then-mac
aes = AES.new(key, mode=AES.MODE_CTR, counter=ctr)
mac = HMAC.new(key, digestmod=SHA256)
mac.update(header)
while True:
# each variable length chunk begins with its length in 2 bytes
# which limits the length to 0xffff (65535) bytes
chunk = stdin.read(int.from_bytes(stdin.read(2), "big"))
# update the hmac before decrypting
mac.update(chunk)
stdout.write(aes.decrypt(chunk))
if not chunk:
break
# the last 32 bytes of the message are the ciphertext hmac
if not hmac.compare_digest(mac.digest(), stdin.read(32)):
raise RuntimeError("hmac error")
61 changes: 61 additions & 0 deletions zpy/legacy/encrypt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright (C) 2015 Stefano Palazzo <stefano.palazzo@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

from Crypto import Random
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Cipher import AES
from Crypto.Util import Counter
from Crypto.Hash import HMAC, SHA256

import zpy.util


def encrypt_stream_v1_base64(identity, stdin, stdout):
with zpy.util.EncodingWriter(stdout) as stdout:
return encrypt_stream_v1(identity, stdin, stdout)


def encrypt_stream_v1(identity, stdin, stdout):
magic = b"zpy\x00\x00\x01"
rng = Random.new()
iv = rng.read(16) # counter mode prefix
key = rng.read(32) # random AES-256 key
# AES-256 in counter mode
ctr = Counter.new(128, initial_value=int.from_bytes(iv, "big"))
aes = AES.new(key, mode=AES.MODE_CTR, counter=ctr)
mac = HMAC.new(key, digestmod=SHA256) # HMAC-SHA256 for ciphertext
# the AES-256 key is encrypte with the users rsa private key
key = PKCS1_OAEP.new(zpy.util.load_identity(identity)).encrypt(key)
# the output stream begins with the 8 byte iv, the length of the
# encrypted AES-256 key in two bytes and the encrypted key itself
header = magic + iv + len(key).to_bytes(2, "big") + key
stdout.write(header)
# starting the hmac with the file header ensures the header cannot
# be modified (e.g. to downgrade the verification protocol)
mac.update(header)
while True:
# encrypt in chunks of 65535 bytes (a two byte integer)
chunk = aes.encrypt(stdin.read(0xFFFF))
if chunk:
# write the length of the encrypted chunk and the chunk itself
stdout.write(len(chunk).to_bytes(2, "big") + chunk)
# update the hmac with the ciphertext
mac.update(chunk)
if len(chunk) < 0xFFFF:
break
# the two null bytes represent an empty chunk, i.e. the end of the
# ciphertext, the last 32 bytes of the output stream are the
# HMAC-SHA256 digest
stdout.write(b"\x00\x00" + mac.digest())
Empty file added zpy/tests/legacy/__init__.py
Empty file.
114 changes: 114 additions & 0 deletions zpy/tests/legacy/test_decrypt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright (C) 2015 Stefano Palazzo <stefano.palazzo@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import unittest
import unittest.mock
import io
import binascii
import contextlib
import zpy.decrypt
import zpy.legacy.decrypt


class DecryptTest(unittest.TestCase):

@unittest.mock.patch("zpy.util.load_identity")
@unittest.mock.patch("Crypto.Cipher.AES.new")
@unittest.mock.patch("Crypto.Cipher.PKCS1_OAEP.new")
@unittest.mock.patch("Crypto.Util.Counter.new")
@unittest.mock.patch("Crypto.Hash.HMAC.new")
def test_decrypt_stream_v1(
self, new_HMAC, new_Counter, new_PKCS1_OAEP, new_AES,
load_identity):
load_identity.return_value = "..."
stdout = io.BytesIO()
stdin = io.BytesIO(binascii.unhexlify((
"00000000000000000000000000000001" # iv
"0001" # length of encrypted key
"aa" # encrypted key
"ffff" # length of the first chunk
) + (
"ee"
) * 0xFFFF + (
"0000"
"cc"
)))
key = b"\xFF" * 32
iv = b"\x00" * 15 + b"\x01"
new_PKCS1_OAEP.return_value.decrypt.return_value = b"\xFF" * 32
new_AES.return_value.decrypt.side_effect = lambda x: x
new_HMAC.return_value.digest.return_value = b"\xCC"
zpy.legacy.decrypt.decrypt_stream_v1("id", stdin, stdout)
self.assertEqual(stdout.getvalue(), b"\xEE" * 0xFFFF)
load_identity.assert_called_once_with("id")
counter = new_Counter.return_value
new_Counter.assert_called_once_with(
128, initial_value=int.from_bytes(iv, "big"))
new_AES.assert_called_once_with(key, mode=6, counter=counter)
stdin = io.BytesIO(binascii.unhexlify((
"00000000000000000000000000000001" # iv
"0001" # length of encrypted key
"aa" # encrypted key
"ffff" # length of the first chunk
) + (
"ee" # first encrypted chunk (65535 bytes)
) * 0xFFFF + (
"0000" # length of next (last) chunk
"AAAA" # invalid mac
)))
with self.assertRaises(RuntimeError):
zpy.legacy.decrypt.decrypt_stream_v1("id", stdin, stdout)

@unittest.mock.patch("zpy.legacy.decrypt.decrypt_stream_v1")
@unittest.mock.patch("zpy.util.DecodingReader")
def test_decrypt_stream_v1_base64(self, DecodingReader, decrypt_stream_v1):
reader = DecodingReader.return_value.__enter__.return_value
stdin, stdout = unittest.mock.Mock(), unittest.mock.Mock()
zpy.legacy.decrypt.decrypt_stream_v1_base64("id", stdin, stdout)
zpy.legacy.decrypt.decrypt_stream_v1.assert_called_once_with(
"id", reader, stdout)
DecodingReader.assert_called_once_with(stdin)

@unittest.mock.patch("builtins.open")
@unittest.mock.patch("zpy.legacy.decrypt.decrypt_stream_v1_base64")
def test_decrypt(self, decrypt_stream_v1_base64, open):
stdin, stdout = unittest.mock.Mock(), unittest.mock.Mock()
stdin.read.side_effect = [b"enB5", b"AAAB"]
open.side_effect = [
contextlib.closing(stdin),
contextlib.closing(stdout),
]
zpy.decrypt.decrypt("id", "/dev/stdin")
self.assertEqual(open.mock_calls, [
unittest.mock.call("/dev/stdin", "rb"),
unittest.mock.call("/dev/stdout", "wb"),
])
decrypt_stream_v1_base64.assert_called_once_with("id", stdin, stdout)

@unittest.mock.patch("builtins.open")
@unittest.mock.patch("zpy.legacy.decrypt.decrypt_stream_v1")
def test_decrypt_raw(self, decrypt_stream_v1, open):
stdin, stdout = unittest.mock.Mock(), unittest.mock.Mock()
stdin.read.side_effect = [b"zpy\x00", b"\x00\x01"]
open.side_effect = [
contextlib.closing(stdin),
contextlib.closing(stdout),
]
zpy.decrypt.decrypt("id", "/dev/stdin")
self.assertEqual(open.mock_calls, [
unittest.mock.call("/dev/stdin", "rb"),
unittest.mock.call("/dev/stdout", "wb"),
])
decrypt_stream_v1.assert_called_once_with("id", stdin, stdout)
Loading