*Simulation 01 for "Concordia_Check" project*

# Stanford Prison Experiment

Conducted in 1971 by Philip Zimbardo, this experiment involved 24 male university students who were assigned roles as either guards or prisoners in a mock prison environment. The study was intended to last two weeks but was terminated after six days due to the extreme psychological effects on participants. The experiment provides insights into the power of situational influences on behavior and social roles. [Source](https://www.apa.org/topics/forensics-law-public-safety/prison).

## Init and import

In [None]:
!pip install git+https://github.com/google-deepmind/concordia.git

In [None]:
!pip install sentence_transformers

In [10]:
# @title Imports

import collections
import concurrent.futures
import datetime
import random

# from google.colab import widgets  # pytype: disable=import-error
from IPython import display
import sentence_transformers

from concordia import components as generic_components
from concordia.agents import basic_agent
from concordia.components import agent as components
from concordia.agents import basic_agent
from concordia.associative_memory import associative_memory
from concordia.associative_memory import blank_memories
from concordia.associative_memory import formative_memories
from concordia.associative_memory import importance_function
from concordia.clocks import game_clock
from concordia.components import game_master as gm_components
from concordia.environment import game_master
from concordia.metrics import goal_achievement
from concordia.metrics import common_sense_morality
from concordia.metrics import opinion_of_others
from concordia.utils import measurements as measurements_lib
from concordia.language_model import gpt_model
from concordia.language_model import gcloud_model
from concordia.utils import html as html_lib
from concordia.utils import plotting


In [5]:
# @title Setup sentence encoder
st_model = sentence_transformers.SentenceTransformer('sentence-transformers/all-mpnet-base-v2')
embedder = lambda x: st_model.encode(x, show_progress_bar=False)


Note: there must be ".env " file in this folder with GPT_API_KEY key string

In [6]:
import os


from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())  # read local .env file

GPT_API_KEY=os.environ.get("GPT_API_KEY")

GPT_MODEL_NAME= 'gpt-3.5-turbo'
#
#


In [7]:
GPT_MODEL_NAME

'gpt-3.5-turbo'

In [8]:
# @title Language Model - pick your model and provide keys
CLOUD_PROJECT_ID = '' #@param {type: 'string'}
# GPT_API_KEY = '' #@param {type: 'string'}
# GPT_MODEL_NAME = '' #@param {type: 'string'}

USE_CLOUD = False #@param {type: 'boolean'}

if USE_CLOUD:
  model = gcloud_model.CloudLanguageModel(project_id= CLOUD_PROJECT_ID)
else:
  model = gpt_model.GptLanguageModel(api_key=GPT_API_KEY, model_name=GPT_MODEL_NAME)

## Configuring the generic knowledge of players and GM.

In [11]:
#@title Make importance models

# Define the importance model for agents
importance_model = importance_function.AgentImportanceModel(model)

# Define a constant importance model for the Game Master (neutral entity)
importance_model_gm = importance_function.ConstantImportanceModel()

# Create the importance models for guards and prisoners
guard_names = ["Mike", "John", "Alex", "Steve", "Kevin", "Paul", "James", "Brian", "Robert", "Tom", "Mark", "Jake"]
prisoner_names = ["Ostin", "Sam", "Chris", "David", "Jack", "Adam", "Eric", "Scott", "Andy", "Josh", "Nick", "Ryan"]

guards = [{"name": f"{name} (guard)", "role": "guard", "importance_model": importance_model} for name in guard_names]
prisoners = [{"name": f"{name} (prisoner)", "role": "prisoner", "importance_model": importance_model} for name in prisoner_names]

# Combine guards and prisoners
agents = guards + prisoners

# Define the Game Master
game_master = {"name": "Game Master", "role": "neutral", "importance_model": importance_model_gm}

# # Include as first memory that this is an experiment
# first_memory = "This is an experiment. We are not real guards or prisoners, but due to experiment rules, we have agreed to act as guards and prisoners."

# # Add first memory to all agents
# for agent in agents:
#     agent["memories"] = [first_memory]

print("Importance models and agents created successfully.")



