In [None]:
from elasticsearch import Elasticsearch
import json
import os
from dotenv import load_dotenv
from datasets import load_dataset
from nltk.tokenize import sent_tokenize
from tqdm import tqdm

In [2]:
load_dotenv()
ELASTIC_CLOUD_ID = os.getenv('ELASTIC_CLOUD_ID')
ELASTIC_PASSWORD = os.getenv('ELASTIC_CLOUD_PASSWORD')
INDEX_NAME = 'sampled_redpajama'  # index name reflected on elastic search

In [3]:
es = Elasticsearch(cloud_id=ELASTIC_CLOUD_ID, basic_auth=("elastic", ELASTIC_PASSWORD))

In [None]:
already_indexed_count = 17045000 # modify if interrupted main loop
subset_name = "togethercomputer/RedPajama-Data-1T"
subset_type = 'c4'
dataset = load_dataset(subset_name, subset_type, streaming=True)

In [None]:
batch_size = 20000
bulk_data = []

max_data_size_bytes = 120 * 1024 * 1024 * 1024  # Convert 120 GB to bytes since limits on free elastic search (120 since interrupted)
current_data_size_bytes = 0
dataset = dataset["train"].skip(already_indexed_count)

for batch in dataset:
    text = batch["text"]
    if text.strip():
        doc = {"text": text.strip()}
        doc_bytes = json.dumps(doc).encode('utf-8') # get the size of the document.
        if (current_data_size_bytes + len(doc_bytes)) > max_data_size_bytes:
            print("Data size limit reached. Stopping indexing.")
            break

        bulk_data.append({"index": {"_index": INDEX_NAME}})
        bulk_data.append(doc)
        current_data_size_bytes += len(doc_bytes)

        if len(bulk_data) >= batch_size:
            print(f'Sent {current_data_size_bytes // (1024 * 1024)} Mb to es cloud')
            es.bulk(body=bulk_data)
            bulk_data = []

    if current_data_size_bytes >= max_data_size_bytes:
        break

if bulk_data:
    es.bulk(body=bulk_data)

print(f"Indexed data size: {current_data_size_bytes / (1024 * 1024 * 1024):.2f} GB")
print("Indexing complete!")

Sent 20 Mb to es cloud
Sent 41 Mb to es cloud
Sent 63 Mb to es cloud
Sent 83 Mb to es cloud
Sent 105 Mb to es cloud
Sent 126 Mb to es cloud
Sent 147 Mb to es cloud
Sent 168 Mb to es cloud
Sent 189 Mb to es cloud
Sent 210 Mb to es cloud
Sent 231 Mb to es cloud
Sent 252 Mb to es cloud
Sent 274 Mb to es cloud
Sent 295 Mb to es cloud
Sent 317 Mb to es cloud
Sent 338 Mb to es cloud
Sent 359 Mb to es cloud
Sent 380 Mb to es cloud
Sent 401 Mb to es cloud
Sent 422 Mb to es cloud
Sent 443 Mb to es cloud
Sent 464 Mb to es cloud
Sent 484 Mb to es cloud
Sent 506 Mb to es cloud
Sent 527 Mb to es cloud
Sent 547 Mb to es cloud
Sent 569 Mb to es cloud
Sent 590 Mb to es cloud
Sent 611 Mb to es cloud
Sent 632 Mb to es cloud
Sent 653 Mb to es cloud
Sent 674 Mb to es cloud
Sent 695 Mb to es cloud
Sent 716 Mb to es cloud
Sent 738 Mb to es cloud
Sent 759 Mb to es cloud
Sent 779 Mb to es cloud
Sent 800 Mb to es cloud
Sent 821 Mb to es cloud
Sent 841 Mb to es cloud
Sent 862 Mb to es cloud
Sent 884 Mb to es cl

KeyboardInterrupt: 

In [None]:
# verify mapping
try:
    mapping = es.indices.get_mapping(index=INDEX_NAME)
    print(f"Mapping for index '{INDEX_NAME}':")
    print(mapping[INDEX_NAME]['mappings'])
except Exception as e:
    print(f"Error retrieving mapping: {e}")

Mapping for index 'sampled_redpajama':
{'properties': {'text': {'type': 'text', 'fields': {'keyword': {'type': 'keyword', 'ignore_above': 256}}}}}


In [None]:
# test and verify the data
try:
    # Match All Query (Retrieve all documents)
    match_all_query = {"match_all": {}}
    search_response = es.search(index=INDEX_NAME, query=match_all_query, size=10)

    print("Match All Query Results:")
    for hit in search_response["hits"]["hits"]:
        print(hit["_source"])

    # Match Query (Search for a specific term)
    match_query = {"match": {"text": "cost"}} 
    search_response = es.search(index=INDEX_NAME, query=match_query, size=10)

    print("\nMatch Query Results:")
    for hit in search_response["hits"]["hits"]:
        print(hit["_source"])

except Exception as e:
    print(f"An error occurred during search: {e}")

Match All Query Results:
{'text': 'When Todd Canter earned an MBA with honors at Cleveland State in 1995, his education helped launch an international career in finance and real estate. He recently said thanks with a gift to establish the Todd Canter Real Estate Research Endowment Fund.\nCanter serves as the Global Portfolio Strategist for the asset management platform of the National Australia Bank (nabSecurities, LLC). Previously, he worked for LaSalle Investment Management as a global real estate investment manager for 21 years. While at LaSalle, he served as the Global Strategist, Global Head of Product Development and CEO of Asia Pacific.\nHe is an active member of the American Real Estate Society, an award-winning author, and has lectured at universities around the world, including Johns Hopkins University, UC Berkeley, Hong Kong University and Nanjing University in Mainland China. Last fall, he was the keynote speaker at the annual real estate conference sponsored by CSU’s Paul 