Skip to content

Commit

Permalink
Experiment - Add support for switching cipher methods through PNConfi…
Browse files Browse the repository at this point in the history
…guration
  • Loading branch information
seba-aln committed Apr 26, 2023
1 parent 809bfb0 commit e873982
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 5 deletions.
22 changes: 17 additions & 5 deletions pubnub/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,19 @@


class PubNubCryptodome(PubNubCrypto):
mode = AES.MODE_CBC
fallback_mode = None

def __init__(self, pubnub_config):
self.pubnub_configuration = pubnub_config
self.mode = pubnub_config.cipher_mode
self.fallback_mode = pubnub_config.fallback_cipher_mode

def encrypt(self, key, msg, use_random_iv=False):
secret = self.get_secret(key)
initialization_vector = self.get_initialization_vector(use_random_iv)

cipher = AES.new(bytes(secret[0:32], 'utf-8'), AES.MODE_CBC, bytes(initialization_vector, 'utf-8'))
cipher = AES.new(bytes(secret[0:32], 'utf-8'), self.mode, bytes(initialization_vector, 'utf-8'))
encrypted_message = cipher.encrypt(self.pad(msg.encode('utf-8')))
msg_with_iv = self.append_random_iv(encrypted_message, use_random_iv, bytes(initialization_vector, "utf-8"))

Expand All @@ -30,8 +35,15 @@ def decrypt(self, key, msg, use_random_iv=False):

decoded_message = decodebytes(msg.encode("utf-8"))
initialization_vector, extracted_message = self.extract_random_iv(decoded_message, use_random_iv)
cipher = AES.new(bytes(secret[0:32], "utf-8"), AES.MODE_CBC, initialization_vector)
plain = self.depad((cipher.decrypt(extracted_message)).decode('utf-8'))
cipher = AES.new(bytes(secret[0:32], "utf-8"), self.mode, initialization_vector)
try:
plain = self.depad((cipher.decrypt(extracted_message)).decode('utf-8'))
except UnicodeDecodeError as e:
if not self.fallback_mode:
raise e

cipher = AES.new(bytes(secret[0:32], "utf-8"), self.fallback_mode, initialization_vector)
plain = self.depad((cipher.decrypt(extracted_message)).decode('utf-8'))

try:
return json.loads(plain)
Expand Down Expand Up @@ -71,7 +83,7 @@ class PubNubFileCrypto(PubNubCryptodome):
def encrypt(self, key, file):
secret = self.get_secret(key)
initialization_vector = self.get_initialization_vector(use_random_iv=True)
cipher = AES.new(bytes(secret[0:32], "utf-8"), AES.MODE_CBC, bytes(initialization_vector, 'utf-8'))
cipher = AES.new(bytes(secret[0:32], "utf-8"), self.mode, bytes(initialization_vector, 'utf-8'))
initialization_vector = bytes(initialization_vector, 'utf-8')

return self.append_random_iv(
Expand All @@ -83,6 +95,6 @@ def encrypt(self, key, file):
def decrypt(self, key, file):
secret = self.get_secret(key)
initialization_vector, extracted_file = self.extract_random_iv(file, use_random_iv=True)
cipher = AES.new(bytes(secret[0:32], "utf-8"), AES.MODE_CBC, initialization_vector)
cipher = AES.new(bytes(secret[0:32], "utf-8"), self.mode, initialization_vector)

return unpad(cipher.decrypt(extracted_file), 16)
23 changes: 23 additions & 0 deletions pubnub/pnconfiguration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .enums import PNHeartbeatNotificationOptions, PNReconnectionPolicy
from Cryptodome.Cipher import AES


class PNConfiguration(object):
Expand All @@ -17,6 +18,8 @@ def __init__(self):
self.publish_key = None
self.secret_key = None
self.cipher_key = None
self._cipher_mode = AES.MODE_GCM
self._fallback_cipher_mode = AES.MODE_CBC
self.auth_key = None
self.filter_expression = None
self.enable_subscribe = True
Expand Down Expand Up @@ -61,6 +64,26 @@ def set_presence_timeout_with_custom_interval(self, timeout, interval):
def set_presence_timeout(self, timeout):
self.set_presence_timeout_with_custom_interval(timeout, (timeout / 2) - 1)

@property
def cipher_mode(self):
return self._cipher_mode

@cipher_mode.setter
def cipher_mode(self, cipher_mode):
if cipher_mode is not self._cipher_mode:
self._cipher_mode = cipher_mode
self.crypto_instance = None

@property
def fallback_cipher_mode(self):
return self._fallback_cipher_mode

@fallback_cipher_mode.setter
def fallback_cipher_mode(self, fallback_cipher_mode):
if fallback_cipher_mode is not self._fallback_cipher_mode:
self._fallback_cipher_mode = fallback_cipher_mode
self.crypto_instance = None

@property
def crypto(self):
if self.crypto_instance is None:
Expand Down

0 comments on commit e873982

Please sign in to comment.