In [1]:
import mysql.connector
import os
import requests
import json
import logging

# Setup logging
logging.basicConfig(
    filename="nyt_pipeline.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a")  # Append mode to retain logs

console = logging.StreamHandler()
console.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
console.setFormatter(formatter)
logging.getLogger("").addHandler(console)

logging.info("Pipeline execution started.")

# Load credentials from environment variables
password = os.getenv("MYSQL_PASSWORD")
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL")

# Ensure all required environment variables are set
if not password or not slack_webhook_url:
    error_message = "Missing environment variables: MYSQL_PASSWORD or SLACK_WEBHOOK_URL"
    logging.error(error_message)
    raise ValueError(error_message)

try:
    # Attempt to connect to MySQL
    conn = mysql.connector.connect(
        host="localhost",
        user="root",
        password=password,
        database="nyt_pipeline")
    
    logging.info("Connected to MySQL successfully.")
    conn.close()

except mysql.connector.Error as err:
    error_message = f"MySQL Connection Failed: {err}"
    logging.error(error_message)

    # Send Slack notification
    message = {
        "text": f":warning: *NYT Pipeline Alert* :warning:\n\n{error_message}"}

    slack_response = requests.post(
        slack_webhook_url, 
        data=json.dumps(message), 
        headers={"Content-Type": "application/json"})

    if slack_response.status_code == 200:
        logging.info("Slack notification sent successfully.")
    else:
        logging.error(f"Failed to send Slack alert: {slack_response.text}")

    raise


2025-02-17 14:40:49,357 - INFO - Pipeline execution started.
2025-02-17 14:40:49,364 - INFO - Connected to MySQL successfully.


In [2]:
API_KEY = os.getenv("NYT_API_KEY")
URL = f"https://api.nytimes.com/svc/topstories/v2/world.json?api-key={API_KEY}"

response = requests.get(URL)

if response.status_code == 200:
    data = response.json()  # Convert response to JSON
    print("Fetched", len(data['results']), "articles.")
else:
    print("Error:", response.status_code, response.text)
    data = {"results": []}

Fetched 38 articles.


In [3]:
# Extract relevant article details
articles = []
for article in data['results']:
    article_id = article.get('uri', 'N/A')
    title = article.get('title', 'No Title')
    author = article.get('byline', '').replace("By ", "")  # Remove 'By' before author(s)
    published_date = article.get('published_date', None)
    url = article.get('url', 'No URL')
    section = article.get('section', 'No Section')

    articles.append((article_id, title, author, published_date, url, section))

print("\nFirst 5 articles:")
for a in articles[:5]:
    print(a)


First 5 articles:
('nyt://article/66414f3d-3004-57d9-a3fa-4b972b9785f6', 'Rubio Meets Saudi Crown Prince for Talks on Gaza and Ukraine', 'Patrick Kingsley and Ismaeel Naar', '2025-02-17T05:17:28-05:00', 'https://www.nytimes.com/2025/02/17/world/middleeast/rubio-saudi-arabia-gaza-ukraine.html', 'world')
('nyt://article/775f9786-1284-54e2-a88c-34fe85bb5d34', 'European Leaders Meet in Paris as U.S. Pushes Ahead With Ukraine Plan', 'Catherine Porter and Steven Erlanger', '2025-02-17T06:26:39-05:00', 'https://www.nytimes.com/2025/02/17/world/europe/europe-paris-ukraine-talks.html', 'world')
('nyt://article/c24514eb-d6de-5791-9fe1-4914957f37cb', 'Trump’s Threats Against Canada Upend Conservative’s Playbook', 'Norimitsu Onishi', '2025-02-17T05:02:28-05:00', 'https://www.nytimes.com/2025/02/17/world/americas/trump-canada-pierre-poilievre.html', 'world')
('nyt://article/259ead3a-7b18-5592-8291-deb62e58e3de', 'Israel Says It Will Keep Troops ‘Temporarily’ in 5 Points in Lebanon', 'Patrick Kings

In [4]:
conn = mysql.connector.connect(
    host="localhost",
    user="root",
    password=password,
    database="nyt_pipeline")

cursor = conn.cursor()

In [5]:
# Query to insert only new articles into nyt_staging
insert_query = """
INSERT IGNORE INTO nyt_staging (article_id, title, author, published_date, url, section)
VALUES (%s, %s, %s, %s, %s, %s);
"""

cursor.executemany(insert_query, articles)
conn.commit()

print("New articles inserted into nyt_staging successfully!")

New articles inserted into nyt_staging successfully!


In [6]:
# SQL query to move only new articles from nyt_staging to nyt_articles
insert_final_query = """
INSERT INTO nyt_articles (article_id, title, author, published_date, url, section)
SELECT s.article_id, s.title, s.author, s.published_date, s.url, s.section
FROM nyt_staging s
WHERE NOT EXISTS (
    SELECT 1 FROM nyt_articles a 
    WHERE a.article_id = s.article_id);
"""

logging.info(f"Executing SQL: {insert_final_query}")
cursor.execute(insert_final_query)
conn.commit()

print("Data moved to nyt_articles successfully!")

2025-02-17 14:40:50,412 - INFO - Executing SQL: 
INSERT INTO nyt_articles (article_id, title, author, published_date, url, section)
SELECT s.article_id, s.title, s.author, s.published_date, s.url, s.section
FROM nyt_staging s
WHERE NOT EXISTS (
    SELECT 1 FROM nyt_articles a 
    WHERE a.article_id = s.article_id);



Data moved to nyt_articles successfully!


In [7]:
API_KEY = os.getenv("NYT_API_KEY")
URL = f"https://api.nytimes.com/svc/topstories/v2/world.json?api-key={API_KEY}"

response = requests.get(URL)
data = response.json()

print("Fetched", len(data['results']), "articles")
for article in data['results'][:5]:
    print(article['title'], "-", article['published_date'])

Fetched 38 articles
Rubio Meets Saudi Crown Prince for Talks on Gaza and Ukraine - 2025-02-17T05:17:28-05:00
European Leaders Meet in Paris as U.S. Pushes Ahead With Ukraine Plan - 2025-02-17T06:26:39-05:00
Trump’s Threats Against Canada Upend Conservative’s Playbook - 2025-02-17T05:02:28-05:00
Israel Says It Will Keep Troops ‘Temporarily’ in 5 Points in Lebanon - 2025-02-17T10:05:07-05:00
South African Imam and Gay Rights Advocate Is Shot Dead - 2025-02-17T13:29:35-05:00


In [8]:
slack_webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if not slack_webhook_url:
    print("Slack Webhook URL not found.")
else:
    message = {
        "text": ":warning: *Test Alert!* :warning:\n\nThis is a test message from your NYT Pipeline."
    }
    response = requests.post(slack_webhook_url, data=json.dumps(message), headers={"Content-Type": "application/json"})
    if response.status_code == 200:
        print("Slack alert sent successfully!")
    else:
        print(f"Failed to send Slack alert: {response.text}")

Slack alert sent successfully!