ImportError: cannot import name 'importance_function' from 'concordia' (c:\Python311\Lib\site-packages\concordia\__init__.py)

In [32]:
SETUP_TIME = datetime.datetime(hour=8, year=2024, month=6, day=1)

# Start time for the simulation
START_TIME = datetime.datetime(hour=9, year=2024, month=6, day=1)

# Create the game clock with multi-interval steps
clock = game_clock.MultiIntervalClock(
    start=SETUP_TIME,
    step_sizes=[datetime.timedelta(hours=1), datetime.timedelta(seconds=10)]
)

print("Clock setup complete. Simulation starts at:", START_TIME)


In [12]:
# @title Generic memories are memories that all players and GM share.

shared_memories = [
    "This is an experiment. We are not real guards or prisoners, but due to experiment rules, we have agreed to act as guards and prisoners.",
    "We are university students participating in this experiment.",
    "The prison is located in the basement of the psychology building at Stanford University.",
    "There are 12 guards and 12 prisoners in this experiment.",
    "The guards are responsible for maintaining order and enforcing the prison rules.",
    "The prisoners are to follow the instructions given by the guards.",
    "The prison has cells for the prisoners, a common area, and a guard room.",
    "The experiment will run for up to 14 days unless terminated earlier.",
    "Participants have agreed to follow the roles strictly for the duration of the experiment.",
    "Stress levels of all participants will be monitored throughout the experiment.",
    "The prison structure includes a guard room, cells, a cell area, a main hall, and rooms or halls designated for roll call, meals, exercise, and work assignments.",
    "The prison may have several levels and additional spaces such as showers and restrooms.",
    "The prison plan includes specific areas designated for various activities. The guard room is located near the entrance for easy access. Cells are grouped together in the cell area, with each cell housing one or two prisoners. The main hall is used for large gatherings and activities. Separate rooms are designated for roll call, meals, exercise, and work assignments. Showers and restrooms are located at the end of the cell block.",
    "The prison is designed to facilitate control and observation, with guards able to monitor all areas from the central guard room. The layout ensures that prisoners can be easily directed to different activity areas while maintaining order and security."
    "The daily schedule includes roll call, meals, exercise, and work assignments.",
    "Roll call takes place at 7:00 AM and 7:00 PM.",
    "Meals are served three times a day: breakfast at 8:00 AM, lunch at 12:00 PM, and dinner at 6:00 PM.",
    "Exercise time is scheduled for 9:00 AM and 4:00 PM, lasting one hour each.",
    "Work assignments are given from 10:00 AM to 12:00 PM and 2:00 PM to 4:00 PM.",
    "Prison rules prohibit physical violence and require respect for all participants.",
    "Guards must use psychological tactics and non-violent methods to maintain control.",
    "Punishments for rule violations include verbal warnings, temporary isolation, and loss of privileges.",
    "Privileges for good behavior include extra recreation time and additional snacks.",
]

# The generic context will be used for the NPC context. It reflects general
# knowledge and is possessed by all characters.
shared_context = model.sample_text(
    'Summarize the following passage in a concise and insightful fashion:\n'
    + '\n'.join(shared_memories)
    + '\n'
    + 'Summary:'
)
print(shared_context)


KeyboardInterrupt: 

## Functions to build the players

In [34]:
# @title setup formative memory factories
blank_memory_factory = blank_memories.MemoryFactory(
    model=model,
    embedder=embedder,
    importance=importance_model.importance,
    clock_now=clock.now,
)
formative_memory_factory = formative_memories.FormativeMemoryFactory(
    model=model,
    shared_memories=shared_memories,
    blank_memory_factory_call=blank_memory_factory.make_blank_memory,
)

print("Formative memory factories setup complete.")

