##**Gerrymandering-Environment**

    INITIAL STATE (provided externally via reset(options=...)):
        - 'district_map'
        - 'social_graph'
        - 'opinions'     

    ACTION:
        - new district assignment for each voter

    OBSERVATION (returned by reset/step):
        {
          'district_map'   : (num_voters,)
          'representatives': (num_districts,)  # voter indices; -1 if empty   district
          'social_graph'   : (num_voters, num_voters)  # AUGMENTED: base social + rep->voter edges used for the step
          'opinions'       : (num_voters, 2)
          'opinion_graph'  : (num_voters, num_voters)  # similarity kernel derived from opinion distances
        }

    KEY LOGIC:
      - Representatives: for each district, pick the member that minimizes the sum of L2 distances to members in that district (discrete 1-median).
      - Opinion dynamics: DRF (assimilation/neutral/backfire) with weighted neighbor influence.
      - Reward: reduction in total distance to reference opinion c*


In [1]:
!pip install torch_geometric



In [2]:
import numpy as np
import torch
from torch_geometric.data import Data
import gymnasium as gym
from gymnasium import spaces
import copy

#This is the Frankenmandering Data class which includes:
*   social edge list: type: matrix and size : (v,v)
*   assignment edge list: type: matrix and size:(v,d)
*   number of orignin edge
*   geometric position
*   district label
*   edge attribute
*   representative
*   opinion



In [3]:
class FrankenData(Data):
  def __init__(self,social_edge, geographical_edge, orig_edge_num, opinion, pos, reps, dist_label, edge_attr, **kwargs):
    super().__init__(**kwargs)

    self.orig_edge_num = orig_edge_num
    self.opinion = torch.tensor(opinion, dtype=torch.float32)
    self.pos = torch.tensor(pos, dtype=torch.float32)
    self.dist_label = torch.tensor(dist_label, dtype=torch.float32)

    #social graph
    self.social_edge = torch.tensor(social_edge, dtype=torch.long)
    self.edge_attr = torch.tensor(edge_attr, dtype=torch.float32)

    # geographical graph
    self.geographical_edge = torch.tensor(geographical_edge, dtype=torch.long)

    self.reps = torch.tensor(reps, dtype=torch.long)

  def save(self,file_path):
      torch.save(self,file_path)
      print(f"FrankenData object saved to {file_path}")

  def load(self,file_path):
      data = torch.load(file_path)
      print(f"FrankenData object loaded from {file_path}")
      return data

