Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

file 173 lines (142 sloc) 6.088 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173
# Copyright 2012 MemSQL, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import random, time, sys
from multiprocessing import Pool, Process, Pipe, JoinableQueue
from config import (NUM_WORKERS, PLAYERS_PER_WORKER, NUM_PLAYERS, DATABASE,
                   SHOW_GAME_STATISTICS, GAME_ENABLE_LOG, TOTAL_TIME,
                   SAMPLE_INTERVAL)
from mongo_database import MongoDatabase
from sql_database import MySqlDatabase, MemSqlDatabase
from utils import Reporter

Database = eval(DATABASE)

def simulate_play(db, me):
    '''Simulate one action for player. This can be either starting a game,
making a move or winning a game, and is determined randomly.'''

    # First, get a list of games for the current player.
    games = db.get_games(me)

    # If the number of games is small, consider starting a new game.
    if len(games) == 0 or (len(games) < 10 and random.random() > .3):
        other = random.randint(0, NUM_PLAYERS - 1)
        if other >= me:
            other += 1
        
        db.start_game(me, other)
        if GAME_ENABLE_LOG:
            db.log(me, 'challenged ' + str(other))
    elif len(games) > 0:
        # Otherwise, we will make a move in some game.
        game = random.choice(games)

        # The longer the game, the higher the chance that we win a game.
        if random.random() < min((float(game['turn']) / 50) ** 3, .8):
            db.end_game(me, game)
            if GAME_ENABLE_LOG:
                db.log(me, 'won')
        else:
            # Otherwise, we advance the game by one step.
            db.make_move(me, game)
            if GAME_ENABLE_LOG:
                db.log(me, 'moved')

def worker_main(creation_queue, pipe, worker_id):
    '''The code for a worker. First the worker helps fill the database with
players. Then the worker will simulate its players.'''
    # We need a try block to cleanly exit when ctrl-c is pressed.
    try:
        # Setup our reporting
        db = Database()
        reporter = Reporter(pipe)

        # We are responsible for the following range of players.
        players = range(worker_id * PLAYERS_PER_WORKER, (worker_id + 1) * PLAYERS_PER_WORKER)

        # Add all our players to the db.
        for player in players:
            db.add_player(player)

        # Mark our worker as done initializing.
        creation_queue.get()
        creation_queue.task_done()
        # Wait for all workers to be done.
        creation_queue.join()

        # Now simulate actions.
        while True:
            simulate_play(db, random.choice(players))
            reporter.mark_event()
    except KeyboardInterrupt:
        pass

class WorkerInfo(object):
    '''Utility class to store some information per process.'''
    def __init__(self, pipe, process):
        self.pipe = pipe
        self.process = process
        self.qps = 0

def main():
    print >>sys.stderr, "Using", DATABASE

    # Setup the database.
    db = Database()
    db.setup()

    # Create a synchronizing queue to coordinate the creation of players. Use
    # it as a semaphore, where we start simulating once each thread has popped
    # and marked one item as done.
    creation_queue = JoinableQueue()
    for i in range(NUM_WORKERS):
        creation_queue.put(i)

    # Create our worker processes and let them start.
    print >>sys.stderr, "Spawning %s workers" % NUM_WORKERS
    workers = []
    for i in range(NUM_WORKERS):
        left, right = Pipe()
        process = Process(target=worker_main, args=(creation_queue, right, i))
        process.start()
        workers.append(WorkerInfo(left, process))

    # Catch KeyboardInterrupt so we can cleanly exit all workers.
    try:
        sum_qps = 0
        total_samples = 0

        print >>sys.stderr, "Setting up %s players" % NUM_PLAYERS
        creation_queue.join()

        print >>sys.stderr, "Simulating!"
        # Store last reporting time so we can print qps once per second.
        last_qps = last_stats = last_sample = time.time()
        samples = []

        start_time = time.time()
        while TOTAL_TIME is None or time.time() - start_time < TOTAL_TIME:
            # Poll each worker to see if qps information has been updated.
            for worker in workers:
                while worker.pipe.poll():
                    worker.qps = worker.pipe.recv()

            qps = sum(worker.qps for worker in workers)

            # Report qps and statistics.
            if time.time() - last_qps > 1:
                last_qps = time.time()
                print >>sys.stderr, "%s actions per second" % qps

                # Additionally, compute a running average over the qps.
                sum_qps += qps
                total_samples += 1

            if SHOW_GAME_STATISTICS and time.time() - last_stats > 2.5:
                last_stats = time.time()
                print >>sys.stderr, "Statistics:", db.get_stats()

            if SAMPLE_INTERVAL is not None and time.time() - last_sample > SAMPLE_INTERVAL:
                samples.append(qps)
                last_sample = time.time()
            time.sleep(0.01)

    except KeyboardInterrupt:
        # Shut down each worker.
        pass
    finally:
        for worker in workers:
            worker.pipe.close()
            worker.process.terminate()
        
    print >>sys.stderr, "Average actions per second", sum_qps / total_samples

    if SAMPLE_INTERVAL is not None:
        # For javascript output
        print "sample_interval = %s;" % SAMPLE_INTERVAL
        print "samples = %s;" % samples

if __name__ == '__main__':
    main()
Something went wrong with that request. Please try again.