# Implementing the Signal Protocol

The Signal Protocol runs in two steps: <br>
The Extended Triple Diffie Helman, which is used for the intiial key exchange to generate the "root" key <br>
The Double Rachet Algorithm


In [1]:
# First, we need some imports

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import dh
from cryptography.hazmat.primitives.kdf.hkdf import HKDF # HKDF is a key derivation function
import hashlib
import json

# Now, we need to generate the parameters for the Diffie-Hellman key exchange
parameters = dh.generate_parameters(generator=2, key_size=512, backend=default_backend())

Server = {} # Server's data that is stored 

In [2]:
# Define a User class for a client 
class User: 
    def __init__(self, name): 
        self.name = name
        self.identity = parameters.generate_private_key()
        self.identity_public_key = self.identity.public_key()
        self.Ephemeral = parameters.generate_private_key()
        self.Ephemeral_public_key = self.Ephemeral.public_key()

        # Need a way to generate this
        self.Signed_prekey = parameters.generate_private_key()
        self.Signed_prekey_public_key = self.Signed_prekey.public_key()
        self.One_time_prekey = parameters.generate_private_key()
        self.One_time_prekey_public_key = self.One_time_prekey.public_key()
    
        # Need a way to generate this 
        self.Signature = parameters.generate_private_key()
        self.Signature_public_key = self.Signature.public_key()


# This class stores the "pre-key bundle" that is sent to the server
class server_user: 
    def __init__(self, name, identity, signed_prekey, onetime_prekey, signature):
        self.name = name
        self.identity_public_key = identity
        self.Signed_prekey_public_key = signed_prekey
        self.One_time_prekey_public_key = onetime_prekey
        self.Signature_public_key = signature

# This class stores the "initial message" that is sent to the reciever of the message by the sender
class initial_message: 
    def  __init__(self, name, identity, ephemeral):
        self.name = name
        self.identity_public_key = identity
        self.Ephemeral_public_key = ephemeral 

In [3]:
def pushToServer(user):
    Server[user.name] = server_user(user.name, user.identity_public_key, user.Signed_prekey_public_key, user.One_time_prekey_public_key, user.Signature_public_key)

def initialMessage(user): 
    return initial_message(user.name, user.identity_public_key, user.Ephemeral_public_key) # AEAD is a symmetric encryption algorithm

Alice = User("Alice")
Bob = User("Bob")

pushToServer(Alice)
pushToServer(Bob)

# Bob_Server = server_user("Bob", Bob.identity_public_key, Bob.Signed_prekey_public_key, Bob.One_time_prekey_public_key, Bob.Signature_public_key)
# Alice_Server = server_user("Alice", Alice.identity_public_key, Alice.Signed_prekey_public_key, Alice.One_time_prekey_public_key, Alice.Signature_public_key)

In [4]:
def getDerivedKey1(User1, User2): 
    DH1 = User1.identity.exchange(User2.Signed_prekey_public_key)
    DH2 = User1.Ephemeral.exchange(User2.identity_public_key)
    DH3 = User1.Ephemeral.exchange(User2.Signed_prekey_public_key)
    DH4 = User1.Ephemeral.exchange(User2.One_time_prekey_public_key)
    derived_key = HKDF(algorithm=hashes.SHA256(), length=32, salt=None, info=b'handshake data',).derive(DH1 + DH2 + DH3 + DH4)
    return derived_key


def getDerivedKey2(User1, User2): 
    DH1 = User2.Signed_prekey.exchange(User1.identity_public_key)
    DH2 = User2.identity.exchange(User1.Ephemeral_public_key)
    DH3 = User2.Signed_prekey.exchange(User1.Ephemeral_public_key)
    DH4 = User2.One_time_prekey.exchange(User1.Ephemeral_public_key)
    derived_key = HKDF(algorithm=hashes.SHA256(), length=32, salt=None, info=b'handshake data',).derive(DH1 + DH2 + DH3 + DH4)
    return derived_key
# Generate a derived key for Alice, using the information Bob put in the server
sk = getDerivedKey1(Alice, Server[Bob.name])

# Generate a derived key for Bob, using  initial message information Alice sent to Bob
sk2 = getDerivedKey2(initialMessage(Alice), Bob)

if sk == sk2:
    print("Same Key Generated:", sk.hex())


Same Key Generated: bb5ba08ec553a581c16768bcc015031026e81d9f47baf0d6edcb0ea79968808b


# Symmetric Ratchet


In [5]:
# import AES


# functions for padding and base64 encoding

