<a href="https://colab.research.google.com/github/vadhri/ai-notebook/blob/main/mpc/oblivious_transer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This notebook demonstrates Oblivious Transfer protocols for the 4 cases below.
*   Basic Oblivious Transfer: Simple sender/receiver model.
*   Public Key Oblivious Transfer: Using RSA keys for security.
*   Basic Parallel Oblivious Transfer: Extending to multiple transfers.
*   OT Extension (Semi-Honest Receiver): An efficient protocol for large-scale transfers.

In [None]:
!pip install -q pycryptodome

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 MB[0m [31m10.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
import random
import numpy as np

Data transfer between 2 parties who do share data but each others computations remain oblivious to each other.

In [None]:
class ObliviousTransfer:
  def __init__(self, party1, party2):
    self.party1 = party1
    self.party2 = party2

    self.data = None

  def send(self, data):
    self.data = data
    print(f"{self.party1} sent {data} to {self.party2}")

  def receive(self, b):
    if self.data:
      return self.data[b]
    else:
      return None

Party 0 or Alice is the sender that sends to messages $m_0$ and $m_1$ to the receiver.

In [None]:
xfer = ObliviousTransfer("Alice", "Bob")
xfer.send(["message 0", "message 1"])

Alice sent ['message 0', 'message 1'] to Bob


Party 1 or Bob is the receiver who chooses to receive inputs representing $b$

In [None]:
print('Bob received : ', xfer.receive(0))

Bob received :  message 0


Alice does not know bob's choice and bob does not know all the choices avilable.

#### Oblivious Transfer using public keys

In [None]:
class PublicObliviousTransfer:
  def __init__(self, party1, party2):
    self.party1 = party1
    self.party2 = party2

    self.data = None

  def register_pk(self, keys):
    self.keys = keys
    print (f"{self.party1} registered public keys")

  def send(self, data):
    self.data = data
    print(f"{self.party1} sent {data} to {self.party2}")

  def receive(self):
    output = []

    for iter in range(len(self.data)):
      cipher_rsa = PKCS1_OAEP.new(self.keys[iter])
      output.append(cipher_rsa.encrypt(self.data[iter]))

    return output

In [None]:
xfer = PublicObliviousTransfer("Alice", "Bob")
xfer.send([b"message 0", b"message 1"])

Alice sent [b'message 0', b'message 1'] to Bob


Alice has one key pair and one public key only.

In [None]:
length = 1024

key = RSA.generate(length)

priv_pem = key.export_key()
pub_pem  = key.publickey().export_key()

fake_key = RSA.generate(length)
fake_pub_pem = fake_key.publickey().export_key()

b = random.choices([0,1], k=1)[0]

if b == 0:
  keys = [RSA.import_key(pub_pem), RSA.import_key(fake_pub_pem)]
else:
  keys = [RSA.import_key(fake_pub_pem), RSA.import_key(pub_pem)]

xfer.register_pk(keys)

Alice registered public keys


In [None]:
messages = xfer.receive()

cipher = PKCS1_OAEP.new(RSA.import_key(priv_pem))
decrypted = cipher.decrypt(messages[b])

print(decrypted)

b'message 0'


### Parallel Oblivious Transer

Alice sends pairs of messages m1, m2 .. mn and bob choose one from each pair by sending b1,b2..bn

In [None]:
class BasicParallelObliviousTransfer:
  def __init__(self, party1, party2):
    self.party1 = party1
    self.party2 = party2

    self.data = None

  def send(self, data):
    self.data = data
    print(f"{self.party1} sent {data} to {self.party2}")

  def receive(self, arr_b):
    output = []

    for iter in range(len(arr_b)):
      output.append(self.data[iter][arr_b[iter]])

    return output

In [None]:
xfer = BasicParallelObliviousTransfer("Alice", "Bob")
xfer.send([["00", "10"], ["01", "11"], ["02", "12"], ["03", "13"]])

Alice sent [['00', '10'], ['01', '11'], ['02', '12'], ['03', '13']] to Bob


In [None]:
print('Bob received : ', xfer.receive([0,1,0,1]))

Bob received :  ['00', '11', '02', '13']


### OT Extension (Semi-Honest Receiver)

The algorithm implemented below is from the paper and section is from the paper below>

Section 3 : Extending OT with a Semi-Honest Receiver

https://www.iacr.org/archive/crypto2003/27290145/27290145.pdf


In [None]:
import hashlib

# Random oracle: H(j, bitvec) → l-bit numpy array.
# output_length is in bytes.
def H(j, bitvec, out_len_bits=16):
    # Convert bits to bytes for hashing
    bits_as_bytes = bytes([j]) + bytes(bitvec.tolist())
    digest = hashlib.sha256(bits_as_bytes).digest()

    # Take first 'out_len_bits' bits from the digest
    bits = np.unpackbits(np.frombuffer(digest, dtype=np.uint8))
    return bits[:out_len_bits].astype(np.uint8)

k = 8
l = 16
qj = np.random.randint(0, 2, size=k, dtype=np.uint8)

print(H(0, qj, out_len_bits=l))


[0 0 0 0 0 1 0 1 1 1 1 1 1 0 0 1]


#### Extending Parallel OT

In [None]:
k = 8
l = 16
m = 32

In [None]:
class BasicParallelObliviousTransfer:
  def __init__(self, party1, party2):
    self.party1 = party1
    self.party2 = party2

    self.data = None

  def send(self, data):
    self.data = data
    print(f"{self.party1} sent {data.shape} to {self.party2}")

  def receive(self, arr_b):
    output = []

    for iter in range(len(arr_b)):
      output.append(self.data[iter][arr_b[iter]])

    return np.array(output)

In [None]:
# P0 samples an m-pairs of l-bit strings
p0_sender_xfer = np.random.randint(0, 2, size=(m, 2, l), dtype=np.uint8)

# P1 samples an m selection bits for the pairs of p0
p1_receiver_selection_bits_m = np.array(random.choices([0,1], k=m))

# start a reverse transfer.

# S initializes a random vector s ∈ {0, 1}k
S = np.array(random.choices([0,1], k=k))

#R a random m × k bit matrix T
T = np.random.randint(0, 2, size=(m, k), dtype=np.uint8)

p1_receiver_reversed_selection_bits_T = np.array(list(zip(T.T, ((T.T + p1_receiver_selection_bits_m)%2))))
p1_receiver_reversed_selection_bits_T.shape

(8, 2, 32)

#### Reverse the roles of P0 and P1

In [None]:
# Reverse exchange where the receiver P1, gives P0 pairs [(w1, w1 modadd b1(), .. (wk, wk modadd bk)]
OT_init = BasicParallelObliviousTransfer("P1", "P0")
OT_init.send(p1_receiver_reversed_selection_bits_T)

# receive only the parts of the matrix as masked by S = (s1,s2,..sk)
P0_reversed_receiver = OT_init.receive(S)

print(f'P0 in receiver mode received {P0_reversed_receiver.shape}')

P1 sent (8, 2, 32) to P0
P0 in receiver mode received (8, 32)


In [None]:
# Transpose to keep the shape mxk
Q = P0_reversed_receiver.T
Q.shape

(32, 8)

In [None]:
y = np.zeros_like(p0_sender_xfer)  # shape (m, 2, l)

for j in range(m):
    qj = Q[j]                      # row j of Q (k bits)
    mask0 = H(j, qj)               # l-bit mask from H(j, qj)
    mask1 = H(j, (qj ^ S))         # l-bit mask from H(j, qj ⊕ S)

    # sender applies mask to both message halves
    y[j, 0] = p0_sender_xfer[j, 0] ^ mask0
    y[j, 1] = p0_sender_xfer[j, 1] ^ mask1

print("y shape:", y.shape)  # (m, 2, l)


y shape: (32, 2, 16)


In [None]:
OT_init = BasicParallelObliviousTransfer("P0", "P1")
OT_init.send(y)
P0_reversed_receiver = OT_init.receive(p1_receiver_selection_bits_m)
print(f'P0 in receiver mode received {P0_reversed_receiver.shape}')

P0 sent (32, 2, 16) to P1
P0 in receiver mode received (32, 16)


In [None]:
z = np.zeros((m, l), dtype=np.uint8)

for j in range(m):
    rj = int(p1_receiver_selection_bits_m[j])  # receiver's choice bit
    tj = T[j]                                 # row j of T (k bits)

    # Receiver recomputes its hash mask
    mask_recv = H(j, tj, out_len_bits=l)

    # Picks its corresponding encoded message and unmasks it
    z[j] = y[j, rj] ^ mask_recv

print("Receiver output shape:", z.shape)


Receiver output shape: (32, 16)


#### Check if parallel oblivious transfer happened without sender knowing choice bits

In [None]:
expected = p0_sender_xfer[np.arange(m), p1_receiver_selection_bits_m]  # shape (m, l)
np.array_equal(z, expected)

True