# Opensearch Query Notebook

## Notebook Setup

Import Libraries

In [19]:
import pandas as pd
import pprint

from datetime import date
from time import perf_counter
from typing import List, Optional, Union, Tuple

from opensearch_dsl import Q, Search, connections, Index
from pydantic import BaseModel
import openpyxl


Connect to opensearch (rerun this cell anytime there are updates to an index)

In [20]:
os_client = connections.create_connection("default", timeout=20, http_auth=("admin", "admin"), hosts=[{"host": "localhost", "port": 9200}], use_ssl=False)
policy_index = Index("policy")
corpus_index = Index("corpus")
policy_index.refresh()
corpus_index.refresh()

{'_shards': {'total': 1, 'successful': 1, 'failed': 0}}

Retrieve corpora available to seach

In [22]:
s = Search(using=os_client, index="corpus")
r = s.execute()
hit_list = [hit.to_dict() for hit in r]
available_corpora = [h for h in hit_list] 
corpus_df = pd.DataFrame.from_records(available_corpora)
corpus_df

Unnamed: 0,name,n_docs,n_ingested,n_error,n_cac_required
0,fam,970,970,0,0


Define functions and SearchResult class

In [23]:
class SearchResult(BaseModel):
    score: Optional[float]
    id: str
    corpus: str
    num: str
    title: str
    frags: Optional[List]
    n_pages: int
    word_count: int
    pub_date: Optional[date]
    last_update: str
    url: str

# this could maybe be turned into a class?
def custom_search(os_client, corpora: Union[List[str], None], query: Union[str, None], slop: int, minimum_should_match: int, include_title=False):
    global available_corpora
    
    s = Search(using=os_client, index="policy")

    # filter by corpus
    if corpora is not None:
        # # below code that checks if corpus is valid is commented out until reimplemented
        # valid_corpus = {c: c in available_corpora for c in corpora}
        # if False in valid_corpus.values():
        #     invalid_corpus = [c for c in valid_corpus if valid_corpus[c] is False]
        #     print(f"{len(invalid_corpus)} corpus value(s) not found: {invalid_corpus}")
        if len(corpora) == 1:
            s = s.filter("term", corpus=corpora[0])
        else:
            corpus = list(set(corpora))
            s = s.filter("terms", corpus=corpus)
    
    # match query on keyword value
    if query is not None:
        if include_title is False:
            q_string = Q(
            "query_string",
            query=query,
            fields=["text_by_page"],
            # fuzziness="AUTO",
            # fuzzy_prefix_length=2,
            phrase_slop=slop,
            minimum_should_match=minimum_should_match
            )
        elif include_title is True:
            q_string = Q(
                "query_string",
                query=query,
                fields=["title", "text_by_page"],
                # fuzziness="AUTO",
                # fuzzy_prefix_length=2,
                phrase_slop=slop,
                minimum_should_match=minimum_should_match
                )
            s = s.highlight("title", number_of_fragments=1)
        
        s = s.highlight("text_by_page", fragment_size=150, number_of_fragments=3)
        s = s.highlight_options(order="score")
        
        s = s.query(q_string)

    s = s.extra(track_scores=True)

    return s

# find page in which text occurs
def find_pages(n_pages, frag_list, text_by_page) -> List[Tuple[str, int]]:
    results = []
    if n_pages > 1:
        for frag in frag_list:
            text = frag.replace("<em>", "").replace("</em>", "")
            page = [text_by_page.index(page) for page in text_by_page if text in page]
            if len(page) == 1:
                page = page[0] + 1
                if frag.endswith(".") is False:
                    frag = frag + "..."
                results.append((frag, page))
    elif n_pages == 1:
        results = frag_list
    return results

# scan using the search, returning results and elapsed time
def do_search(s:  Search):
    start = perf_counter()
    r = s.scan()
    
    results = []
    for h in r:
        result = SearchResult(
            score=h.meta.score,
            id=h.meta.id,
            corpus=h.corpus,
            num=h.num,
            title=h.title,
            n_pages=h.n_pages,
            word_count=h.word_count,
            last_update=str(h.access_time),
            url=h.url
        )
        try:
            result.pub_date = h.pub_date
        except Exception:
            result.pub_date = None

        if hasattr(h.meta,"highlight"):
            frags = []
            if hasattr(h.meta.highlight, "text_by_page"):
                page_frags = find_pages(
                h.n_pages, [i for i in h.meta.highlight.text_by_page], h.text_by_page
                )
                frags.append(page_frags)
            if hasattr(h.meta.highlight, "title"):
                title_frag = h.meta.highlight.title
                frags.append(title_frag)
                
            result.frags = frags


        results.append(result)
    elapsed_time = perf_counter() - start
    return results, elapsed_time


