In [None]:
%reload_ext autoreload
%autoreload 2
import os, sys


# Ragging using elastic

Step 1 is to run elastic image:

```md

docker pull docker.elastic.co/elasticsearch/elasticsearch-wolfi:8.16.1

RUN Elastic:

    docker network create demonet
    mkir ~/data/elastic

    export ESI='-p 9200:9200 docker.elastic.co/elasticsearch/elasticsearch:8.16.1'
    export ESP='-e ELASTICSEARCH_PASSWORD=elastic -e ELASTICSEARCH_USERNAME=elastic'
    export ESS='-e xpack.security.enabled=false -e discovery.type=single-node'
    export ESV='-v ~/data/elastic:/usr/share/elasticsearch/data'
    docker run --rm -it --name es01 --network=demonet ${ESS} ${ESP} ${ESI}

# Now connect using url http://localhost:9200
```

You may want to adjust some parameters

Login to docker as root and install vi

```sh
[ ]
    docker exec -u 0 -it <container es01 /bin/bash
    apt-get update
    apt-get install vim
```

# Elastic DB Load

In [35]:
%%writefile ../db_elastic.py
#!/usr/bin/env python 

import os, sys, logging, argparse, glob
sys.path.append(os.path.expanduser("~/.django") )
sys.path.append(os.path.expanduser("gpt") )

from importlib import metadata
from langchain_ollama import OllamaEmbeddings
from langchain_core.documents import Document
from langchain_core.retrievers import BaseRetriever
from langchain_elasticsearch import (
    BM25Strategy,
    DenseVectorStrategy,
    ElasticsearchStore,
)

from elasticsearch import Elasticsearch
from mangorest.mango import webapi
from pdf_parser_tools import pdf_parser
from dataframe_tools import merge_records,metadata_chunks,chunk_dict_to_list, chunks_to_doc_obj

logger = logging.getLogger( "gpt" )

# You can set these in your ~/.django/my_config

ES_URL, ES_USER, ES_PW  = "http://localhost:9200", "elastic", "elastic"
ES_CNX= dict(es_url= ES_URL, es_user= ES_USER, es_password=ES_PW)

if (os.path.exists(os.path.expanduser("~/.django/my_config.py"))):
    import my_config
    try:
        from my_config import ES_URL, ES_USER, ES_PW
    except:
        pass


_ES_STARTEGIES = {
    "hnsw":     DenseVectorStrategy(), 
    "bm25":     BM25Strategy(),
    "hybrid":   DenseVectorStrategy(hybrid=True, rrf=False),
    "sparse":   None,
    "exact":    None,
}
# ---------------------------------------------------------------------------------------
def esDeleteIndex(index):
    esclient = Elasticsearch(ES_URL, basic_auth = (ES_USER, ES_PW))
    esclient.info()
    try:
        esclient.indices.delete(index=index)
    except:
        pass
# ---------------------------------------------------------------------------------------
def esCreateIndex(index):
    esclient = Elasticsearch(ES_URL, basic_auth = (ES_USER, ES_PW))
    esclient.indices.create(index=index)
# ---------------------------------------------------------------------------------------
def getEmbedding(model="llama3.2",base_url = "http://127.0.0.1:11434/"):
    e = OllamaEmbeddings( model = model, base_url =base_url )
    return e
# ---------------------------------------------------------------------------------------
def add_to_es( docs: list[Document], es_cnx: dict, index: str, embed, strategy= "hnsw" ):
    strat = _ES_STARTEGIES[strategy]
    vectorstore = None
    for i in range(0, len(docs), 20000):
        vectorstore = ElasticsearchStore.from_documents(
            documents=docs[i : min(i + 20000, len(docs))],
            embedding=embed,
            **es_cnx,
            index_name=index,
            bulk_kwargs={
                "chunk_size": 100,
            },
            strategy=strat,
        )
    return vectorstore

