# Elastic Self-Managed GPU Acceleration Demo

## Create GKE Cluster + GPU Node Pool

In [44]:
%%bash
gcloud container clusters create gpu-demo \
    --region us-central1 \
    --node-locations us-central1-a,us-central1-b,us-central1-c \
    --num-nodes 1 \
    --machine-type e2-standard-4 \
    --disk-type pd-standard \
    --disk-size 50GB

gcloud container node-pools create gpu-pool \
    --cluster gpu-demo \
    --region us-central1 \
    --node-locations us-central1-a,us-central1-b,us-central1-c \
    --num-nodes 1 \
    --machine-type g2-standard-4 \
    --disk-type pd-ssd \
    --disk-size 100GB \
    --accelerator type=nvidia-l4,count=1,gpu-driver-version=latest \
    --location-policy ANY \
    --spot

kubectl get nodes -o custom-columns="NODE:.metadata.name,ZONE:.metadata.labels.topology\.kubernetes\.io/zone"

Note: Your Pod address range (`--cluster-ipv4-cidr`) can accommodate at most 1008 node(s).
Creating cluster gpu-demo in us-central1...
.................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................

NAME      LOCATION     MASTER_VERSION      MASTER_IP     MACHINE_TYPE   NODE_VERSION        NUM_NODES  STATUS   STACK_TYPE
gpu-demo  us-central1  1.33.5-gke.2019000  34.69.112.19  e2-standard-4  1.33.5-gke.2019000  3          RUNNING  IPV4