## Create query, run search and save outputs

In [53]:
sample_query_a = '"zero trust architecture" (saas "software as a service") (iaas "infrastructure as a service") (paas "platform as a service") (iot "internet of things") "information technology" "operational technology" "digital infrastructure" (scrm "supply chain risk management")'
sample_query_b = '(saas "software as a service") (iaas "infrastructure as a service") (paas "platform as a service") (iot "internet of things")'
sample_cyber_scrm_query = '"cyber scrm" "cyber supply chain risk management" "c scrm" "c-scrm"'

class query(object):
    global query_list
    query_list = []
    def custom_query(self, query_counter):
        if query_counter == 0:
            query_ask = input(str("Do you have a query you wish to search for? If so, please enter the entire term exactly as it" + 
                                  " should be searched. \nEnter Yes or No:")).lower()
        else:
            query_ask = input(str("Do you have another query you wish to search for? Enter Yes or No:")).lower()
        if query_ask != 'yes' and query_ask != 'no':
            self.custom_query(query_counter)
        if query_ask == 'yes':
            custom_query = input(str("Please enter the query: ")).lower()
            query_list.append("" + custom_query + "")
            query_counter += 1
            print("Terms in query: {t}".format(t=query_list))
            self.custom_query(query_counter)
query = query()
query.custom_query(0)
print(query_list)
#global str_query_list
#str_query_list = ''
#if query_ask == 'no':
#str_query_list += str_query_list + '"'
#str_query_list = print('" "'.join(query_list))

Terms in query: ['zero trust architecture']
Terms in query: ['zero trust architecture', 'hdfuig']
Terms in query: ['zero trust architecture', 'hdfuig', 'test']
['zero trust architecture', 'hdfuig', 'test']


Pass in search parameters

In [30]:
s = custom_search(
    os_client=os_client,
    corpora=None,
    query=str(query_list).replace('[','').replace(']',''),
    slop=0,
    minimum_should_match=1,
    include_title=True
)
pprint.pprint(s.to_dict())


{'highlight': {'fields': {'text_by_page': {'fragment_size': 150,
                                           'number_of_fragments': 3},
                          'title': {'number_of_fragments': 1}},
               'order': 'score'},
 'query': {'query_string': {'fields': ['title', 'text_by_page'],
                            'minimum_should_match': 1,
                            'phrase_slop': 0,
                            'query': "'zero trust architecture', 'saas', "
                                     "'software as a service', 'iaas', "
                                     "'infrastructure as a service'"}},
 'track_scores': True}


Perform search

In [31]:
search_results, elapsed_time = do_search(s)
sg_or_pl = "result" if len(search_results) == 1 else "results"

print(f"{len(search_results)} search {sg_or_pl} in {round(elapsed_time,2)} seconds\n")
#print(search_results)

961 search results in 4.88 seconds



Convert list of SearchResult objects into DataFrame, sort by score high to low, and show top 10 results

In [32]:
list_of_dicts = [i.__dict__ for i in search_results]
df = pd.DataFrame(list_of_dicts)
df = df.sort_values(by='score', ascending=False)
df.head(20)