from Crypto.Cipher import AES
import base64

def pad(msg):
    # pkcs7 padding
    num = 16 - (len(msg) % 16)
    return msg + bytes([num] * num)

def unpad(msg):
    # remove pkcs7 padding
    return msg[:-msg[-1]]

def b64(msg):
    # base64 encoding
    return base64.b64encode(msg)


In [6]:
class symmetricRachet(object):
    def __init__(self, key):
        self.state = key #KDF key 

    def next(self, inp=b''): # the inp is the diffie-hellamn ratchet 
        # turn the ratchet, changing the state and yielding a new key and IV
        output = HKDF(algorithm=hashes.SHA256(), length=80, salt=b'',info=b'', backend= default_backend() 
                         ).derive(self.state + inp)  # takes in the state and the input, which is the kdf key and the diffie-hellman key
        self.state = output[:32] #KDF key 
        outkey, iv = output[32:64], output[64:]
        return outkey, iv


In [7]:
class rachet_user_send: 
    def __init__(self, sk):
        self.sk = sk
        self.DHratchet = None
        self.SorR = False 
        self.currkey = None
        self.curriv = None

    def init_ratchets(self):
        # initialise the root chain with the shared key
        self.root_ratchet = symmetricRachet(self.sk)
        # initialise the sending and recving chains
        self.recv_ratchet = symmetricRachet(self.root_ratchet.next()[0])
        self.send_ratchet = symmetricRachet(self.root_ratchet.next()[0])

        # Write print statements for what's going in here: 
        print('[Alice]\tRoot ratchet seed:', b64(self.sk))
        print('[Alice]\tRecv ratchet seed:', b64(self.recv_ratchet.state))
        print('[Alice]\tSend ratchet seed:', b64(self.send_ratchet.state))

        
        print("Alice's ratchets have been initialized")


    def dh_ratchet(self, bob_public):
    # perform a DH ratchet rotation using Bob's public key
        if self.DHratchet is not None:
            print("[Alice]\t Using the DH random value to update the root ratchet")
            # the first time we don't have a DH ratchet yet
            dh_recv = self.DHratchet.exchange(bob_public)
            shared_recv = self.root_ratchet.next(dh_recv)[0]
            # use Bob's public and our old private key
            # to get a new recv ratchet
            self.recv_ratchet = symmetricRachet(shared_recv)
            print('[Alice]\tRecv ratchet seed (after DH):', b64(shared_recv))
        # generate a new key pair and send ratchet
        # our new public key will be sent with the next message to Bob
        
        print("[Alice]\tGenerating new DH key pair")
        self.DHratchet = parameters.generate_private_key() # has a private and public key
        dh_send = self.DHratchet.exchange(bob_public)

        print("Performed Diffie Hellman: ", b64(dh_send))
        # use DH to reset the root rachet
        shared_send = self.root_ratchet.next(dh_send)[0] # Using the DH ratchet to generate a new send ratchet
        # updating root ratchet
        # print("[Alice]\tUpdating the root ratchet: ", b64(self.root_ratchet.state))

        self.send_ratchet = symmetricRachet(shared_send)
        print('[Alice]\tSend ratchet seed (after Diffie Hellman reset):', b64(shared_send))


    def send(self, bob, msg):
        if self.SorR == False: # if it is in the same chain: 
            key, iv = self.send_ratchet.next() # gets you the next key and iv 
            self.currkey = key
            self.curriv = iv
        else: 
            key, iv = self.currkey, self.curriv
        
        cipher = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(msg)) # use the next key and iv to send 
        print('[Alice]\tSending ciphertext to Bob:', b64(cipher))
        print('[Alice]\t key used: ', key)
        # send ciphertext and current DH public key
        self.SorR = True
        bob.recv(cipher, self.DHratchet.public_key())
        print("")

    def recv(self, cipher, bob_public_key):
        print("")
        if self.SorR == True: # if last thing was a send
            key, iv = self.recv_ratchet.next()
            self.currkey = key
            self.curriv = iv
        else:
            key, iv = self.currkey, self.curriv
        # receive Bob's new public key and use it to perform a DH
        self.dh_ratchet(bob_public_key)
        key, iv = self.recv_ratchet.next()
        print('[Alice]\t key used: ', key)
        self.SorR = False
        # decrypt the message using the new recv ratchet
        msg = unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(cipher))
        print('[Alice]\tDecrypted message:', msg)
        print("")

    