# ---------------------------------------------------------------------------------------
def es_retriever( es_cnx: dict, index: str, embed, strategy="hnsw", k= 10 ):
    strat = _ES_STARTEGIES[strategy]

    v = ElasticsearchStore( **es_cnx, embedding=embed, index_name=index, strategy=strat)
    return v.as_retriever(search_kwargs={"k": k})

def esVectorSearch( retreiver, q, k=10):
        ret = retreiver.as_retriever(search_kwargs={"k": k}).invoke(q)
        
        h = {r.page_content:r for r in ret}
        if len(h) != len(ret):
            ret = [v for v in h.values()]
            
        return ret

@webapi("/gpt/esSearchIndex/")
def esSearchIndex(request, index_name, query, model="llama3.2", user="", es_url="", 
                    es_user="", es_pass="", k=10, rank=1, **kwargs):

    print(f""" 
        {locals()}
    """)
        
    if (not es_url):
        es = dict(es_url= ES_URL, es_user=ES_USER, es_password=ES_PW)
    else:
        es = dict(es_url= es_url, es_user=es_user, es_password=es_pass)

    #model = "llama3.2" #lets force the embedding for now
    embed = getEmbedding(model=model) 

    
    if ( rank):
        v = es_retriever(es, index=index_name, embed=embed, k=k*2)
        docs = v.invoke(query)
        if (len(docs)):
            ranked = rerank( query, docs)
            docs = [Document(page_content=r['text'], metadata=r['metadata']) for r in ranked[0:k]]
    else:
        v = es_retriever(es, index=index_name, embed=embed, k=k)
        docs = v.invoke(query)

    h = {r.page_content: r for r in docs}
    if len(h) != len(docs):
        docs = [v for v in h.values()]
    
    ret = []
    for d in docs:
        ret.append(dict(page_content=d.page_content, metadata=d.metadata))
    return ret



def esTextSearch(q, k=10, index="test", url = ES_URL, user=ES_USER, pw= ES_PW):
    esclient = Elasticsearch(url, basic_auth = (user, pw))
    res = esclient.search(index=index,  q=q, size=k)

    ret = []
    for i,r in enumerate(res['hits']['hits']):
        pc = r['_source']['text']
        mt = r['_source']['metadata']
        ret.append(Document(page_content = pc, metadata=mt))
        #print(i, " ==>", )
    return ret

# ---------------------------------------------------------------------------------------
def rerank(q, ret):
    from flashrank import (Ranker, RerankRequest,)

    ranker = Ranker("ms-marco-MiniLM-L-12-v2", os.path.expanduser("~/.cache/RERANKER/"))
    rerankrequest = RerankRequest(
        query=q, passages=[{"text": d.page_content, "metadata": d.metadata} for d in ret]
    )
    reranked = ranker.rerank(rerankrequest)
    return reranked

# ---------------------------------------------------------------------------------------
def getchunksFromPDF(filename):
    filename = os.path.expanduser(filename)
    
    record = pdf_parser(filename)
    # Seems like there is not data excepts figures and tables in the pdf
    if ( not record ):
        logger.info("Hmmmm not records found in PDF file!")
        return [] 

    merged = merge_records(record)

    docName = os.path.basename(filename)
    chunk_dict = metadata_chunks(merged,docName)
    chunks = chunk_dict_to_list(chunk_dict)
    docs = chunks_to_doc_obj(chunks, docName )
    return docs
# ---------------------------------------------------------------------------------------
# This is standing by itself - should be called by indexFromFolder
# can be multi tasked 
def loadES( model="llama3.2", index="", filename = '~/data/gpt/test-files/HS4_SGS1_V1S7.pdf',
           es_url=ES_URL , es_user=ES_USER, es_password=ES_PW ):
    
    docs = []
    if ( filename.endswith(".pdf")):
        docs = getchunksFromPDF(filename)
    else:
        logger.info("Indexing only PDF files now!! :)")

    if (not docs):
        return docs
    embed= getEmbedding(model)
    es = dict(es_url=es_url , es_user=es_user, es_password=es_password)
    v = add_to_es(docs, es, index=index, embed=embed)

    return docs

