# **Post-U.S. Presidential Election Media Coverage Analysis ETL Pipeline**
Tenzin Lhamo
---



# Data Extraction

In [19]:
#load libraries
import requests
import pandas as pd

api_key = "8887c2c2acca40c9a9d08ea623b3f260"
base_url = "https://newsapi.org/v2/everything"

#fetch api
def fetch_news(query, from_date, to_date, language = 'en'):
    params = {
        'q': query,
        'from': from_date,
        'to': to_date,
        'language': language,
        'apiKey': api_key
    }
    try:
        response = requests.get(base_url, params = params)
        #check response status
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        #create error emssage for bad responses
        print(f"Error fetching data: {e}")
        raise e

#choose post-election data
election_data = fetch_news("US Presidential Election", "2024-11-06", "2024-11-20")
articles = election_data.get('articles', [])
election_df = pd.DataFrame(articles)

print(election_data)

{'status': 'ok', 'totalResults': 5792, 'articles': [{'source': {'id': 'the-verge', 'name': 'The Verge'}, 'author': 'Wes Davis', 'title': 'Bluesky crosses the 15 million user mark', 'description': 'Bluesky crossed the 15 million user mark today as signups surge in the wake up the US presidential election', 'url': 'https://www.theverge.com/2024/11/13/24295484/bluesky-15-million-users-social-media-x-musk', 'urlToImage': 'https://cdn.vox-cdn.com/thumbor/nQopgfz8pctT4cExam1iwTffjPs=/0x0:2040x1360/1200x628/filters:focal(1020x680:1021x681)/cdn.vox-cdn.com/uploads/chorus_asset/file/25728926/STK133_BLUESKY__C.jpg', 'publishedAt': '2024-11-13T14:48:20Z', 'content': 'Bluesky crosses the 15 million user mark\r\nBluesky crosses the 15 million user mark\r\n / The site has grown quickly as people continue to search for X alternatives.\r\nByWes Davis, a weekend editor who c… [+1362 chars]'}, {'source': {'id': None, 'name': 'BBC News'}, 'author': None, 'title': 'No plans to join Bluesky yet, Keir Starm

# Data Transformation


In [20]:
def clean_data(election_df):
    try:
        #make sure columns of interest are present
        required_columns = ['source', 'author', 'title', 'description', 'url', 'publishedAt', 'content']
        missing_columns = [col for col in required_columns if col not in election_df.columns]
        if missing_columns:
            raise ValueError(f"Missing columns: {', '.join(missing_columns)}")

        #create copy of dataframe
        election_df = election_df[['source', 'author', 'title', 'description', 'url', 'publishedAt', 'content']].copy()
        #transform source column
        election_df['source'] = election_df['source'].apply(lambda x: x['name'] if isinstance(x, dict) and 'name' in x else None)
        #transform publishedAt column to datetime
        election_df['publishedAt'] = pd.to_datetime(election_df['publishedAt'], errors = 'coerce')
        #convert publishedAt column to MySQL-compatible format
        election_df['publishedAt'] = election_df['publishedAt'].dt.strftime('%Y-%m-%d %H:%M:%S')

        #replace NA values with descriptions
        election_df['author'] = election_df['author'].fillna('Unknown')
        election_df['description'] = election_df['description'].fillna('No Description')
        #remove rows with [Removed] titles
        election_df = election_df[election_df['title'] != '[Removed]']

        #remove potential duplicates
        election_df = election_df.drop_duplicates(subset = ['title', 'url'], keep = 'first')

        #order chronologically
        election_df = election_df.sort_values(by = 'publishedAt', ascending = True)
        return election_df

    except Exception as e:
        print(f"Error in data transformation: {e}")
        raise e

#apply to clean_df
clean_df = clean_data(election_df)
#check clean df
print(clean_df.head())
print(clean_df.info())


              source                                           author  \
61          ABC News   Beatrice Peterson, Leah Sarnoff, Josh Margolin   
17  Business Insider                   htan@insider.com (Huileng Tan)   
25  Business Insider  insider@insider.com (Geoff Weiss,Bryan Metzger)   
51  Business Insider      bgriffiths@insider.com (Brent D. Griffiths)   
63               TPM                                      John Gruber   

                                                title  \
61  Capitol Police warn of post-election violence ...   
17    The Trump trade is taking off on election night   
25         Republicans win back control of the Senate   
51  Harris' chances are still alive after she won ...   
63                             Overnight Status Check   

                                          description  \
61  A Capitol Police intelligence assessment obtai...   
17  The US dollar, Treasury yields, and stock futu...   
25  Republicans are projected to win a majority

# Data Loading

In [21]:
#install and load libraries
!pip install mysql-connector-python
import mysql.connector

#adding JawsDB connection info
db_config = {
        'host': 'o3iyl77734b9n3tg.cbetxkdyhwsb.us-east-1.rds.amazonaws.com',
        'user': 'eggeeac2hno4ngfh',
        'password': 'ikodmpb2p364dijt',
        'database': 'wfizyuc5pvcg05f1'
}

#connect to database
conn = mysql.connector.connect(**db_config)
cursor = conn.cursor()

#check connection
cursor.execute('SELECT DATABASE();')
print(cursor.fetchone())

('wfizyuc5pvcg05f1',)


In [22]:
#create table in JawsDB mySQL
create_table_query = """
CREATE TABLE IF NOT EXISTS election_data (
    id INT AUTO_INCREMENT PRIMARY KEY,
    source VARCHAR(255),
    author VARCHAR(255),
    title TEXT,
    description TEXT,
    url TEXT,
    publishedAt DATETIME,
    content TEXT
);
"""

#execute and commit changes
cursor.execute(create_table_query)
conn.commit()

In [23]:
#load data into JawsDB mySQL table
for index, row in clean_df.iterrows():
    insert_query = """
    INSERT INTO election_data (source, author, title, description, url, publishedAt, content)
    VALUES (%s, %s, %s, %s, %s, %s, %s)
    """
    cursor.execute(insert_query, (row['source'], row['author'], row['title'], row['description'], row['url'], row['publishedAt'], row['content']))

#commit changes
conn.commit()

In [24]:
#check data
cursor.execute("SELECT * FROM election_data LIMIT 5;")
print(cursor.fetchall())

[(1, 'BBC News', 'Unknown', 'When will we know whether Harris or Trump won?', 'Once polls close on Tuesday, a winner may not be projected for several hours, days or even weeks.', 'https://www.bbc.com/news/articles/cde7ng85jwgo', datetime.datetime(2024, 11, 4, 5, 29, 31), 'Americans are voting for their next president in a general election that culminates on Tuesday 5 November.\r\nOnce polls close, depending on how close the contest is, it is possible a winner will not b… [+7919 chars]'), (2, 'BBC News', 'Unknown', 'Ten reasons why either Trump or Harris might win', 'Polls suggest the race is deadlocked - but if one candidate pulls away, these factors could explain why.', 'https://www.bbc.com/news/articles/cj0jq134y91o', datetime.datetime(2024, 11, 4, 7, 35, 16), 'Ten reasons why either Trump or Harris might win\r\nWith just one day to go, the race for the White House is deadlocked - both at the national level and in the all-important battleground states.\r\nThe p… [+6608 chars]'), (3, 

In [25]:
#close connection
cursor.close()
conn.close()

In [26]:
#load data into Pandas dataframe
#reestablish connection
conn = mysql.connector.connect(**db_config)
query = "SELECT * FROM election_data;"
df = pd.read_sql(query, conn)
print(df.head())
conn.close()

  df = pd.read_sql(query, conn)


   id            source            author  \
0   1          BBC News           Unknown   
1   2          BBC News           Unknown   
2   3  Business Insider    Bradley Saacks   
3   4          BBC News           Unknown   
4   5  Business Insider  Natalie Musumeci   

                                               title  \
0     When will we know whether Harris or Trump won?   
1   Ten reasons why either Trump or Harris might win   
2       Who Wall Street thinks will win the election   
3  Our undecided US voters have (mostly) made up ...   
4  Inside the litigation election, where lawyers ...   

                                         description  \
0  Once polls close on Tuesday, a winner may not ...   
1  Polls suggest the race is deadlocked - but if ...   
2  A survey of professional investors on the plat...   
3  The BBC speaks again to a group who have strug...   
4  While preelection litigation might not feel es...   

                                                 url   

# Automation Using Prefect

In [28]:
#download and load prefect
!pip install prefect
!pip install slack_sdk
!pip install pymysql

import asyncio
from prefect import task, flow
from prefect.server.schemas.schedules import IntervalSchedule
from sqlalchemy import create_engine
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
import os

#fetch and clean data
@task
def fetch_and_clean_data():
    try:
        #fetch data from API
        election_data = fetch_news("US Presidential Election", "2024-11-06", "2024-11-20")
        articles = election_data.get('articles', [])
        election_df = pd.DataFrame(articles)
        clean_df = clean_data(election_df)
        return clean_df
    #create error message
    except Exception as e:

        notify_slack(f"Error fetching and cleaning data: {e}")
        raise

#load data into JawsDB mySQL
@task
def load_data_to_mysql(clean_data):
    try:
        engine = create_engine('mysql+pymysql://eggeeac2hno4ngfh:ikodmpb2p364dijt@o3iyl77734b9n3tg.cbetxkdyhwsb.us-east-1.rds.amazonaws.com/wfizyuc5pvcg05f1')
        clean_data.to_sql('news_table', con = engine, if_exists = 'append', index = False)
        print('Data loaded successfully.')
    #create error message
    except Exception as e:
        notify_slack(f"Error in loading data to MySQL: {e}")
        raise

#create slack notification for failures to my Slack channel
@task
def notify_slack(message):
    try:
        slack_client = WebClient(token = 'xoxb-5417998309077-8138270063841-CmPESBStbBUgLD17BzoZMwTz')
        user_id = 'U05SESH2V1V'
        slack_client.chat_postMessage(channel = user_id, text = message)
        #create error message
    except SlackApiError as e:
        print(f"Slack API Error: {e.response['error']}")
        raise e

#define prefect flow
@flow
def etl_flow():
    try:
        data = fetch_and_clean_data()
        load_data_to_mysql(data)
    except Exception as e:
        notify_slack(f'ETL pipeline failed: {str(e)}')
        raise e
#set schedule to run once a day
schedule = IntervalSchedule(interval = pd.Timedelta(days = 1))

#create deployment
if __name__ == "__main__":
    etl_flow()



Data loaded successfully.
