In [None]:
!pip install elasticsearch

In [None]:
from elasticsearch import Elasticsearch

# Connect to Elasticsearch (Default: localhost:9200)
es = Elasticsearch(hosts=["http://localhost:9200"])

# Test connection
if es.ping():
    print("Connected to Elasticsearch!")
else:
    print("Failed to connect.")


In [None]:
# Example data
document = {
    "title": "Elasticsearch Python Example",
    "content": "This is a tutorial on Elasticsearch Python API.",
    "timestamp": "2024-12-31T10:00:00"
}

# Index a document into the 'my_index' index
response = es.index(index="my_index", id=1, document=document)
print("Index Response:", response)


In [None]:
# Search for all documents in 'my_index'
response = es.search(index="my_index", query={"match_all": {}})
print("Search Results:", response["hits"]["hits"])


In [None]:
# Match query to find documents with 'Python' in the 'content' field
query = {
    "match": {
        "content": "Python"
    }
}

response = es.search(index="my_index", query=query)
print("Search Results:", response["hits"]["hits"])


In [None]:
# Boolean query to filter documents with 'Python' in content and a specific timestamp
query = {
    "bool": {
        "must": [
            {"match": {"content": "Python"}}
        ],
        "filter": [
            {"range": {"timestamp": {"gte": "2024-01-01"}}}
        ]
    }
}

response = es.search(index="my_index", query=query)
print("Filtered Search Results:", response["hits"]["hits"])


In [None]:
# Update the document with ID 1
update_doc = {
    "doc": {
        "title": "Updated Title: Elasticsearch Python API"
    }
}

response = es.update(index="my_index", id=1, body=update_doc)
print("Update Response:", response)


In [None]:
# Delete document with ID 1
response = es.delete(index="my_index", id=1)
print("Delete Response:", response)


In [None]:
# Delete all documents in 'my_index'
response = es.delete_by_query(index="my_index", body={"query": {"match_all": {}}})
print("Delete All Response:", response)


In [None]:
# Paginate search results (2 results per page)
query = {"match_all": {}}
page_size = 2
page_number = 1  # Change for subsequent pages

response = es.search(index="my_index", query=query, size=page_size, from_=(page_number - 1) * page_size)
print("Paginated Results:", response["hits"]["hits"])