Main

In [28]:
%%writefile main.py
from api_call import api_call
from data_transformation import data_transformations
from data_export import export_data



all_articles = api_call()

transformed_data = data_transformations(all_articles)

export_data(transformed_data)

Writing main.py


API Call

In [25]:
%%writefile api_call.py

import requests
from bs4 import BeautifulSoup
import time
import pandas as pd
import os
import json



# API Query with pagination control (ability to iterate through all pages/records)

# API base URL and static parameters
def api_call():
    api_url = "https://my.intelligence2day.com/components/api/search.cfc"
    params = {
        "method": "query",
        "APIid": "I2DE_4880557FFC6ABA165C916880849F9CAC",
        "authKey": "c51e7492-ab7f-46d8-9d10-edd4e434d2c1",
        "customerGUID": "b6150206-d9b1-4963-8907-22b7695c0477",
        "accessGroups": "8329",
        "returnFields": "*",
        # "queryString": "*:*",  # Query for all records
        "queryString": "dateline:[NOW-2MONTHS TO NOW] AND topicId:135576",  # Query for all records within time range
        "maxRows": 500,  # Limit to x results
        "sort": "dateline desc",  # Sort by UID in descending order
    }

    # Pagination control
    cursor = "*"  # Start with an empty cursor for the first request
    has_more = True
    total_articles = 0
    page = 1
    all_articles = []  # To store all article data

    while has_more:
        time.sleep(2)
        print(f"\n--- Fetching Page {page} ---")

        # Update the cursor in the request parameters for pagination
        params["cursorMark"] = cursor

        # Make the request
        response = requests.get(api_url, params=params, verify=False)

        # Print the status code
        print(f"Status Code: {response.status_code}")
        
        if response.status_code == 200:
            try:
                data = response.json()  # Parse the response as JSON
                print("Returned Data:")
                
                formatted_json = json.dumps(data, indent=4)
                print(formatted_json)    # Print the raw JSON response
                
                articles = data.get("docs", [])
                next_cursor = data.get("nextCursormark", None)

                if not articles:
                    print("No more articles returned.")
                    break

                print(f"Retrieved {len(articles)} articles on page {page}.")

                # Print the articles' title, summary, and URL
                for i, article in enumerate(articles, 1):
                    title = article.get("headline", "No title")
                    summary = article.get("summary", "No summary")
                    url = article.get("attachmenturl", "No URL")
                    date = article.get("dateline", "No date")

                    all_articles.append({"Title": title, "Summary": summary, "URL": url, "Date": date})

                    print(f"\nArticle {total_articles + i}")
                    print(f"Title   : {title}")
                    print(f"Summary : {summary}")
                    print(f"URL     : {url}")
                    print(f"Date    : {date}")

                total_articles += len(articles)
                page += 1

                # Prepare for the next iteration with the nextCursormark
                if next_cursor:
                    cursor = next_cursor  # Update the cursor for the next request
                else:
                    has_more = False  # No more pages, end the loop

            except ValueError:
                print("Error: Response is not valid JSON.")
                break
        else:
            print(f"Request failed with status code {response.status_code}")
            break

    print(f"\n✅ Total articles fetched: {total_articles}")
    return all_articles



Writing api_call.py


Data Transformations

In [26]:
%%writefile data_transformation.py

import re
from datetime import datetime
import pandas as pd
from bs4 import BeautifulSoup

def remove_html_and_script(text):
    soup = BeautifulSoup(text, "html.parser")

    # Remove script and style tags completely
    for tag in soup(["script", "style"]):
        tag.decompose()
        

    return soup.get_text(strip=False)

