# <center> Provably Secure Homomorphic Steganography </center>

### Provably Secure Homomorphic Steganography (ProSt) Implementation

This PDF contains the result of the Python implementation of **Provably Secure Homomorphic Steganography (ProSt)**, an innovative approach that merges steganography with homomorphic computation. ProSt enables secure computations on data hidden within images, ensuring both the data's location and content remain secret from an honest-but-curious adversary while still allowing meaningful operations on the concealed information.

#### Key Features
- **Steganographic Embedding**: Embeds secret bits into cover images using least significant bit (LSB) substitution at a randomly selected secret position.
- **Homomorphic Computation**: Evaluates Boolean circuits on hidden data using reversible Fredkin gates, maintaining security and bit distribution during computation.
- **Circuit Obfuscation**: Hides the computation's purpose by renaming wires and adding dummy components to the circuit.
- **Randomized Gate Execution**: Executes gates in a randomized topological order to obscure the circuit structure.
- **Security**: Ensures an adversary gains negligible advantage in uncovering the secret, as supported by the accompanying research paper.

#### Workflow
1. **Circuit Specification**: Define a Boolean circuit with a simple syntax (e.g., inputs, gates like FREDKIN, and outputs).
2. **Obfuscation**: Rename wires and insert dummy gates/ancillaries to disguise the circuit.
3. **Secret Position Selection**: Choose a secret position in the image for embedding the bits (known only to the sender, Alice).
4. **Embedding**: Insert input bits and ancillary values into cover images at the secret position.
5. **Homomorphic Computation**: An untrusted server (Eve) processes the circuit pixel-wise across all images, applying gates in random order without knowing the secret position.
6. **Extraction**: Retrieve the computation result from the output image at the secret position.
7. **Dataset used**: https://github.com/NVlabs/ffhq-dataset

This results showcases ProSt as a tool for privacy-preserving computations in untrusted settings, such as cloud environments, by combining the stealth of steganography with the flexibility of homomorphic operations.

In [None]:
import numpy as np
from PIL import Image
import random
import os
import matplotlib.pyplot as plt
import re
import string
from collections import defaultdict, deque
import time

# --------------------------
# Gate Definitions (Cloud)
# --------------------------


def fredkin_gate(c, x, y):
  """Implements the Fredkin gate: swaps x and y if c=1."""
  if c == 0:
    return (c, x, y)
  else:
    return (c, y, x)


def apply_fredkin_pixelwise(input_images, output_images):
  """Apply Fredkin gate pixel-wise to three input images and set three output
    images."""
  I_c, I1, I2 = input_images
  O_c, O1, O2 = output_images
  h, w, c = I_c.shape
  for r in range(h):
    for col in range(w):
      for ch in range(c):
        c_bit = I_c[r, col, ch] & 1
        x_bit = I1[r, col, ch] & 1
        y_bit = I2[r, col, ch] & 1

        c_out, x_out, y_out = fredkin_gate(c_bit, x_bit, y_bit)

        O_c[r, col, ch] = (O_c[r, col, ch] & ~1) | c_out
        O1[r, col, ch] = (O1[r, col, ch] & ~1) | x_out
        O2[r, col, ch] = (O2[r, col, ch] & ~1) | y_out


# ---------------------------------
# Circuit Specification (Shared)
# ---------------------------------

GATE_TYPES = {"FREDKIN": 3}


class Circuit:
  def __init__(self):
    self.inputs = []
    self.ancillaries = []  # List of (wire, value)
    self.gates = []
    self.outputs = []

  def add_input(self, name):
    self.inputs.append(name)

  def add_ancillary(self, name, value):
    self.ancillaries.append((name, value))

  def add_gate(self, gate_type, inputs, outputs):
    self.gates.append(
      {"type": gate_type, "inputs": inputs, "outputs": outputs}
    )

  def add_output(self, name):
    self.outputs.append(name)


