In [9]:
from mongoengine import DynamicDocument, QuerySet
from dotenv import load_dotenv
from db_connection import connect_db
import json
from typing import Generator
from datetime import datetime, timedelta
import aiohttp
import asyncio

from email_sender import email_sender


load_dotenv()
connect_db()

ValueError: Undefined Mailgun domain or API key

In [None]:
class User(DynamicDocument):
    meta = {'collection': 'user'}

In [None]:
def get_all_users() -> Generator[str, None, None]:
    try:
        users: QuerySet = User.objects()
        for user in users:
            yield user.to_json()
    except Exception as e:
        print(f"An error occurred: {e}")

### Scheduled Daily Activity Email Report
The following code is scheduled to run at 10 am IST every day to send you an email report calculating the new daily active users and newly created users

In [None]:
"""
@SCHEDULE
cron: 0 30 4 * * *
"""
try:
    data_list = [json.loads(data) for data in get_all_users()]
    past_day = datetime.now() - timedelta(days=1)

    # Filter out all users created within the past 24 hours
    new_users = [
        user for user in data_list if datetime.fromtimestamp(user['created_at']['$date'] / 1000) > past_day
    ]

    # Daily active users
    active_users = [
        user for user in data_list if datetime.fromtimestamp(user['last_login']['$date'] / 1000) > past_day
    ]

    num_new_users = len(new_users)
    num_active_users = len(active_users)

    subject = f"Daily User Report: {num_new_users} new users, {num_active_users} daily active users"
    body = f"""<html><body>
               Total new users in the past 24 hours: {num_new_users}<br>
               Total daily active users in the past 24 hours: {num_active_users}
               </body></html>"""
    
    email_sender.send_email(
        source="ReportBot<report-bot@mydomain.com>",
        recipients=["your_email@example.com"],
        subject=subject,
        body=body
    )
    print("Sent report email")
except Exception as e:
    print("Error: ", e)

### Scheduled Health Checks
The following code is run every 3 minutes to perform a scheduled health check and send an alert email if the site is down or returns an unexpected status code.  
If the ping fails, it will retry a specified number of times to ensure the website is actually down.

In [14]:
"""
@SCHEDULE
interval: 3m
"""
async def scheduled_health_check():
    website_url = "https://www.yourwebsite.com"
    retries = 3
    retry_interval = 10  # in seconds

    async with aiohttp.ClientSession() as session:  # Ensuring the session context stays open for retries
        for i in range(retries):
            try:
                async with session.get(website_url) as response:
                    if response.status == 200:
                        print(f"{datetime.now()} - Website is up.")
                        return
                    else:
                        print(
                            f"{datetime.now()} - Unexpected response: {response.status}"
                        )
            except aiohttp.ClientError as e:
                print(f"{datetime.now()} - Request failed. Error: {e}")

            if i < retries:  # No need to sleep after the last try
                await asyncio.sleep(retry_interval)

        print(f"{datetime.now()} - All retries failed, sending failure report")
        try:
            subject = "URGENT: Your website is down!"
            body = """<html><body>Your website is currently down. All retries failed.</body></html>"""
            email_sender.send_email(
                source="ReportBot<report-bot@mydomain.com>",
                recipients=["your_email@example.com"],
                subject=subject,
                body=body,
            )
            print(f"{datetime.now()} - Sent alert email.")
        except Exception as e:  # A more specific exception is preferable
            print(f"{datetime.now()} - Error in sending email: {e}")