# ---------------------------------------------------------------------------------------
def indexFromFolderOLD(folder="", force=0, index="test", url=ES_URL, user=ES_USER, pw= ES_PW, model="llama3.2"):
    import extract_text

    folder = os.path.expanduser(folder) + "/*"
    files = [f for f in glob.glob(folder, recursive=0) if os.path.isfile(f)]

    embed= getEmbedding(model)
    es = dict(es_url=url , es_user=user, es_password=pw)

    for f in files:
        marker = f".{f}.{index}.indexed"
        if f.endswith(".indexed") or (os.path.exists( marker) and not force):
            continue;

        logger.info(f"Indexing {f}")        
        try:
            docs = extract_text.getChunks(f)
            v = add_to_es(docs, es, index=index, embed=embed)
            open(marker, "w").write("")
        except Exception as e:
            logger.error(f"{f} failed to index")
            pass

    return files

# ---------------------------------------------------------------------------------------
def indexFromFolder(folder="", force=0, index="test", url=ES_URL, user=ES_USER, pw= ES_PW, model="llama3.2"):
    folder = os.path.expanduser(folder) + "/*"
    files = [f for f in glob.glob(folder, recursive=0) if os.path.isfile(f)]

    embed= getEmbedding(model)
    es = dict(es_url=url , es_user=user, es_password=pw)

    logger.info(f"Indexing files from {folder}: found {len(files)} files.")        

    iFiles = []
    for f in files:
        bn = os.path.basename(f)
        dn = os.path.dirname(f)
        marker = f"{dn}/.{bn}.{index}.indexed"

        if f.endswith(".indexed") or (os.path.exists( marker) and not force):
            continue;

        logger.info(f"Indexing {f}")        
        try:
            loadES(model, index, f, url, user, pw)
            open(marker, "w").write("")
            iFiles.append(f)
        except Exception as e:
            logger.error(f"{f} failed to index {e}")
            pass

    return iFiles


#-----------------------------------------------------------------------------------
sysargs=None
def addargs(argv=sys.argv):
    global sysargs
    p = argparse.ArgumentParser(f"{os.path.basename(argv[0])}:")
    p.add_argument('-p', '--path',   type=str, required=True, help="where files are located to index")
    p.add_argument('-i', '--index',  type=str, required=True, help="Elastic Search index")
    p.add_argument('-m', '--model',  type=str, required=False, default="all-minilm:L6-v2", help="embedding model")
    p.add_argument('-e', '--es_url', type=str, required=False, default=ES_URL,  help="elastic URL")
    p.add_argument('-u', '--es_user',type=str, required=False, default=ES_USER, help="elastic user")
    p.add_argument('-w', '--es_pass',type=str, required=False, default=ES_PW,   help="elastic password")
    p.add_argument('-f', '--force',  required=False, default=False, action='store_true', help="force")

    sysargs=p.parse_args(argv[1:])
    return sysargs

from colabexts import utils as colabexts_utils
if __name__ == '__main__' and not colabexts_utils.inJupyter():
    a = addargs()
    logger.info(f"Indexing  {sysargs}")

    indexFromFolder(folder=a.path, force=a.force, index=a.index, url=a.es_url, 
                        user=a.es_user, pw= a.es_pass, model=a.model)

#    indexFromFolder(sys.argv[1])
# index, model = "test2", "all-minilm:L6-v2"
# index, model = "test3", "llama3.2:latest"

# esDeleteIndex(index)
# esCreateIndex(index)

# loadES(model, index);


Overwriting ../db_elastic.py


In [97]:
index, model = "test2", "all-minilm:L6-v2"
index, model = "test1", "llama3.2:latest"

#esDeleteIndex(index)
#esCreateIndex(index)

loadES(model, index, filename = '~/data/gpt/test-files/HS4_SGS1_V1S7.pdf');