In [4]:
class FrankenmanderingEnv(gym.Env):

    metadata = {"render_modes": ["human"]}

    def __init__(
        self,
        num_voters: int,
        num_districts: int,
        opinion_dim: int = 2,
        horizon: int = 10,
        seed: int | None = None,
        FrankenData = None,
        # target
        reference_opinion: np.ndarray | None = None,

    ):
        super().__init__()
        self.num_voters = int(num_voters)
        self.num_districts = int(num_districts)
        self.opinion_dim = int(opinion_dim)
        self.horizon = int(horizon)
        self.rng = np.random.default_rng(seed)

        # state
        self.t = 0
        self.initial_data = copy.deepcopy(FrankenData)
        self.FrankenData = FrankenData
        self.opinion_update_step = 0

        # target opinion c*
        if reference_opinion is None:
            self.c_star = np.zeros((self.num_voters,self.opinion_dim), dtype=np.float32)
        else:
            self.c_star = np.asarray(reference_opinion, dtype=np.float32).reshape(self.num_voters,self.opinion_dim)

        # action spaces
        self.action_space = spaces.Box(low=0.0, high=1.0, shape=(self.num_voters, self.num_districts), dtype=np.float32)

        self.observation_space = spaces.Dict({
                  "opinions": spaces.Box(low=-np.inf, high=np.inf,
                                        shape=(self.num_voters, self.opinion_dim), dtype=np.float32),

                  "positions": spaces.Box(low=-np.inf, high=np.inf,
                                          shape=(self.num_voters, 2), dtype=np.float32),

                  "dist_label": spaces.Box(low=0, high=self.num_districts-1,
                                          shape=(self.num_voters,), dtype=np.int32),

                  "reps": spaces.Box(low=-1, high=self.num_voters-1,
                                    shape=(self.num_districts,), dtype=np.int32),

                  "social_edge_index": spaces.Box(low=0, high=self.num_voters-1,
                                shape=(2, self.initial_data.orig_edge_num), dtype=np.int32),

                  "social_edge_attr": spaces.Box(low=0, high=np.inf,
                                                shape=(self.initial_data.orig_edge_num,), dtype=np.float32),

                  "assignment": spaces.Box(low=0, high=self.num_voters-1,
                                          shape=(2, self.num_voters), dtype=np.int32),
              })


    # options includes opinions, geometric position,  edge_index(social graph,assignment),assignment
    def reset(self, seed: int | None = None, options: dict | None = None):
        super().reset(seed=seed)
        self.t = 0

        self.FrankenData = copy.deepcopy(self.initial_data)

        return self.FrankenData, {}

    def step(self, action: np.ndarray, **kwargs):
        if self.FrankenData is None:
            raise RuntimeError("Environment must be reset before calling step.")

        # Normalize action and compute new assignment
        assignment = row_normalize(np.asarray(action, dtype=np.float32))

        # assignment = ensure_non_empty(assignment)
        y = assignment.argmax(axis=1).astype(np.int32)
        y[(assignment.max(axis=1) == 0)] = -1

        # check the validity of district
        # contiguous = check_contiguity(self.FrankenData.geographical_edge, y, self.num_districts)
        non_empty = all(np.any(y == d) for d in range(self.num_districts))


        if  (not non_empty):
          terminated = False
          reward = -10
          return self.FrankenData, reward, terminated,False,{}

        # Elect new representatives
        reps = elect_representatives(y, self.FrankenData.opinion, self.num_districts)
        #partial reset augmented graph

        social_graph = self.FrankenData.social_edge[:, :-self.FrankenData.orig_edge_num]

        social_edge_attr = self.FrankenData.edge_attr[:-self.FrankenData.orig_edge_num]

        # update social graph based on reps and new assignment
        aug_edge_index, aug_edge_attr = augment_with_reps(
            social_graph,
            social_edge_attr,
            reps,
            y
        )

        # Update oppinion
        x_new = opinion_update(self.opinion_update_step, aug_edge_index, aug_edge_attr,self.FrankenData.opinion, **kwargs)

        reward = self.reward(self.FrankenData.opinion, x_new, y, self.FrankenData.social_edge)

        self.t += 1
        self.opinion_update_step +=1
        terminated = self.t >= self.horizon

        self.FrankenData = FrankenData(
              social_edge = aug_edge_index,
              orig_edge_num = aug_edge_index.shape[1],
              geographical_edge= self.initial_data.geographical_edge,
              opinion= x_new,
              pos =self.FrankenData.pos,
              reps=[r if r is not None else -1 for r in reps],
              dist_label=y,
              edge_attr = aug_edge_attr
            )
        return self.FrankenData, float(reward), terminated, False, {}


    def reward(self, oldX,newX, y, assignment, a= 0.1, b =0.5):
        # check the population equity and compactness constraints
        pop_dev = population_equality(np.ones_like(y),y, self.num_districts)
        comp_score = compactness_score(assignment, y)

        penalty =  -a * pop_dev - b * comp_score

        old_d=np.linalg.norm(oldX-self.c_star,axis=1).sum()
        new_d=np.linalg.norm(newX-self.c_star,axis=1).sum()

        reward = penalty + old_d-new_d
        return reward

    # def render(self, mode="human"):
    #     mean = self._x.mean(axis=0)
    #     print(f"[t={self.t}] mean opinion ≈ {tuple(np.round(mean,3))}")


**Elect representatives**
###This function elects representative in each district using a spatial / metric voting rule, specifically the median voter rule

```
Args:
        y: (N,) array of district labels for each voter.
        X: (N, opinion_dim) array of voter opinions.
        num_districts: number of districts (K).

    Returns:
        reps: list of representative indices, one per district (None if district empty)
```



In [5]:
import numpy as np

def elect_representatives(y: np.ndarray, X: np.ndarray, num_districts: int):

    reps = [None] * num_districts
    N, m = X.shape

    # Precompute pairwise distances
    dif = X[:, None, :] - X[None, :, :]   # (N,N,m)
    dists = np.linalg.norm(dif, axis=2)   # (N,N) Euclidean distances

    for d in range(num_districts):
        members = np.where(y == d)[0]  # voters in district d

        if len(members) == 0:
            continue  # empty district

        # For each member, compute sum of distances to others in district
        sums = dists[np.ix_(members, members)].sum(axis=1)

        # Pick the member with the minimal total distance/it stores the index of opinion
        reps[d] = int(members[np.argmin(sums)])

    return reps

