Feed Parser and Data Extraction:
- Create a script that reads the provided list of RSS feeds.
- Parse each feed and extract relevant information from each news article,
including title, content, publication date, and source URL.
- Ensure handling of duplicate articles from the same feed.

In [1]:
import feedparser
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime


In [2]:
Base = declarative_base()

class NewsArticle(Base):
    __tablename__ = 'news_articles'
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    content = Column(Text)
    pub_date = Column(DateTime)
    source_url = Column(String, unique=True, nullable=False)

The code defines a SQLAlchemy ORM model for a database table named news_articles, representing news articles. It uses declarative_base() to create a base class for model definitions. 

The NewsArticle class includes several columns: an id as the primary key (integer), a title (string) that cannot be null, a content field (text) for the article's body, a pub_date (datetime) for when the article was published, and a source_url (string) that is unique and also cannot be null. 

This structure allows for easy management of news article data within a database, facilitating operations like adding and querying articles.

In [3]:
def parse_rss_feed(feed_url, processed_titles):
    feed = feedparser.parse(feed_url)

    articles = []

    for entry in feed.entries:
        title = entry.title.strip()
        if title not in processed_titles:
            content = entry.get('summary', entry.get('description', ''))
            pub_date = entry.get('published_parsed', entry.get('updated_parsed', None))
            source_url = entry.link

            articles.append({
                'title': title,
                'content': content,
                'pub_date': pub_date,
                'source_url': source_url
            })

            processed_titles.add(title)

    return articles

The parse_rss_feed function retrieves and processes articles from an RSS feed specified by feed_url. It takes a set of processed_titles to ensure that only new articles are considered.

The function parses the feed and initializes an empty list for storing articles. For each entry in the feed, it extracts the article's title, trimming any whitespace. If the title hasn't been processed before, it retrieves the content (using either the summary or description), publication date, and source URL. 

The article's details are then stored in a dictionary, which is appended to the articles list. The title is added to the processed_titles set to avoid duplicates in future calls. Finally, the function returns the list of newly parsed articles.


In [4]:
rss_feeds = [
    'http://rss.cnn.com/rss/cnn_topstories.rss',
    'http://qz.com/feed',
    'http://feeds.foxnews.com/foxnews/politics',
    'http://feeds.reuters.com/reuters/businessNews',
    'http://feeds.feedburner.com/NewshourWorld',
    'https://feeds.bbci.co.uk/news/world/asia/india/rss.xml',
]



The variable rss_feeds is a list of URLs pointing to various RSS feeds from well-known news outlets, including CNN, Quartz, Fox News, Reuters, PBS NewsHour, and BBC News, specifically focused on topics like top stories, politics, and world news.

This diverse selection allows for a comprehensive aggregation of news content. The processed_titles variable is initialized as an empty set, which will be used to track article titles that have already been processed, ensuring that duplicates are avoided in future data retrieval. This setup is designed to facilitate efficient collection and management of news articles from these sources.

In [5]:
processed_titles = set()

for feed_url in rss_feeds:
    articles = parse_rss_feed(feed_url, processed_titles)
    print(f"Articles from {feed_url}:\n")
    for article in articles:
        print(f"Title: {article['title']}")
        print(f"Content: {article['content']}")
        print(f"Publication Date: {article['pub_date']}")
        print(f"Source URL: {article['source_url']}")
        print("\n" + "-"*50 + "\n")

Articles from http://rss.cnn.com/rss/cnn_topstories.rss:

Title: Some on-air claims about Dominion Voting Systems were false, Fox News acknowledges in statement after deal is announced
Content: 
Publication Date: time.struct_time(tm_year=2023, tm_mon=4, tm_mday=19, tm_hour=12, tm_min=44, tm_sec=51, tm_wday=2, tm_yday=109, tm_isdst=0)
Source URL: https://www.cnn.com/business/live-news/fox-news-dominion-trial-04-18-23/index.html

--------------------------------------------------

Title: Dominion still has pending lawsuits against election deniers such as Rudy Giuliani and Sidney Powell
Content: 
Publication Date: None
Source URL: https://www.cnn.com/business/live-news/fox-news-dominion-trial-04-18-23/h_8d51e3ae2714edaa0dace837305d03b8

--------------------------------------------------

Title: Here are the 20 specific Fox broadcasts and tweets Dominion says were defamatory
Content: • Fox-Dominion trial delay 'is not unusual,' judge says
• Fox News' defamation battle isn't stopping Trump

Articles from http://qz.com/feed:

