In [None]:
import requests

def fetch_quote():
    url = "https://zenquotes.io/api/today"
    try:
        response = requests.get(url, timeout=10)

        # Check if request succeeded
        if response.status_code != 200:
            return {"error": f"Request failed with status code {response.status_code}"}

        # Try to parse JSON safely
        try:
            data = response.json()
        except ValueError:
            return {"error": "Malformed JSON response"}

        # Validate structure: expect list with at least one dictionary
        if not isinstance(data, list) or len(data) == 0 or not isinstance(data[0], dict):
            return {"error": "Unexpected data format"}

        # Extract fields
        quote_text = data[0].get("q")
        author = data[0].get("a")

        if not quote_text or not author:
            return {"error": "Missing quote or author in response"}

        return {"quote": quote_text.strip(), "author": author.strip()}

    except requests.exceptions.RequestException as e:
        return {"error": f"Network error: {e}"}


# Run it
result = fetch_quote()

if "error" in result:
    print("Error:", result["error"])
else:
    print(f'“{result["quote"]}” — {result["author"]}')


In [None]:
import psycopg2
from psycopg2 import sql
import smtplib
import time
import logging
import requests
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import os

# =======================================================
# CONFIGURATION
# =======================================================
SMTP_SERVER = "smtp.gmail.com"
SMTP_PORT = 587
SENDER_EMAIL = os.getenv("SENDER_EMAIL") or "kabirolawalemohammed@gmail.com"
SENDER_PASSWORD = os.getenv("SENDER_PASSWORD") or "your_app_password"
ADMIN_EMAIL = "kabirwale@yahoo.com"

#USERS = [
 #   {"name": "Kabir", "email": "kabir@example.com"},
  #  {"name": "Aisha", "email": "aisha@example.com"},
   # {"name": "Tunde", "email": "tunde@example.com"}
#]

LOG_FILE = "email_service.log"
logging.basicConfig(
    filename=LOG_FILE,
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)


# Database connection
def get_connection():
    return psycopg2.connect(
        host="localhost",
        database="quote_task",
        user="airflow",
        password="airflow"
    )

# -----------------------------
# Create users table
# -----------------------------
def create_users_table():
    conn = get_connection()
    cur = conn.cursor()
    create_table_query = """
    CREATE TABLE IF NOT EXISTS users (
        user_id SERIAL PRIMARY KEY,
        name VARCHAR(100) NOT NULL,
        email VARCHAR(255) UNIQUE NOT NULL,
        subscription_status VARCHAR(10) CHECK (subscription_status IN ('active', 'inactive')) DEFAULT 'active',
        email_frequency VARCHAR(10) CHECK (email_frequency IN ('daily', 'weekly')) DEFAULT 'daily',
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );
    """
    cur.execute(create_table_query)
    conn.commit()
    cur.close()
    conn.close()
    print("Users table created successfully.")

# -----------------------------
# Add a new user
# -----------------------------
def add_user(name, email, frequency="daily"):
    conn = get_connection()
    cur = conn.cursor()
    query = """
        INSERT INTO users (name, email, email_frequency)
        VALUES (%s, %s, %s)
        ON CONFLICT (email) DO NOTHING;
    """
    cur.execute(query, (name, email, frequency))
    conn.commit()
    cur.close()
    conn.close()
    print(f"User added: {name} <{email}>")

# -----------------------------
# Update subscription status
# -----------------------------
def update_subscription(email, status):
    conn = get_connection()
    cur = conn.cursor()
    query = "UPDATE users SET subscription_status = %s WHERE email = %s;"
    cur.execute(query, (status, email))
    conn.commit()
    cur.close()
    conn.close()
    print(f"Subscription status for {email} set to {status}")

# -----------------------------
# Fetch active users
# -----------------------------
def get_active_users(frequency="daily"):
    conn = get_connection()
    cur = conn.cursor()
    query = "SELECT name, email FROM users WHERE subscription_status = 'active' AND email_frequency = %s;"
    cur.execute(query, (frequency,))
    users = cur.fetchall()
    cur.close()
    conn.close()
    return [{"name": n, "email": e} for n, e in users]


# =======================================================
# QUOTE FETCH FUNCTION
# =======================================================
def fetch_quote():
    url = "https://zenquotes.io/api/today"
    try:
        response = requests.get(url, timeout=10)
        if response.status_code != 200:
            raise Exception(f"Status code {response.status_code}")
        
        data = response.json()
        
        if not isinstance(data, list) or not data[0].get("q"):
            raise Exception("Malformed response structure")

         # Validate structure: expect list with at least one dictionary
        if not isinstance(data, list) or len(data) == 0 or not isinstance(data[0], dict):
            raise Exception("Unexpected data format")
        

        quote, author = data[0]["q"].strip(), data[0]["a"].strip()
        logging.info(f"Quote fetched successfully: “{quote}” — {author}")
        return quote, author
    except Exception as e:
        logging.error(f"Quote fetch failed: {e}")
        return None, None