In [6]:
# # test elect_representatives
# X = np.array([[0.],
#         [0.],
#         [0.],
#         [1.],
#         [2.],
#         [3.],
#         [4.],
#         [5.],
#         [5.],
#         [5.]], dtype=float)

# Y = np.array([0, 0, 0,0,0,0,0,0,0,0])
# print(elect_representatives(Y,X,1))

## **Augment with reps (G union H)**

###
Augment the social graph with representative edges (district graphs).
    
    Args:
        edge_index: (2, E) numpy array of existing social edges.
        edge_attr: (E,) numpy array of edge weights.
        reps: list of representative indices, one per district (None if empty).
        y: (N,) array of district assignments (each voter’s district label).
        rep_edge_weight: weight to assign to each representative edge (default=1.0).
    
    Returns:
        new_edge_index: (2, E+E_rep) with representative edges added.
        new_edge_attr:  (E+E_rep,) with weights for representative edges.

In [7]:

def augment_with_reps(edge_index, edge_attr, reps, y, rep_edge_weight: float = 1.0):

    add_src, add_dst, add_w = [], [], []
    existing = set(map(tuple, edge_index.T))

    for d, r in enumerate(reps):
        if r is None:
            continue  # empty district
        members = np.where(y == d)[0]

        for v in members:
            if v == r:
                continue  # skip self-loop

            if (r, v) in existing:
              continue  # skip duplicate

            add_src.append(r)             # representative -> member
            add_dst.append(v)
            add_w.append(rep_edge_weight) # fixed influence weight

    if not add_src:  # no additional edges
        return edge_index, edge_attr

    rep_edges = np.vstack([add_src, add_dst])      # shape (2, E_rep)
    rep_weights = np.array(add_w, dtype=np.float32)

    new_edge_index = np.concatenate([edge_index, rep_edges], axis=1)
    new_edge_attr  = np.concatenate([edge_attr, rep_weights], axis=0)

    return new_edge_index, new_edge_attr

### **Opinion Update Function**
### At each step, voters "adjust" their position in opinion space based on the Euclidean distance to their neighbours, with the DRF


deciding whether they converge, diverge, or ignore.

Each neighbour "pulls" or "pushes" v depending on how similar they are.

The closer they are, the more v tends to assimilate.

If they’re a bit too far, v resists and moves in the opposite direction (backfire).

If they’re too far away, v ignores them.

The update is multi-dimensional, so if opinions are about multiple issues, the shift happens along the direction where the neighbour differs most.

Because normalization is used, v doesn’t "overreact" to having many neighbours.

    Apply opinion update based on DRF across graph edges.
    X: (N, opinion_dim)
    edge_index (augmented social graph): shape (2, E)
    edge_attr(w): shape (E,)
    mu : μ(∥cut​−cvt​∥)

In [8]:
def opinion_update(self, edge_index_aug, edge_attr, X: np.ndarray) -> np.ndarray:

    N, m = X.shape   # N voters, m-dimensional opinions
    newX = np.copy(X)

    updates = np.zeros_like(X)
    norm_factors = np.zeros((N, 1)) #Z

    for e in range(edge_index_aug.shape[1]):
        u, v = edge_index_aug[:, e]
        w = edge_attr[e]

        diff = X[u] - X[v]               # vector difference in opinion space

       # Euclidean distance
       # if diff is a 1-D array [x, y, z], then np.linalg.norm(diff, 2) calculates sqrt(x^2 + y^2 + z^2)
        dist = np.linalg.norm(diff, 2)

        if dist > 1e-8:  # avoid division by zero
            mu = drf(dist)
            direction = (diff / dist).numpy()      # unit vector (direction only)

            # influence: u influences v
            updates[v] += mu * w * direction
            norm_factors[v] += abs(w)

    # apply updates with normalization
    for v in range(N):
        if norm_factors[v] > 0:
            newX[v] = X[v] + updates[v] / norm_factors[v]

    return newX

**Discrepancy Response Function**
*   This function determines how a voter reacts when interacting with another voter based on the discrepancy (distance) between their opinions.
*   Tunable thresholds: epsilon values determine assimilation/backfire ranges. There are 5 levels in this range

