<a href="https://colab.research.google.com/github/uanish91/ETL/blob/main/ETL_project.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#**Build a News ETL Data Pipeline Using Python and SQLite**

# 1. Import the Libraries and Connect to NewsAPI

In [2]:
import pandas as pd
import sqlite3 as sql
!pip install newsapi-python
from newsapi import NewsApiClient
import logging


Collecting newsapi-python
  Downloading newsapi_python-0.2.7-py2.py3-none-any.whl (7.9 kB)
Installing collected packages: newsapi-python
Successfully installed newsapi-python-0.2.7


In [3]:
#Use news-api python library
#Get your API key by heading over to NEWS API registration page
news_api_key = "5bba0ebc778d42b2a040d45d25187f00"
news_api = NewsApiClient(api_key = news_api_key)

# 2. Retrieve and print News Articles

In [4]:
# The news_api pulls json file with sections status, total_results and articles
#Define a function to extract the articles related tp AI.
def extract_news_data():
  try:
    result = news_api.get_everything(q = 'AI', language = 'en', sort_by='publishedAt')
    logging.info("Connection is successful")
    return result["articles"]
  except:
      logging.error("Connection is unsuccessful")
      return None

articles = extract_news_data()
print(articles[:5])


#ItemIterate over the list for getting the keys - Optional
seen_keys = set()
for item in articles:
    # Iterate over each key in the dictionary
    for key in item.keys():
        # Split the key on underscore (_) if it contains one, otherwise use the key as is
        first_word = key.split(":")[0]
        if first_word not in seen_keys:
          print(first_word)
          seen_keys.add(first_word)