2024-12-25 09:15:18,619 elastic_transport.transport INFO: GET http://localhost:9200/ [status:200 duration:0.006s]
2024-12-25 09:15:18,625 elastic_transport.transport INFO: HEAD http://localhost:9200/test1 [status:404 duration:0.003s]
2024-12-25 09:15:20,220 httpx INFO: HTTP Request: POST http://127.0.0.1:11434/api/embed "HTTP/1.1 200 OK"
2024-12-25 09:15:20,399 elastic_transport.transport INFO: PUT http://localhost:9200/test1 [status:200 duration:0.177s]
2024-12-25 09:16:20,755 httpx INFO: HTTP Request: POST http://127.0.0.1:11434/api/embed "HTTP/1.1 200 OK"
2024-12-25 09:16:21,175 elastic_transport.transport INFO: PUT http://localhost:9200/_bulk?refresh=true [status:200 duration:0.159s]
2024-12-25 09:16:21,342 elastic_transport.transport INFO: PUT http://localhost:9200/_bulk?refresh=true [status:200 duration:0.125s]
2024-12-25 09:16:21,494 elastic_transport.transport INFO: PUT http://localhost:9200/_bulk?refresh=true [status:200 duration:0.120s]
2024-12-25 09:16:21,629 elastic_transpo

In [31]:
# ---------------------------------------------------------------------------------------
def getchunks(filename):
    filename = os.path.expanduser(filename)
    
    record = pdf_parser(filename)
    merged = merge_records(record)

    docName = os.path.basename(filename)
    chunk_dict = metadata_chunks(merged,docName)
    chunks = chunk_dict_to_list(chunk_dict)
    docs = chunks_to_doc_obj(chunks, docName )
    return docs
# ---------------------------------------------------------------------------------------
# This is standing by itself - should be called by indexFromFolder
# can be multi tasked 
def loadES( model="llama3.2", index="", filename = '~/data/gpt/test-files/HS4_SGS1_V1S7.pdf',
           es_url=ES_URL , es_user=ES_USER, es_password=ES_PW ):
    
    docs = getchunks(filename)
    embed= getEmbedding(model)
    es = dict(es_url=es_url , es_user=es_user, es_password=es_password)
    v = add_to_es(docs, es, index=index, embed=embed)

    return docs

filename="/Users/e346104/data/gpt/test-files/HS4SGS1v3s3_table.pdf"
#docs = getchunks(filename)
record = pdf_parser(filename)



In [32]:
record

[]

In [94]:
query = "What is the rotation axis of the solar arrays?"
index, model = "test2", "all-minilm:L6-v2"
index, model = "test3", "llama3.2"
#model ="llama3.2"
esSearchIndex(None, model=model, index_name=index, rank=1, query=query, k=5)

2024-12-25 01:04:40,307 elastic_transport.transport INFO: GET http://localhost:9200/ [status:200 duration:0.004s]


 
        {'request': None, 'index_name': 'test3', 'query': 'What is the rotation axis of the solar arrays?', 'model': 'llama3.2', 'user': '', 'es_url': '', 'es_user': '', 'es_pass': '', 'k': 5, 'rank': 1, 'kwargs': {}}
    


2024-12-25 01:04:40,454 httpx INFO: HTTP Request: POST http://127.0.0.1:11434/api/embed "HTTP/1.1 200 OK"
2024-12-25 01:04:40,465 elastic_transport.transport INFO: POST http://localhost:9200/test3/_search?_source_includes=metadata,text [status:200 duration:0.010s]