In [35]:
def build_agent(
    agent_config,
    player_names: list[str],
    measurements: measurements_lib.Measurements | None = None,
):
  mem = formative_memory_factory.make_memories(agent_config)

  # Build the player.

  time = components.report_function.ReportFunction(
      name='current_time', function=clock.current_time_interval_str
  )

  identity = components.identity.SimIdentity(model, mem, agent_config.name)
  goal_component = generic_components.constant.ConstantComponent(
      state=agent_config.goal
  )
  reflection = components.reflection.Reflection(
      model=model,
      memory=mem,
      agent_name=agent_config.name,
      importance_threshold=15.0,
      verbose=False,
  )
  plan = components.plan.SimPlan(
      model,
      mem,
      agent_config.name,
      clock_now=clock.now,
      components=[identity, time],
      goal=goal_component,
      verbose=False,
  )
  current_obs = components.observation.Observation(
      agent_name=agent_config.name,
      clock_now=clock.now,
      memory=mem,
      timeframe=clock.get_step_size(),
      component_name='current observations',
  )
  summary_obs = components.observation.ObservationSummary(
      agent_name=agent_config.name,
      model=model,
      clock_now=clock.now,
      memory=mem,
      timeframe_delta_from=datetime.timedelta(hours=4),
      timeframe_delta_until=datetime.timedelta(hours=1),
      components=[identity],
      component_name='summary of observations',
  )


  goal_metric = goal_achievement.GoalAchievementMetric(
      model=model,
      player_name=agent_config.name,
      player_goal=agent_config.goal,
      clock=clock,
      name='Goal Achievement',
      measurements=measurements,
      channel='goal_achievement',
      verbose=False,
  )
  morality_metric = common_sense_morality.CommonSenseMoralityMetric(
      model=model,
      player_name=agent_config.name,
      clock=clock,
      name='Morality',
      verbose=False,
      measurements=measurements,
      channel='common_sense_morality',
  )
  
      # Add stress measurement component
  stress_metric = measurements_lib.StressLevelMetric(
        model=model,
        player_name=agent_config.name,
        clock=clock,
        name='Stress Level',
        measurements=measurements,
        channel='stress_level',
        verbose=False,
    )

  agent = basic_agent.BasicAgent(
      model,
      mem,
      agent_name=agent_config.name,
      clock=clock,
      verbose=True,
      components=[
          identity,
          plan,
          reflection,
          time,
          summary_obs,
          current_obs,
          goal_metric,
          morality_metric,
          stress_metric,  # Add the stress metric component
      ],
  )

  reputation_metric = opinion_of_others.OpinionOfOthersMetric(
      model=model,
      player_name=agent_config.name,
      player_names=player_names,
      context_fn=agent.state,
      clock=clock,
      name='Opinion',
      verbose=False,
      measurements=measurements,
      channel='opinion_of_others',
      question="What is {opining_player}'s opinion of {of_player}?",
  )

  agent.add_component(reputation_metric)

  return agent

## Configure and build the players

In [36]:
NUM_PLAYERS = 24

def make_random_big_five() -> str:
    return str({
        'extraversion': random.randint(1, 10),
        'neuroticism': random.randint(1, 10),
        'openness': random.randint(1, 10),
        'conscientiousness': random.randint(1, 10),
        'agreeableness': random.randint(1, 10),
    })

# Create names for guards and prisoners
guard_names = ["Mike", "John", "Alex", "Steve", "Kevin", "Paul", "James", "Brian", "Robert", "Tom", "Mark", "Jake"]
prisoner_names = ["Ostin", "Sam", "Chris", "David", "Jack", "Adam", "Eric", "Scott", "Andy", "Josh", "Nick", "Ryan"]

# Shared context for the experiment
shared_context_experiment = shared_context

# Define player configurations for guards
guard_configs = [
    formative_memories.AgentConfig(
        name=f'{name} (guard)',
        gender='male',
        goal='Maintain order and enforce prison rules.',
        context=shared_context_experiment + f' {name} is a guard responsible for maintaining order.',
        traits=make_random_big_five()
    )
    for name in guard_names
]

# Define player configurations for prisoners
prisoner_configs = [
    formative_memories.AgentConfig(
        name=f'{name} (prisoner)',
        gender='male',
        goal='Follow the instructions given by the guards.',
        context=shared_context_experiment + f' {name} is a prisoner and must follow the rules.',
        traits=make_random_big_five()
    )
    for name in prisoner_names
]

