Skip to content

Commit

Permalink
Use previous version of devicelibs/crypto.py, with some modern changes.
Browse files Browse the repository at this point in the history
The current version depends on a version of python-cryptsetup that
superseded the RHEL6 version in 2011.

However, require a passphrase for every function where it is currently
required.

Move python-cryptsetup version back in spec file.

Signed-off-by: Anne Mulhern <amulhern@redhat.com>
  • Loading branch information
mulkieran committed Apr 1, 2015
1 parent 88c4544 commit d8d7b03
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 85 deletions.
150 changes: 82 additions & 68 deletions blivet/devicelibs/crypto.py
Expand Up @@ -20,16 +20,17 @@
# Martin Sivak <msivak@redhat.com>
#

import os
import random
import time
from pycryptsetup import CryptSetup

from ..errors import CryptoError
from ..size import Size
from ..util import get_current_entropy
from ..util import get_current_entropy, run_program

LUKS_METADATA_SIZE = Size("2 MiB")
MIN_CREATE_ENTROPY = 256 # bits
MIN_CREATE_ENTROPY = 256

# Keep the character set size a power of two to make sure all characters are
# equally likely
Expand All @@ -44,70 +45,46 @@ def generateBackupPassphrase():
raw = [random.choice(GENERATED_PASSPHRASE_CHARSET) for _ in range(GENERATED_PASSPHRASE_LENGTH)]

# Insert a '-' after every five char chunk for easier reading
parts = []
for i in range(0, GENERATED_PASSPHRASE_LENGTH, 5):
parts.append(''.join(raw[i : i + 5]))
parts = [''.join(raw[i : i + 5]) for i in range(0, GENERATED_PASSPHRASE_LENGTH, 5)]
return "-".join(parts)

yesDialog = lambda q: True
logFunc = lambda p, t: None
passwordDialog = lambda t: None

