In [1]:
import base64
import itertools
import json
import os
import yaml

import requests

from cryptography.hazmat.primitives import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
import xdg.BaseDirectory

In [2]:
def pad(data, size=128): # size is in bits; 16 bytes = 128 bits
    padder = padding.PKCS7(size).padder()
    padded_data = padder.update(data)
    return padded_data + padder.finalize()

def unpad(padded_data, size=128):
    unpadder = padding.PKCS7(size).unpadder()
    data = unpadder.update(padded_data)
    return data + unpadder.finalize()

def getCipher(key, iv):
    backend = default_backend()
    return Cipher(
        algorithms.AES(base64.b64decode(key)),
        modes.CBC(base64.b64decode(iv)),
        backend
    )

def encrypt(data, key, iv):
    cipher = getCipher(key, iv)
    encryptor = cipher.encryptor()
    p = pad(data)
    res = encryptor.update(p) + encryptor.finalize()
    return base64.b64encode(res)
    
def decrypt(data, key, iv):
    cipher = getCipher(key, iv)
    decryptor = cipher.decryptor()
    padded_data = decryptor.update(base64.b64decode(data)) + decryptor.finalize()
    return unpad(padded_data)

In [3]:
key_size = 32 # this is in bytes
iv_size = 16

def getRandomBytes(size):
    return base64.b64encode(os.urandom(size))

def getRandomKey():
    return getRandomBytes(key_size)

def getRandomIV():
    return getRandomBytes(iv_size)

In [4]:
def getVerifier(iv, key):
    return encrypt(iv, key, iv)

In [5]:
def merge(d1, d2):
    return dict(itertools.chain(d1.iteritems(), d2.iteritems()))

In [6]:
def jsonMap(fn, json_obj):
    def _fn(v):
        if v is None:
            return None
        elif isinstance(v, dict):
            return jsonMap(fn, v)
        elif isinstance(v, list):
            return map(_fn, v)
        else:
            return fn(v)
    if isinstance(json_obj, dict):
        return {k: _fn(v) for k, v in json_obj.iteritems()}
    else:
        return _fn(json_obj)

In [7]:
def encryptDict(dct, key, iv):
    def _encrypt(v):
        return encrypt(v, key, iv)
    return jsonMap(_encrypt, dct)

def decryptDict(encrypted_dict, key, iv):
    def _decrypt(v):
        return decrypt(v, key, iv)
    return jsonMap(_decrypt, encrypted_dict)

In [8]:
def checkVerifier(key, iv, verifier):
    return verifier == encrypt(iv, key, iv)

In [9]:
def convertToStr(input_dict):
    return jsonMap(str, input_dict)

In [10]:
KEEPASS_URL = 'http://localhost:19455/'
_output = None

def makeRequest(key, input_data, id_, standard_data=None, iv=None):
    global _output
    _output = None
    # standard_data can be set to {} so need to explicitly check that it is equal to None
    if standard_data is None:
        iv = iv or getRandomIV()
        standard_data = {
            'Id': id_, 
            'Nonce': iv, 
            'Verifier': getVerifier(iv, key)
        }
    data = merge(standard_data, input_data)
    response = requests.post(KEEPASS_URL, json=data)
    if response.status_code != 200:
        raise Exception('Damn')
    output = convertToStr(response.json())
    if output['Success']:
        if checkVerifier(key, output['Nonce'], output['Verifier']):
            return output
        else:
            print 'Failed to verify'
    else:
        print 'keepass response was not successful'
    # For debugging
    _output = output
    return None

In [11]:
def associate():
    key = getRandomKey()
    input_data = {
        'RequestType': 'associate',
        'Key': key
    }
    output = makeRequest(key, input_data, None, {})
    if output:
        return key, output['Id']

In [12]:
def testAssociate(id_, key):
    input_data = {
        'RequestType': 'test-associate',
        'TriggerUnlock': True,
    }
    return makeRequest(key, input_data, id_)

In [13]:
def getLogins(url, id_, key):
    iv = getRandomIV()
    input_data = {
        'RequestType': 'get-logins',
        'Url': encrypt(url, key, iv)
    }
    output = makeRequest(key, input_data, id_, iv=iv)
    if output:
        return [decryptDict(entry, key, output['Nonce']) for entry in output['Entries']]

In [14]:
def getAndSaveNewAssociation(config_path):
    key, id_ = associate()
    with open(config_path, 'w') as fout:
        fout.write(yaml.safe_dump({'key': key, 'id': id_}, default_flow_style=False))
    return key, id_

In [15]:
config_dir = xdg.BaseDirectory.save_config_path('keepass_test')
config_path = os.path.join(config_dir, 'config.yml')
if os.path.exists(config_path):
    with open(config_path) as fin:
        config = yaml.safe_load(fin)
    id_ = config['id']
    key = config['key']
    if not testAssociate(id_, key):
        print "Previous association failed. Loading new association"
        key, id_ = getAndSaveNewAssociation(config_path)
else:
    print "No previous association. Loading new association"
    key, id_ = getAndSaveNewAssociation(config_path)

In [16]:
logins = getLogins('https://amazon.com', id_, key)

In [17]:
logins[0]['Uuid']

'b4dcc4713e72de19750e7fb3c0a10363'