class rachet_user_recv: 
    def __init__(self, sk):
        self.sk = sk
        self.DHratchet = parameters.generate_private_key()
        self.repeat = 0
        self.SorR = False 
        self.currkey = None
        self.curriv = None


    def init_ratchets(self):
        # initialise the root chain with the shared key
        self.root_ratchet = symmetricRachet(self.sk)
        # initialise the sending and recving chains
        self.send_ratchet = symmetricRachet(self.root_ratchet.next()[0])
        self.recv_ratchet = symmetricRachet(self.root_ratchet.next()[0])

        # write print statements for what's going in here:
        print('[Bob]\tRoot ratchet seed:', b64(self.sk))
        print('[Bob]\tRecv ratchet seed:', b64(self.recv_ratchet.state))
        print('[Bob]\tSend ratchet seed:', b64(self.send_ratchet.state))
        print("Bob's ratchets have been initialized")

    
    def dh_ratchet(self, alice_public): 
        dh_recv = self.DHratchet.exchange(alice_public)

        # setting the new recv ratchet
        shared_recv = self.root_ratchet.next(dh_recv)[0]
        self.recv_ratchet = symmetricRachet(shared_recv)
        print('[Bob]\tRecv ratchet seed:', b64(shared_recv))
        # generate a new key pair and send ratchet
        # our new public key will be sent with the next message to Alice

        # generating a new send ratchet based with diffie helman
        self.DHratchet = parameters.generate_private_key()
        dh_send = self.DHratchet.exchange(alice_public)

        # setting the new send ratchet with the DH input 
        shared_send = self.root_ratchet.next(dh_send)[0]

        # updating the root ratchet

        self.send_ratchet = symmetricRachet(shared_send)

        print('[Bob]\tSend ratchet seed (After Diffie Hellman reset):', b64(shared_send))

    def send(self, alice, msg):
        key, iv = self.send_ratchet.next()
        cipher = AES.new(key, AES.MODE_CBC, iv).encrypt(pad(msg))
        print('[Bob]\tSending ciphertext to Alice:', b64(cipher))
        print('[Bob]\t key used: ', key)

        # send ciphertext and current DH public key
        alice.recv(cipher, self.DHratchet.public_key())

    def recv(self, cipher, alice_public_key):
        # receive Alice's new public key and use it to perform a DH
        self.dh_ratchet(alice_public_key) # get the new dh_ratchet based ont he alice_public_key 
        key, iv = self.recv_ratchet.next()
        print("[Bob]\t key used: ", key)

        # decrypt the message using the new recv ratchet
        msg = unpad(AES.new(key, AES.MODE_CBC, iv).decrypt(cipher))

        print('[Bob]\tDecrypted message:', msg)



In [8]:
import base64

def b64(msg):
    # base64 encoding helper function
    return base64.encodebytes(msg).decode('utf-8').strip()

bob = rachet_user_recv(sk)
alice = rachet_user_send(sk)

alice.init_ratchets()
bob.init_ratchets()


[Alice]	Root ratchet seed: u1ugjsVTpYHBZ2i8wBUDECboHZ9HuvDW7csOp5logIs=
[Alice]	Recv ratchet seed: +KHfwrFHT8EK4fBOawKjAErH1/YogMkwMsy9qVzxons=
[Alice]	Send ratchet seed: /w7pjj+av2iIZpq9e4b8xR06SZYxFbHmSLeVRnozfjc=
Alice's ratchets have been initialized
[Bob]	Root ratchet seed: u1ugjsVTpYHBZ2i8wBUDECboHZ9HuvDW7csOp5logIs=
[Bob]	Recv ratchet seed: /w7pjj+av2iIZpq9e4b8xR06SZYxFbHmSLeVRnozfjc=
[Bob]	Send ratchet seed: +KHfwrFHT8EK4fBOawKjAErH1/YogMkwMsy9qVzxons=
Bob's ratchets have been initialized


In [9]:
print('User 1: send ratchet:', list(map(b64, alice.send_ratchet.next())))
print('User 2: recv ratchet:', list(map(b64, bob.recv_ratchet.next())))
print('User 1: recv ratchet:', list(map(b64, alice.recv_ratchet.next())))
print('User 2: send ratchet:', list(map(b64, bob.send_ratchet.next())))