### thresholds (tune these as hyperparameters)
*   eps_indiff = 0.5    below this = indifference
*   eps_backfire = 4.0  backfire zone
*   eps_irrel = 6.0     above this = irrelevance    
*   eps_assim = 2.0     assimilation zone

In [9]:
def drf(discrepancy, **kwargs):
    eps_assim = kwargs.get("eps_assim", 3)
    eps_backfire = kwargs.get("eps_backfire", 3)
    assim_shift = kwargs.get("assim_shift", np.sign(discrepancy))
    back_shift = kwargs.get("back_shift", -np.sign(discrepancy))

    delta = abs(discrepancy)

    if delta < eps_assim: #assimilation
          return assim_shift

    elif (delta > eps_backfire) or (delta == eps_backfire): #backfire
          return back_shift
    else:
          return 0.0  #indifference, irrelevance, neutal


In [10]:
def row_normalize(A: np.ndarray, eps=1e-8) -> np.ndarray:
        row_sum = A.sum(axis=1, keepdims=True)
        row_sum = np.clip(row_sum, eps, None)
        return A / row_sum


def adj_to_coo(adj: np.ndarray):
        row, col = np.nonzero(adj)
        edge_index = np.vstack([row, col])
        edge_attr = adj[row, col]
        return edge_index, edge_attr

## **Population equality**
###(districts must have nearly equal populations)
Compute population deviation score.
    
    population weight: np.ndarray shape (N,) -> population of each voter/node
    labels: np.ndarray shape (N,) -> district assignment
    num_districts: int
    
    Returns: float (sum of deviations from ideal)
    """
    Returns normalized population deviation:
      sum(|pop_d - ideal|) / total_pop
    This is in [0, 2*(1 - 1/num_districts)] roughly; smaller is better.
    """


In [11]:
def population_equality(pop_weights: np.ndarray, y: np.ndarray, num_districts: int) -> float:

    pop_weights = np.asarray(pop_weights, dtype=float)
    total = pop_weights.sum()
    pop_per = np.zeros(num_districts, dtype=float)

    for d in range(num_districts):
        pop_per[d] = pop_weights[y == d].sum()
    ideal = total / float(num_districts)

    return float(np.abs(pop_per - ideal).sum() / (total + 1e-12))

## **check contiguity**
###A district is contiguous if all its members are connected to each other in the graph.
This function Check whether each district forms a connected component.
    
    edge_index: np.ndarray shape (2, E) -> edges
    labels: np.ndarray shape (N,) -> district assignment of each node
    num_districts: int
    
    Returns: bool (True if all districts are contiguous)

In [12]:
from collections import deque

def check_contiguity(geo_edge, labels, num_districts):

    N = len(labels)
    adjacency = [[] for _ in range(N)]

    for u, v in geo_edge.T:
        adjacency[u].append(v)
        adjacency[v].append(u)

    for d in range(num_districts):
        members = np.where(labels == d)[0]

        if len(members) == 0:
            continue  # empty district handled elsewhere

        # BFS search, using queue
        visited = set()
        queue = deque([members[0]])
        visited.add(members[0])

        while queue:
            node = queue.popleft()
            for nei in adjacency[node]:
                if labels[nei] == d and nei not in visited:
                    visited.add(nei)
                    queue.append(nei)

        if len(visited) != len(members):
            return False  # not contiguous
    return True

## **compactness score**
###  Compactness means districts should not be “spread out” with too many edges crossing between districts.This function computes compactness as ratio of cut edges to total edges.
    
    edge_index (social connection): np.ndarray shape (2, E)
    labels (district labels): np.ndarray shape (N,)
    
    Returns: float (compactness score)

In [13]:
def compactness_score(geographical_edge, y: np.ndarray) -> float:
    """
    Compactness = fraction of geographical edges that are cut (endpoints in different districts).
    Lower is more compact. Accepts same geographical_edge formats as build_neighbors().
    """
    ge = np.asarray(geographical_edge)
    # normalize edges to a list of pairs
    if ge.ndim == 2 and ge.shape[0] == ge.shape[1]:
        # adjacency matrix -> build unique edges (i<j)
        N = ge.shape[0]
        cut = 0
        total = 0
        for i in range(N):
            for j in range(i+1, N):
                if ge[i, j]:
                    total += 1
                    if y[i] != y[j]:
                        cut += 1
        return 0.0 if total == 0 else float(cut / total)
    else:
        if ge.shape[0] == 2:
            edges = ge.T.astype(int)
        else:
            edges = ge.astype(int)
        total = edges.shape[0]
        if total == 0:
            return 0.0
        cuts = np.sum([1 for (u, v) in edges if int(y[int(u)]) != int(y[int(v)])])
        return float(cuts / total)

