Skip to content

Commit

Permalink
Upgrading the Elasticsearch module to use ECK 1.6.0 and ES 7.14 (#4686)
Browse files Browse the repository at this point in the history
Signed-off-by: Avi Layani <alayani@redhat.com>
  • Loading branch information
Avilir committed Aug 9, 2021
1 parent 723bf03 commit c43584c
Show file tree
Hide file tree
Showing 6 changed files with 2,154 additions and 562 deletions.
148 changes: 66 additions & 82 deletions ocs_ci/ocs/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"""
import logging
import base64
import time
import json

from elasticsearch import Elasticsearch, helpers, exceptions as esexp
Expand All @@ -15,6 +14,7 @@
from ocs_ci.ocs.utils import get_pod_name_by_pattern
from ocs_ci.utility.utils import TimeoutSampler
from ocs_ci.helpers.performance_lib import run_command
from ocs_ci.helpers.helpers import create_pvc, wait_for_resource_state

log = logging.getLogger(__name__)

Expand All @@ -39,7 +39,9 @@ def elasticsearch_load(connection, target_path):
# define a function that will load a text file
def get_data_from_text_file(json_file):
"""
This function will return a list of docs stored in a text file
This function will return a list of docs stored in a text file.
the function is working as a generator, and return the records
one at a time.
Args:
json_file (str): the file name to look for docs in
Expand All @@ -53,21 +55,17 @@ def get_data_from_text_file(json_file):
l.strip() for l in open(str(json_file), encoding="utf8", errors="ignore")
]
log.info(f"String docs length: {len(docs)}")
doc_list = []

for num, doc in enumerate(docs):
try:
dict_doc = json.loads(doc)
doc_list += [dict_doc]
yield dict_doc
except json.decoder.JSONDecodeError as err:
# print the errors
log.error(
f"ERROR for num: {num} -- JSONDecodeError: {err} for doc: {doc}"
)

log.info(f"Dict docs length: {len(doc_list)}")
return doc_list

all_files = run_command(f"ls {target_path}/results/", out_format="list")
if "Error in command" in all_files:
log.error("There is No data to load into ES server")
Expand All @@ -82,13 +80,11 @@ def get_data_from_text_file(json_file):
file_name = f"{target_path}/results/{ind}"
ind_name = ind.split(".")[0]
log.info(f"Loading the {ind} data into the ES server")
docs_list = get_data_from_text_file(file_name)