User 1: send ratchet: ['dNrDyGwzbW7KYZlw0ntxkO5muIGQbS8Ik0Hw8g2JBcA=', 'aMEKqhlU1Fp+ItT43UtmwA==']
User 2: recv ratchet: ['dNrDyGwzbW7KYZlw0ntxkO5muIGQbS8Ik0Hw8g2JBcA=', 'aMEKqhlU1Fp+ItT43UtmwA==']
User 1: recv ratchet: ['HRldb6tUxUIqZFLnXTSqByywj3AC+SwB845OtWZgyO4=', 'ErpMTgOaLguzZV5sWo5O/Q==']
User 2: send ratchet: ['HRldb6tUxUIqZFLnXTSqByywj3AC+SwB845OtWZgyO4=', 'ErpMTgOaLguzZV5sWo5O/Q==']


In [10]:
# import AES
from Crypto.Cipher import AES

def pad(msg):
    # pkcs7 padding
    num = 16 - (len(msg) % 16)
    return msg + bytes([num] * num)

def unpad(msg):
    # remove pkcs7 padding
    return msg[:-msg[-1]]

# Diffie Hellman Ratchet


In [11]:
Al = rachet_user_send(sk)
Bb = rachet_user_recv(sk)

Al.init_ratchets()
Bb.init_ratchets()

Al.dh_ratchet(Bb.DHratchet.public_key())

[Alice]	Root ratchet seed: u1ugjsVTpYHBZ2i8wBUDECboHZ9HuvDW7csOp5logIs=
[Alice]	Recv ratchet seed: +KHfwrFHT8EK4fBOawKjAErH1/YogMkwMsy9qVzxons=
[Alice]	Send ratchet seed: /w7pjj+av2iIZpq9e4b8xR06SZYxFbHmSLeVRnozfjc=
Alice's ratchets have been initialized
[Bob]	Root ratchet seed: u1ugjsVTpYHBZ2i8wBUDECboHZ9HuvDW7csOp5logIs=
[Bob]	Recv ratchet seed: /w7pjj+av2iIZpq9e4b8xR06SZYxFbHmSLeVRnozfjc=
[Bob]	Send ratchet seed: +KHfwrFHT8EK4fBOawKjAErH1/YogMkwMsy9qVzxons=
Bob's ratchets have been initialized
[Alice]	Generating new DH key pair
Performed Diffie Hellman:  271fWus+XemJOxzw1NKj97yBZTgqrSCHzMZCXm8PuT+N6I99PyKhpAydEoZgT1QvM/mu5FLnTD7w
/13vj9GT7Q==
[Alice]	Send ratchet seed (after Diffie Hellman reset): Ju84Nkf0CMa+8IhTKDOWyJgl6L1+XxRvwsHoxtVOnA0=


In [14]:
Al.send(Bb, b'hi')
Bb.send(Al, b'hi again back')
Al.send(Bb, b'Bye')

[Alice]	Sending ciphertext to Bob: /+7vYXVETNbjMW0DD014Yg==
[Alice]	 key used:  b"\xb4s\x92E\xe6\x8d\x143\xb95m\x1fe#\xe9\xe1\x9ac\xf4$\xdc\xa2\xc5'wU!\x80\x1c}~\x07"
[Bob]	Recv ratchet seed: Iji9lUcB9RvrEKgX+Dm7JVWlyLgIw+yI0wFcHuc2Bjw=
[Bob]	Send ratchet seed (After Diffie Hellman reset): bV44hALXyfkU+vzlRSgkr4rGBz6NS3jLcYWsdD2WCLw=
[Bob]	 key used:  b"\xb4s\x92E\xe6\x8d\x143\xb95m\x1fe#\xe9\xe1\x9ac\xf4$\xdc\xa2\xc5'wU!\x80\x1c}~\x07"
[Bob]	Decrypted message: b'hi'

[Bob]	Sending ciphertext to Alice: 09dTHLY2uH1d0fLVMcC1RQ==
[Bob]	 key used:  b'W\xc05z\xcd\xdcE\xb8(\xcau\xecE\x87Tu\xa6\x9d\x8bM`\x1c\x8e2\x96\x85\xb4\x075\x83_\x92'

[Alice]	 Using the DH random value to update the root ratchet
[Alice]	Recv ratchet seed (after DH): bV44hALXyfkU+vzlRSgkr4rGBz6NS3jLcYWsdD2WCLw=
[Alice]	Generating new DH key pair
Performed Diffie Hellman:  nQvHO9L70FYXajbY+webOluHSnuc1ZrQ5iZ4NfFPWhJM1mJBJj9N4fYBwfNBv7i6m0adQTMORfe+
9qW9qdNJpA==
[Alice]	Send ratchet seed (after Diffie Hellman reset): xbmkI

In [13]:
Alice = User("Alice")
Bob = User("Bob")

pushToServer(Alice)
pushToServer(Bob)