If necessary, create a virtual environment and install the required packages below.

In [None]:
!python3 -m venv venv
!source venv/bin/activate
!pip3 install numpy
!pip3 install matplotlib

In [None]:
import json
import matplotlib.pyplot as plt
import numpy as np
import sys
import math
import json

Below, we plot the confirmation bias model we used for individual agents.

In [None]:
def plot_model(b, ax):
	'''
	Displays a representation of our agents' behaviour when consuming a post.
	'''

	x = 2-b
	y = 1 - x/2

	ax.plot((0, x), (1, y), color='blue')
	ax.plot((x, 2), (y, 0), linestyle='dashed', color='blue')

	if b:
		ax.plot((x, 2), (y, 1), color='red')

		ax.plot((x, 2), (0, 0), color='green')
		ax.plot((x, x), (-0.05, 0.05), color='green')
		ax.plot((2, 2), (-0.05, 0.05), color='green')
		ax.text((2+x)/2, -0.075, 'B', color='green', fontsize=10)

	ax.set_title(f'Confirmation Bias Model (B = {b})', fontsize=12)

In [None]:
fig, axs = plt.subplots(2, 2, figsize=(14, 8), sharex=True, sharey=True)

plot_model(0, axs[0, 0])
plot_model(1, axs[0, 1])
plot_model(0.75, axs[1, 0])
plot_model(0.25, axs[1, 1])

fig.text(0.5, 0.04, 'Difference in opinion, d', ha='center', fontsize=12)
fig.text(0.08, 0.5, 'Probability of strengthening opinion, P(S)', va='center', rotation='vertical', fontsize=12)

Default global parameters are set below.

In [None]:
# ----------------- simulation parameters ----------------------#

NUM_AGENTS = 500
NUM_TIME_STEPS = 1000
NUM_SIMULATIONS = 500
POLARISATION_CUTOFF = 0.75	# agents with opinions outside [-cutoff, cutoff] are considered polarised
CONVERGENCE_NUM = 10			  # number of time steps with no change to end the simulation

# ----------------- default hyperparameters ---------------------------#

B = 0.3 	# agent bias
P = 0.5		# proportion of posting agents
N = 0		  # standard deviation of Gaussian noise applied to posts
C = 0.1		# proportion of posts consumed per day
D = 0.1		# change in agent's opinion due to a post
PB = 1		# platform bias
RB = 1		# recommendation bias

Below is the implementation of our model.

