<a href="https://colab.research.google.com/github/shivajshankar/elasticsearch_proj1/blob/main/pythonscripts/colab/elastic_search_connect.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [10]:
!pip install elasticsearch



In [11]:
import requests
import json
from typing import Dict, Any, Optional

class ElasticsearchREST:
    def __init__(self, host: str, port: int = 9200, scheme: str = "http",
                 username: Optional[str] = None, password: Optional[str] = None):
        self.base_url = f"{scheme}://{host}:{port}"
        self.session = requests.Session()
        if username and password:
            self.session.auth = (username, password)
        self.session.headers.update({
            "Content-Type": "application/json",
            "Accept": "application/json"
        })

    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}{endpoint}"
        try:
            response = self.session.request(method, url, **kwargs)
            response.raise_for_status()
            return response.json() if response.text else {}
        except requests.exceptions.RequestException as e:
            print(f"Error making request to {url}: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response: {e.response.text}")
            raise

    def info(self) -> Dict[str, Any]:
        """Get cluster info"""
        return self._request("GET", "/")

    def index_document(self, index: str, document: Dict, doc_id: Optional[str] = None) -> Dict[str, Any]:
        """Index a document"""
        endpoint = f"/{index}/_doc"
        if doc_id:
            endpoint += f"/{doc_id}"
        return self._request("POST", endpoint, json=document)

    def search(self, index: str, query: Dict) -> Dict[str, Any]:
        """Search documents"""
        return self._request("GET", f"/{index}/_search", json=query)

    def get_document(self, index: str, doc_id: str) -> Dict[str, Any]:
        """Get a document by ID"""
        return self._request("GET", f"/{index}/_doc/{doc_id}")

# Example usage:
if __name__ == "__main__":
    # Initialize the client
    es = ElasticsearchREST(
        host="shivajshankar1.duckdns.org",
        port=9200
    )

    # Test connection
    print("Testing connection...")
    print(json.dumps(es.info(), indent=2))

    # Index a test document
    print("\nIndexing test document...")
    doc = {
        "title": "Test Document",
        "content": "This is a test document from Colab",
        "timestamp": "2025-07-23T11:00:00Z"
    }
    result = es.index_document("test-index", doc)
    print("Document indexed:", result)

    # Search for documents
    print("\nSearching for documents...")
    search_results = es.search("test-index", {"query": {"match_all": {}}})
    print("Search results:", json.dumps(search_results, indent=2))

Testing connection...
{
  "name": "elasticsearch-6f8b5c7c77-688jn",
  "cluster_name": "docker-cluster",
  "cluster_uuid": "tVYyf9FSSLuAHdC3TE8Rrw",
  "version": {
    "number": "8.13.4",
    "build_flavor": "default",
    "build_type": "docker",
    "build_hash": "da95df118650b55a500dcc181889ac35c6d8da7c",
    "build_date": "2024-05-06T22:04:45.107454559Z",
    "build_snapshot": false,
    "lucene_version": "9.10.0",
    "minimum_wire_compatibility_version": "7.17.0",
    "minimum_index_compatibility_version": "7.0.0"
  },
  "tagline": "You Know, for Search"
}

