Skip to content

Commit

Permalink
CBQE-2613: es test fixes and more additions
Browse files Browse the repository at this point in the history
Added more test cases for 2.1.0 release

Change-Id: I97169a8533a34cb5ef74833fd648186a429144ba
Reviewed-on: http://review.couchbase.org/49199
Tested-by: buildbot <build@couchbase.com>
Reviewed-by: Subhashni Balakrishnan <b.subhashni@gmail.com>
  • Loading branch information
bsubhashni committed May 8, 2015
1 parent 8fe39c1 commit 2ed303b
Show file tree
Hide file tree
Showing 14 changed files with 533 additions and 119 deletions.
6 changes: 6 additions & 0 deletions TestInput.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def __init__(self):
self.index_path = ''
self.n1ql_port = ''
self.index_port = ''
self.es_username = ''
self.es_password = ''

def __str__(self):
#ip_str = "ip:{0}".format(self.ip)
Expand Down Expand Up @@ -271,6 +273,10 @@ def get_server_options(servers, membase_settings, global_properties):
server.n1ql_port = global_properties['n1ql_port']
if server.index_port == '' and 'index_port' in global_properties:
server.index_port = global_properties['index_port']
if server.es_username == '' and 'es_username' in global_properties:
server.es_username = global_properties['es_username']
if server.es_password == '' and 'es_password' in global_properties:
server.es_password = global_properties['es_password']
return servers

@staticmethod
Expand Down
33 changes: 33 additions & 0 deletions conf/py-xdcr-elasticsearch-advanced.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
xdcr.esXDCR.ESTests:

############ ADVANCED CONFIGURATION TESTS ###############

#test ignore deletes
test_ignore_deletes,items=10000,rdirection=unidirection,end_replication_flag=1

#test wrap counters
test_wrap_counters,items=10000,rdirection=undirection,end_replication_flag=1

#test ignore failues
test_ignore_failures,items=10000,rdirection=unidirection,end_replication_flag=1

#test doc include filter
test_doc_include_filter,items=10000,rdirection=unidirection,end_replication_flag=1

#test doc exclude filter
test_doc_exclude_filter,items=10000,rdirection=unidirection,end_replication_flag=1

#test test_delimiter_type_selector
test_delimiter_type_selector,items=10000,rdirection=undirection,end_replication_flag=1

#test test_regex_type_selector
test_regex_type_selector,items=10000,rdirection=undirection,end_replication_flag=1

#test test_doc_routing
test_doc_routing,items=10000,rdirection=undirection,end_replication_flag=1