In [None]:
class MediaPlatform():
	'''
	The implementation of our model.
	The media platforms maintains a list of agents that consume posts and change their opinion each time step.
	'''

	def __init__(self, b=B, p=P, n=N, c=C, d=D, pb=PB, rb=RB, poster_dist='uniform', verbose=False):
		self.b = b				# agent bias
		self.p = p				# proportion of posting agents
		self.n = n				# standard deviation of Gaussian noise applied to posts
		self.c = c				# proportion of generated posts consumed per day
		self.d = d				# amount opinions are strengthed/weakened by
		self.pb = pb			# platform bias
		self.rb = rb			# recommendation bias
		
		self.verbose = verbose

		self.num_posters = round(P * NUM_AGENTS)
		self.posts_per_day = round(C * self.num_posters)

		self.num_same = 0
		[self.platform_opinion] = np.random.choice([-1, 1], 1)

		self.m = 0
		self.c = 0

		# confirmation bias model parameters
		if self.b > 0:
			self.m = (2 - b) / (2 * b)
			self.c = 1 - 2 * self.m

		self.agent_opinions = np.zeros(NUM_AGENTS)

		if poster_dist == 'uniform':
			self.agent_opinions[:self.num_posters] = np.random.uniform(-1, 1, self.num_posters)

		else:
			opinions = np.linspace(-1, 1, 100)
			if poster_dist == 'bimodal':
				probs = np.cos((opinions + 1) / 2 * np.pi) ** 2

			elif poster_dist == 'centered':
				probs = np.cos(opinions / 2 * np.pi) ** 2

			elif poster_dist == 'skewed':
				# skew away from platform opinion
				skew_direction = math.copysign(1, self.platform_opinion)
				probs = np.cos((opinions + skew_direction) / 4 * np.pi) ** 2

			else:
				raise ValueError('Invalid poster distribution')
			
			probs /= np.sum(probs)
			self.agent_opinions[:self.num_posters] = np.random.choice(opinions, self.num_posters, p=probs)
			
		self.agent_opinions[self.num_posters:] = np.random.uniform(-1, 1, NUM_AGENTS - self.num_posters)
		self.t_agent_opinion = self.agent_opinions.copy()
		self.prev_opinions = self.agent_opinions.copy()
		self.t = 0


	def change_agent_opinions(self, posts):
		'''
		Changes each agents' opinion in a time step based on the posts they receive.
		'''

		diff = np.abs(posts - self.agent_opinions)
		strengthen_probs = np.zeros(NUM_AGENTS)
		
		fun1 = diff <= 2 - self.b
		fun2 = diff > 2 - self.b

		strengthen_probs[fun1] = 1 - diff[fun1] / 2
		strengthen_probs[fun2] = self.m * diff[fun2] + self.c

		strengthened = np.random.random(NUM_AGENTS) < strengthen_probs
		self.agent_opinions[strengthened] += np.sign(self.agent_opinions[strengthened]) * self.d	  # strengthen opinions
		self.agent_opinions[~strengthened] -= np.sign(self.agent_opinions[~strengthened]) * self.d	# weaken opinions
		self.agent_opinions = np.clip(self.agent_opinions, -1, 1)
		
		
	def serve_posts(self):
		'''
		Returns the creator to consumer (CTC) matrix that scores each post for each agent.
		'''

		ctc = np.zeros((self.num_posters, NUM_AGENTS)) # creator to consumer matrix
		ctc = ctc + np.reshape(self.posts, (self.num_posters, 1))
             
		# calculate similarity between creator and consumer opinions
		ctc_consumer_sim = 1 - np.abs(ctc - self.agent_opinions) / 2 + sys.float_info.epsilon

		# calculate similarity between creator and platform opinions
		ctc_platform_sim = 1 - np.abs(ctc - self.platform_opinion) / 2 + sys.float_info.epsilon
            
		ctc = self.pb * ctc_platform_sim + self.rb * ctc_consumer_sim
		ctc = ctc / np.max(ctc, axis=0, keepdims=True) # normalize with max of each agent's posts
		ctc[np.diag_indices(self.num_posters)] = 0     # posters should not consume their own posts
		return ctc


	def time_step(self):
		'''
		Each agent consumes its own served posts
		'''
		self.posts = self.agent_opinions[:self.num_posters] 								# posts are the opinions of the posting agents
		self.posts += np.random.normal(scale=self.n, size=self.posts.shape) # add noise to posts
		self.posts = np.clip(self.posts, -1, 1) 														# clip posts to [-1, 1]

		if self.pb or self.rb:
			ctc = self.serve_posts()
			tiled_posts = np.tile(self.posts, (NUM_AGENTS, 1))
			sort_order = np.argsort(ctc.T, axis=1)[:, ::-1]
			tiled_posts = np.take_along_axis(tiled_posts, sort_order, axis=1)

			for i in range(self.posts_per_day):
				selected_posts = tiled_posts[:, i]
				self.change_agent_opinions(selected_posts)
		else:
			selected_posts = np.random.choice(self.posts, self.posts_per_day)
			for i in range(self.posts_per_day):
				posts_i = np.tile(selected_posts[i], NUM_AGENTS)
				self.change_agent_opinions(posts_i)
		
		self.t_agent_opinion = np.vstack((self.t_agent_opinion, self.agent_opinions))
		self.t += 1

	
	def converged(self):
		'''
		Return True if no opinions have changed in the last CONVERGENCE_NUM time steps
		'''
		if np.all(self.agent_opinions == self.prev_opinions):
			self.num_same += 1
		else:
			self.prev_opinions = self.agent_opinions.copy()
			self.num_same = 0

		return self.num_same == CONVERGENCE_NUM


	def simulate(self):
		'''
		Execute the specified time steps, or until convergence of opinions
		'''
		for i in range(NUM_TIME_STEPS):
			if self.verbose:
				print(f'Time step {i}')
			self.time_step()
			if self.converged():
				return


	def fractions(self):
		'''
		Return the fractions of agents holding positive and negative opinions
		'''
		pos = np.sum(self.agent_opinions > POLARISATION_CUTOFF) / NUM_AGENTS
		neg = np.sum(self.agent_opinions < -POLARISATION_CUTOFF) / NUM_AGENTS

		if self.verbose:
			print(f'Fraction positive {pos}')
			print(f'Fraction negative {neg}')

		return {1: pos, -1: neg}
	

	def polarisation(self):
		'''
		Return polarisation value between [0, 1]
		'''
		return min(self.fractions().values())


	def graph(self):
		'''
		Graph every agent's opinion changing over time
		'''
		_, ax = plt.subplots()
		for i in range(NUM_AGENTS):
			ax.plot(self.t_agent_opinion[:, i], label=i)