Title: Elon Musk's Tesla robotaxi event is today. Here's what to know
Content: <img class="type:primaryImage" src="https://i.kinja-img.com/image/upload/c_fit,q_80,w_636/03fa1ed5ceb673b8dac3131c138f24b9.jpg" /><p>Tesla (<a class="sc-1out364-0 dPMosf sc-145m8ut-0 lcFFec js_link" href="https://qz.com/quote/TSLA" rel="noopener noreferrer" target="_blank">TSLA</a>) CEO Elon Musk is primed to unveil his company’s efforts to produce a self-driving “robotaxi,” either delivering a massive win for investors that justifies sacrificing a widely anticipated car — or a major disappointment.<br /></p><p><a href="https://qz.com/tesla-robotaxi-we-robot-elon-musk-ai-fsd-product-launch-1851669136">Read more...</a></p>
Publication Date: time.struct_time(tm_year=2024, tm_mon=10, tm_mday=10, tm_hour=9, tm_min=44, tm_sec=0, tm_wday=3, tm_yday=284, tm_isdst=0)
Source URL: https://qz.com/tesla-robotaxi-we-robot-elon-musk-ai-fsd-product-launch-1851669136

---------------------

Articles from http://feeds.foxnews.com/foxnews/politics:

Title: Arizona begins in-person and absentee voting, here's what you need to know
Content: Arizona kicks off 2024 election with early voting, absentee voting on Wednesday.
Publication Date: time.struct_time(tm_year=2024, tm_mon=10, tm_mday=9, tm_hour=9, tm_min=0, tm_sec=15, tm_wday=2, tm_yday=283, tm_isdst=0)
Source URL: https://www.foxnews.com/politics/arizona-begins-in-person-absentee-voting-heres-what-you-need-know

--------------------------------------------------

Title: Haitian migration into US becomes major political issue as election looms
Content: The impact of Haitian migration into the United States has become a top 2024 political issue, after former President Trump put the issue into the spotlight.
Publication Date: time.struct_time(tm_year=2024, tm_mon=10, tm_mday=10, tm_hour=10, tm_min=0, tm_sec=9, tm_wday=3, tm_yday=284, tm_isdst=0)
Source URL: https://www.foxnews.com/politics/haitian-migration-us-becomes-major-

Articles from http://feeds.feedburner.com/NewshourWorld:

Title: Xi Jinping celebrates China’s rising power — and his own
Content: President Xi Jinping opened China’s twice-per decade Communist Party Congress on Wednesday hailing the reforms he put in place during his first five-year term and sharing his vision for where he hopes to take the nation. William Brangham reports on the congress as it prepares to announce Xi’s successor and how new leadership may transform China’s role as a global economic partner.
Publication Date: time.struct_time(tm_year=2017, tm_mon=10, tm_mday=18, tm_hour=22, tm_min=35, tm_sec=21, tm_wday=2, tm_yday=291, tm_isdst=0)
Source URL: http://www.pbs.org/newshour/bb/xi-jinping-celebrates-chinas-rising-power/

--------------------------------------------------

Title: The battle for Mosul is over, but this hidden ISIS danger could lurk for years
Content: Iraq may have ousted Islamic States militants from the city of Mosul over the summer, but the major task of f

Articles from https://feeds.bbci.co.uk/news/world/asia/india/rss.xml:

Title: Starbucks, Tetley, Jaguar Land Rover: Remembering Ratan Tata's global ambitions
Content: The former Tata boss's audacious risk-taking saw his firm swallow up a host of iconic foreign brands.
Publication Date: time.struct_time(tm_year=2024, tm_mon=10, tm_mday=10, tm_hour=8, tm_min=48, tm_sec=43, tm_wday=3, tm_yday=284, tm_isdst=0)
Source URL: https://www.bbc.com/news/articles/c5y8jp386lno

--------------------------------------------------

Title: Obituary: Ratan Tata, the 'modest' Indian tycoon
Content: The businessman led a conglomerate of more than 100 companies, employing some 660,000 people.
Publication Date: time.struct_time(tm_year=2024, tm_mon=10, tm_mday=9, tm_hour=19, tm_min=40, tm_sec=39, tm_wday=2, tm_yday=283, tm_isdst=0)
Source URL: https://www.bbc.com/news/articles/cd11lz4vpr7o

--------------------------------------------------

Title: India's Ratan Tata: in his own words
Content: The legendary

The code iterates over a list of RSS feed URLs stored in rss_feeds and processes each feed using the parse_rss_feed function, while tracking processed article titles with the processed_titles set. For each feed URL, it retrieves the articles and prints a header indicating the source of the articles. 

