# Elastic search data catalog into Dataplex

## Get data from ES

In [338]:
import sys
sys.version

'3.10.13 | packaged by conda-forge | (main, Oct 26 2023, 18:07:37) [GCC 12.3.0]'

In [None]:
pip install google-cloud-dataplex --user

In [None]:
pip install elasticsearch

In [399]:
from elasticsearch import Elasticsearch
import pandas as pd
from google.cloud import dataplex_v1
from google.cloud.dataplex_v1.types import Entry
import time
import google.protobuf

In [453]:
# Elasticsearch Connection Details
ES_ENDPOINT = "https://number.us-central1.gcp.cloud.es.io:443"
API_KEY_ID = "" 
API_KEY = "" 
PROJECT_ID = ""
LOCATION_ID = "us-central1"
SAMPLE_SIZE = 5

In [451]:
es_client = Elasticsearch(
    [ES_ENDPOINT],
    api_key=(API_KEY_ID, API_KEY),
    verify_certs=True 
)

In [342]:
def get_indices():
    indices = es_client.indices.get(index="*,-.*")
    user_created_indices = []
    for index in indices:
        if not index.startswith("."):
            user_created_indices.append({
                "index": index,
                "shards": indices[index]["settings"]["index"]["number_of_shards"],
                "creation_date": indices[index]["settings"]["index"]["creation_date"],
                "blocks_write": indices[index]["settings"]["index"]["blocks"]["write"],
            })
    return user_created_indices

In [343]:
def get_index_sample(index_name, size=SAMPLE_SIZE):
    search_result = es_client.search(
        index=index_name,
        body={"query": {"match_all": {}}, "size": size}
    )

    # Extract documents
    documents = search_result['hits']['hits']
    documents_return = [doc['_source'] for doc in documents]
    return pd.DataFrame(documents_return)

In [344]:
def get_index_mapping(index_name):
    mappings = es_client.indices.get_mapping(index=index_name)
    mappings_output = []
    for index_name, index_mapping in mappings.items():
        for field_name, field_mapping in index_mapping['mappings']['properties'].items():
            mappings_output.append({
                "index_name": index_name,
                "field_name": field_name,
                "type": field_mapping["type"]
            })
    return mappings_output

## Test

In [345]:
indices = get_indices()
indices

[{'index': 'products',
  'shards': '1',
  'creation_date': '1721414017156',
  'blocks_write': 'false'},
 {'index': 'security_events',
  'shards': '1',
  'creation_date': '1721414299237',
  'blocks_write': 'true'}]

## Ingest in dataplex

In [452]:
DATAPLEX_GROUP_ID = "elasticsearch"
DATAPLEX_ENTRY_TYPE_ID = "elasticsearch-index"
DATAPLEX_ENTRY_TYPE_NAME = "Elasticsearch Index"
DATAPLEX_SYSTEM = "Elasticsearch"
DATAPLEX_TYPE_DISPLAY = "Elasticsearch index"
DATAPLEX_TYPE_DESCRIPTION = ES_ENDPOINT
DATAPLEX_ASPECT_TYPE_ID = "index-schema"

In [354]:
parent_project = f"projects/{PROJECT_ID}/locations/{LOCATION_ID}"
parent_group = f"projects/{PROJECT_ID}/locations/{LOCATION_ID}/entryGroups/{entry_group_id}"
dplx_client = dataplex_v1.CatalogServiceClient()

### Dataplex setup for Elasticsearch catalog items

### Aspect type

<google.api_core.operation.Operation at 0x7f8f2e74e7a0>