def is_luks(device):
cs = CryptSetup(device=device, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
return cs.isLuks()
cs = CryptSetup(yesDialog=yesDialog, logFunc=logFunc)
return cs.isLuks(device)

def luks_uuid(device):
cs = CryptSetup(device=device, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
return cs.luksUUID()
cs = CryptSetup(yesDialog=yesDialog, logFunc=logFunc)
return cs.luksUUID(device).strip()

def luks_status(name):
"""True means active, False means inactive (or non-existent)"""
cs = CryptSetup(name=name, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
return cs.status()
cs = CryptSetup(yesDialog=yesDialog, logFunc=logFunc)
return cs.luksStatus(name)!=0

def luks_format(device,
passphrase=None,
cipher=None, key_size=None, key_file=None,
passphrase=None, key_file=None,
cipher=None, key_size=None,
min_entropy=0):
"""
Format device as LUKS with the specified parameters.
:param str device: device to format
:param str passphrase: passphrase to add to the new LUKS device
:param str cipher: cipher mode to use
:param int keysize: keysize to use
:param str key_file: key file to use
:param int min_entropy: minimum random data entropy level required for LUKS
format creation (0 means entropy level is not checked)
note::
If some minimum entropy is required (min_entropy > 0), the
function waits for enough entropy to be gathered by the kernel
which may potentially take very long time or even forever.
"""

# pylint: disable=unused-argument
if not passphrase:
raise ValueError("luks_format requires passphrase")

cs = CryptSetup(device=device, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
cs = CryptSetup(yesDialog=yesDialog, logFunc=logFunc)
key_file_unlink = False

key_file = cs.prepare_passphrase_file(passphrase)
key_file_unlink = True

#None is not considered as default value and pycryptsetup doesn't accept it
#so we need to filter out all Nones
kwargs = {}

# Split cipher designator to cipher name and cipher mode
cipherType = None
cipherMode = None
if cipher:
cparts = cipher.split("-")
cipherType = "".join(cparts[0:1])
cipherMode = "-".join(cparts[1:])

if cipherType: kwargs["cipher"] = cipherType
if cipherMode: kwargs["cipherMode"] = cipherMode
if key_size: kwargs["keysize"] = key_size
kwargs["device"] = device
if cipher: kwargs["cipher"] = cipher
if key_file: kwargs["keyfile"] = key_file
if key_size: kwargs["keysize"] = key_size

if min_entropy > 0:
# min_entropy == 0 means "don't care"
Expand All @@ -116,55 +93,92 @@ def luks_format(device,
time.sleep(1)

rc = cs.luksFormat(**kwargs)
if rc:
raise CryptoError("luks_format failed for '%s'" % device)
if key_file_unlink: os.unlink(key_file)

# activate first keyslot
cs.addKeyByVolumeKey(newPassphrase=passphrase)
if rc:
raise CryptoError("luks_add_key_by_volume_key failed for '%s'" % device)

raise CryptoError("luks_format failed for '%s'" % device)

def luks_open(device, name, passphrase=None, key_file=None):
# pylint: disable=unused-argument
if not passphrase:
raise ValueError("luks_format requires passphrase")
raise ValueError("luks_open requires a passphrase")

cs = CryptSetup(device=device, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
cs = CryptSetup(yesDialog=yesDialog, logFunc=logFunc)
key_file_unlink = False

rc = cs.activate(passphrase=passphrase, name=name)
if rc<0:
raise CryptoError("luks_open failed for %s (%s) with errno %d" % (device, name, rc))
key_file = cs.prepare_passphrase_file(passphrase)
key_file_unlink = True

def luks_close(name):
cs = CryptSetup(name=name, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
rc = cs.deactivate()
rc = cs.luksOpen(device=device, name=name, keyfile=key_file)
if key_file_unlink: os.unlink(key_file)
if rc:
raise CryptoError("luks_open failed for %s (%s)" % (device, name))

def luks_close(name):
cs = CryptSetup(yesDialog=yesDialog, logFunc=logFunc)
rc = cs.luksClose(name)
if rc:
raise CryptoError("luks_close failed for %s" % name)

def luks_add_key(device,
new_passphrase=None,
new_passphrase=None, new_key_file=None,
passphrase=None, key_file=None):
# pylint: disable=unused-argument
if not passphrase:
raise ValueError("luks_add_key requires passphrase")
raise ValueError("luks_add_key requires a passphrase")

params = ["-q"]

cs = CryptSetup(device=device, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
rc = cs.addKeyByPassphrase(passphrase=passphrase, newPassphrase=new_passphrase)
p = os.pipe()
os.write(p[1], "%s\n" % passphrase)

if rc<0:
params.extend(["luksAddKey", device])

if new_passphrase:
os.write(p[1], "%s\n" % new_passphrase)
elif new_key_file and os.path.isfile(new_key_file):
params.append("%s" % new_key_file)
else:
raise CryptoError("luks_add_key requires either a passphrase or a key file to add")

os.close(p[1])

rc = run_program(["cryptsetup"] + params, stdin=p[0], stderr_to_stdout=True)

os.close(p[0])
if rc:
raise CryptoError("luks add key failed with errcode %d" % (rc,))

def luks_remove_key(device,
del_passphrase=None,
del_passphrase=None, del_key_file=None,
passphrase=None, key_file=None):
# pylint: disable=unused-argument
if not passphrase:
raise ValueError("luks_remove_key requires passphrase")
raise ValueError("luks_remove_key requires a passphrase")

params = []

p = os.pipe()
if del_passphrase: #the first question is about the key we want to remove
os.write(p[1], "%s\n" % del_passphrase)

cs = CryptSetup(device=device, yesDialog=yesDialog, logFunc=logFunc, passwordDialog=passwordDialog)
rc = cs.removePassphrase(passphrase = passphrase)
os.write(p[1], "%s\n" % passphrase)

params.extend(["luksRemoveKey", device])

if del_passphrase:
pass
elif del_key_file and os.path.isfile(del_key_file):
params.append("%s" % del_key_file)
else:
raise CryptoError("luks_remove_key requires either a passphrase or a key file to remove")

os.close(p[1])

rc = run_program(["cryptsetup"] + params, stdin=p[0], stderr_to_stdout=True)

os.close(p[0])
if rc:
raise CryptoError("luks remove key failed with errcode %d" % (rc,))
raise CryptoError("luks_remove_key failed with errcode %d" % (rc,))


2 changes: 1 addition & 1 deletion python-blivet.spec
Expand Up @@ -16,7 +16,7 @@ Source0: http://github.com/dwlehman/blivet/archive/%{realname}-%{version}.tar.gz
%define pypartedver 2.5-2
%define pythonpyblockver 0.45
%define e2fsver 1.41.0
%define pythoncryptsetupver 0.1.1
%define pythoncryptsetupver 0.0.11
%define utillinuxver 2.15.1
%define lvm2ver 2.02.99

Expand Down
26 changes: 10 additions & 16 deletions tests/devicelibs_test/crypto_test.py
Expand Up @@ -39,8 +39,7 @@ def testCryptoMisc(self):
##
# pass
self.assertEqual(crypto.is_luks(_LOOP_DEV0), -22)
with self.assertRaisesRegexp(IOError, "Device cannot be opened"):
crypto.is_luks("/not/existing/device")
self.assertEqual(crypto.is_luks("/not/existing/device"), -22)

##
## luks_format
Expand All @@ -59,7 +58,7 @@ def testCryptoMisc(self):
crypto.luks_format(_LOOP_DEV1, key_file=keyfile)

# fail
with self.assertRaisesRegexp(IOError, "Device cannot be opened"):
with self.assertRaises(CryptoError):
crypto.luks_format("/not/existing/device", passphrase="secret", cipher="aes-cbc-essiv:sha256", key_size=256)

# no passhprase or key file
Expand All @@ -86,24 +85,19 @@ def testCryptoMisc(self):
##
## luks_remove_key
##
# fail
with self.assertRaisesRegexp(CryptoError, "luks remove key failed"):
crypto.luks_remove_key(_LOOP_DEV0, del_passphrase="another-secret", passphrase="wrong-pasphrase")

# pass
self.assertEqual(crypto.luks_remove_key(_LOOP_DEV0, del_passphrase="another-secret", passphrase="secret"), None)

# fail
with self.assertRaisesRegexp(IOError, "Device cannot be opened"):
with self.assertRaises(CryptoError):
crypto.luks_open("/not/existing/device", "another-crypted", passphrase="secret")

# no passhprase or key file
with self.assertRaisesRegexp(ValueError, "luks_format requires passphrase"):
with self.assertRaises(ValueError):
crypto.luks_open(_LOOP_DEV1, "another-crypted")

with self.assertRaisesRegexp(IOError, "Device cannot be opened"):
crypto.luks_status("another-crypted")
with self.assertRaisesRegexp(IOError, "Device cannot be opened"):
self.assertFalse(crypto.luks_status("another-crypted"))
with self.assertRaises(CryptoError):
crypto.luks_close("wrong-name")

# cleanup
Expand Down Expand Up @@ -150,8 +144,8 @@ def testCryptoOpen(self):
## luks_status
##
# pass
self.assertEqual(crypto.luks_status(_name0), 2)
self.assertEqual(crypto.luks_status(_name1), 2)
self.assertTrue(crypto.luks_status(_name0))
self.assertTrue(crypto.luks_status(_name1))

##
## luks_uuid
Expand All @@ -170,9 +164,9 @@ def testCryptoOpen(self):
self.assertEqual(crypto.luks_close(_name1), None)

# already closed
with self.assertRaisesRegexp(IOError, "Device cannot be opened"):
with self.assertRaises(CryptoError):
crypto.luks_close("crypted")
with self.assertRaisesRegexp(IOError, "Device cannot be opened"):
with self.assertRaises(CryptoError):
crypto.luks_close("encrypted")

class CryptoTestCase3(unittest.TestCase):
Expand Down

0 comments on commit d8d7b03

Please sign in to comment.