Flow:
1) Get 5000 entries from database
2) Clean dataset considering only English entries
3) Insert into a new index 1 in open search

In OpenSearch Dashboard:
1) Load model on a node
2) Create a new index 2 to work with the newly loaded model
3) Reindex the existing index 1 to point to index 2
4) Run the predictions


In [6]:
import psycopg2
import time
from psycopg2.extras import RealDictCursor
from langdetect import DetectorFactory,detect

conn_params = {
    "host": "127.0.0.1",
    "port": "5432",
    "database": "citation_aggregated_model",
    "user": "admin",
    "password": "admin"
}
query = '''SELECT cat.corpus_id,
               cat.abstract,
               cpm.title,
               cpm.venue,
               cpm.year,
               string_agg(car.name, ', ') as authors_name
        FROM citation_abstract cat
                 JOIN citation_papers_meta cpm on cat.corpus_id = cpm.corpus_id
                 JOIN citation_author car on cat.corpus_id = car.corpus_id
        group by
            cat.corpus_id,
            cpm.title,
            cpm.venue,
            cpm.year
        LIMIT 5000;
'''

start = time.perf_counter()
with psycopg2.connect(**conn_params) as conn:
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(query)
        rows = cursor.fetchall()
end = time.perf_counter()
print(f"Successfully fetched {len(rows)} rows in {end-start} seconds")

start = time.perf_counter()
DetectorFactory.seed = 0
cleaned_rows = []
for row in rows:
    title = row.get('title')
    if detect(title) == 'en':
        cleaned_rows.append(dict(row))
end = time.perf_counter()
print(f"Successfully filtered {len(cleaned_rows)} in {end-start} seconds")

Successfully fetched 5000 rows in 0.6091839189175516 seconds
Successfully filtered 4838 in 28.0349865029566 seconds


In [7]:
cleaned_rows[0]

{'corpus_id': 5,
 'abstract': 'Purpose: Corneal lymphatic vessels are clinically invisible because of their thin walls and clear lymph fluid. There is no easy and established method for in vivo imaging of corneal lymphatic vessels so far. In this study, we present a novel approach to visualize corneal lymphatic vessels in vivo by injecting intrastromal fluorescein sodium. Methods: Six- to eight-week-old female BALB/c mice were used in the mouse model of suture-induced corneal neovascularization. Two weeks after the suture placement, fluorescein sodium was injected intrastromally. The fluorescein, taken up by the presumed lymphatic vessels, was then tracked using a clinically used Spectralis HRA + OCT device. Immunohistochemistry staining with specific lymphatic marker LYVE-1 and pan-endothelial marker CD31 was used to confirm the indirect lymphangiography findings. Results: By injecting fluorescein intrastromally, both corneal blood and lymphatic vessels were detected. While the lympha

## Ingestion in Open Search

```
PUT /papers_meta_index
{
  "settings": {
    "number_of_shards": 1,
    "number_of_replicas": 0
  },
  "mappings": {
    "properties": {
      "corpus_id": {
        "type": "text"
      },
      "abstract": {
        "type": "text"
      },
        "title": {
        "type": "text"
      },
        "venue": {
        "type": "text"
      },
        "year": {
        "type": "text"
      },
        "authors_name": {
        "type": "text"
      }
    }
  }
}

GET /papers_meta_index/_count
```

In [19]:
import json
import requests
from requests.auth import HTTPBasicAuth
def bulk_upload_records(records:list):
    print(f"Start inserting {len(records)} records in opensearch")
    os_url = 'http://localhost:9200/'
    index_name = 'papers_meta_index'

    # Define the Elasticsearch bulk insert payload
    bulk_data = ''
    for doc in records:
        bulk_data += f'{{"index":{{"_index":"{index_name}"}}}}\n'
        bulk_data += f'{json.dumps(doc)}\n'

    headers = {'Content-Type': 'application/json'}

    # Insert the data using the OpenSearch bulk API
    response = requests.post(f'{os_url}_bulk', auth=HTTPBasicAuth('admin', 'admin'),
                             headers=headers, data=bulk_data)

    # Check the response status code
    if response.status_code != 200:
        # print(f'Successfully inserted {len(papers)} documents into "{index_name}"')
        # else:
        print(f'Error inserting data: {response.content}')
    print(f"Completed inserting {len(records)} records in opensearch")

In [25]:
import multiprocessing as mp
chunk_size = 500
chunked_cleaned_list=[cleaned_rows[i:i + chunk_size] for i in range(0, len(cleaned_rows), chunk_size)]

with mp.Pool() as pool:
  results = pool.map(bulk_upload_records, chunked_cleaned_list)

Start inserting 500 records in opensearch
Start inserting 500 records in opensearch
Start inserting 500 records in opensearch
Start inserting 500 records in opensearch
Start inserting 500 records in opensearch
Start inserting 500 records in opensearch
Start inserting 500 records in opensearch
Start inserting 500 records in opensearch
Start inserting 500 records in opensearchStart inserting 338 records in opensearch

Completed inserting 338 records in opensearch
Completed inserting 500 records in opensearch
Completed inserting 500 records in opensearchCompleted inserting 500 records in opensearch

Completed inserting 500 records in opensearch
Completed inserting 500 records in opensearchCompleted inserting 500 records in opensearch
Completed inserting 500 records in opensearch

Completed inserting 500 records in opensearch
Completed inserting 500 records in opensearch