[{'page_content': 'Document Name: HS4_SGS1_V1S7.pdf\nChapter: 7 ELECTRICAL POWER SUBSYSTEM\nSection: 7.2 CONVENTIONS\nSubSection: 7.2.2 EPS Conventions\n\nAs seen above the CSADAs are not aligned that way. The North CSADA Angle in the S/C frame starts from S/C’s –X axis (midnight) and the positive direction is clockwise viewed from the North. The primary POT dead band location on the S/C frame is 266 degrees, and the redundant POT dead band location on the S/C frame is 260 degrees. When the SA follows the Sun in Normal Mode, the POT voltage increases.',
  'metadata': {'source': 'HS4_SGS1_V1S7.pdf', 'page': 6}},
 {'page_content': 'Document Name: HS4_SGS1_V1S7.pdf\nChapter: 7 ELECTRICAL POWER SUBSYSTEM\nSection: 7.5 UNIT DESCRIPTIONS\nSubSection: 7.5.5 Scalable Power Regulation Unit (SPRU)\nSubSubSection: 7.5.5.4.3.1 Function\n\nThe auxiliary Shunt function is housed on the CAE board, and acts as a 25th shunt to maintain bus voltage regulation by applying a load to the 70V bus in the eve

In [96]:
esclient = Elasticsearch(ES_URL, basic_auth = (ES_USER, ES_PW))
esclient.info()
esclient.indices.delete(index="test1")


2024-12-25 09:14:42,709 elastic_transport.transport INFO: GET http://localhost:9200/ [status:200 duration:0.009s]
2024-12-25 09:14:42,714 elastic_transport.transport INFO: DELETE http://localhost:9200/test1 [status:404 duration:0.004s]


NotFoundError: NotFoundError(404, 'index_not_found_exception', 'no such index [test1]', test1, index_or_alias)

# TEST

In [None]:
# ---------------------------------------------------------------------------------------
# **** TEST ****
#
# Add documents to index
# ---------------------------------------------------------------------------------------
def getTestDocs():
    docs= [ 
        Document(page_content= "Hello world!",  metadata=dict(source= "src")),
        Document(page_content= "day11 world!",  metadata=dict(source= "src")),
        Document(page_content= "day21 world!",  metadata=dict(source= "src")),
        Document(page_content= "day31 world!",  metadata=dict(source= "src")),
        Document(page_content= "day41 world!",  metadata=dict(source= "src")),
    ]
    return docs

# Index documents 
def test1(docs=None):
    if (docs is None):
        docs = getTestDocs()
        
    embed= getEmbedding()

    # STEP 1. lets delete the test index 
    print("Deleteing test index")
    esDeleteIndex("test")
    esCreateIndex("test")

    es = dict(es_url= ES_URL, es_user=ES_USER, es_password=ES_PW)

    print("Add to elastic")
    v = add_to_es(docs, es, index="test", embed=embed)
    return v
# ---------------------------------------------------------------------------------------
# Retrieve documents
# 
def test2(v, q="hello"):
    print("==> TEST Vector Retriever ==>")
    ret = esVectorSearch( v, q, 3)
    for r in ret:   
        print(f"===> {str(r)[0:64]}")

    print("==> RERANK Vector Retriever ==>")
    ret = rerank( q, ret)
    for r in ret:   
        print(f"===> {str(r)[0:64]}")

    print("==> TEST keyword Retriever ==>")
    ret = esTextSearch(  q, 3)
    for r in ret:   
        print(f"===> {str(r)[0:64]}")

    print("==> RERANK KW Retriever ==>")
    #ret = rerank( q, ret)
    #for r in ret:   
    #    print(r)

    return ret

from . import extract_text
def test():
    docs = extract_text.getChunks("~/Desktop/data/LLM/sample.pdf")
    v = test1(docs)
    ret = test2(v,"day11")
    return v

v = test()
    

In [None]:
q="Arianespace has processing and launch"
ret = esVectorSearch( v, q, 3)
print("***==>  Vector Retriever ==>")
for r in ret:   
    print(f"===> {str(r)[0:64]}")
    #print(r)

'''print("\n\n***==> RERANK Vector Retriever ==>")
ret = rerank( q, ret)
for r in ret:   
    print(f"===> {str(r)[0:64]}")

print("\n\n***==> TEST keyword Retriever ==>")
ret = esTextSearch(  q, 3)
for r in ret:   
    print(f"===> {str(r)[0:64]}")
''';