def unicode_handling(text):
    # Dictionary of unicode escape sequences mapped to their actual characters
    unicode_map = {
        r'\u2018': '‘',  # Left single quote
        r'\u2019': '’',  # Right single quote
        r'\u201c': '“',  # Left double quote
        r'\u201d': '”',  # Right double quote
        r'\u2013': '–',  # En dash
        r'\u2014': '—',  # Em dash
        r'\u2022': '•',  # Bullet
        r'\u2026': '…',  # Ellipsis
        r'\u00a0': ' ',  # Non-breaking space
        r'\u00b7': '·',  # Middle dot
        r'\u00e9': 'é',  # e acute
        r'\u00e2': 'â',  # a circumflex
        r'\u00e0': 'à',  # a grave
        r'\u00e8': 'è',  # e grave
        r'\u00e7': 'ç',  # c cedilla
        r'\u00f4': 'ô',  # o circumflex
        r'\u00fb': 'û',  # u circumflex
        r'\u00ee': 'î',  # i circumflex
        r'\u00ef': 'ï',  # i diaeresis
        r'\u00e4': 'ä',  # a umlaut
        r'\u00f6': 'ö',  # o umlaut
        r'\u00fc': 'ü',  # u umlaut
        r'\u00df': 'ß',  # sharp s
        r'\u2082': '₂',  # subscript 2
        r'\u2083': '₃',  # subscript 3
        r'\u267b': '',         # Recycling symbol
        r'\ufe0f': '',         # Variation selector
        # r'\ud83d\udd25': '',   # Fire emoji
        # r'\ud83c\udf1f': '', 
        # r'\u2744\ufe0f': '',
        r'\u2744': '',
        r'\u2122': '™',
        r'\u27a1': '',
        r'\u20ac': '€',
        r'\u201': '',
        r'\u2013': '–',
        r'\u2014': '—',
        #r'\ud83d\udccd': '',
        #r'\ud83c\udf89': '',
        #r'\ud83d\udd17': '',
        #r'\ud83d\udd0e': '',
        #r'\ud83d\udcf8': '',
        #r'\ud83d\udc49': '',
        #r'\ud83c\udfa7': '',
        #r'\ud83e\udd1d': '',
        #r'\u2714': '',
        #r'\ud83d\udca1': '',
        r'\u23f0': '',
        # r'\ud83c\udf88': '',
        r'\u2': '',
        r'\u201e': '',
        r'\u26a1': '',
        # r'\ud83d\udd12': '',
        # r'\ud83d\ude80': '',  # Unicode for "ROCKET" emoji (🚀).
        # r'\ud83c\u': '',  # Represents other emojis or special characters.
        r'\u25b6': '',  # Unicode for "BLACK RIGHT-POINTING TRIANGLE" (▶), used for video/play buttons.
        r'\u2b05': '',
        r'\u0130': '',
        # r'\ud83c\udf2c': '',  # Unicode for "TROPICAL STORM" emoji (🌀)
        # r'\ud83c\uud83c': '',  # Represents other emojis or special characters.
        
    }

    for code, char in unicode_map.items():
        text = text.replace(code, char)

    return text




def remove_matches(text):
    # Regular expression to match Unicode escape sequences
    unicode_pattern = r'\\u[0-9a-fA-F]{4}|\\U[0-9a-fA-F]{8}'

    # Replace all matches with an empty string
    updated_string = re.sub(unicode_pattern, '', text)

    return updated_string




def extract_date_ddmmyyyy(iso_datetime: str) -> str:
    """
    Extracts and formats the date portion from an ISO 8601 datetime string
    into 'dd-mm-yyyy' format.

    Parameters:
        iso_datetime (str): An ISO 8601 datetime string (e.g., '2025-04-16T04:31:13Z').

    Returns:
        str: The date in 'dd-mm-yyyy' format.
    """
    dt = datetime.fromisoformat(iso_datetime.replace("Z", "+00:00"))
    return dt.strftime("%Y-%m-%d")



def data_transformations(all_articles):
    print("Starting data transformations")
    df = pd.DataFrame(all_articles)
    df['Title'] = df['Title'].apply(remove_html_and_script)
    df['Title'] = df['Title'].apply(unicode_handling)
    df['Title'] = df['Title'].apply(remove_matches)
    df['Summary'] = df['Summary'].apply(remove_html_and_script)
    df['Summary'] = df['Summary'].apply(unicode_handling)
    df['Summary'] = df['Summary'].apply(remove_matches)
    df['Date'] = df['Date'].apply(extract_date_ddmmyyyy)

    print("Data transformations completed")

    return df

Writing data_transformation.py


Export Data

In [31]:
%%writefile data_export.py

import pandas as pd
import os
from datetime import datetime
from openpyxl import load_workbook

def new_records(df):
    current_date = datetime.now().strftime("%Y-%m-%d")

    # Construct filename with current date
    filename = f"updated_{current_date}.xlsx"

    with pd.ExcelWriter(filename, engine="xlsxwriter") as writer:
        df.to_excel(writer, sheet_name="Sheet1", index=False, startrow=0)

        workbook = writer.book
        worksheet = writer.sheets["Sheet1"]

        # defining column settings
        (max_row, max_col) = df.shape
        column_settings = [{"header": col} for col in df.columns]

        # Define table range & add table
        worksheet.add_table(0, 0, max_row, max_col -1,{
            "columns": column_settings,
            "name": "Table1",
    })

def records(df):
    file_path = "records.xlsx"

    # Check if file exists and determine mode and start row
    if os.path.exists(file_path):
        workbook = load_workbook(file_path)
        sheet = workbook.active
        start_row = sheet.max_row
        mode = "a"
        sheet_exists_option = {"if_sheet_exists": "overlay"}
        write_header = False
    else:
        start_row = 0
        mode = "w"
        sheet_exists_option = {}
        write_header = True

    # Use unpacking to only include 'if_sheet_exists' when needed
    with pd.ExcelWriter(file_path, engine="openpyxl", mode=mode, **sheet_exists_option) as writer:
        df.to_excel(writer, sheet_name="Sheet1", index=False, startrow=start_row,header=write_header)

    print(f"Data written to {file_path}")

def export_data(df):
    new_records(df)
    records(df)

Overwriting data_export.py