In [444]:
def dataplex_elastic_setup(parent_project, entry_group_id, 
                           entry_type_id, entry_type_name, 
                           type_system, type_display, type_description,
                          aspect_type_id):
    
    # Aspect type
    attribute_fields = dataplex_v1.types.AspectType.MetadataTemplate(
        name = "Attribute",
        type_ = "record",
        index = 1,
        constraints = dataplex_v1.types.AspectType.MetadataTemplate.Constraints(required=True),
        record_fields = [
            dataplex_v1.types.AspectType.MetadataTemplate(
                name = "Name",
                type_ = "string",
                index = 1,
                constraints = dataplex_v1.types.AspectType.MetadataTemplate.Constraints(required=True)
            ),
            dataplex_v1.types.AspectType.MetadataTemplate(
                name = "DataType",
                type_ = "string",
                index = 2,
                constraints = dataplex_v1.types.AspectType.MetadataTemplate.Constraints(required=True)
            )
        ]
    )

    index_fields = [
        dataplex_v1.types.AspectType.MetadataTemplate(
            name = "Attributes",
            type_ = "array",
            index = 1,
            constraints = dataplex_v1.types.AspectType.MetadataTemplate.Constraints(required=True),
            array_items = attribute_fields
        ),
        dataplex_v1.types.AspectType.MetadataTemplate(
            name = "Shards",
            type_ = "string",
            index = 2,
            constraints = dataplex_v1.types.AspectType.MetadataTemplate.Constraints(required=True),
        ),
        dataplex_v1.types.AspectType.MetadataTemplate(
            name = "CreationDate",
            type_ = "string",
            index = 3,
            constraints = dataplex_v1.types.AspectType.MetadataTemplate.Constraints(required=True),
        ),
        dataplex_v1.types.AspectType.MetadataTemplate(
            name = "BlocksWrite",
            type_ = "string",
            index = 4,
            constraints = dataplex_v1.types.AspectType.MetadataTemplate.Constraints(required=True),
        )
    ]

    aspect_type = dataplex_v1.types.AspectType(
        metadata_template = dataplex_v1.types.AspectType.MetadataTemplate(
            name = "Index",
            type_ = "record",
            record_fields = index_fields
        )
    )

    try:
        dplx_client.create_aspect_type(
            parent = f"projects/{PROJECT_ID}/locations/global",
            aspect_type_id = DATAPLEX_ASPECT_TYPE_ID,
            aspect_type = aspect_type
        )
        time.sleep(2)
    except Exception as ex:
        print("Aspect Type exists, skipping...")

    
    # Entry Group - Container for indexes
    entry_group = dataplex_v1.types.EntryGroup(
        description = type_description
    )
    
    try:
        dplx_client.create_entry_group(
            parent = parent_project,
            entry_group_id = entry_group_id,
            entry_group = entry_group
        )
        time.sleep(2)
    except Exception as ex:
        print("Entry Group exists, skipping...")
    
    
    # Entry type, to map an index into the catalog
    entry_type_id = entry_type_id
    entry_type = dataplex_v1.types.EntryType(
        type_aliases = ["TABLE"],
        display_name = type_display,
        system = type_system,
        description = type_description,
        required_aspects = [
            dataplex_v1.types.EntryType.AspectInfo(
                type_=f"projects/{PROJECT_ID}/locations/global/aspectTypes/{aspect_type_id}"
        )]
    )
    

    try:
        dplx_client.create_entry_type(
            entry_type = entry_type,
            parent = f"projects/{PROJECT_ID}/locations/global",
            entry_type_id = entry_type_id
        )
        time.sleep(2)
    except Exception as ex:
        print("Entry Type exists, skipping...")


In [445]:
dataplex_elastic_setup(parent_project=parent_project, entry_group_id=DATAPLEX_GROUP_ID, 
                       entry_type_id=DATAPLEX_ENTRY_TYPE_ID, entry_type_name=DATAPLEX_ENTRY_TYPE_NAME,
                        type_system=DATAPLEX_SYSTEM, type_display=DATAPLEX_TYPE_DISPLAY,
                       type_description=DATAPLEX_TYPE_DESCRIPTION, 
                       aspect_type_id=DATAPLEX_ASPECT_TYPE_ID)

Aspect Type exists, skipping...
Entry Group exists, skipping...
Entry Type exists, skipping...


### List entries

In [439]:
current_dplx_entries = []
for entry in dplx_client.list_entries(parent=parent_group):
    if entry.entry_type[entry.entry_type.rindex("/")+1:] != "entrygroup":
        entry_id = entry.name[entry.name.rindex("/")+1:]
        current_dplx_entries.append(entry_id)
        
print(current_dplx_entries)

[]


### Create entry

In [446]:
def create_dataplex_entry(entry_id, aspects, system, description, entry_type_id):
    entry_name = f"elasticsearch.cloud.es/{entry_id}"

    entry_source = dataplex_v1.types.EntrySource(
        system = system,
        display_name = entry_id,
        description = description
    )
    
    entry = dataplex_v1.types.Entry(
        name = entry_id,
        entry_type = f"projects/{PROJECT_ID}/locations/global/entryTypes/{entry_type_id}",
        entry_source = entry_source,
        aspects = aspects
    )
    
    try:
        dplx_client.create_entry(
            parent = parent_group,
            entry_id = entry_name,
            entry = entry
        )
    except Exception as ex:
        print(f"Entry {entry_id} exists, skipping...")

In [447]:
for index in indices:
    index_name = index["index"]
    
    schema = google.protobuf.struct_pb2.ListValue()
    schema.values.extend([
        google.protobuf.struct_pb2.Value(struct_value=col1)
    ])
    
    attributes=get_index_mapping(index_name)
    for attr in attributes:
        col = google.protobuf.struct_pb2.Struct()
        col["Name"] = attr["field_name"]
        col["DataType"] = attr["type"]
        schema.append(col)
    
    aspect_data = google.protobuf.struct_pb2.Struct()
    aspect_data["Attributes"] = schema
    aspect_data["Shards"] = index["shards"]
    aspect_data["CreationDate"] = index["creation_date"]
    aspect_data["BlocksWrite"] = index["blocks_write"]
    
    aspect_data = dataplex_v1.types.Aspect(data = aspect_data)
    aspects = {f"{PROJECT_ID}.global.{DATAPLEX_ASPECT_TYPE_ID}": aspect_data}
    
    if index_name not in current_dplx_entries:
        print(f"##### {index_name} #####")
        create_dataplex_entry(entry_id=index_name, aspects=aspects,
                              system=DATAPLEX_TYPE_SYSTEM, 
                              description=index["creation_date"],
                             entry_type_id=DATAPLEX_ENTRY_TYPE_ID)
        current_dplx_entries.append(index_name)