def parse_circuit(spec):
  """Parse circuit specification with multiple gate types and ancillary wires."""
  circuit = Circuit()
  for line in spec.splitlines():
    line = line.strip()
    if not line or line.startswith("#"):
      continue
    tokens = line.split()
    if tokens[0] == "INPUT":
      for name in "".join(tokens[1:]).split(","):
        circuit.add_input(name.strip())
    elif tokens[0] == "ANCILLARY":
      for part in "".join(tokens[1:]).split(","):
        wire, value = part.split("=")
        circuit.add_ancillary(wire.strip(), int(value.strip()))
    elif tokens[0] in GATE_TYPES:
      gate_type = tokens[0]
      expected_io = GATE_TYPES[gate_type]
      arrow_idx = tokens.index("->")
      inputs = [
        x.strip()
        for x in "".join(tokens[1:arrow_idx]).split(",")
      ]
      outputs = [
        x.strip()
        for x in "".join(tokens[arrow_idx + 1 :]).split(",")
      ]
      if len(inputs) != expected_io or len(outputs) != expected_io:
        raise ValueError(
          f"{gate_type} gate must have {expected_io} inputs and outputs"
        )
      circuit.add_gate(gate_type, inputs, outputs)
    elif tokens[0] == "OUTPUT":
      for name in "".join(tokens[1:]).split(","):
        circuit.add_output(name.strip())
  return circuit


# -----------------------------
# Randomized Gate Ordering
# -----------------------------


def build_dependency_graph(gates):
  """Build a dependency graph for gates based on input/output relationships."""
  graph = defaultdict(list)
  in_degree = defaultdict(int)
  wire_to_gates = defaultdict(list)

  # Map each output wire to the gate that produces it
  for i, gate in enumerate(gates):
    for output in gate["outputs"]:
      wire_to_gates[output].append(i)

  # Build graph: edge from producer to consumer
  for i, gate in enumerate(gates):
    for input_wire in gate["inputs"]:
      for producer_idx in wire_to_gates[input_wire]:
        graph[producer_idx].append(i)
        in_degree[i] += 1

  # Identify gates with no dependencies
  sources = [i for i in range(len(gates)) if in_degree[i] == 0]
  return graph, in_degree, sources


def randomized_topological_sort(gates):
  """Generate a randomized execution order for gates while respecting
    dependencies."""
  graph, in_degree, sources = build_dependency_graph(gates)
  order = []

  while sources:
    # Randomly select a gate from those ready to execute
    gate_idx = random.choice(sources)
    order.append(gate_idx)
    sources.remove(gate_idx)

    # Update dependencies for gates that depend on this one
    for dependent in graph[gate_idx]:
      in_degree[dependent] -= 1
      if in_degree[dependent] == 0:
        sources.append(dependent)

  if len(order) != len(gates):
    raise ValueError("Circuit contains a cycle or invalid dependencies")
  return order


# -------------------------
# Obfuscation Functions
# -------------------------


def get_all_wires(circuit_spec):
  """Collect all unique wire names from the circuit specification."""
  all_wires = set()
  for line in circuit_spec.splitlines():
    line = line.strip()
    if not line or line.startswith("#"):
      continue
    tokens = line.split()
    if tokens[0] == "INPUT":
      for name in "".join(tokens[1:]).split(","):
        all_wires.add(name.strip())
    elif tokens[0] == "ANCILLARY":
      for part in "".join(tokens[1:]).split(","):
        wire, _ = part.split("=")
        all_wires.add(wire.strip())
    elif tokens[0] in GATE_TYPES:
      arrow_idx = tokens.index("->")
      inputs = [
        x.strip()
        for x in "".join(tokens[1:arrow_idx]).split(",")
      ]
      outputs = [
        x.strip()
        for x in "".join(tokens[arrow_idx + 1 :]).split(",")
      ]
      all_wires.update(inputs)
      all_wires.update(outputs)
    elif tokens[0] == "OUTPUT":
      for name in "".join(tokens[1:]).split(","):
        all_wires.add(name.strip())
  return all_wires


def generate_random_name(existing_names):
  """Generate a unique random wire name."""
  while True:
    random_name = "wire_" + "".join(
      random.choices(string.ascii_letters + string.digits, k=8)
    )
    if random_name not in existing_names:
      return random_name


