# Spark2Elasticsearch
> Test the ES connector

## Inputs data

In [None]:
EMB_PATH = "gs://test-project-bucket/testzone/20231015-rnd/"
ES_INDEX = "test-index-768"
ES_USER = "my-elastic-user"
ES_PSW = "my-secret-password"
ES_HOST = "10.10.0.5" # IP of the LB of the "Architecture 1", it route to all the ES Data Nodes, used by "Indexing test 1"
ES_HOSTS = ["10.10.3.234", "10.10.3.234", "10.10.3.239"] # IP of each LB of the "Architecture 2", each IP route to only one ES Data Node, used by "Indexing test 2"

## Load the data

In [None]:
df = spark.read.parquet(EMB_PATH)

In [None]:
df.count()

In [None]:
df.printSchema()

## Utilities

In [None]:
# Class to make performances mesaures
import time

class Timer:
    def __init__(self):
        self.start_time = None
        self.end_time = None

    def start(self):
        self.start_time = time.time()

    def stop(self):
        self.end_time = time.time()

    def print(self):
        elapsed_time = self.end_time - self.start_time
        hours, rem = divmod(elapsed_time, 3600)
        minutes, seconds = divmod(rem, 60)
        print(f"Time elapsed: {int(hours):.2f}h {int(minutes):.2f}m {seconds:.2f}s")
timer = Timer()

In [None]:
# Set the DF column name to use as ES index "_id", instead to allow ES to generate it.
# more: https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html
id_es = "emb_id"

---
## Indexing test1
> Index the data using the GKE Load Balancer

In [None]:
timer.start()
df.write.format("org.elasticsearch.spark.sql") \
    .option("es.nodes", ES_HOST) \
    .option("es.nodes.wan.only", "true") \
    .option("es.nodes.discovery", "false") \
    .option("es.http.timeout", "5m") \
    .option("es.net.http.auth.user", ES_USER) \
    .option("es.net.http.auth.pass", ES_PSW) \
    .option("es.write.operation", "create") \
    .option("es.resource", ES_INDEX) \
    .option("es.mapping.id", id_es) \
    .option("es.mapping.exclude", id_es) \
    .mode("overwrite") \
    .save()
timer.stop()
timer.print()

- Test the indexed index:
    - counting the vectors:
        ```
            # Kibana code
            GET test-index-768/_count

            ===
            {
              "count": 1000000,
              "_shards": {
                "total": 3,
                "successful": 3,
                "skipped": 0,
                "failed": 0
              }
            }

        ```
    - quering the index to test the KNN functionality:
        ```
            # Open a port-forward to your ES cluster
            $ python src/es-index-tester.py \
                                --es_host localhost:9200 \
                                --es_index test-index-768 \
                                --es_user my-elastic-user \
                                --es_psw my-secret-password \
                                --vector_dim 768
            INFO:elastic_transport.transport:HEAD http://localhost:9200/ [status:200 duration:0.108s]
            INFO:root:Normalizing the vector...
            INFO:root:Quering 100 KNN results from test-index-768...
            INFO:elastic_transport.transport:POST http://localhost:9200/test-index-768/_search [status:200 duration:5.002s]
            INFO:root:Query completed! Results:{'took': 4947, [...]
        ```

---

## Indexing test2
> Index the data routing the data to all the ES nodes

- Delete and re-create the ES `test-index-768` index before run the following cells
    ```
        DELETE test-index-768
        PUT test-index-768 [...] # recreate the index using provided kibana code
    ```

In [None]:
ES_HOSTS_STR = ",".join(
    ES_HOSTS
)  # string formatting required by the ".option" method below

In [None]:
# Configuration parameters:
# https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

timer.start()
df.write.format("org.elasticsearch.spark.sql") \
    .option("es.nodes", ES_HOSTS_STR) \
    .option("es.nodes.wan.only", "true") \
    .option("es.nodes.discovery", "false") \
    .option("es.http.timeout", "5m") \
    .option("es.net.http.auth.user", ES_USER) \
    .option("es.net.http.auth.pass", ES_PSW) \
    .option("es.write.operation", "create") \
    .option("es.resource", ES_INDEX) \
    .option("es.mapping.id", id_es) \
    .option("es.mapping.exclude", id_es) \
    .mode("overwrite") \
    .save()
timer.stop()
timer.print()

- Test the indexed index:
    - counting the vectors:
        ```
            # Kibana code
            GET test-index-768/_count

            ===
            {
              "count": 1000000,
              "_shards": {
                "total": 3,
                "successful": 3,
                "skipped": 0,
                "failed": 0
              }
            }

        ```
    - quering the index to test the KNN functionality:
        ```
            # Open a port-forward to your ES cluster
            $ python src/es-index-tester.py \
                                --es_host localhost:9200 \
                                --es_index test-index-768 \
                                --es_user my-elastic-user \
                                --es_psw my-secret-password \
                                --vector_dim 768
            INFO:elastic_transport.transport:HEAD http://localhost:9200/ [status:200 duration:0.108s]
            INFO:root:Normalizing the vector...
            INFO:root:Quering 100 KNN results from test-index-768...
            INFO:elastic_transport.transport:POST http://localhost:9200/test-index-768/_search [status:200 duration:0.089s]
            INFO:root:Query completed! Results:{'took': 4381, [...]
        ```