Indexing test document...
Document indexed: {'_index': 'test-index', '_id': 'iZf0NZgBYnrZjdTtk19b', '_version': 1, 'result': 'created', '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 0, '_primary_term': 1}

Searching for documents...
Search results: {
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "v

In [13]:
import requests
import json
from typing import Dict, Any, Optional, List

class ElasticsearchREST:
    def __init__(self, host: str, port: int = 9200, scheme: str = "http",
                 username: Optional[str] = None, password: Optional[str] = None):
        self.base_url = f"{scheme}://{host}:{port}"
        self.session = requests.Session()
        if username and password:
            self.session.auth = (username, password)
        self.session.headers.update({
            "Content-Type": "application/json",
            "Accept": "application/json"
        })

    def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
        url = f"{self.base_url}{endpoint}"
        try:
            response = self.session.request(method, url, **kwargs)
            response.raise_for_status()
            return response.json() if response.text else {}
        except requests.exceptions.RequestException as e:
            print(f"Error making request to {url}: {e}")
            if hasattr(e, 'response') and e.response is not None:
                print(f"Response: {e.response.text}")
            raise

    def info(self) -> Dict[str, Any]:
        """Get cluster info"""
        return self._request("GET", "/")

    def index_document(self, index: str, document: Dict, doc_id: Optional[str] = None,
                      refresh: bool = True) -> Dict[str, Any]:
        """Index a document"""
        endpoint = f"/{index}/_doc"
        if doc_id:
            endpoint += f"/{doc_id}"
        params = {"refresh": "true"} if refresh else {}
        return self._request("POST", endpoint, json=document, params=params)

    def search(self, index: str, query: Dict) -> Dict[str, Any]:
        """Search documents with a query"""
        return self._request("GET", f"/{index}/_search", json=query)

    def match_all(self, index: str, size: int = 10) -> List[Dict]:
        """Get all documents in an index"""
        response = self.search(index, {
            "query": {"match_all": {}},
            "size": size
        })
        return [hit["_source"] for hit in response.get("hits", {}).get("hits", [])]

    def search_by_field(self, index: str, field: str, value: Any) -> List[Dict]:
        """Search documents where field matches value"""
        response = self.search(index, {
            "query": {
                "match": {
                    field: value
                }
            }
        })
        return [hit["_source"] for hit in response.get("hits", {}).get("hits", [])]

    def get_document(self, index: str, doc_id: str) -> Optional[Dict]:
        """Get a document by ID"""
        try:
            response = self._request("GET", f"/{index}/_doc/{doc_id}")
            return response.get("_source")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                return None
            raise

    def refresh_index(self, index: str) -> Dict[str, Any]:
        """Refresh an index to make recent changes searchable"""
        return self._request("POST", f"/{index}/_refresh")

# Example usage:
if __name__ == "__main__":
    # Initialize the client
    es = ElasticsearchREST(
        host="shivajshankar1.duckdns.org",
        port=9200
    )

    # Test connection
    print("Cluster Info:")
    print(json.dumps(es.info(), indent=2))

    # Index a test document
    print("\nIndexing test document...")
    doc = {
        "title": "Hello from Colab",
        "content": "This document was indexed from Google Colab",
        "timestamp": "2025-07-23T11:00:00Z"
    }
    result = es.index_document("my-index", doc)
    print("Document indexed:", result)

    # Refresh the index to make the document searchable
    es.refresh_index("my-index")

    # Search for all documents
    print("\nAll documents in 'my-index':")
    all_docs = es.match_all("my-index")
    for i, doc in enumerate(all_docs, 1):
        print(f"\nDocument {i}:")
        print(json.dumps(doc, indent=2))

    # Search by field
    print("\nSearching for documents containing 'Colab':")
    search_results = es.search_by_field("my-index", "content", "Colab")
    for i, doc in enumerate(search_results, 1):
        print(f"\nMatching document {i}:")
        print(json.dumps(doc, indent=2))

Cluster Info:
{
  "name": "elasticsearch-6f8b5c7c77-688jn",
  "cluster_name": "docker-cluster",
  "cluster_uuid": "tVYyf9FSSLuAHdC3TE8Rrw",
  "version": {
    "number": "8.13.4",
    "build_flavor": "default",
    "build_type": "docker",
    "build_hash": "da95df118650b55a500dcc181889ac35c6d8da7c",
    "build_date": "2024-05-06T22:04:45.107454559Z",
    "build_snapshot": false,
    "lucene_version": "9.10.0",
    "minimum_wire_compatibility_version": "7.17.0",
    "minimum_index_compatibility_version": "7.0.0"
  },
  "tagline": "You Know, for Search"
}

Indexing test document...
Document indexed: {'_index': 'my-index', '_id': 'i5f3NZgBYnrZjdTt1F9x', '_version': 1, 'result': 'created', 'forced_refresh': True, '_shards': {'total': 2, 'successful': 1, 'failed': 0}, '_seq_no': 1, '_primary_term': 1}

All documents in 'my-index':

Document 1:
{
  "title": "Hello from Colab",
  "content": "This document was indexed from Google Colab",
  "timestamp": "2025-07-23T11:00:00Z"
}

Document 2:
{
  