In [59]:
import os
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
import re
import json

In [60]:
def stem_text(text, ps):
    stemmed = ' '.join([ps.stem(word) for word in text.split()])
    return stemmed

In [61]:
def update_text_map_with_links(inlinks_file, outlinks_file):
    with open(inlinks_file, 'r') as inlinks_json, open(outlinks_file, 'r') as outlinks_json:
        inlinks_data = json.load(inlinks_json)
        outlinks_data = json.load(outlinks_json)
        
        for doc_id, inlinks in inlinks_data.items():
            if doc_id in text_map:
                text_map[doc_id]['inlinks'] = inlinks
        for doc_id, outlinks in outlinks_data.items():
            if doc_id in text_map:
                text_map[doc_id]['outlinks'] = outlinks

In [62]:
text_map = {}
ps = PorterStemmer()

def parse_file(file_path):
    current_doc_no = None
    reading_text = False
    doc_text = ""
    count = 0
    # parse the doc to get doc no and corresponding text
    with open(file_path, 'r', encoding='ISO-8859-1', errors='ignore') as file:
        lines = file.readlines()
    for line in lines:
        if "<DOCNO>" in line:
            current_doc_no = line.strip().replace('<DOCNO>', '').replace('</DOCNO>', '')
        elif "<HEAD>" in line:
            doc_title = line.strip().replace('<HEAD>', '').replace('</HEAD>', '')
        elif "<TEXT>" in line:
            reading_text = True
            if "</TEXT>" in line:
                    doc_text += line.split("<TEXT>")[1].split("</TEXT>")[0].strip() + ' '
                    reading_text = False
        elif "</TEXT>" in line:
            reading_text = False
        elif reading_text:
            doc_text += line.strip() + ' '
        elif "</DOC>" in line:
            if current_doc_no is not None:
                stemmed_text = stem_text(doc_text, ps)
                text_map[current_doc_no.strip()] = {'title': doc_title, 'text': stemmed_text.strip()}
                doc_text = ""
                doc_title = ""

In [63]:
file_path = "hurricane_data_final.txt"

parse_file(file_path)
print("Parsing completed")

Parsing completed


In [64]:
inlinks_file_path = "hurricane_inlinks_final.json"
outlinks_file_path = "hurricane_outlinks_final.json"
update_text_map_with_links(inlinks_file_path, outlinks_file_path)

print("Inlinks and outlinks added")

Inlinks and outlinks added


In [65]:
#total no of docs
print(len(text_map))

30000


In [66]:
#save doc id into a list
docnos = list(text_map.keys())
first_doc_id = docnos[0]

first_doc_value = text_map.get(first_doc_id)

first_doc_value

{'title': 'Cyclone Imogen - Wikipedia',
 'text': "tropic cyclon imogen wa a weak but damag tropic cyclon that affect part of northern queensland. the sixth tropic low, and the first cyclon of the 2020â\x80\x9321 australian region cyclon season, imogen origin from a tropic low that form in the western gulf of carpentaria. imogen caus minim destruct on the northern part of queensland as a categori 2 cyclone, caus approxim $10 million in damages.[1] on 1 january, the bureau of meteorolog (bom) report that a tropic low had form near groot eylandt in the western gulf of carpentaria, locat about 635 km (395 mi) east-southeast of darwin.[2] the system wa assign the identifi code 05u by the bom.[3] environment condit were assess as be favour for tropic cyclogenesis, characteris by veri warm sea surfac temperatur of up to 31 â°c (88 â°f), low to moder vertic wind shear and an establish poleward outflow channel in the upper levels.[4] flare convect began to develop around the consolid low-level 

In [67]:
sw_path = "../config/stoplistnltk.txt"

with open(sw_path) as file:
    stopwords = file.read().splitlines()

print(len(stopwords))

418


In [68]:
import string

def process_content(text):
    text = ' '.join([word.lower() for word in text.split() if word.lower() not in stopwords])
    text = text.translate(str.maketrans("", "", string.punctuation))
    return text

In [69]:
author = "Pramatha"

processed_text_map = {doc_id: {'text': process_content(content['text']), 'author': author, **content} for doc_id, content in text_map.items()}

docnos = list(processed_text_map.keys())
first_doc_id = docnos[0]
first_doc_processed_content = processed_text_map[first_doc_id]

print(f"Document ID: {first_doc_id}")
print(f"Processed Content of the First Document: {first_doc_processed_content}")

Document ID: http://en.wikipedia.org/wiki/Cyclone_Imogen


# Merging elastic cloud index 

In [77]:
# Merging Indexes
from elasticsearch7 import Elasticsearch
import os
from concurrent.futures import ThreadPoolExecutor
import time
import logging
import json

# Create an Index in Elasticsearch
def createIndex(index_name, stopwords) :
    if(es.indices.exists(index=index_name)):
        return
    configurations = {
        "settings" : {
            "number_of_shards": 1,
            "number_of_replicas": 1,
            "max_result_window" : 100000,
            "analysis": {
                "filter": {
                    "english_stop": {
                        "type": "stop",
                        "stopwords": stopwords
                    },
                },
                "analyzer": {
                    "stopped": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": [
                            "lowercase",
                            "english_stop",
                            "porter_stem"
                        ]
                    }
                }
            }
        },
        "mappings": {
            "properties": {
                "text": {
                    "type": "text",
                    "fielddata": True,
                    "analyzer": "stopped",
                    "index_options": "positions"
                },
                "title": {
                    "type": "text"
                },
                "author": {
                    "type": "text"
                },
                "inlinks": {
                    "type": "text"
                },
                "outlinks": {
                    "type": "text"
                },
                
                
            }
        }
    }
    es.indices.create(index=index_name, body=configurations)
    logging.info(f"Index with name : {index_name} has been created in Elastic Search")
    