Run a single simulation with default parameters:

In [None]:
m = MediaPlatform(b=0.6, poster_dist='centered')
m.simulate()
print('Polarised proportions', m.fractions())
print("Platform's opinion", m.platform_opinion)
m.graph()
plt.title("Each agent's opinion per time step")

Running many simulations to collect data was done in a separate python file, simulate.py, provided.

Graph bias vs polarisation

In [None]:

# load the data

data = []

file = 'data/bias-vs-polarisation/data-p{}-c{}-plat0-rec0.json'

for p, c in ((0.25, 0.1), (0.5, 0.1), (0.75, 0.1), (0.5, 0.3), (0.5, 0.5)):
	with open(file.format(p, c), 'r') as f:
		data.append(json.load(f)['data'])

In [None]:
def plot_avg(data, label):
	'''
	Plots the average polarisation for each level of bias in the data.
	'''

	bs, ps = zip(*list(data.items()))
	bs = [float(b) for b in bs]

	avg_ps = []
	for p in ps:
		avg_ps.append(sum(p) / len(p))
	
	plt.plot(bs, avg_ps, label=label)

In [None]:
# plot each dataset

plot_avg(data[0], 'p=0.25, c=0.1')
plot_avg(data[1], 'p=0.5, c=0.1')
plot_avg(data[2], 'p=0.75, c=0.1')
plot_avg(data[3], 'p=0.5, c=0.3')
plot_avg(data[4], 'p=0.5, c=0.5')

plt.legend()
plt.xlabel('Bias')
plt.ylabel('Average polarisation')

In [None]:
def variance(lst):
	'''
	Returns the variance of a list of floats.
	'''
	avg = sum(lst) / len(lst)
	return sum([(x - avg)**2 for x in lst]) / len(lst)

def plot_var(data, label):
	'''
	Plots the variance of the polarisation for a data set.
	'''
	bs, ps = zip(*list(data.items()))
	bs = [float(b) for b in bs]

	avg_ps = []
	for p in ps:
		avg_ps.append(variance(p))
	
	plt.plot(bs, avg_ps, label=label)

In [None]:
plot_var(data[0], 'p=0.25, c=0.1')
plot_var(data[1], 'p=0.5, c=0.1')
plot_var(data[2], 'p=0.75, c=0.1')
plot_var(data[3], 'p=0.5, c=0.3')
plot_var(data[4], 'p=0.5, c=0.5')

plt.legend()
plt.xlabel('Bias')
plt.ylabel('Polarisation variance')

Here we find the peaks of the polarisation variance graph to estimate the critical bias value B. We observe the critical bias value B = 0.3, about which polarisation changes rapidly.

In [None]:
peaks = []

for d in data:
	vars = {}
	for b, ps in d.items():
		vars[b] = variance(ps)
	
	peaks.append(max(d.keys(), key=lambda b: vars[b]))

peaks


In the next data sets, the platform bias and recommendation bias values are introduced to the simulations. We graph the proportional of agents conforming to platform's opinion against the ratio platform bias / recommendation bias.

In [None]:

# load the data

file = 'data/{}-vs-polarisation/data-b{}-p0.5-c0.1-{}1.json'

platform_data = []
rec_data = []

for b in [0, 0.25, 0.5, 0.75]:
	with open(file.format('platform', b, 'rec'), 'r') as f:
		platform_data.append(json.load(f)['data'])

	with open(file.format('rec', b, 'platform'), 'r') as f:
		rec_data.append(json.load(f)['data'])

