Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Octocat-spinner-32-eaf2f5

Cannot retrieve contributors at this time

file 158 lines (128 sloc) 4.481 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
# encoding: UTF-8

# --
# Copyright (C) 2008-2012 10gen Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ++

module Mongo

  # Instantiates and manages connections to a MongoDB sharded cluster for high availability.
  class MongoShardedClient < MongoReplicaSetClient
    include ThreadLocalVariableManager

    SHARDED_CLUSTER_OPTS = [:refresh_mode, :refresh_interval, :tag_sets, :read]

    attr_reader :seeds, :refresh_interval, :refresh_mode,
                :refresh_version, :manager

    def initialize(*args)
      opts = args.last.is_a?(Hash) ? args.pop : {}

      nodes = args.flatten

      if nodes.empty? and ENV.has_key?('MONGODB_URI')
        parser = URIParser.new ENV['MONGODB_URI']
        if parser.direct?
          raise MongoArgumentError, "Mongo::MongoShardedClient.new called with no arguments, but ENV['MONGODB_URI'] implies a direct connection."
        end
        opts = parser.connection_options.merge! opts
        nodes = [parser.nodes]
      end

      unless nodes.length > 0
        raise MongoArgumentError, "A MongoShardedClient requires at least one seed node."
      end

      @seeds = nodes.map do |host_port|
        host, port = host_port.split(":")
        [ host, port.to_i ]
      end

      # TODO: add a method for replacing this list of node.
      @seeds.freeze

      # Refresh
      @last_refresh = Time.now
      @refresh_version = 0

      # No connection manager by default.
      @manager = nil

      # Lock for request ids.
      @id_lock = Mutex.new

      @pool_mutex = Mutex.new
      @connected = false

      @safe_mutex_lock = Mutex.new
      @safe_mutexes = Hash.new {|hash, key| hash[key] = Mutex.new}

      @connect_mutex = Mutex.new
      @refresh_mutex = Mutex.new

      @mongos = true

      check_opts(opts)
      setup(opts)
    end

    def valid_opts
      GENERIC_OPTS + SHARDED_CLUSTER_OPTS
    end

    def inspect
      "<Mongo::MongoShardedClient:0x#{self.object_id.to_s(16)} @seeds=#{@seeds.inspect} " +
          "@connected=#{@connected}>"
    end

    # Initiate a connection to the sharded cluster.
    def connect(force = !@connected)
      return unless force
      log(:info, "Connecting...")

      # Prevent recursive connection attempts from the same thread.
      # This is done rather than using a Monitor to prevent potentially recursing
      # infinitely while attempting to connect and continually failing. Instead, fail fast.
      raise ConnectionFailure, "Failed to get node data." if thread_local[:locks][:connecting]

      @connect_mutex.synchronize do
        begin
          thread_local[:locks][:connecting] = true
          if @manager
            @manager.refresh! @seeds
          else
            @manager = ShardingPoolManager.new(self, @seeds)
            thread_local[:managers][self] = @manager
            @manager.connect
          end
        ensure
          thread_local[:locks][:connecting] = false
        end

        @refresh_version += 1
        @last_refresh = Time.now
        @connected = true
      end
    end

    # Force a hard refresh of this connection's view
    # of the sharded cluster.
    #
    # @return [Boolean] +true+ if hard refresh
    # occurred. +false+ is returned when unable
    # to get the refresh lock.
    def hard_refresh!
      log(:info, "Initiating hard refresh...")
      connect(true)
      return true
    end

    def connected?
      @connected && @manager.primary_pool
    end

    # Returns +true+ if it's okay to read from a secondary node.
    # Since this is a sharded cluster, this must always be false.
    #
    # This method exist primarily so that Cursor objects will
    # generate query messages with a slaveOkay value of +true+.
    #
    # @return [Boolean] +true+
    def slave_ok?
      false
    end

    def checkout(&block)
      tries = 0
      begin
        super(&block)
      rescue ConnectionFailure
        tries +=1
        tries < 2 ? retry : raise
      end
    end
  end
end
Something went wrong with that request. Please try again.