def obfuscate_circuit_spec(
  circuit_spec, num_dummy_ancillaries=5, num_dummy_gates=3
):
  """Obfuscate the circuit specification with random names and dummy
    components."""
  # Step 1: Collect all original wire names
  all_wires = get_all_wires(circuit_spec)
  used_names = set()

  # Step 2: Create mapping for original wires to random names
  mapping = {}
  for wire in all_wires:
    random_name = generate_random_name(used_names)
    mapping[wire] = random_name
    used_names.add(random_name)

  # Step 3: Replace all wire names in the spec
  obfuscated_spec = circuit_spec
  for original, random_name in mapping.items():
    obfuscated_spec = re.sub(
      r"\b" + re.escape(original) + r"\b",
      random_name,
      obfuscated_spec,
    )

  # Step 4: Add dummy ancillary wires
  dummy_ancillaries = []
  for _ in range(num_dummy_ancillaries):
    dummy_name = generate_random_name(used_names)
    used_names.add(dummy_name)
    value = random.choice([0, 1])
    dummy_ancillaries.append(f"{dummy_name} = {value}")

  # Insert dummy ancillaries after the last ANCILLARY line
  lines = obfuscated_spec.splitlines()
  ancillary_indices = [
    i
    for i, line in enumerate(lines)
    if line.strip().startswith("ANCILLARY")
  ]
  if ancillary_indices:
    insert_idx = ancillary_indices[-1] + 1
    for dummy in reversed(dummy_ancillaries):
      lines.insert(insert_idx, f"ANCILLARY {dummy}")
  else:
    lines = [f"ANCILLARY {', '.join(dummy_ancillaries)}"] + lines
  obfuscated_spec = "\n".join(lines)

  # Step 5: Add dummy gates (Fredkin only)
  dummy_gates = []
  ancillary_names = [
    wire.split("=")[0].strip() for wire in dummy_ancillaries
  ]
  for _ in range(num_dummy_gates):
    # MODIFIED: Only generate FREDKIN gates
    gate_type = "FREDKIN"
    input_wires = random.sample(ancillary_names, 3)
    output_wires = [
      generate_random_name(used_names) for _ in range(3)
    ]
    used_names.update(output_wires)
    gate_line = f"{gate_type} {', '.join(input_wires)} -> {', '.join(output_wires)}"
    dummy_gates.append(gate_line)

  # Insert dummy gates before the OUTPUT line
  lines = obfuscated_spec.splitlines()
  output_indices = [
    i
    for i, line in enumerate(lines)
    if line.strip().startswith("OUTPUT")
  ]
  if output_indices:
    insert_idx = output_indices[0]
    for gate in reversed(dummy_gates):
      lines.insert(insert_idx, gate)
  else:
    lines.extend(dummy_gates)
  obfuscated_spec = "\n".join(lines)

  return obfuscated_spec, mapping


# ---------------------------------
# Steganography Components (Alice)
# ---------------------------------


def locgen(image_shape):
  """Alice provides the secret position via user input."""
  h, w, c = image_shape
  while True:
    try:
      user_input = input(
        f"Enter secret position as 'row,column,channel' (max row {h-1}, col {w-1}, channel {c-1}): "
      )
      r, col, ch = map(int, user_input.split(","))
      if 0 <= r < h and 0 <= col < w and 0 <= ch < c:
        break
      else:
        print("Values out of range. Please try again.")
    except Exception as e:
      print(
        "Invalid input. Please enter three integers separated by commas."
      )
  print(f"[LOCGEN] Secret position provided: ({r}, {col}, {ch})")
  return (r, col, ch)


def embed_bit_in_image(image_array, bit, pos):
  """Alice embeds a bit into an image."""
  r, col, ch = pos
  pixel = image_array[r, col, ch]
  image_array[r, col, ch] = (pixel & ~1) | bit
  print(
    f"[EMB] Embedded bit {bit} at {pos}: {pixel} -> {image_array[r, col, ch]}"
  )
  return image_array


def extract_bit_from_image(image_array, pos):
  """Alice extracts a bit from an image."""
  r, col, ch = pos
  bit = image_array[r, col, ch] & 1
  print(
    f"[EXT] Extracted bit {bit} from {pos}: Pixel value {image_array[r, col, ch]}"
  )
  return bit


def display_images(images_dict, title, stage):
  """Display images with their wire names."""
  num_images = len(images_dict)
  cols = 4
  rows = (num_images + cols - 1) // cols
  plt.figure(figsize=(15, 5 * rows))
  for i, (wire, img) in enumerate(images_dict.items(), 1):
    plt.subplot(rows, cols, i)
    plt.imshow(img)
    plt.title(f"{wire}")
    plt.axis("off")
  plt.suptitle(f"{stage}: {title}", fontsize=16)
  plt.tight_layout(rect=[0, 0, 1, 0.95])
  plt.show()


# -------------------------
# Cloud Computation (Eve)
# -------------------------