Note: Machines with GPUs have certain limitations which may affect your workflow. Learn more at https://cloud.google.com/kubernetes-engine/docs/how-to/gpus
Note: Starting in GKE 1.30.1-gke.115600, if you don't specify a driver version, GKE installs the default GPU driver for your node's GKE version.
Creating node pool gpu-pool...
....................................................................................................................................................................................................................................................................................................................done.
Created [https://container.googleapis.com/v1/projects/elastic-customer-eng/zones/us-central1/clusters/gpu-demo/nodePools/gpu-pool].


NAME      MACHINE_TYPE   DISK_SIZE_GB  NODE_VERSION
gpu-pool  g2-standard-4  100           1.33.5-gke.2019000
NODE                                      ZONE
gke-gpu-demo-default-pool-39f74ae9-38lq   us-central1-a
gke-gpu-demo-default-pool-6863c142-8wtn   us-central1-b
gke-gpu-demo-default-pool-ba602fca-gvpp   us-central1-c
gke-gpu-demo-gpu-pool-4de4e479-9vt9       us-central1-c
gke-gpu-demo-gpu-pool-855f47e1-gnt5       us-central1-a
gke-gpu-demo-gpu-pool-e436ad48-7cxs       us-central1-b


## Deploy Elastic Cluster

In [46]:
%%bash
kubectl create -f https://download.elastic.co/downloads/eck/3.2.0/crds.yaml > /dev/null 2>&1
kubectl apply -f https://download.elastic.co/downloads/eck/3.2.0/operator.yaml > /dev/null 2>&1
kubectl apply -f manifests
ES_STATUS=$(kubectl get elasticsearch -o=jsonpath='{.items[0].status.health}')
KB_STATUS=$(kubectl get kibana -o=jsonpath='{.items[0].status.health}')
while [[ $ES_STATUS != "green" ||  $KB_STATUS != "green" ]]
do  
  sleep 5
  ES_STATUS=$(kubectl get elasticsearch -o=jsonpath='{.items[0].status.health}')
  KB_STATUS=$(kubectl get kibana -o=jsonpath='{.items[0].status.health}')
done

cat > .env << EOF
ELASTIC_USERNAME=elastic
ELASTIC_PASSWORD=$(kubectl get secret elastic-es-elastic-user -o go-template='{{.data.elastic | base64decode}}')
ELASTIC_URL=https://$(kubectl get svc elastic-es-http -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):9200
EOF

cat .api_keys >> .env 2>/dev/null
kubectl get secret elastic-es-http-certs-public -o jsonpath='{.data.ca\.crt}' | base64 --decode > ca.crt

echo
kubectl get pods -o jsonpath='{range .items[*]}{.metadata.name}{"\t"}{.spec.nodeName}{"\t"}{.metadata.labels.app}{"\n"}{end}' | while read pod node app; do
    zone=$(kubectl get node $node -o jsonpath='{.metadata.labels.topology\.kubernetes\.io/zone}')
    echo "Pod: $pod | Zone: $zone | Node: $node"
done | column -t -s "|"
echo
echo "Kibana is available at: https://$(kubectl get svc kibana-kb-http -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):5601"

secret/eck-trial-license unchanged
elasticsearch.elasticsearch.k8s.elastic.co/elastic configured
kibana.kibana.k8s.elastic.co/kibana configured

Pod: elastic-es-data-node-0        Zone: us-central1-a    Node: gke-gpu-demo-gpu-pool-855f47e1-gnt5
Pod: elastic-es-data-node-1        Zone: us-central1-c    Node: gke-gpu-demo-gpu-pool-4de4e479-9vt9
Pod: elastic-es-data-node-2        Zone: us-central1-b    Node: gke-gpu-demo-gpu-pool-e436ad48-7cxs
Pod: elastic-es-master-node-0      Zone: us-central1-c    Node: gke-gpu-demo-default-pool-ba602fca-gvpp
Pod: elastic-es-master-node-1      Zone: us-central1-a    Node: gke-gpu-demo-default-pool-39f74ae9-38lq
Pod: elastic-es-master-node-2      Zone: us-central1-b    Node: gke-gpu-demo-default-pool-6863c142-8wtn
Pod: kibana-kb-68895889d5-wl9vk    Zone: us-central1-c    Node: gke-gpu-demo-default-pool-ba602fca-gvpp
Pod: kibana-kb-6d7cd49cd4-rzmdd    Zone: us-central1-b    Node: gke-gpu-demo-default-pool-6863c142-8wtn

Kibana is available at: https://34

## Create Data Set

In [None]:
import os
import json
from dotenv import load_dotenv
from faker import Faker
#import requests 
#from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import time
import tqdm
from elasticsearch import Elasticsearch

DATASET_SIZE = 10000
BATCH_SIZE = 100

load_dotenv(override=True)
es = Elasticsearch(
    hosts=os.getenv("ELASTIC_URL"),
    basic_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PASSWORD")),
    ca_certs="./ca.crt",
    max_retries=10,
    retry_on_timeout=True
)

es.options(ignore_status=[404]).inference.delete(inference_id="jina-embeddings-v3")
response = es.inference.put(
    task_type="text_embedding",
    inference_id="jina-embeddings-v3",
    body={
        "service": "jinaai",
        "service_settings": {
            "api_key": os.getenv("JINA_API_KEY"),
            "model_id": "jina-embeddings-v3"
        }
    }
)

def get_jina_embeddings(batch):
    response = es.inference.inference(input=batch, inference_id="jina-embeddings-v3")
    return [item['embedding'] for item in response['text_embedding']]
'''
class RateLimitError(Exception):
    pass

@retry(
        wait=wait_exponential(multiplier=1, min=4, max=60),
        stop=stop_after_attempt(5),
        retry=retry_if_exception_type(RateLimitError)
)
def get_jina_embeddings(batch):
    url = "https://api.jina.ai/v1/embeddings"
    headers = { 
        "Authorization": f"Bearer {os.getenv('JINA_API_KEY')}", 
        "Content-Type": "application/json" 
    }
    payload = { 
        "model": "jina-embeddings-v3", 
        "task": "text-matching",
        "input": batch
    }
    response = requests.post(url, headers=headers, json=payload)
    if response.status_code == 429:
        raise RateLimitError("Rate limit exceeded")
    response.raise_for_status()
    return [item['embedding'] for item in response.json()['data']]
'''
def create_data_file():
    fake = Faker()
    fake.seed_instance(12345)

    with open("data.jsonl", "w") as f:
        for _ in tqdm.tqdm(range(DATASET_SIZE // BATCH_SIZE)):
            paragraphs = fake.paragraphs(nb=BATCH_SIZE)
            embeddings = get_jina_embeddings(paragraphs)
            for paragraph, embedding in zip(paragraphs, embeddings):
                doc = {"paragraph": paragraph, "embedding": embedding}
                f.write(json.dumps(doc) + "\n")
            time.sleep(1) 

if not os.path.exists("data.jsonl"):
    create_data_file()

with open("data.jsonl", "r") as f:
    line_count = sum(1 for _ in f)
print(f'{line_count} documents on file.')

## Index Data Set

In [None]:
from elasticsearch import helpers

INDEX_NAME = "test_index"
#es = Elasticsearch(
#    hosts=os.getenv("ELASTIC_URL"),
#    basic_auth=(os.getenv("ELASTIC_USERNAME"), os.getenv("ELASTIC_PASSWORD")),
#    ca_certs="./ca.crt",
#    max_retries=10,
#    retry_on_timeout=True
#)

settings = {
    "index": {
        "number_of_shards": 3,
        "number_of_replicas": 1
    }
}
mappings = {
    "properties": {
        "paragraph": { "type": "text" },
        "embedding": {
            "type": "dense_vector",
            "dims": 1024,
            "index": True,
            "index_options": {
                "type": "int8_hnsw"
            }
        }
    }
}            

es.options(ignore_status=[404]).indices.delete(index=INDEX_NAME)
es.indices.create(index=INDEX_NAME, body={"settings": settings, "mappings": mappings})

def gen_data():
    with open("data.jsonl", "r") as f:
        for line in f:
            yield line.strip()
            
ok, result = helpers.bulk(client=es.options(request_timeout=60), 
                          index=INDEX_NAME, 
                          actions=gen_data())
print(f"{ok} documents indexed.")

## Reindex Test Function

In [None]:
import uuid
import time
import warnings
warnings.filterwarnings("ignore", message=".*technical preview.*")

def monitor_reindex(task_id):
    while True:
        task = es.tasks.get(task_id=task_id)
        completed = task.get('completed', False)
        if completed:
            latency_sec = task['response']['took'] / 1000
            total_docs = task['response']['total']
            throughput = total_docs / latency_sec
            return latency_sec, throughput
        time.sleep(2)
            
def speed_test(tests=10):
    latencies = []
    throughputs = []

    for i in tqdm.tqdm(range(tests)):
        es.indices.clear_cache(index=INDEX_NAME)
        dest = f"{INDEX_NAME}_{uuid.uuid4()}"
        reindex_body = {
            "source": { "index": INDEX_NAME },
            "dest": { "index": dest }
        }
        response = es.reindex(slices="auto", body=reindex_body, wait_for_completion=False)
        task_id = response['task']
        latency, throughput = monitor_reindex(task_id)
        latencies.append(latency)
        throughputs.append(throughput)
        es.indices.delete(index=dest)
        if i < tests - 1:
            time.sleep(5)
    return latencies, throughputs


## CPU Test


In [None]:
cpu_latencies, cpu_throughputs = speed_test()
print('Latencies:', cpu_latencies)
print('Throughputs:', cpu_throughputs)

## Reconfigure for GPU

In [None]:
%%bash
kubectl patch elasticsearch elastic --type='merge' -p '
spec:
  nodeSets:
  - name: data-node
    config:
      vectors.indexing.use_gpu: true
'

## Check GPU is Detected

In [None]:
%%bash
kubectl logs elastic-es-data-node-0 -c elasticsearch | grep "Found compatible GPU"

## GPU Test

In [None]:
gpu_latencies, gpu_throughputs = speed_test()
print('Latencies:', cpu_latencies)
print('Throughputs:', cpu_throughputs)

## Results

In [None]:
import pandas as pd 
import matplotlib.pyplot as plt

df_latencies = pd.DataFrame({
    'CPU': cpu_latencies,
    'GPU': gpu_latencies
})
df_throughputs = pd.DataFrame({
    'CPU': cpu_throughputs,
    'GPU': gpu_throughputs
})

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))
colors = ['blue', 'green']

df_latencies.quantile(0.95).plot(kind='bar', ax=ax1, color=colors, rot=0)
ax1.set_title('Latency (P95)\nLower is better', fontweight='bold')
ax1.set_ylabel('Seconds')

df_throughputs.mean().plot(kind='bar', ax=ax2, color=colors, rot=0)
ax2.set_title('Throughput (Mean)\nHigher is better', fontweight='bold')
ax2.set_ylabel('Requests per Second')

plt.tight_layout()
plt.show()

## Destroy Environment

In [1]:
%%bash
rm -f .env
rm -f ca.crt
gcloud container clusters delete gpu-demo \
--region us-central1 \
--quiet

Deleting cluster gpu-demo...
...........................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................................