# Combine guard and prisoner configurations
player_configs = guard_configs + prisoner_configs

print("Player configurations created for the Stanford Prison Experiment.")


In [37]:
# Ensure the player_configs list is truncated to NUM_PLAYERS
player_configs = player_configs[:NUM_PLAYERS]
player_names = [player.name for player in player_configs][:NUM_PLAYERS]
measurements = measurements_lib.Measurements()

players = []

# Using a ThreadPoolExecutor to build agents concurrently
with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_PLAYERS) as pool:
    for agent in pool.map(build_agent,
                          player_configs[:NUM_PLAYERS],
                          # All players get the same `player_names`.
                          [player_names] * NUM_PLAYERS,
                          # All players get the same `measurements` object.
                          [measurements] * NUM_PLAYERS):
        players.append(agent)

print("Players created and configured successfully.")


Number of generated formative episodes (1) does not match number of formative ages (5).
Number of generated formative episodes (1) does not match number of formative ages (5).
Number of generated formative episodes (1) does not match number of formative ages (5).
Number of generated formative episodes (1) does not match number of formative ages (5).
Number of generated formative episodes (1) does not match number of formative ages (5).


## Build GM

In [38]:
# Initialize the Game Master's memory
game_master_memory = associative_memory.AssociativeMemory(
    embedder, importance_model_gm.importance, clock=clock.now
)

print("Game Master's associative memory setup complete.")


In [39]:
# @title Create components and externalities

# Extract player names for guards and prisoners
# citizen_names = [player.name for player in players]
player_names = [player.name for player in players]

# General knowledge component with shared memories about the experiment
facts_on_experiment = generic_components.constant.ConstantComponent(
    ' '.join(shared_memories), 'General knowledge of the experiment'
)

# Player status component to track the status of all participants
player_status = gm_components.player_status.PlayerStatus(
    clock.now, model, game_master_memory, player_names
)

# Component to track relevant events during the simulation
relevant_events = gm_components.relevant_events.RelevantEvents(
    clock.now, model, game_master_memory
)

# Time display component to keep track of simulation time
time_display = gm_components.time_display.TimeDisplay(clock)

# Conversation externality to manage interactions between players
convo_externality = gm_components.conversation.Conversation(
    players,
    model,
    clock=clock,
    memory=game_master_memory,
    burner_memory_factory=blank_memory_factory,
    components=[player_status],
    cap_nonplayer_characters=2,
    shared_context=shared_context_experiment,  # Updated shared context for the experiment
    verbose=False,
)

# Direct effect externality to manage direct interactions and effects
direct_effect_externality = gm_components.direct_effect.DirectEffect(
    players,
    model=model,
    memory=game_master_memory,
    clock_now=clock.now,
    verbose=False,
    components=[player_status],
)

print("Components and externalities created successfully.")


In [40]:
# Create the Game Master object for the Stanford Prison Experiment
env = game_master.GameMaster(
    model=model,
    memory=game_master_memory,
    clock=clock,
    players=players,
    components=[
        facts_on_experiment,  # Updated component for the experiment context
        player_status,
        convo_externality,
        direct_effect_externality,
        relevant_events,
        time_display,
    ],
    randomise_initiative=True,
    player_observes_event=False,
    verbose=True,
)

print("Game Master object created successfully.")

## The RUN

In [41]:
clock.set(START_TIME)

In [42]:
# @title Initial observations and player location
for player in players:
    if "guard" in player.name:
        player.observe(
            f'{player.name} is in the guard room, preparing for the day.'
        )
        game_master_memory.add(f'{player.name} is in the guard room, ready for duty.')
    elif "prisoner" in player.name:
        player.observe(
            f'{player.name} is in their cell, they have just woken up.'
        )
        game_master_memory.add(f'{player.name} is in their cell, awaiting instructions.')

print("Initial observations and player locations set.")


In [44]:
# @title Expect about 2-3 minutes per step.
# episode_length = 14  # @param {type: 'integer'}
episode_length = 1  # @param {type: 'integer'}

# start with one episode first

for _ in range(episode_length):
  env.step()