def Eve_compute_circuit(circuit_spec, received_images, cover_dir):
  """Eve performs homomorphic computation with randomized gate ordering."""
  print("\n[4] Eve (untrusted cloud) performing circuit evaluation...")
  print("\n[COMP] Starting homomorphic computation...")

  # Measure circuit parsing time
  start_time = time.time()
  circuit = parse_circuit(circuit_spec)
  parsing_time = time.time() - start_time
  num_gates = len(circuit.gates)

  # Compute number of unique wires
  all_wires = set()
  for gate in circuit.gates:
    all_wires.update(gate["inputs"])
    all_wires.update(gate["outputs"])
  num_wires = len(all_wires)

  print(f"[Profiling] Circuit parsing time: {parsing_time:.6f} seconds")
  print(
    f"[Info] Circuit after obfuscation: {num_gates} gates, {num_wires} wires"
  )

  # Measure gate order computation time
  start_time = time.time()
  gate_order = randomized_topological_sort(circuit.gates)
  sorting_time = time.time() - start_time
  print(
    f"[Profiling] Topological sorting time: {sorting_time:.6f} seconds"
  )

  print(
    "[Computation] Circuit initialized with inputs:", circuit.inputs
  )

  # Randomize gate execution order
  print(
    "[Randomization] Gate execution order:",
    [
      f"{circuit.gates[i]['type']} {circuit.gates[i]['inputs']} -> {circuit.gates[i]['outputs']}"
      for i in gate_order
    ],
  )

  signals = received_images.copy()
  processed_dir = os.path.join(cover_dir, "processed")
  os.makedirs(processed_dir, exist_ok=True)

  # Measure total gate execution time
  total_gate_execution_time = 0

  if not os.path.exists(processed_dir):
    print(
      f"[Setup] Created intermediate directory: {processed_dir}"
    )
    os.makedirs(processed_dir)

  for i, gate_idx in enumerate(gate_order, 1):
    gate = circuit.gates[gate_idx]
    gate_type = gate["type"]
    input_images = [signals[wire] for wire in gate["inputs"]]
    output_images = [signals[wire] for wire in gate["outputs"]]

    # Measure execution time per gate
    start_gate_time = time.time()

    # MODIFIED: Removed CNOT case, only FREDKIN is processed
    if gate_type == "FREDKIN":
      apply_fredkin_pixelwise(input_images, output_images)

    gate_execution_time = time.time() - start_gate_time
    total_gate_execution_time += gate_execution_time
    print(
      f"Gate {i} (idx {gate_idx}): {gate_type} on inputs {gate['inputs']} -> outputs {gate['outputs']}"
    )

    for j, wire in enumerate(gate["outputs"]):
      out_path = os.path.join(
        processed_dir, f"gate_{i}_out{j}_{wire}.png"
      )
      Image.fromarray(signals[wire]).save(out_path)
      print(f"  Output image {j} saved: {out_path}")

    # Display images after each gate
    gate_output_images = {
      wire: signals[wire] for wire in gate["outputs"]
    }
    display_images(
      gate_output_images,
      f"After Gate {i} ({gate_type} {gate['inputs']} -> {gate['outputs']})",
      "Eve's Computation",
    )

    print(
      f"  [Profiling] Gate {i} ({gate_type}) execution time: {gate_execution_time:.6f} seconds"
    )

  print(
    f"\n[Profiling] Total gate execution time: {total_gate_execution_time:.6f} seconds"
  )
  total_time = (
    parsing_time + sorting_time + total_gate_execution_time
  )
  print(
    f"[Profiling] Total Eve's computation time: {total_time:.6f} seconds"
  )

  print("[COMP] Computation complete")
  return signals


# ---------------
# Main Workflow
# ---------------