try:
log.info(
"Attempting to index the list of docs using helpers.bulk()"
resp = helpers.bulk(
connection, get_data_from_text_file(file_name), index=ind_name
)
resp = helpers.bulk(connection, docs_list, index=ind_name)
log.info(f"helpers.bulk() RESPONSE: {resp}")
except Exception as err:
log.error(f"Elasticsearch helpers.bulk() ERROR:{err}")
Expand All @@ -107,9 +103,8 @@ def __init__(self):
"""
log.info("Initializing the Elastic-Search environment object")
self.namespace = "elastic-system"
self.eck_file = "ocs_ci/templates/app-pods/eck.1.3.1-all-in-one.yaml"
self.eck_file = "ocs_ci/templates/app-pods/eck.1.6.0-all-in-one.yaml"
self.dumper_file = "ocs_ci/templates/app-pods/esclient.yaml"
self.pvc = "ocs_ci/templates/app-pods/es-pvc.yaml"
self.crd = "ocs_ci/templates/app-pods/esq.yaml"

# Creating some different types of OCP objects
Expand All @@ -131,22 +126,28 @@ def __init__(self):
self._deploy_es()

# Verify that ES is Up & Running
timeout = 600
while timeout > 0:
if self.get_health():
log.info("The ElasticSearch server is ready !")
break
else:
log.warning("The ElasticSearch server is not ready yet")
log.info("going to sleep for 30 sec. before next check")
time.sleep(30)
timeout -= 30
sample = TimeoutSampler(timeout=180, sleep=10, func=self.get_health)
if not sample.wait_for_func_status(True):
raise Exception("Elasticsearch deployment Failed")

# Deploy the elasticsearch dumper pod
self._deploy_data_dumper_client()

# Connect to the server
self.con = self._es_connect()

def _pod_is_found(self, pattern):
"""
Boolean function which check if pod (by pattern) is exist.
Args:
pattern (str): the pattern of the pod to look for
Returns:
bool : True if pod found, otherwise False
"""
return len(get_pod_name_by_pattern(pattern, self.namespace)) > 0

def _deploy_eck(self):
"""
Deploying the ECK environment for the Elasticsearch, and make sure it
Expand All @@ -157,16 +158,13 @@ def _deploy_eck(self):
log.info("Deploying the ECK environment for the ES cluster")
self.ocp.apply(self.eck_file)

for es_pod in TimeoutSampler(
300, 10, get_pod_name_by_pattern, "elastic-operator", self.namespace
):
try:
if es_pod[0] is not None:
self.eckpod = es_pod[0]
log.info(f"The ECK pod {self.eckpod} is ready !")
break
except IndexError:
log.info("ECK operator pod not ready yet")
sample = TimeoutSampler(
timeout=300, sleep=10, func=self._pod_is_found, pattern="elastic-operator"
)
if not sample.wait_for_func_status(True):
raise Exception("ECK deployment Failed")

log.info("The ECK pod is ready !")

def _deploy_data_dumper_client(self):
"""
Expand All @@ -178,16 +176,13 @@ def _deploy_data_dumper_client(self):
log.info("Deploying the es client for dumping all data")
self.ocp.apply(self.dumper_file)

for dmp_pod in TimeoutSampler(
300, 10, get_pod_name_by_pattern, "es-dumper", self.namespace
):
try:
if dmp_pod[0] is not None:
self.dump_pod = dmp_pod[0]
log.info(f"The dumper client pod {self.dump_pod} is ready !")
break
except IndexError:
log.info("Dumper pod not ready yet")
sample = TimeoutSampler(
timeout=300, sleep=10, func=self._pod_is_found, pattern="es-dumper"
)
if not sample.wait_for_func_status(True):
raise Exception("Dumper pod deployment Failed")
self.dump_pod = get_pod_name_by_pattern("es-dumper", self.namespace)[0]
log.info(f"The dumper client pod {self.dump_pod} is ready !")

def get_ip(self):
"""
Expand All @@ -211,22 +206,36 @@ def get_port(self):
return self.es.get()["spec"]["ports"][0]["port"]

def _deploy_es(self):
log.info("Deploy the PVC for the ElasticSearch cluster")
self.ocp.apply(self.pvc)
"""
Deploying the Elasticsearch server
"""

# Creating PVC for the elasticsearch server and wait until it bound
log.info("Creating 10 GiB PVC for the ElasticSearch cluster on")
self.pvc_obj = create_pvc(
sc_name=constants.CEPHBLOCKPOOL_SC,
namespace=self.namespace,
pvc_name="elasticsearch-data-quickstart-es-default-0",
access_mode=constants.ACCESS_MODE_RWO,
size="10Gi",
)
wait_for_resource_state(self.pvc_obj, constants.STATUS_BOUND)
self.pvc_obj.reload()

log.info("Deploy the ElasticSearch cluster")
self.ocp.apply(self.crd)

for es_pod in TimeoutSampler(
300, 20, get_pod_name_by_pattern, "quickstart-es-default", self.namespace
):
try:
if es_pod[0] is not None:
self.espod = es_pod[0]
log.info(f"The ElasticSearch pod {self.espod} Started")
break
except IndexError:
log.info("elasticsearch pod not ready yet")
sample = TimeoutSampler(
timeout=300,
sleep=10,
func=self._pod_is_found,
pattern="quickstart-es-default",
)
if not sample.wait_for_func_status(True):
raise Exception("The ElasticSearch pod deployment Failed")
self.espod = get_pod_name_by_pattern("quickstart-es-default", self.namespace)[0]
log.info(f"The ElasticSearch pod {self.espod} Started")

es_pod = OCP(kind="pod", namespace=self.namespace)
log.info("Waiting for ElasticSearch to Run")
Expand Down Expand Up @@ -271,7 +280,8 @@ def cleanup(self):
log.info("Deleting the es resource")
self.ocp.delete(yaml_file=self.crd)
log.info("Deleting the es project")
self.ns_obj.delete_project(project_name=self.namespace)
# self.ns_obj.delete_project(project_name=self.namespace)
self.ocp.delete(yaml_file=self.eck_file)
self.ns_obj.wait_for_delete(resource_name=self.namespace, timeout=180)

def _es_connect(self):
Expand Down Expand Up @@ -304,32 +314,6 @@ def get_indices(self):
results.append(ind)
return results

def _copy(self, es):
"""
Copy All data from the internal ES server to the main ES.
**This is deprecated function** , use the dump function, and load
the data from the files for the main ES server
Args:
es (obj): elasticsearch object which connected to the main ES
"""

query = {"size": 1000, "query": {"match_all": {}}}
for ind in self.get_indices():
log.info(f"Reading {ind} from internal ES server")
try:
result = self.con.search(index=ind, body=query)
except esexp.NotFoundError:
log.warning(f"{ind} Not found in the Internal ES.")
continue

log.debug(f"The results from internal ES for {ind} are :{result}")
log.info(f"Writing {ind} into main ES server")
for doc in result["hits"]["hits"]:
log.debug(f"Going to write : {doc}")
es.index(index=ind, doc_type="_doc", body=doc["_source"])

def dumping_all_data(self, target_path):
"""
Dump All data from the internal ES server to .tgz file.
Expand Down

0 comments on commit c43584c

Please sign in to comment.