KeyboardInterrupt: 

## Summary and analysis of the episode

In [45]:
import collections
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from concordia import plotting

# Mockup of measurements and plotting module
class Measurements:
    def available_channels(self):
        return ['stress_level', 'opinion_of_others', 'goal_achievement', 'morality']

measurements = Measurements()

# Define the plotting function
def plot_line_measurement_channel(measurements, channel, group_by, xaxis):
    # Replace this with your actual plotting logic using measurements data
    x = np.arange(1, 15)
    if channel == 'stress_level':
        y_guards = np.random.rand(14) * 50 + 50  # Mock stress level data for guards
        y_prisoners = np.random.rand(14) * 50 + 25  # Mock stress level data for prisoners
        
        plt.plot(x, y_guards, label=f'Guards {channel}')
        plt.plot(x, y_prisoners, label=f'Prisoners {channel}')
    else:
        y = np.random.rand(14)  # Random data for demonstration
        plt.plot(x, y, label=f'{channel} data')
    
    plt.xlabel(xaxis)
    plt.ylabel(channel)
    plt.legend()

# Separate function to calculate averages and plot them
def plot_group_averages(data, group, metric, xaxis):
    x = np.arange(1, len(data) + 1)
    averages = data.mean(axis=0)
    plt.plot(x, averages, label=f'{group} {metric} Average')
    plt.xlabel(xaxis)
    plt.ylabel(metric)
    plt.legend()

# Mockup data for plotting
player_data = {
    'guard': pd.DataFrame(np.random.rand(14, 10) * 50 + 50),  # Mock data for guards
    'prisoner': pd.DataFrame(np.random.rand(14, 10) * 50 + 25)  # Mock data for prisoners
}

# Group by default to 'player' unless specified otherwise
group_by = collections.defaultdict(lambda: 'player')
group_by['opinion_of_others'] = 'of_player'

# Using Matplotlib for plotting
for channel in measurements.available_channels():
    plt.figure()
    if channel == 'stress_level':
        plot_line_measurement_channel(measurements, channel, group_by=group_by[channel], xaxis='time_str')
    else:
        plot_group_averages(player_data['guard'], 'Guards', channel, 'time')
        plot_group_averages(player_data['prisoner'], 'Prisoners', channel, 'time')
    
    plt.title(channel)
    plt.show()

print("Metrics plotting setup complete.")


NameError: name 'widgets' is not defined

In [47]:
# @title Summarize the entire story.
# Retrieve recent memories from the Game Master
all_gm_memories = env._memory.retrieve_recent(k=10000, add_time=True)

# Combine all memories into a detailed story
detailed_story = '\n'.join(all_gm_memories)
print('len(detailed_story): ', len(detailed_story))

# Generate a narrative summary using the model
episode_summary = model.sample_text(
    f'Sequence of events:\n{detailed_story}'+
    '\nNarratively summarize the above temporally ordered ' +
    'sequence of events. Write it as a news report. Summary:\n',
    max_characters=3500, max_tokens=3500, terminators=()
)
print(episode_summary)


len(detailed_story):  19787
In a peaceful town called Riverbend, a series of events unfolded on October 1, 2024. The day began with Alice preparing and enjoying breakfast at her private home, followed by Charlie, Dorothy, and Ellen also being at their respective homes. Bob joined in by preparing and eating breakfast at his private home as well. Meanwhile, Charlie went for a morning jog, and Ellen had breakfast and read the newspaper at her private home.

As the morning progressed, Alice left her home for a jog in the nearby park, passing by Bob's house. Ellen also went for a jog, crossing paths with Charlie and engaging in a brief chat about their exercise routines. Dorothy joined in the jogging activities, while Charlie continued working on a project at home.

Throughout the day, Alice continued her workout at the park, while Bob cleaned his house and went grocery shopping. Ellen organized a community book drive, and Charlie took breaks from their project to go for walks in the park. 

In [48]:
# @title Summarize the perspective of each player
from concordia import html_lib

player_logs = []
player_log_names = []