1 change: 1 addition & 0 deletions lib/couchbase_helper/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def async_init_node(self, server, disabled_consistent_view=None,
services - can be kv, n1ql, index
Returns:
NodeInitTask - A task future that is a handle to the scheduled task."""

_task = NodeInitializeTask(server, disabled_consistent_view, rebalanceIndexWaitingDisabled,
rebalanceIndexPausingDisabled, maxParallelIndexers, maxParallelReplicaIndexers,
port, quota_percent, services = services, index_quota_percent = index_quota_percent)
Expand Down
120 changes: 70 additions & 50 deletions lib/membase/api/esrest_client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from membase.api.rest_client import RestConnection, Bucket, BucketStats, OtpNode, Node
from remote.remote_util import RemoteMachineShellConnection
from TestInput import TestInputSingleton
import pyes
from pyes import ES, managers, query
import logger
import time
import requests
Expand All @@ -12,6 +12,7 @@
# Instance created by membase.api.rest_client.RestConnection
# when elastic-search endpoint is detected so it is not necessary to
# directly import this module into tests

class EsRestConnection(RestConnection):
def __init__(self, serverInfo, proto = "http"):
#serverInfo can be a json object
Expand All @@ -21,35 +22,43 @@ def __init__(self, serverInfo, proto = "http"):
# TODO: dynamic master node detection
if isinstance(serverInfo, dict):
self.ip = serverInfo["ip"]
self.username = serverInfo["username"]
self.password = serverInfo["password"]
self.rest_username = serverInfo["username"]
self.rest_password = serverInfo["password"]
self.username = serverInfo["es_username"]
self.password = serverInfo["es_password"]
self.port = 9091 #serverInfo["port"]
else:
self.ip = serverInfo.ip
self.username = serverInfo.rest_username
self.password = serverInfo.rest_password
self.rest_username = serverInfo.rest_username
self.rest_password = serverInfo.rest_password
self.username = serverInfo.es_username
self.password = serverInfo.es_password
self.port = 9091 # serverInfo.port

self.baseUrl = "http://{0}:{1}/".format(self.ip, self.port)
self.capiBaseUrl = self.baseUrl
self.esHttpUrl = "http://{0}:9200".format(self.ip)
self.http_port = str(int(self.port) + 109)
self.proto = proto
self.conn = pyes.ES((self.proto,self.ip,self.http_port))
self.conn = ES(server=self.esHttpUrl)
self.manager = managers.Cluster(self.conn)
self.test_params = TestInputSingleton.input
self.docs = None

def get_index_stats(self):
return pyes.index_stats()
return ES.index_stats()

def get_indices(self):
return self.conn.indices.get_indices()

def get_indices_as_buckets(self):
def get_indices_as_buckets(self, doc_type='couchbaseDocument'):
buckets = []
indices = self.get_indices()

for index in indices:
bucket = Bucket()
stats = self.conn.indices.stats()['indices'][index]
q = query.MatchAllQuery()
docs = self.conn.search(q,index,doc_type)
bucket.name = index
bucket.type = "es"
bucket.port = self.port
Expand All @@ -59,15 +68,15 @@ def get_indices_as_buckets(self):

#vBucketServerMap
bucketStats = BucketStats()
bucketStats.itemCount = stats['primaries']['docs']['count']
bucketStats.itemCount = docs.count()
bucket.stats = bucketStats
buckets.append(bucket)
bucket.master_id = "es@"+self.ip

return buckets

def get_bucket(self, bucket_name):
for bucket in self.get_indices_as_buckets():
def get_bucket(self, bucket_name, doc_type):
for bucket in self.get_indices_as_buckets(doc_type):
if bucket.name == bucket_name:
return bucket
return
Expand Down Expand Up @@ -105,7 +114,6 @@ def is_ns_server_running(self, timeout_in_seconds=360):


def node_statuses(self, timeout=120):

otp_nodes = []

for node in self.get_nodes():
Expand All @@ -123,36 +131,33 @@ def node_statuses(self, timeout=120):


def get_nodes_self(self, timeout=120):

for node in self.get_nodes():
# force to return master node
if node.port == 9091:
return node

return

def get_nodes(self):

es_nodes = []
nodes = self.conn.cluster_nodes()['nodes']
status = self.conn.cluster_health()['status']
nodes = self.manager.state()['nodes']
status = self.manager.health()['status']
if status == "green":
status = "healthy"

for node_key in nodes:
nodeInfo = nodes[node_key]
ex_params = self.get_node_params(nodeInfo)

nodeInfo.update({'ssh_password' : ex_params.ssh_password,
'ssh_username' : ex_params.ssh_username})
nodeInfo['key'] = node_key
node = ESNode(nodeInfo)
node.status = status
es_nodes.append(node)

return es_nodes

def get_node_params(self, info):
ip, port = parse_addr(info["couchbase_address"])
ip, port = parse_addr(info["transport_address"])
clusters = self.test_params.clusters
master_node = None
for _id in clusters:
Expand All @@ -168,7 +173,7 @@ def get_node_params(self, info):
def search_term(self, key, indices=["default"]):
result = None
params = {"term":{"_id":key}}
query = pyes.Search(params)
query = ES.Search(params)
row = self.conn.search(query, indices = indices)
if row.total > 0:
result = row[0]
Expand All @@ -178,12 +183,12 @@ def term_exists(self, key, indices=["default"]):
return self.search_term(key, indices = indices) is not None

def all_docs(self, keys_only = False, indices=["default"],size=10000):
q = query.MatchAllQuery()

query = pyes.Search({'match_all' : {}})
rows = self.conn.search(query, indices=indices, size=size)
docs = self.conn.search(q,indices=indices,doc_types='couchbaseDocument')
docs = []

for row in rows:
for row in docs:
if keys_only:
row = row['meta']['id']
docs.append(row)
Expand All @@ -194,9 +199,7 @@ def all_docs(self, keys_only = False, indices=["default"],size=10000):
# See - CBES-17
# for use when it seems nodes are out of sync
def search_all_nodes(self, key, indices=["default"]):

doc = None

for index in indices:
for _node in self.get_nodes():
ip, port = (_node.ip, _node.ht_port)
Expand All @@ -219,26 +222,59 @@ def start_replication(self, *args, **kwargs):
def _rebalance_progress(self, *args, **kwargs):
return 100

def _rebalance_progress_status(self, *args, **kwargs):
return 'not running'

def get_vbuckets(self, *args, **kwargs):
return ()

def add_node(self, user='', password='', remoteIp='', port='8091'):
def replace_template(self, node, file):
f = open(file, 'r')
template = f.read().replace('\n', ' ')
api = "http://{0}:9200/_template/couchbase".format(node.ip)
status, content, header = self._http_request(api, 'PUT', template)
if status:
log.info('uploaded couchbase template: '+file)
else:
log.error('template upload failed: {0}'.format(content))

def add_node(self, user='', password='', remoteIp='', port='8091',zone_name='', services=None):
new_node = self.get_nodes_self()
new_node.ip = remoteIp
new_node.port = port

self.start_es_node(new_node)

def start_es_node(self, node):
def update_configuration(self, node, commands):
rmc = RemoteMachineShellConnection(node)
shell = rmc._ssh_client.invoke_shell()
for command in commands:
log.info('Adding elastic search config {0} on node {1}'.format(command, self.ip))
shell.send('echo "{0}" >> ~/elasticsearch/config/elasticsearch.yml \n'.format(command))
while not shell.recv_ready():
time.sleep(2)
rc = shell.recv(1024)
log.info(rc)

def reset_configuration(self, node, count=1):
rmc = RemoteMachineShellConnection(node)
shell = rmc._ssh_client.invoke_shell()
log.info('Removing last {0} lines from elastic search config on node {1}'.format(count, self.ip))
shell.send('head -n -{0} ~/elasticsearch/config/elasticsearch.yml > temp ; mv temp ~/elasticsearch/config/elasticsearch.yml \n'.format(count))
while not shell.recv_ready():
time.sleep(2)
rc = shell.recv(1024)
log.info(rc)


def start_es_node(self, node):
rmc = RemoteMachineShellConnection(node)

# define es exec path if not in $PATH environment
es_bin = 'elasticsearch'
es_bin = '~/elasticsearch/bin/elasticsearch'
if 'es_bin' in TestInputSingleton.input.test_params:
es_bin = TestInputSingleton.input.test_params['es_bin']


# connect to remote node
log.info('Starting node: %s:%s' % (node.ip, node.port))
shell=rmc._ssh_client.invoke_shell()
Expand Down Expand Up @@ -292,23 +328,6 @@ def set_max_parallel_indexers(self, *args, **kwargs):
def set_max_parallel_replica_indexers(self, *args, **kwargs):
pass

def rebalance(self, otpNodes, ejectedNodes):
# shutdown ejected nodes
# wait for shards to be rebalanced

nodesToShutdown = \
[node for node in self.get_nodes() if node.id in ejectedNodes]

for node in nodesToShutdown:
self.eject_node(node)

def eject_node(self, node):
api = "http://%s:%s/_cluster/nodes/%s/_shutdown" % (node.ip, node.ht_port, node.key)
status, content, header = self._http_request(api, 'POST', '')
if status:
log.info('ejected node: '+node.id)
else:
raise Exception("failed to eject node: "+node.id)

def log_client_error(self, post):
# cannot post req errors to 9091
Expand Down Expand Up @@ -349,10 +368,10 @@ def rebalance(self, otpNodes, ejectedNodes):
self.eject_node(node)

def eject_node(self, node):
api = "http://%s:%s/_cluster/nodes/%s/_shutdown" % (node.ip, node.ht_port, node.key)
api = "http://%s:9200/_cluster/nodes/local/_shutdown" % (node.ip)
status, content, header = self._http_request(api, 'POST', '')
if status:
log.info('ejected node: '+node.id)
log.info('ejected node: '+node.ip)
else:
log.error('rebalance operation failed: {0}'.format(content))

Expand Down Expand Up @@ -391,8 +410,9 @@ class ESNode(Node):
def __init__(self, info):
super(ESNode, self).__init__()
self.key = str(info['key'])
self.ip, self.port = parse_addr(info["couchbase_address"])
self.ip, self.port = parse_addr(info["transport_address"])
self.tr_ip, self.tr_port = parse_addr(info["transport_address"])
self.port = 9091

if 'http_address' in info:
self.ht_ip, self.ht_port = parse_addr(info["http_address"])
Expand Down
2 changes: 2 additions & 0 deletions lib/membase/api/rest_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def rebalance_reached(self, percentage=100):
time.sleep(2)
if progress <= 0:
log.error("rebalance progress code : {0}".format(progress))

return False
elif retry >= 40:
log.error("rebalance stuck on {0}%".format(progress))
Expand Down Expand Up @@ -1645,6 +1646,7 @@ def create_bucket(self, bucket='',
threadsNumber=3,
flushEnabled=1,
evictionPolicy='valueOnly'):

api = '{0}{1}'.format(self.baseUrl, 'pools/default/buckets')
params = urllib.urlencode({})

Expand Down
5 changes: 5 additions & 0 deletions lib/membase/helper/cluster_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ def cleanup_cluster(servers, wait_for_rebalance=True, master = None):
helper.is_ns_server_running(timeout_in_seconds=testconstants.NS_SERVER_TIMEOUT)
nodes = rest.node_statuses()
master_id = rest.get_nodes_self().id
for node in nodes:
if int(node.port) in xrange(9091, 9991):
rest.eject_node(node)
nodes.remove(node)

if len(nodes) > 1:
log.info("rebalancing all nodes in order to remove nodes")
rest.log_client_error("Starting rebalance from test, ejected nodes %s" % \
Expand Down
Loading

0 comments on commit 2ed303b

Please sign in to comment.