In [None]:
import sys
!{sys.executable} -m pip install spacy
!{sys.executable} -m spacy download en

In [21]:
# Import necessary libraries
from elasticsearch import Elasticsearch
import logging
import spacy
from spacytextblob.spacytextblob import SpacyTextBlob
from spacy.language import Language

# Define custom component to filter entities
@Language.component("filter_entities")
def filter_entities(doc):
    RELEVANT_LABELS = {'PERSON', 'ORG', 'GPE', 'LOC', 'DATE' , 'EVENT', 'MONEY', 'PRODUCT', 'WORK_OF_ART'}
    doc.ents = [ent for ent in doc.ents if ent.label_ in RELEVANT_LABELS]
    return doc

# Install and load NLP model
nlp = spacy.load("en_core_web_sm")

# Add SpacyTextBlob and custom filter component to spaCy pipeline
nlp.add_pipe("spacytextblob")
nlp.add_pipe("filter_entities", after="spacytextblob")

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ElasticsearchHook:
    def __init__(self, hosts=None):
        self.es = Elasticsearch(hosts or ["http://localhost:9200"])
        self.logger = logging.getLogger(__name__)
        self.logger.info("Initialized ElasticsearchHook with hosts: %s", hosts)

    def index_data(self, index, id, body):
        try:
            response = self.es.index(index=index, id=id, body=body)
            self.logger.info("Indexed document with ID: %s into index: %s", id, index)
            return response
        except Exception as e:
            self.logger.error(
                "Failed to index document with ID: %s into index: %s. Error: %s",
                id,
                index,
                e,
            )
            raise

    def exists(self, index, id):
        try:
            exists = self.es.exists(index=index, id=id)
            self.logger.info(
                "Checked existence of document with ID: %s in index: %s. Exists: %s",
                id,
                index,
                exists,
            )
            return exists
        except Exception as e:
            self.logger.error(
                "Failed to check existence of document with ID: %s in index: %s. Error: %s",
                id,
                index,
                e,
            )
            raise

    def get_new_articles(self, index, doc_type="_doc", size=10000):
        query = {"query": {"bool": {"must_not": {"exists": {"field": "nlp_processed"}}}}}
        try:
            result = self.es.search(index=index, body=query, size=size)
            return result["hits"]["hits"]
        except Exception as e:
            self.logger.error(
                "Failed to retrieve new articles from index: %s. Error: %s",
                index,
                e,
            )
            raise

    def update_article(self, index, id, body):
        try:
            self.es.update(index=index, id=id, body={"doc": body})
        except Exception as e:
            self.logger.error(
                "Failed to update article in index: %s, id: %s. Error: %s",
                index,
                id,
                e,
            )
            raise

def process_articles_with_nlp():
    es = ElasticsearchHook()
    try:
        articles = es.get_new_articles(index="rss_feeds")
        logger.info(f"Retrieved {len(articles)} new articles from Elasticsearch")
    except Exception as e:
        logger.error(f"Error retrieving articles from Elasticsearch: {e}")
        return

    for article in articles:
        try:
            doc = nlp(article['_source']['summary'])
            entities = [{'text': ent.text, 'label': ent.label_} for ent in doc.ents]
            sentiment = doc._.blob.sentiment.polarity  # Using spacytextblob for sentiment analysis

            enriched_data = {
                'nlp_processed': True,
                'entities': entities,
                'sentiment': sentiment
            }

            es.update_article(index="rss_feeds", id=article['_id'], body=enriched_data)
            logger.info(f"Enriched data: {enriched_data}")
            logger.info(f"Successfully processed and updated article: {article['_id']}")
        except Exception as e:
            logger.error(f"Error processing article ID: {article['_id']}, Error: {e}")

# Execute the function
process_articles_with_nlp()


INFO:__main__:Initialized ElasticsearchHook with hosts: None
  result = self.es.search(index=index, body=query, size=size)
INFO:elastic_transport.transport:POST http://localhost:9200/rss_feeds/_search [status:200 duration:0.064s]
  result = self.es.search(index=index, body=query, size=size)
INFO:__main__:Retrieved 2824 new articles from Elasticsearch
INFO:elastic_transport.transport:POST http://localhost:9200/rss_feeds/_update/https%3A%2F%2Finteraksyon.philstar.com%2Fpolitics-issues%2F2024%2F08%2F29%2F282529%2Ftrump-reposts-lewd-remark-about-harris-on-his-social-media-site%2F [status:200 duration:0.019s]
  self.es.update(index=index, id=id, body={"doc": body})
INFO:__main__:Enriched data: {'nlp_processed': True, 'entities': [{'text': 'Donald Trump', 'label': 'PERSON'}, {'text': 'Wednesday', 'label': 'DATE'}, {'text': 'Kamala Harris', 'label': 'PERSON'}, {'text': 'Truth Social', 'label': 'ORG'}, {'text': 'Harris', 'label': 'PERSON'}, {'text': 'Hillary Clinton', 'label': 'PERSON'}, {'tex