[{'source': {'id': None, 'name': 'Samsung.com'}, 'author': 'Samsung Newsroom', 'title': 'Samsung Showcases Innovative AI TV Technologies at 2024 Southeast Asia Tech Seminar', 'description': 'Samsung to meet with media and industry professionals to share company’s AI-enhanced lineup of Neo QLED 8K, OLED and lifestyle products and discuss latest industry trends', 'url': 'https://news.samsung.com/global/samsung-showcases-innovative-ai-tv-technologies-at-2024-southeast-asia-tech-seminar', 'urlToImage': 'https://img.global.news.samsung.com/global/wp-content/uploads/2024/04/SEA-Tech-Seminar-2024_thumb728.jpg', 'publishedAt': '2024-04-24T02:51:54Z', 'content': 'Samsung Electronics, the worlds leading TV manufacturer for the 18th consecutive year, commenced its 2024 Southeast Asia Tech Seminar in Bangkok, Thailand. From April 23 to 24, Samsung will showcase … [+3213 chars]'}, {'source': {'id': None, 'name': 'Wccftech'}, 'author': 'Rohail Saleem', 'title': 'Elon Musk Says That Tesla’s AI Traini

#  3. Clean Author Column

In [4]:
#Define a function to format the authors name into title case
#Split the name at comma and make first name title cased
def clean_author_column(text):
  try:
    return text.split(",")[0].title()
  except AttributeError:
    return "No Author"


#4. Transform News Data

In [5]:
#This is the transform stage from ETL
#The extracted data has to be transformed to be further loaded into a dataframe for processing in staging area.
#Replace the value for source key with name's key
def transform_news_data(articles):
  article_list = []
  for i in articles:
    article_list.append([value.get("name",0) if key == "source"
                         else
                         value for key, value in i.items() if key in ["author", "title", "publishedAt", "content", "url", "source"]])
    #Convert the dictionary into dataframe
    df = pd.DataFrame(article_list, columns = ["source", "Author Name", "News Title", "URL", "Date Published", "Content"])
    #change the format of date published column
    df["Date Published"] = pd.to_datetime(df["Date Published"]).dt.strftime('%Y-%m-%d %H:%M:%S')
    #Apply the function to title case the author column
    df["Author Name"] = df["Author Name"].apply(clean_author_column)
    return df

transformed_data = transform_news_data(articles)
print(transformed_data)


           source Author Name  \
0  MobiHealthNews   No Author   

                                          News Title  \
0  GE HealthCare partners with Elekta for radiati...   

                                                 URL       Date Published  \
0  https://www.mobihealthnews.com/news/ge-healthc...  2024-04-23 17:25:13   

                                             Content  
0  GE HealthCare has partnered with radiation the...  


# 5. Load the Data into SQLite DB

In [6]:
#Load the data into the sql database.
#Define load_news_data to load the processed data into sql database
def load_news_data(data):
  #Use sqlite3.connect() to establish a connection with sqlite3 database
  #Name the table as news_table.db
  with sql.connect('news_table.db') as connection:
    #Establish a cursor object to execute queries
    cursor = connection.cursor()
    cursor.execute('''
    CREATE TABLE IF NOT EXISTS news_table(
      'source' VARCHAR (30),
      'Author Name' TEXT,
      'News Title' TEXT,
      'URL' TEXT,
      'Date Published' TEXT,
      'Content' TEXT)''')
  data.to_sql(name= "news_table", con = connection, index = False, if_exists = "append")

result = load_news_data(transformed_data)



# 5a. Check if the data is loaded into sqlite database (Optional)

In [7]:
#Check if the data is loaded(Optional)
def check_data_loaded():
  with sql.connect('news_table.db') as connection:
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM news_table")
    rows = cursor.fetchall()
    for row in rows:
      print(row)

print(check_data_loaded())

('Theinventory.com', 'The Inventory Bot', 'Leadaybetter 2 HEPA + 4 Foam & Felt Filters for Shark Navigator Lift-Away NV350, Now 15% Off', 'https://theinventory.com/leadaybetter-2-hepa-4-foam-felt-filters-for-shark-n-1851428897', '2024-04-23 16:37:04', "Whether you're looking to simply replace your shark Navigator Lift-Away NV350, NV351, NV352 series vacuum cleaner filters or seeking to optimize your vacuum's efficiency, the Leadybetter 2 HEPA + 4 F… [+1984 chars]")
('MobiHealthNews', 'No Author', 'GE HealthCare partners with Elekta for radiation therapy', 'https://www.mobihealthnews.com/news/ge-healthcare-partners-elekta-radiation-therapy', '2024-04-23 17:25:13', 'GE HealthCare has partnered with radiation therapy company Elekta to develop new software to improve clinicians experience and enable greater precision treatment.\xa0\r\nElekta will use GE HealthCares MIM… [+2256 chars]')
None


# 6. Automate the steps with Apache Airflow by Initializing the DAG object.

In [10]:
#Install the apache-airflow in colab
#!pip install --ignore-installed apache-airflow

Collecting apache-airflow
  Using cached apache_airflow-2.9.0-py3-none-any.whl (13.3 MB)
Collecting alembic<2.0,>=1.13.1 (from apache-airflow)
  Using cached alembic-1.13.1-py3-none-any.whl (233 kB)
Collecting argcomplete>=1.10 (from apache-airflow)
  Using cached argcomplete-3.3.0-py3-none-any.whl (42 kB)
Collecting asgiref (from apache-airflow)
  Using cached asgiref-3.8.1-py3-none-any.whl (23 kB)
Collecting attrs>=22.1.0 (from apache-airflow)
  Using cached attrs-23.2.0-py3-none-any.whl (60 kB)
Collecting blinker>=1.6.2 (from apache-airflow)
  Using cached blinker-1.7.0-py3-none-any.whl (13 kB)
Collecting colorlog<5.0,>=4.0.2 (from apache-airflow)
  Using cached colorlog-4.8.0-py2.py3-none-any.whl (10 kB)
Collecting configupdater>=3.1.1 (from apache-airflow)
  Using cached ConfigUpdater-3.2-py2.py3-none-any.whl (34 kB)
Collecting connexion[flask]<3.0,>=2.10.0 (from apache-airflow)
  Using cached connexion-2.14.2-py2.py3-none-any.whl (95 kB)
Collecting cron-descriptor>=1.2.24 (from a

In [8]:
!airflow db init

DB: sqlite:////root/airflow/airflow.db
[[34m2024-04-24T17:26:03.402+0000[0m] {[34mmigration.py:[0m216} INFO[0m - Context impl [01mSQLiteImpl[22m.[0m
[[34m2024-04-24T17:26:03.403+0000[0m] {[34mmigration.py:[0m219} INFO[0m - Will assume [01mnon-transactional[22m DDL.[0m
INFO  [alembic.runtime.migration] Context impl SQLiteImpl.
INFO  [alembic.runtime.migration] Will assume non-transactional DDL.
INFO  [alembic.runtime.migration] Running stamp_revision  -> 1949afb29106
WARNI [airflow.models.crypto] empty cryptography key - values will not be stored encrypted.
Initialization done


In [9]:
import airflow

In [10]:
#ensure the executables are present in the path environement used by colab.
#Add the path where dependencies are installed.
import sys
print(sys.version)
print(sys.executable)
print(sys.path)
sys.path.append('/usr/bin/python3')
sys.path.append('/usr/local/lib/python3.10')
print(sys.path)

3.10.12 (main, Nov 20 2023, 15:14:05) [GCC 11.4.0]
/usr/bin/python3
['/content', '/env/python', '/usr/lib/python310.zip', '/usr/lib/python3.10', '/usr/lib/python3.10/lib-dynload', '', '/usr/local/lib/python3.10/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.10/dist-packages/IPython/extensions', '/root/.ipython', '/root/airflow/dags', '/root/airflow/config', '/root/airflow/plugins']
['/content', '/env/python', '/usr/lib/python310.zip', '/usr/lib/python3.10', '/usr/lib/python3.10/lib-dynload', '', '/usr/local/lib/python3.10/dist-packages', '/usr/lib/python3/dist-packages', '/usr/local/lib/python3.10/dist-packages/IPython/extensions', '/root/.ipython', '/root/airflow/dags', '/root/airflow/config', '/root/airflow/plugins', '/usr/bin/python3', '/usr/local/lib/python3.10']


In [13]:
#Import DAG, datetime and timedelta
from airflow import DAG
from datetime import datetime, timedelta, time
from airflow.operators.python import PythonOperator

In [14]:
# Define variables for scheduling the DAG
to_date = datetime.now().date()
from_date = to_date - timedelta(days=1)

In [16]:
#Initialize DAG object
dag= DAG(dag_id="news_etl", default_args = {'start_date':datetime.combine(from_date, time(0,0)), 'retries':1}, schedule = 'daily',)

In [17]:
#Replace the extract_news_data() function to make it comaptible with Airflow
def extract_news_data(**kwargs):
    try:
        result = news_api.get_everything(q="AI", language="en", from_param=from_date, to=to_date)
        logging.info("Connection is successful.")
        # Push the result to the XCom
        kwargs['task_instance'].xcom_push(key='extract_result', value=result["articles"])
    except:
        logging.error("Connection is unsuccessful.")


In [18]:
#Replace the transform_news_data() function to make it comaptible with Xcoms within a DAG
def transform_news_data(**kwargs):
    article_list = []
    # Add the XCom pull logic to pull data from the XCom
    data = kwargs['task_instance'].xcom_pull(task_ids='extract_news', key='extract_result')

    # Logging message after the XCom pull
    logging.info("Data pulled successfully")

    for i in data:
        article_list.append([value.get("name", 0) if key == "source" else value for key, value in i.items() if key in ["author", "title", "publishedAt", "content", "url", "source"]])

    df = pd.DataFrame(article_list, columns=["Source", "Author Name", "News Title", "URL", "Date Published", "Content"])

    df["Date Published"] = pd.to_datetime(df["Date Published"]).dt.strftime('%Y-%m-%d %H:%M:%S')

    df["Author Name"] = df["Author Name"].apply(clean_author_column)

    #Add the XCom push logic to push data to the XCom
    kwargs['task_instance'].xcom_push(key='transform_df', value=df.to_json())
    logging.info("Transformed data pushed to XCom successfully.")


In [34]:
#Replace the load_news_data() function to make it comaptible with Xcoms within a DAG
def load_news_data(**kwargs):
  with sql.connect('news_table.db') as connection:
        # Create a cursor within the context manager
        cursor = connection.cursor()

        # Create a table
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS news_table (
                "Source" VARCHAR(30),
                "Author Name" TEXT,
                "News Title" TEXT,
                "URL" TEXT,
                "Date Published" TEXT,
                "Content" TEXT
            )
        ''')

        # Pull data from XCom
        data = kwargs['task_instance'].xcom_pull(task_ids='transform_news', key='transform_df')

        # Convert data into a DataFrame
        df = pd.read_json(data)

        # Logging message before loading data
        logging.info("Ready to load data into the database.")

        df.to_sql(name="news_table", con=connection, index=False, if_exists="append")

        # Logging message after loading data
        logging.info("Data successfully loaded into the database.")

In [29]:
# Create Python operators
_extract_news_data = PythonOperator(
    task_id = "extract_news",
    python_callable = extract_news_data,
    provide_context = True,
    dag = dag
)

_transform_news_data = PythonOperator(
    task_id = "transform_news",
    python_callable = transform_news_data,
    provide_context = True,
    dag = dag
)

_load_news_data = PythonOperator(
    task_id = "load_news",
    python_callable = load_news_data,
    provide_context = True,
    dag = dag
)

In [30]:
# Create dependencies
_extract_news_data >> _transform_news_data >> _load_news_data

<Task(PythonOperator): load_news>

In [32]:
with sql.connect("news_table.db") as connection:
    df = pd.read_sql("SELECT * FROM news_table;", connection)
df.head()

Unnamed: 0,source,Author Name,News Title,URL,Date Published,Content
0,Theinventory.com,The Inventory Bot,Leadaybetter 2 HEPA + 4 Foam & Felt Filters fo...,https://theinventory.com/leadaybetter-2-hepa-4...,2024-04-23 16:37:04,Whether you're looking to simply replace your ...
1,MobiHealthNews,No Author,GE HealthCare partners with Elekta for radiati...,https://www.mobihealthnews.com/news/ge-healthc...,2024-04-23 17:25:13,GE HealthCare has partnered with radiation the...