for player in players:
    name = player.name
    detailed_story = '\n'.join(player._memory.retrieve_recent(k=1000, add_time=True))
    
    # Generate a summary of events for each player
    summary = model.sample_text(
        f'Sequence of events that happened to {name}:\n{detailed_story}'
        '\nWrite a short story that summarizes these events.\n',
        max_characters=3500, max_tokens=3500, terminators=()
    )
    
    # Retrieve all recent memories for the player
    all_player_mem = player._memory.retrieve_recent(k=1000, add_time=True)
    all_player_mem = ['Summary:', summary, 'Memories:'] + all_player_mem
    
    # Convert the player's memories and summary to HTML
    player_html = html_lib.PythonObjectToHTMLConverter(all_player_mem).convert()
    player_logs.append(player_html)
    player_log_names.append(f'{name}')

print("Player perspectives summarized and converted to HTML.")


#Build and display HTML log of the experiment

In [49]:
# Retrieve histories from various sources and convert to HTML
history_sources = [env, direct_effect_externality, convo_externality]

# Convert each history to HTML
histories_html = [html_lib.PythonObjectToHTMLConverter(history.get_history()).convert() for history in history_sources]

# Get names of each history source for labeling
histories_names = [type(history).__name__ for history in history_sources]

print("Histories converted to HTML.")


In [63]:
# Convert all GM memories to HTML
gm_mem_html = html_lib.PythonObjectToHTMLConverter(all_gm_memories).convert()

# Combine all histories and player logs into a single tabbed HTML page
tabbed_html = html_lib.combine_html_pages(
    histories_html + [gm_mem_html] + player_logs,
    histories_names + ['GM'] + player_log_names,
    summary=episode_summary,
    title='Stanford Prison Experiment Simulation'
)

# Finalize the HTML content
tabbed_html = html_lib.finalise_html(tabbed_html)

print("Final HTML report generated.")


In [64]:
# File path for the HTML report
file_path = 'stanford_prison_experiment_output.html'

# Write HTML to the file
with open(file_path, 'w') as html_file:
    html_file.write(tabbed_html)

print(f"HTML saved to {file_path}")


HTML saved to day_in_riverbend_output.html


In [65]:
display.HTML(tabbed_html)

#Interact with a specific player

In [78]:
import pandas as pd

# Define the questions
questions = [
    'What do you think of yourself?',
    'What do you think of others in your group?',
    'What do you think of those in the opposite group?',
    'What are your general thoughts and feelings?',
    'On a scale of 0 to 100, what was your stress level in the morning?',
    'On a scale of 0 to 100, what was your stress level in the evening?',
    'On a scale of 0 to 100, what was your happiness level in the morning?',
    'On a scale of 0 to 100, what was your happiness level in the evening?',
    'On a scale of 0 to 100, how much do you believe in yourself?',
    'On a scale of 0 to 100, how much do you believe in humanity?',
    'What do you think of this experiment?',
    'What improvements would you suggest for this experiment?',
    'What do you think should be improved in the prison environment?'
]

# Stages of the experiment
stages = ['before', 'beginning', 'middle', 'end', 'after']

# Create a DataFrame to store the responses
columns = ['Name', 'Type'] + [f'{question} ({stage})' for question in questions for stage in stages]
responses_df = pd.DataFrame(columns=columns)

# Function to interact with the player and collect responses
def collect_responses(player, player_type):
    player_responses = [player.name, player_type]
    for stage in stages:
        for question in questions:
            interaction_premise = f'{player.name} is being asked the following question by a close friend during the {stage} of the experiment.\n'
            interrogation = f'{interaction_premise}Close Friend: {question}\n'
            player_response = player.say(interrogation)
            player_responses.append(player_response)
    return player_responses

# Collect responses for each player
for player in players:
    player_type = 'guard' if 'guard' in player.name.lower() else 'prisoner'
    player_responses = collect_responses(player, player_type)
    responses_df.loc[len(responses_df)] = player_responses

# Write the DataFrame to a CSV file
file_path = 'stanford_prison_experiment_responses.csv'
responses_df.to_csv(file_path, index=False)

print(f"Responses saved to {file_path}")


In [None]:
responses_df