In [None]:
def get_data(data):
	'''
	Extracts the relevant data from the json data in the format [xs, ys] for graphing.
	'''

	xs = []
	ys = []
	for key, val in data.items():
		xs.append(float(key))
		y = []
		for v in val:
			y.append(float(v[1] if v[0] == 1 else v[2]))
		ys.append(sum(y) / len(y))
	return xs, ys

In [None]:
def plot_conform(data, xlabel):
	'''
	Plots the average proportion of agents that agree with the platform.
	'''

	xs = []
	ys = []

	for d in data:
		x, y = get_data(d)
		xs.append(x)
		ys.append(y)

	for i, b in enumerate([0, 0.25, 0.5, 0.75]):
		plt.plot(xs[i], ys[i], label=f'b={b}')

	plt.legend()
	plt.xlabel(xlabel)
	plt.ylabel('Average fraction conforming to platform')

In [None]:
plot_conform(platform_data, 'Platform bias / Recommendation bias')

Graph fraction of agents conforming to platform's opinion against level of recommendation bias

In [None]:
plot_conform(rec_data, 'Recommendation bias / Platform bias')

In the following sections, we plot the polarisation values as heatmaps with varying agent bias and PB / RB ratios, for different poster opinion distributions.

In [None]:

a1 = np.load('data/platform-vs-agent-bias/data-p0.3-c0.1-n0-uniform.npy')
a2 = np.load('data/platform-vs-agent-bias/data-p0.3-c0.1-n0-bimodal.npy')
a3 = np.load('data/platform-vs-agent-bias/data-p0.3-c0.1-n0-centered.npy')
a4 = np.load('data/platform-vs-agent-bias/data-p0.3-c0.1-n0-skewed.npy')

def plot_model(arr, ax, title):
	im = ax.imshow(arr, cmap='seismic', interpolation='spline36', origin='lower', vmin=0.4, vmax=1.0)
	ax.set_title(title, fontsize=12)
	ax.set_xticks(np.arange(10, 100, 10), np.round(np.linspace(0.2, 1.8, 9), 2), rotation=45)
	ax.set_yticks(np.arange(10, 100, 10), np.round(np.linspace(0.1, 0.9, 9), 2))
	return im

fig, axs = plt.subplots(2, 2, figsize=(12, 12), sharex=True, sharey=True)

im1 = plot_model(a1, axs[0, 0], 'Uniform Distribution')
plot_model(a2, axs[1, 0], 'Bimodal Distribution')
plot_model(a3, axs[0, 1], 'Centered Distribution')
plot_model(a4, axs[1, 1], 'Skewed Distribution')

plt.suptitle('Platform vs Agent Bias for Different Poster Opinion Distributions', fontsize=16)

cbax_ax = fig.add_axes([0.92, 0.15, 0.01, 0.7])
cbar = fig.colorbar(im1, cax=cbax_ax)

cbar.ax.get_yaxis().labelpad = 30
cbar.ax.set_ylabel('Proportion of agents aligning with platform opinion', rotation=270, fontsize=12)

fig.text(0.5, 0.04, 'Platform Bias / Recommendation Bias, PB / RB', ha='center', fontsize=12)
fig.text(0.08, 0.5, 'Agent Bias, B', va='center', rotation='vertical', fontsize=12)

# fig.savefig(f'data/platform-vs-agent-bias/heatmap.png', dpi=600, bbox_inches='tight')

Below we graph post distribution models corresponding to each heatmap.

In [None]:
def plot_poster_distribution_model(dist):
	opinions = np.arange(-1.0, 1.0, 0.01)

	if dist == 'uniform':
		probs = np.ones_like(opinions)
	elif dist == 'bimodal':
		probs = np.cos((opinions + 1) / 2 * np.pi) ** 2
	elif dist == 'centered':
		probs = np.cos(opinions / 2 * np.pi) ** 2
	elif dist == 'skewed':
		probs = np.cos((opinions + 1) / 4 * np.pi) ** 2
	else:
		raise ValueError('Invalid poster distribution')
	
	probs /= np.sum(probs)
	fig, ax = plt.subplots(figsize=(8, 6))
	ax.plot(opinions, probs)
	ax.set_yticks([])

In [None]:
plot_poster_distribution_model('uniform')
plot_poster_distribution_model('bimodal')
plot_poster_distribution_model('centered')
plot_poster_distribution_model('skewed')