**Inchworm test**

In [14]:
import torch
import numpy as np

# ---- Setup initial data for inchworm ----
num_voters = 10
num_districts = 1
init_opinions = np.array([0,0,0,1,2,3,4,5,5,5], dtype=np.float32).reshape(num_voters, 1)

# trivial positions (not used in inchworm)
pos = np.zeros((num_voters, 2), dtype=np.float32)

# no base edges (inchworm works only via districting)
so_edge = np.empty((2,0),dtype=np.int64)
edge_attr  = np.empty((0,),dtype=np.float32)


# trivial initial assignment (everyone in district 0 for now)
geographical_edge = np.zeros((num_voters, num_voters), dtype=np.float32)
geographical_edge = [
                      [0,0,0,1,0,0,0,0,0,0],
                      [0,0,0,1,0,0,0,0,0,0],
                      [0,0,0,1,0,0,0,0,0,0],
                      [1,1,1,0,1,0,0,1,1,1],
                      [0,0,0,1,0,1,0,0,1,1],
                      [0,0,0,0,1,0,1,0,0,0],
                      [0,0,0,0,0,1,0,1,0,0],
                      [0,0,0,1,0,0,1,0,1,0],
                      [0,0,0,1,1,0,0,1,0,1],
                      [0,0,0,1,1,0,0,0,1,0]
]

dist_label = np.zeros(num_voters, dtype=np.int32)

# no reps yet
reps = [-1]*num_districts

geographical_edge = []

# pack into FrankenData
init_data = FrankenData(
    social_edge=so_edge,
    geographical_edge= geographical_edge,
    orig_edge_num=2,
    opinion=init_opinions,
    pos=pos,
    reps=reps,
    dist_label=dist_label,
    edge_attr=edge_attr
)

In [15]:
env = FrankenmanderingEnv(num_voters=num_voters,
                          num_districts=num_districts,
                          FrankenData=init_data
                                               )

# reset will now just deepcopy init_data
obs, _ = env.reset()
print("t=0:", obs.opinion.squeeze().tolist())

t=0: [0.0, 0.0, 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 5.0]


In [16]:
def run_inchworm(env, districts):
    history = []
    for t, district in enumerate(districts, 1):
        # Build assignment matrix
        assignment = np.zeros((env.num_voters, env.num_districts), dtype=np.float32)

        for v in range(env.num_voters):
            if v in district:
                assignment[v] = 1.0
            # else:
            #     assignment[v, 1] = 1.0
        obs, reward, terminated, truncated, info = env.step(assignment, )

        opinions = obs.opinion.squeeze().tolist()
        reps = obs.reps.numpy().tolist()
        history.append((opinions, reps))

        print(f"t={t}, reps={reps}, opinions={opinions}")
    return history

In [17]:

districts = [
    [2,3,9],
    [1,3,8],
    [0,3,7],
    [3,4,9],
    [4,5,8],
    [5,6,7],
    [6,7,8],
    [7,8,9]
]

history = run_inchworm(env, districts)

t=1, reps=[3], opinions=[0.0, 0.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 6.0]
t=2, reps=[3], opinions=[0.0, 1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 6.0]
t=3, reps=[3], opinions=[1.0, 1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 6.0, 6.0, 6.0]
t=4, reps=[4], opinions=[1.0, 1.0, 1.0, 2.0, 2.0, 3.0, 4.0, 6.0, 6.0, 7.0]
t=5, reps=[5], opinions=[1.0, 1.0, 1.0, 2.0, 3.0, 3.0, 4.0, 6.0, 7.0, 7.0]
t=6, reps=[6], opinions=[1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 4.0, 5.0, 7.0, 7.0]
t=7, reps=[7], opinions=[1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 5.0, 6.0, 7.0]
t=8, reps=[8], opinions=[1.0, 1.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 6.0, 6.0]


  newX[v] = X[v] + updates[v] / norm_factors[v]
  newX[v] = X[v] + updates[v] / norm_factors[v]
  old_d=np.linalg.norm(oldX-self.c_star,axis=1).sum()
  self.pos = torch.tensor(pos, dtype=torch.float32)
  self.geographical_edge = torch.tensor(geographical_edge, dtype=torch.long)