Then, for each article, it outputs the title, content, publication date, and source URL in a structured format. After displaying the details of each article, a line of dashes is printed to visually separate the articles from different feeds.

This approach effectively aggregates and presents news articles from various sources, ensuring clarity and organization in the output.


## Database Storage:
- Design a database schema to store the extracted news article data.
- Implement logic to store new articles in the database without duplicates.


In [6]:
from sqlalchemy import create_engine, Column, String, DateTime, Integer, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Article(Base):
    __tablename__ = 'articles'
    id = Column(Integer, primary_key=True)
    title = Column(String)
    content = Column(String)
    published = Column(DateTime)
    link = Column(String, unique=True)
    category = Column(String)

engine = create_engine('mysql+pymysql://root:vijvig123@127.0.0.1/my_database')
Base.metadata.create_all(engine)


The code sets up a SQLAlchemy ORM model for a database table named articles, which will store information about news articles. It begins by importing necessary components from SQLAlchemy and creating a base class using declarative_base().

The Article class defines the structure of the articles table with several columns: id (an integer serving as the primary key), title (string), content (string), published (datetime), link (a unique string for the article's URL), and category (string for the article's classification).

An SQLAlchemy engine is then created to connect to a MySQL database using the provided credentials, and the Base.metadata.create_all(engine) command is executed to create the articles table in the specified database. This setup enables structured storage and retrieval of article data within a MySQL database.

In [7]:
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import IntegrityError

articles = [
    {
        'title': 'Sample Title 1',
        'content': 'Sample Content 1',
        'published': '2022-01-01T12:00:00',
        'link': 'http://example.com/article1',
    },
    {
        'title': 'Sample Title 2',
        'content': 'Sample Content 2',
        'published': '2022-01-02T14:30:00',
        'link': 'http://example.com/article2',
    },
]

def save_articles(articles):
    Session = sessionmaker(bind=engine)
    session = Session()
    for article in articles:
        # Check for duplicates based on the link
        existing_article = session.query(Article).filter_by(link=article['link']).first()
        if existing_article:
            # Optionally update existing article's fields or log
            print(f"Duplicate article found: {article['link']}")
            continue  # Skip adding this article
        
        # Create a new Article object and add it to the session
        news_article = Article(**article)
        session.add(news_article)

    try:
        session.commit()
    except IntegrityError:
        session.rollback()  # Roll back the transaction on error
        print("An error occurred while saving articles.")

save_articles(articles)


Duplicate article found: http://example.com/article1
Duplicate article found: http://example.com/article2


The code defines a function save_articles that saves a list of articles to a database using SQLAlchemy. It begins by creating a session with the database engine using sessionmaker.

For each article in the provided list, the function checks for duplicates by querying the database for existing articles with the same link. If a duplicate is found, it logs a message and skips adding that article. If no duplicate exists, it creates a new Article object using the article data and adds it to the session.

After processing all articles, the function attempts to commit the session to save the changes to the database. If an IntegrityError occurs during the commit (potentially due to a duplicate entry), it rolls back the transaction and prints an error message. This setup ensures that only unique articles are saved while handling potential database errors gracefully.


## Task Queue and News Processing:
○ Set up a Celery queue to manage asynchronous processing of new articles.
○ Configure the parser script to send extracted articles to the queue upon arrival.
○ Create a Celery worker that consumes articles from the queue and performs
further processing:
■ Category classification: Utilize NLTK or spaCy to classify each article into
the provided categories.
■ Update the database with the assigned category for each article.

In [8]:
from celery import Celery
from celery.signals import worker_process_init
from sqlalchemy import create_engine, Column, String, Text, DateTime, Integer
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords


In [10]:
from celery import Celery
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
from nltk.corpus import stopwords

# Initialize Celery
app = Celery('news_processing', broker='pyamqp://guest:guest@localhost//')

# Set of English stop words
stop_words = set(stopwords.words('english'))

# Set up SQLAlchemy engine and session
DATABASE_URL = 'mysql+pymysql://root:vijvig123@127.0.0.1/my_database'
engine = create_engine(DATABASE_URL)

# Create a session factory
Session = sessionmaker(bind=engine)

# Define the base class for SQLAlchemy models
Base = declarative_base()


The code initializes a Celery application named news_processing, configured to use a RabbitMQ message broker for task management. It also imports necessary libraries and sets up a collection of English stop words using the Natural Language Toolkit (NLTK) to facilitate text processing. 

The code establishes a connection to a MySQL database using SQLAlchemy, with a specified database URL, and creates an engine for managing database interactions. A session factory is then defined using sessionmaker, which allows for the creation of database sessions. 