Unnamed: 0,score,id,corpus,num,title,frags,n_pages,word_count,pub_date,last_update,url
154,16.943058,fam_foreign_affairs_manual_1 FAM 270,fam,1 FAM 270,BUREAU OF INFORMATION RESOURCE MANAGEMENT (IRM...,[[and command and control center in the\r\neve...,1,20128,2022-02-17,2022-03-31T18:23:33,https://fam.state.gov/FAM/01FAM/01FAM0270.html
786,16.803432,fam_foreign_affairs_manual_12 FAM 010,fam,12 FAM 010,"SCOPE AND AUTHORITY, 12 FAM 010","[[<em>trust</em>. , It does not include\r\nper...",1,12687,2021-04-21,2022-03-31T18:23:02,https://fam.state.gov/FAM/12FAM/12FAM0010.html
953,16.642393,fam_foreign_affairs_manual_5 FAH-8 H-110,fam,5 FAH-8 H-110,"WEB DEVELOPMENT, 5 FAH-8 H-110",[[The\r\nformat of an IP address is <em>a</em>...,1,2787,2017-07-24,2022-03-31T18:23:20,https://fam.state.gov/FAM/05FAH08/05FAH080110....
170,14.942364,fam_foreign_affairs_manual_4 FAH-2 H-820,fam,4 FAH-2 H-820,"CASHIER BANK ACCOUNTS, 4 FAH-2 H-820","[[Any change in the financial\r\nenvironment, ...",1,1655,2013-07-23,2022-03-31T18:23:23,https://fam.state.gov/FAM/04FAH02/04FAH020820....
804,14.862165,fam_foreign_affairs_manual_5 FAM 150,fam,5 FAM 150,"SERVICE AGREEMENTS, 5 FAM 150","[[MOUS, MOAS, <em>IAAS</em>, AND ISAS (CT:IM-2...",1,1810,2018-11-13,2022-03-31T18:23:37,https://fam.state.gov/FAM/05FAM/05FAM0150.html
476,14.665928,fam_foreign_affairs_manual_7 FAM 500 Appendix A,fam,7 FAM 500 Appendix A,"internal revenue service, 7 FAM 500 Appendix A","[[(including <em>a</em> passport, national ide...",1,3193,2021-04-22,2022-03-31T18:23:15,https://fam.state.gov/FAM/07FAM/07FAM0500apA.html
504,14.198364,fam_foreign_affairs_manual_5 FAM 110,fam,5 FAM 110,"IT MANAGEMENT, 5 FAM 110",[[Responsibility\r\nfor Internal Control; (12)...,1,3508,2020-05-26,2022-03-31T18:23:37,https://fam.state.gov/FAM/05FAM/05FAM0110.html
925,12.572518,fam_foreign_affairs_manual_5 FAH-3 H-510,fam,5 FAH-3 H-510,"Using PROGRAM TAGS (“K” TAGS), 5 FAH-3 H-510","[[<em>a</em> group or in part, or the\r\narea ...",1,11616,2021-07-14,2022-03-31T18:23:08,https://fam.state.gov/FAM/05FAH03/05FAH030510....
910,12.523376,fam_foreign_affairs_manual_7 FAM 1030,fam,7 FAM 1030,"UNITED STATES AS PROTECTING POWER, 7 FAM 1030","[[<em>As</em> <em>a</em> general rule, <em>a</...",1,1892,2021-04-22,2022-03-31T18:23:12,https://fam.state.gov/FAM/07FAM/07FAM1030.html
146,12.240341,fam_foreign_affairs_manual_1 FAM 260,fam,1 FAM 260,"BUREAU OF DIPLOMATIC SECURITY (DS), 1 FAM 260",[[Government agencies to produce <em>a</em>\r\...,1,28987,2021-10-20,2022-03-31T18:23:34,https://fam.state.gov/FAM/01FAM/01FAM0260.html


Iterate over and extract queried information by P-Tag:

In [100]:
from parsel import Selector
import requests

def parse_doc_page():
    target_urls = ['/05FAM/', '/12FAM/']
    #target_urls.append(input(str(Please enter any other url you wish to query:))) -> future-proofing
    html_url = ''
    output = []
    for id in range(0, len(df['url'])):
            for target_url in target_urls:
                    if target_url in df['url'][id]:
                        html_url = df['url'][id]
                        response = requests.get(html_url)
                        sel = Selector(response.text)
                        doc_text = sel.xpath("//p/text()").getall()
                        #print(doc_text)
                        for ptag in doc_text:
                            #check if keyword is in p-tag
                            #if keyword in p-tag, append ptag to output list
                            for query in query_list:
                                if query in ptag:
                                    output.append(ptag)
                                    break
    with open("extracted_text.txt", "w") as extracted_text:
        extracted_text.write(str(output).replace(r'\r', ' ').replace(r'\xa0', '').replace(r'\n', '').replace(r'\x80', '').replace(r'\x99', ''))

    #convert output to downloadable text file
                          
parse_doc_page()


Access opensearch document 

In [None]:
s = Search(using=os_client, index="policy")
s = s.filter("sample_keyword", _id="sample_document_id") 
'''
Step 1: iterate over each keyword and each document with a non-zero ontological score and with correct fam/fah designation
Step 2: Either a) fuzzy match to return output of keyword in each document within x amount of words, b) return entire 
document for client to manually parse, or c) use xpath id in the website to segregate my policy section and return that

'''

        # extracted_info = response.selector.xpath(//p/)
r = s.scan()
pprint.pprint([h.to_dict() for h in r])

Save DataFrame to Excel

In [10]:
filename = "dmsms.xlsx"
df.to_excel(filename)