Skip to content

Commit

Permalink
Merge pull request #4 from jaedsonpys/new-storage-method
Browse files Browse the repository at this point in the history
New storage method
  • Loading branch information
jaedsonpys committed Aug 6, 2023
2 parents 5c70134 + c3919d6 commit 70e4124
Show file tree
Hide file tree
Showing 12 changed files with 536 additions and 254 deletions.
15 changes: 14 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -288,4 +288,17 @@

## Features

- Create path if not exist in `CookieDB.append()` method.
- Create path if not exist in `CookieDB.append()` method.

# 9.0.0

- [CookieDB 9.0.0 in PyPi](https://pypi.org/project/cookiedb/9.0.0/)
- [CookieDB 9.0.0 in GitHub Release](https://github.com/jaedsonpys/cookiedb/releases/tag/v9.0.0)

## Features

- Remove all methods that use `pickle`.
- Create self method to store data and read database by lines.
- Loading only necessary data when getting, adding, deleting and updating.
- Encode and decode data using `struct` in `Cryptography` class.
- Raise `ValueNotSupportedError` to not suported values (supported data types: str, int, float, bool, dict and list).
2 changes: 1 addition & 1 deletion cookiedb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@
from .cookiedb import CookieDB
from . import exceptions

__version__ = '8.1.0'
__version__ = '9.0.0'
222 changes: 184 additions & 38 deletions cookiedb/_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,55 +12,201 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import pickle
from typing import Union
import os
import struct
from io import BufferedWriter
from typing import Union, Any, Tuple, Iterator, List

from . import exceptions
from ._encrypt import Cryptography
from ._item import Item


class Document:
def __init__(self, cryptography: Cryptography, document_path: str) -> None:
self._crypt = cryptography
self._document_path = document_path

if not os.path.isfile(document_path):
with open(self._document_path, 'wb') as doc:
self._add_item('@checkEncrypt', True, doc)
else:
first_item = next(self._read_doc())[1]

try:
self._crypt.decrypt(first_item)
except exceptions.InvalidTokenError:
raise exceptions.InvalidDatabaseKeyError('Invalid database key') from None

@staticmethod
def _save_file(file_content: str, filepath: str) -> None:
with open(filepath, 'wb') as writer:
writer.write(file_content)

def _encrypt(self, obj: dict) -> str:
pickle_file = pickle.dumps(obj)
encrypted_data = self._crypt.encrypt(pickle_file)
return encrypted_data

def _decrypt(self, encrypted: bytes) -> dict:
decrypted_data = self._crypt.decrypt(encrypted)
data = pickle.loads(decrypted_data)
return data

def create_document(self) -> dict:
document = {
'items': {}
}

data = self._encrypt(document)
self._save_file(data, self._document_path)
return document

def get_document(self) -> Union[None, dict]:
try:
with open(self._document_path, 'rb') as reader:
data = reader.read()
except FileNotFoundError:
raise exceptions.DatabaseNotFoundError(f'Database "{self._document_path}" not found')
def _to_dict_tree(items: List[Tuple[str, Any]], _list: bool = False) -> dict:
result = [] if _list else {}

for path, value in items:
result_ref = result
parts = path.strip('/').split('/')
max_index = len(parts) - 1
last_index = None

for i, part in enumerate(parts):
if i == max_index:
if isinstance(result_ref, list) and part.isdigit():
result_ref.append(value)
elif isinstance(result_ref, list):
result_ref[last_index][part] = value
else:
result_ref[part] = value
elif part.isdigit():
last_index = int(part)
try:
result_ref = result_ref[last_index]
except IndexError:
if parts[i + 1].isdigit():
dtype = []
else:
dtype = {}

result_ref.insert(last_index, dtype)
result_ref = result_ref[last_index]
elif parts[i + 1].isdigit():
result_ref = result_ref.setdefault(part, [])
else:
result_ref = result_ref.setdefault(part, {})

return result

def _read_doc(self) -> Iterator[Tuple[bytes]]:
with open(self._document_path, 'rb') as doc:
while True:
_line_len = doc.read(2)

if not _line_len or _line_len == b'\x00':
break

full_len, = struct.unpack('<H', _line_len)
yield _line_len, doc.read(full_len)

def _read_items(self) -> Iterator[Item]:
for __, line in self._read_doc():
decrypted_line = self._crypt.decrypt(line)
yield Item(decrypted_line)

def _write_item(self, item: bytes, fp: BufferedWriter) -> None:
encrypted_item = self._crypt.encrypt(item)
fp.write(struct.pack('<H', len(encrypted_item)))
fp.write(encrypted_item)

def _add_item(self, path: str, value: Any, fp: BufferedWriter) -> None:
new_item = Item.create(path, value)
self._write_item(new_item, fp)

def _exists(self, path: str) -> bool:
path = path.encode()

for item in self._read_items():
item_path = item.get_path()

if item_path == path or item_path.startswith(path):
return True
elif item_path.startswith(b''.join((b'@list:', path))):
return True
elif item_path.startswith(b''.join((b'#', path))):
return True

return False

def _get_list(self, path: str, _len: int) -> list:
required_items = [f'#{path}/{i}'.encode() for i in range(_len)]
list_items = []

def _in_required(ipath: bytes) -> bool:
for req_item in required_items:
if ipath == req_item or ipath.startswith(req_item):
return True

return False

_encoded_path = path.encode()
_enc_path_elem = b''.join((b'#', _encoded_path))
_get_basepath = lambda p: p.replace(_enc_path_elem, b'')

for item in self._read_items():
item_path = item.get_path()

if _in_required(item_path):
value = item.get_value()
basepath = _get_basepath(item_path).decode()

if not item_path.decode()[-1].isdigit():
list_items.append((basepath, value))
else:
list_items.append((basepath, value))

tree_items = self._to_dict_tree(list_items, _list=True)
return tree_items

def add(self, path: str, value: Any) -> None:
if self._exists(path):
self.update(path, value)
else:
document = self._decrypt(data)
with open(self._document_path, 'ab') as doc:
if isinstance(value, dict):
items = Item._dict_to_items(value, path)
for item in items:
self._write_item(item, doc)
elif isinstance(value, list):
items = Item.create_list(path, value)
for item in items:
self._write_item(item, doc)
else:
self._add_item(path, value, doc)

def get(self, path: str) -> Union[Any, None]:
path = path.encode()
items = []

for item in self._read_items():
item_path = item.get_path()

if item_path == path:
return item.get_value()
elif item_path.startswith(b''.join((b'@list:', path))):
path_without_prefix = item_path[6:]
sub_path = path_without_prefix.replace(path, b'')
item_value = self._get_list(path_without_prefix.decode(), item.get_value())

if item_path == b''.join((b'@list:', path)):
return item_value

items.append((sub_path.decode(), item_value))
elif item_path.startswith(path):
sub_path = item_path.replace(path, b'')
items.append((sub_path.decode(), item.get_value()))

if items:
result = self._to_dict_tree(items)
return result

def delete(self, path: str) -> None:
path = path.encode()

with open(self._document_path + '.temp', 'wb') as _temp_doc:
for line_len, line in self._read_doc():
decrypted_item = self._crypt.decrypt(line)
item = Item(decrypted_item)
item_path = item.get_path()

if item_path != path and not item_path.startswith(path):
lpath = b''.join((b'@list:', path))
lepath = b''.join((b'#', path))

if not item_path.startswith(lpath) and not item_path.startswith(lepath):
_temp_doc.write(line_len)
_temp_doc.write(line)

return document
os.remove(self._document_path)
os.rename(self._document_path + '.temp', self._document_path)

def update_document(self, items: dict) -> None:
document = self.get_document()
document['items'] = items
encrypted_json = self._encrypt(document)
self._save_file(encrypted_json, self._document_path)
def update(self, path: str, value: Any) -> None:
self.delete(path)
self.add(path, value)
44 changes: 21 additions & 23 deletions cookiedb/_encrypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import struct
from io import BytesIO
from hashlib import sha256
from secrets import token_bytes

Expand Down Expand Up @@ -54,8 +56,8 @@ def encrypt(self, data: bytes) -> bytes:
"""Encrypt a data in bytes.
The result will be a string in bytes containing
the length of the encrypted data, initialization
vector (IV), encrypted data and a MAC hash.
the initialization vector (IV), encrypted data
and a MAC hash.
:param data: Any data in bytes
:type data: bytes
Expand All @@ -69,27 +71,35 @@ def encrypt(self, data: bytes) -> bytes:
cipher = AES.new(self._encryption_key, AES.MODE_CBC, iv=random_iv)
encrypted_data = cipher.encrypt(padding_data)

result = b''.join((len(data).to_bytes(4, 'big'), random_iv, encrypted_data))
return b''.join((result, self._get_hmac(result)))
enc_data_mac = self._get_hmac(encrypted_data)
enc_data_len = len(encrypted_data)
enc_data_mac_len = len(enc_data_mac)

pack_values = (enc_data_len, enc_data_mac_len,
random_iv, encrypted_data, enc_data_mac)
result = struct.pack(f'<HH 16s {enc_data_len}s {enc_data_mac_len}s', *pack_values)

return result

def decrypt(self, token: bytes) -> bytes:
"""Decrypt a token in bytes.
:param token: Encrypted token
:type token: bytes
:raises Exception: If token is invalid
:raises Exception: If token has a invalid signature
:raises InvalidTokenError: If token is invalid
:raises InvalidSignatureError: If token has a invalid signature
:return: Decrypted data
:rtype: bytes
"""

random_iv = token[4:20]
mac = token[-32:]
encrypted_data = token[20:-32]
token_buf = BytesIO(token)
enc_len, mac_len = struct.unpack('<HH', token_buf.read(4))
flen = (enc_len + mac_len) + 16
iv, encrypted_data, mac = struct.unpack(f'<16s {enc_len}s {mac_len}s', token_buf.read(flen))

cipher = AES.new(self._encryption_key, AES.MODE_CBC, iv=random_iv)
cipher = AES.new(self._encryption_key, AES.MODE_CBC, iv=iv)

if self._valid_hmac(mac, token[:-32]):
if self._valid_hmac(mac, encrypted_data):
try:
decrypted_data = cipher.decrypt(encrypted_data)
unpad_data = Padding.unpad(decrypted_data, AES.block_size)
Expand All @@ -98,15 +108,3 @@ def decrypt(self, token: bytes) -> bytes:
return unpad_data
else:
raise exceptions.InvalidSignatureError('Token signature don\'t match')

def get_data_size(self, token: bytes) -> int:
"""Return the encrypted data size.
:param token: Encrypted token
:type token: bytes
:return: Data size in bytes
:rtype: int
"""

size = int.from_bytes(token[:4], 'big')
return size

0 comments on commit 70e4124

Please sign in to comment.