Additionally, a base class for SQLAlchemy models is defined using declarative_base(), laying the groundwork for building structured database models in the application. This setup provides the necessary components for processing news articles in a robust and efficient manner.

In [11]:
class NewsArticle(Base):
    __tablename__ = 'news_articles'
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    content = Column(Text)
    pub_date = Column(DateTime)
    source_url = Column(String, unique=True, nullable=False)
    category = Column(String)


In [12]:
def classify_category(content):
    # Your category classification logic using NLTK or spaCy goes here
    return 'Uncategorized'


In [13]:
@app.task
def process_article(article):
    session = Session()

    try:
        existing_article = session.query(NewsArticle).filter_by(source_url=article['source_url']).first()

        if not existing_article:
            category = classify_category(article['content'])

            new_article = NewsArticle(
                title=article['title'],
                content=article['content'],
                pub_date=datetime.utcfromtimestamp(article['pub_date']),
                source_url=article['source_url'],
                category=category
            )
            session.add(new_article)
            session.commit()

    except Exception as e:
        logging.error(f"Error processing article: {str(e)}")

    finally:
        session.close()


The process_article function is a Celery task designed to handle the processing and storage of a news article. It begins by creating a new database session using the previously defined session factory. 

The function checks if an article with the same source_url already exists in the database. If no such article is found, it proceeds to classify the article's category based on its content.

A new NewsArticle object is then created with the provided details, including title, content, publication date (converted from a timestamp), source URL, and the classified category. This new article is added to the session, and the session is committed to save the changes to the database. 

In case of any errors during the process, the error is logged, and regardless of success or failure, the session is closed to ensure proper resource management. This function enables efficient and organized storage of articles in the database while handling potential issues effectively.


In [14]:
@worker_process_init.connect
def configure_workers(**kwargs):
    global Session
    Session = sessionmaker(bind=engine)
    

## Logging and Error Handling:
- Implement proper logging throughout the application to track events and potential
errors.
- Handle parsing errors and network connectivity issues gracefully.

In [15]:
import logging

app = Celery('news_processing', broker='pyamqp://guest:guest@localhost//')

stop_words = set(stopwords.words('english'))

engine = create_engine('mysql+pymysql://root:vijvig123@127.0.0.1/my_database')
Session = sessionmaker(bind=engine)


The code initializes a Celery application named news_processing, which is configured to use RabbitMQ as its message broker for managing background tasks. It also sets up a collection of English stop words using the Natural Language Toolkit (NLTK), which can be used for text processing tasks such as filtering out common words. 

Additionally, the code establishes a connection to a MySQL database through SQLAlchemy by creating an engine with the specified database URL. A session factory is then created using sessionmaker, allowing for the creation of database sessions to facilitate interactions with the database. 

This setup lays the groundwork for processing news articles and managing data efficiently within the application.

In [16]:
Base = declarative_base()

class NewsArticle(Base):
    __tablename__ = 'news_articles'
    id = Column(Integer, primary_key=True)
    title = Column(String, nullable=False)
    content = Column(Text)
    pub_date = Column(DateTime)
    source_url = Column(String, unique=True, nullable=False)
    category = Column(String)


In [17]:
def classify_category(content):
    # Your category classification logic using NLTK or spaCy goes here
    return 'Uncategorized'


In [18]:
@app.task
def process_article(article):
    session = Session()

    try:
        existing_article = session.query(NewsArticle).filter_by(source_url=article['source_url']).first()

        if not existing_article:
            category = classify_category(article['content'])

            new_article = NewsArticle(
                title=article['title'],
                content=article['content'],
                pub_date=datetime.utcfromtimestamp(article['pub_date']),
                source_url=article['source_url'],
                category=category
            )
            session.add(new_article)
            session.commit()

    except Exception as e:
        session.rollback()  # Rollback the session on error
        logging.error(f"Error processing article '{article['source_url']}': {str(e)}")
    finally:
        session.close()  # Ensure the session is closed

The process_article function is a Celery task designed to process and store individual news articles in a database. It begins by creating a new session using the session factory. Within a try block, the function checks if an article with the same source_url already exists in the database.

If it doesn't find a duplicate, it classifies the article's category based on its content. A new NewsArticle object is then created using the article's title, content, publication date (converted from a timestamp), source URL, and the classified category. This object is added to the session and committed to the database.

In the event of an exception, the function rolls back the session to prevent partial data changes and logs an error message indicating the issue. Regardless of the outcome, the session is closed in the finally block, ensuring proper resource management. This function enables efficient and error-resilient storage of news articles in the database.