def main():
  circuit_spec = """
    # Circuit to compute: (A AND C) OR (NOT A AND B) OR (NOT B AND NOT C)
    # This is a Fredkin-gate only implementation.
    INPUT A, B, C
    ANCILLARY const0 = 0
    ANCILLARY const1 = 1
    FREDKIN A, C, const0 -> A1, C1, AC
    FREDKIN A, const1, const0 -> A2, notA, A3
    FREDKIN notA, B, const0 -> notA1, B1, notAB
    FREDKIN B, const1, const0 -> B2, notB, B3
    FREDKIN C, const1, const0 -> C2, notC, C3
    FREDKIN notB, notC, const0 -> notB1, notC1, notBnotC
    FREDKIN AC, const1, const0 -> AC1, notAC, AC2
    FREDKIN notAB, const1, const0 -> notAB1, not_notAB, notAB2
    FREDKIN notAC, not_notAB, const0 -> notAC1, not_notAB1, not_T1
    FREDKIN not_T1, const1, const0 -> not_T1_1, T1, not_T1_2
    FREDKIN T1, const1, const0 -> T1_2, not_T1_3, T1_3
    FREDKIN notBnotC, const1, const0 -> notBnotC2, not_notBnotC, notBnotC3
    FREDKIN not_T1_3, not_notBnotC, const0 -> not_T1_4, not_notBnotC1, not_result
    FREDKIN not_result, const1, const0 -> not_result_1, result, not_result_2
    OUTPUT result
    """

  print(
    "\n------ Provably Secure Homomorphic Steganography: Circuit Evaluation ------\n"
  )

  # Obfuscate the circuit specification
  obfuscated_spec, mapping = obfuscate_circuit_spec(
    circuit_spec, num_dummy_ancillaries=5, num_dummy_gates=3
  )
  print(
    "[Obfuscation] Original circuit obfuscated with random names and dummy components."
  )
  print("[Obfuscated Circuit Spec]:\n", obfuscated_spec)

  # Parse the obfuscated circuit
  circuit = parse_circuit(obfuscated_spec)
  print("[1] Obfuscated circuit parsed:")
  print(f"  Inputs: {circuit.inputs}")
  print(f"  Ancillaries: {circuit.ancillaries}")
  print(f"  Gates: {len(circuit.gates)} gates parsed")
  print(f"  Outputs: {circuit.outputs}")

  # IMPORTANT: Update this path to your directory of cover images
  cover_dir = "C:/cover"
  if not os.path.isdir(cover_dir):
    print(
      f"[Alice] Error: Cover image directory not found at '{cover_dir}'"
    )
    print(
      "[Alice] Please create the directory and add at least 15 PNG/JPG images."
    )
    return

  image_files = sorted(
    [
      f
      for f in os.listdir(cover_dir)
      if f.lower().endswith((".png", ".jpg", ".jpeg"))
    ]
  )

  all_wires = set(
    circuit.inputs
    + [wire for wire, _ in circuit.ancillaries]
    + [
      wire
      for gate in circuit.gates
      for wire in gate["inputs"] + gate["outputs"]
    ]
    + circuit.outputs
  )

  if len(image_files) < len(all_wires):
    print(
      f"[Alice] Error: Need {len(all_wires)} images for all wires, found {len(image_files)}."
    )
    return

  print("\n[Album] Listing all cover images:")
  images_to_send = {}
  for i, wire in enumerate(all_wires):
    img_path = os.path.join(cover_dir, image_files[i])
    print(f"  {wire}: {img_path}")
    img = np.array(Image.open(img_path).convert("RGB"))
    images_to_send[wire] = img.copy()

  shapes = [img.shape for img in images_to_send.values()]
  if len(set(shapes)) > 1:
    print("[Alice] Error: All images must have the same dimensions.")
    return
  image_shape = shapes[0]

  # Step 1: Get the secret position from Alice via user input
  secret_pos = locgen(image_shape)
  display_images(
    images_to_send,
    "Initial Cover Images After Secret Position Input",
    "Alice's Setup",
  )

  # Step 2: Embed bits and display images
  print("\n[3] Embedding binary values:")
  for wire in circuit.inputs:
    while True:
      try:
        val = int(input(f"  Enter bit for {wire} (0 or 1): "))
        if val in [0, 1]:
          break
        print("    Enter 0 or 1.")
      except ValueError:
        print("    Enter 0 or 1.")
    embed_bit_in_image(images_to_send[wire], val, secret_pos)

  for wire, value in circuit.ancillaries:
    embed_bit_in_image(images_to_send[wire], value, secret_pos)
    print(f"  Ancillary {wire}={value} embedded at {secret_pos}")
  display_images(
    images_to_send,
    "Images After Embedding Input and Ancillary Bits",
    "Alice's Embedding",
  )

  print("\n[Alice] Sending obfuscated circuit and images to Eve...")
  result_images = Eve_compute_circuit(
    obfuscated_spec, images_to_send, cover_dir
  )
  if result_images is None:
    print("[Alice] Computation failed.")
    return

  # Step 4: Display final output image before extraction
  final_output_dict = {
    circuit.outputs[0]: result_images[circuit.outputs[0]]
  }
  display_images(
    final_output_dict,
    "Final Output Image Before Extraction",
    "Alice's Extraction",
  )

  result = extract_bit_from_image(
    result_images[circuit.outputs[0]], secret_pos
  )
  print(f"\n[5] Final circuit output: {result}")

  final_output_path = os.path.join(cover_dir, "final_output.png")
  Image.fromarray(result_images[circuit.outputs[0]]).save(
    final_output_path
  )
  print(
    f"\n[6] Final output image saved as '{final_output_path}'"
  )

  print("\n------ Process Completed Successfully! ------\n")


if __name__ == "__main__":
  main()