def merge_indexes(index_name, processed_text_map) :
    print(es.ping())
    visited_urls = set()
    total_files = 1
    total_docs = 0
    for path in processed_text_map:
        total_docs += 1
        logging.info(f'Adding Data to Index : {path}')
        try:
            search_result = search_index(index_name, path)
            logging.info(f'Search Result for {path} completed')
            if exists_in_index(search_result) :
                logging.info(f'Merging started for {path}')
                authors = search_result["hits"]["hits"][0]["_source"]["author"]
                inlinks = set(search_result["hits"]["hits"][0]['_source']['inlinks'])
                outlinks = set(search_result["hits"]["hits"][0]['_source']['outlinks'])
                updated_authors = authors + "," + processed_text_map[path]['author']
                updated_inlinks = inlinks.union(processed_text_map[path]['inlinks'])
                updated_outlinks = outlinks.union(processed_text_map[path]['outlinks'])
                data_update = {
                    'doc' : {
                        'inlinks' : list(updated_inlinks),
                        'outlinks' : list(updated_outlinks),
                        'author' : updated_authors
                        }
                    }
                logging.info(f'Data Update for {path} started')
                es.update(index=index_name, id=path, body=data_update)
                logging.info(f'Data Update for {path} finished')
            else:
                es.index(index=index_name, id=path, body=processed_text_map[path])
                logging.info(f'Data Add for {path} finished')
            visited_urls.add(path)
        except Exception as e:
            logging.error(f'Failed to index {path} due to {e}')
        visited_urls.add(path)
        logging.info(f'Finished Adding data for {path}')
        logging.info(f'Completed Files : {total_files}, Remaining Files : {len(processed_text_map) - total_files}, Total Docs : {len(visited_urls)}')
        total_files +=1
    logging.info(f'Total Set : {len(visited_urls)}')
    return visited_urls
            

def search_index(index_name, docID) :    
    response = es.search(index=index_name, query={"match": {"_id": docID}})
    return response

def exists_in_index(response) :
    return response['hits']['total']['value'] > 0

In [78]:
cloud_id = 'webcrawler:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlvOjQ0MyRhNWRhNGY1MTQ3Yjk0YmYwODAyMWJhMTQ0NDM5ZDU1NiQ3NWNiNjM4ZjM1ODA0ZmRjOWU1NTA0N2FhM2U3YWJiZA=='
es = Elasticsearch(cloud_id = cloud_id, request_timeout=3000, http_auth = ('elastic', 'bZRNn0IN9wzKsH3VacukXFXW'))
index_name = 'recent_hurricanes'
logging.basicConfig(
            level=logging.INFO,
            format="%(asctime)s [%(levelname)s] %(message)s\n",
            filename="index_merger_logs.txt",
            filemode="a"
)
start_time = time.time()
# createIndex(index_name, [])
visited_urls = merge_indexes(index_name, processed_text_map)
end_time = time.time()
print(f'Ended indexing the data in {end_time - start_time} seconds')

True


  es.update(index=index_name, id=path, body=data_update)
  es.index(index=index_name, id=path, body=processed_text_map[path])


Ended indexing the data in 6257.730765342712 seconds
