forked from saurabh0719/elara
-
Notifications
You must be signed in to change notification settings - Fork 0
/
elarautil.py
149 lines (134 loc) · 5.27 KB
/
elarautil.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
"""
Copyright (c) 2021, Saurabh Pujari
All rights reserved.
This source code is licensed under the BSD-style license found in the LICENSE file in the root directory of this source tree.
"""
from typing import Dict
import msgpack
import os
from zlib import crc32
from cryptography.fernet import Fernet
from .exceptions import FileAccessError, FileKeyError, LoadChecksumError, LoadIncompatibleDB
class Util:
@staticmethod
def check_mag(mag):
return mag == b"ELDB"
@staticmethod
def check_encrypted(version):
# if msb of version number is set the db is encrypted
return (version & (1 << 15)) != 0
@staticmethod
def read_plain_db(obj) -> Dict:
with open(obj.path, "rb") as fctx:
if not Util.check_mag(fctx.read(4)):
raise FileAccessError("File magic number not known")
version = int.from_bytes(fctx.read(2), "little", signed=False)
# check for encryption before trying anything
if Util.check_encrypted(version):
raise FileAccessError("This file is encrypted, run in secure mode")
checksum = int.from_bytes(fctx.read(4), "little", signed=False)
data = fctx.read()
calculated_checksum = crc32(data)
if calculated_checksum != checksum:
raise LoadChecksumError(
f"calculated checksum: {calculated_checksum} is different from stored checksum {checksum}")
elif version != obj.db_format_version:
raise LoadIncompatibleDB(f"db format version {version} is incompatible with {obj.db_format_version}")
try:
curr_db = msgpack.unpackb(data)
except FileNotFoundError:
raise FileAccessError(
"File not found"
)
return curr_db
@staticmethod
def store_plain_db(obj):
with open(obj.path, "wb") as fctx:
try:
data = msgpack.packb(obj.db)
buffer = b"ELDB"
buffer += obj.db_format_version.to_bytes(2, "little")
buffer += (crc32(data)).to_bytes(4, "little")
buffer += data
fctx.write(buffer)
except FileExistsError:
raise FileAccessError(
"File already exists"
)
@staticmethod
def read_and_decrypt(obj):
if obj.key:
fernet = Fernet(obj.key)
try:
with open(obj.path, "rb") as fctx:
if not Util.check_mag(fctx.read(4)):
raise FileAccessError("File magic number not known")
version = int.from_bytes(fctx.read(2), "little")
if not Util.check_encrypted(version):
raise FileAccessError("File is marked not encrypted, you might have a corrupt db")
checksum = int.from_bytes(fctx.read(4), "little")
encrypted_data = fctx.read()
calculated_checksum = crc32(encrypted_data) & 0xFFFFFFFF
if calculated_checksum != checksum:
raise LoadChecksumError(
f"calculated checksum: {calculated_checksum} is different from stored checksum {checksum}")
except FileNotFoundError:
raise FileAccessError("File open & read error")
decrypted_data = fernet.decrypt(encrypted_data)
return msgpack.unpackb(decrypted_data)
else:
return None
@staticmethod
def encrypt_and_store(obj):
if obj.key:
fernet = Fernet(obj.key)
db_snapshot = msgpack.packb(obj.db)
buffer = b"ELDB"
# set version msb
buffer += (obj.db_format_version | 1 << 15).to_bytes(2, "little")
encrypted_data = fernet.encrypt(db_snapshot)
buffer += crc32(encrypted_data).to_bytes(4, "little")
buffer += encrypted_data
try:
with open(obj.path, "wb") as file:
file.write(buffer)
return True
except FileExistsError:
raise FileAccessError("File exists")
else:
return False
@staticmethod
def keygen(path):
try:
key = Fernet.generate_key()
except Exception:
raise FileKeyError("Key generation error")
try:
with open(path, "wb") as file:
file.write(key)
file.close()
return True
except Exception:
raise FileAccessError("File open & write error")
@staticmethod
def loadkey(path):
key_path = os.path.expanduser(path)
# print(key_path)
if os.path.exists(key_path):
if os.stat(key_path).st_size == 0:
Util.keygen(key_path)
# else: key exists in file; use that
else:
# create file and store keygen
Util.keygen(key_path)
@staticmethod
def readkey(path):
key_path = os.path.expanduser(path)
if os.path.exists(key_path):
file = open(key_path, "rb")
key = file.read()
file.close()
return key
else:
key = None
return key