# =======================================================
# EMAIL SENDING FUNCTION (with retries)
# =======================================================
def send_email(to_email, subject, body, retries=3, delay=5):
    for attempt in range(1, retries + 1):
        try:
            msg = MIMEMultipart()
            msg["From"] = SENDER_EMAIL
            msg["To"] = to_email
            msg["Subject"] = subject
            msg.attach(MIMEText(body, "plain"))

            with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
                server.starttls()
                server.login(SENDER_EMAIL, SENDER_PASSWORD)
                server.send_message(msg)

            logging.info(f"Email sent to {to_email}")
            return True
        except Exception as e:
            logging.warning(f"Attempt {attempt} failed for {to_email}: {e}")
            time.sleep(delay)
    logging.error(f"Failed to send email to {to_email} after {retries} retries")
    return False

def send_email_admin(to_email, subject, body, attachments=None, retries=3, delay=5):
    """
    Sends an email with optional attachments and retry logic.
    attachments: list of file paths
    """
    for attempt in range(1, retries + 1):
        try:
            msg = MIMEMultipart()
            msg["From"] = SENDER_EMAIL
            msg["To"] = to_email
            msg["Subject"] = subject
            msg.attach(MIMEText(body, "plain"))

            # Attach files (if any)
            if attachments:
                for file_path in attachments:
                    if os.path.exists(file_path):
                        with open(file_path, "rb") as f:
                            part = MIMEBase("application", "octet-stream")
                            part.set_payload(f.read())
                        encoders.encode_base64(part)
                        part.add_header(
                            "Content-Disposition",
                            f"attachment; filename={os.path.basename(file_path)}",
                        )
                        msg.attach(part)
                    else:
                        logging.warning(f"Attachment not found: {file_path}")

            with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
                server.starttls()
                server.login(SENDER_EMAIL, SENDER_PASSWORD)
                server.send_message(msg)

            logging.info(f"Email sent to {to_email}")
            return True
        except Exception as e:
            logging.warning(f"Attempt {attempt} failed for {to_email}: {e}")
            time.sleep(delay)
    logging.error(f"Failed to send email to {to_email} after {retries} retries")
    return False

# =======================================================
# MAIN WORKFLOW
# =======================================================
def main():
    # Create Data to used for testing 
    create_users_table()
    add_user("Kabir Mohammed", "kabirolawlemohammed@yahoo.com", "daily")
    add_user("Bala Usman", "skyfortcafe@gmail.com", "weekly")
    add_user("Sanusi Badaru", "skyfortcafe@yahoo.com", "daily")
    add_user("Abubakar Mohammed", "skyfortglobalresourcesltd@yahoo.com", "daily")
    update_subscription("skyfortcafe@gmail.com", "inactive")
    
    quote, author = fetch_quote()
    
    if not quote:
        logging.error("Aborting: No quote available today.")
        return
        
    USERS = get_active_users(frequency="daily")
    
    total_users = len(USERS)
    success_count = 0
    fail_count = 0
    retry_events = []

    for user in USERS:
        personalized_body = (
            f"Hi {user['name']},\n\n"
            f"Today's quote:\n“{quote}” — {author}\n\n"
            "Stay inspired,\nYour Daily Quote Service"
        )
        subject = f"Your Daily Quote - {author}"

        success = send_email(user["email"], subject, personalized_body)
        if success:
            success_count += 1
        else:
            fail_count += 1
            retry_events.append(user["email"])

    # Prepare daily summary for admin
    summary = (
        f"Daily Quote Summary - {datetime.now().strftime('%Y-%m-%d')}\n\n"
        f"Total users: {total_users}\n"
        f"Emails sent successfully: {success_count}\n"
        f"Failed deliveries: {fail_count}\n"
        f"Retried users: {', '.join(retry_events) if retry_events else 'None'}\n"
        f"\nQuote: “{quote}” — {author}"
    )

    # Send summary email to admin
    #send_email(ADMIN_EMAIL, "Daily Quote Service Summary", summary)
    send_email_admin(
        ADMIN_EMAIL,
        "Daily Quote Service Summary (with Logs)",
        summary,
        attachments=[LOG_FILE]
    )

    logging.info(f"Daily summary sent to admin {ADMIN_EMAIL}")
    print("Email delivery process completed. Check log for details.")

if __name__ == "__main__":
    main()


In [None]:



# -----------------------------
# Example Usage
# -----------------------------
if __name__ == "__main__":
    create_users_table()
    add_user("Kabir Mohammed", "kabir@example.com", "daily")
    add_user("Aisha Bello", "aisha@example.com", "weekly")
    update_subscription("aisha@example.com", "inactive")

    active_daily_users = get_active_users("daily")
    print("Active daily subscribers:", active_daily_users)
