In [None]:
from contextlib import contextmanager
import os
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import win32com.client as wc
from typing import List, Optional
from datetime import datetime
import calendar

# Constants and variables
output_filepath: Path = Path("./output/test.xlsx")
mail_from: str = "sender@mail.com"
mail_to: List[str] = ["recipient1@mail.com;", "recipient2@mail.com;"]
mail_subject: str = "Test email"
mail_body: str = "<p>This is a test email.</p>"
mail_attachments: List[str] = [str(output_filepath.resolve())]
mail_cc: List[str] = ["cc1@mail.com;", "cc2@mail.com;"] # Optional

# Load environment variables
def load_env_vars() -> str:
    try:
        current_directory = (
            Path(__file__).resolve().parent if "__file__" in locals() else Path.cwd()
        )
        environment_variables = current_directory / ".env"
        load_dotenv(environment_variables)
        USER = os.getenv("USER", "default_user")
        PASSWORD = os.getenv("PASSWORD", "default_password")
        HOST = os.getenv("HOST", "default_host")
        PORT = os.getenv("PORT", "default_port")
        DATABASE = os.getenv("SERVICE_NAME", "default_service_name")

        if not all([USER, PASSWORD, HOST, PORT, DATABASE]):
            raise ValueError("One or more environment variables are not set.")

        connection_string = f"oracle+oracledb://{USER}:{PASSWORD}@{HOST}:{PORT}/{DATABASE}"
        print("Environmental variables loaded.")
        return connection_string
    except Exception as error:
            print(f"An error occurred: {error}")
            return str(None)
    
# Create database connection
@contextmanager
def create_db_connection(connection_string: str):
    engine: Engine = create_engine(connection_string)
    connection: Connection = engine.connect()
    try:
        yield connection
    except SQLAlchemyError as error:
        print(f"An error occurred: {error}")
        return None

# Get last Friday of the current month   
def last_friday_current_month(date: datetime) -> str:
    year = date.year
    month = date.month
    _, last_day = calendar.monthrange(year, month)
    for day in range(last_day, 0, -1):
        if calendar.weekday(year, month, day) == calendar.FRIDAY:
            date = datetime(year, month, day)
            return date.strftime("%d-%b-%y") 
    return str(None)
        
# Get last Friday of the previous month
def last_friday_previous_month(date: datetime) -> str:
    year = date.year
    month = date.month - 1
    _, last_day = calendar.monthrange(year, month)
    for day in range(last_day, 0, -1):
        if calendar.weekday(year, month, day) == calendar.FRIDAY:
            date = datetime(year, month, day)
            return date.strftime("%d-%b-%y")
    return str(None)

# Read SQL script
def read_sql_script(sql_path: str) -> str:
    try:
        with open(sql_path) as file:
            query = file.read()
            print("SQL query read.")
            return query
    except Exception as error:
        print(f"An error occurred: {error}")
        return str(None)

# Execute query
def execute_query(connection: Connection, query: str, params: dict) -> pd.DataFrame:
    try:
        data = pd.read_sql(text(query), connection, params=params)
        print("Data returned.")
        return data
    except Exception as error:
        print(f"An error occurred: {error}")
        return pd.DataFrame(None)

# Output data to spreadsheet
def write_to_excel(data: pd.DataFrame, output_filepath: str) -> None:
    with pd.ExcelWriter(
        output_filepath,
        engine="xlsxwriter",
        datetime_format="dd mmm yyyy hh:mm:ss",
        date_format="dd mmm yyyy",
    ) as writer:
        data.to_excel(writer, sheet_name="Sheet1", index=False, header=False, startrow=1,)
        workbook = writer.book
        worksheet = writer.sheets["Sheet1"]
        header_format = workbook.add_format(
            {
                "bold": True,
                "text_wrap": True,
                "valign": "top",
                "fg_color": "#9FC5E8",
                "border": 1,
            }
        )
        for col_num, value in enumerate(data.columns):
            worksheet.write(0, col_num, value, header_format)
        print("Spreadsheet created.")

# Email function
def send_email(mail_to: List[str], subject: str, mail_body: str, mail_attachments: List[str], mail_cc: Optional[List[str]] = None, mail_from: Optional[str] = None) -> None:
    try:
        outlook = wc.Dispatch("Outlook.Application")
        mail = outlook.CreateItem(0)
        if mail_from is not None:
           mail.SentOnBehalfOfName = mail_from
        mail.To = ", ".join(mail_to)
        if mail_cc is not None:
            mail.CC = ", ".join(mail_cc)
        mail.Subject = subject
        mail.HTMLBody = mail_body
        for attachment in mail_attachments:
            mail.Attachments.Add(attachment)
        mail.Display()
        print("Email created.")
    except Exception as error:
        print(f"An error occurred: {error}")
    
# Main program
def main() -> None:
    print("Starting program.")
    connection_string: str = load_env_vars()
    with create_db_connection(connection_string) as connection:
        sql_path: Path = Path("./sql/query.sql")
        query = read_sql_script(str(sql_path))
        if query:
            date: datetime = datetime.now()
            end_date: str = last_friday_current_month(date)
            start_date: str = last_friday_previous_month(date)
            params: Dict[str, str] = {"start_date": f"{start_date}", "end_date": f"{end_date}"}
            data: pd.DataFrame = execute_query(connection, query, params)
            write_to_excel(data, str(output_filepath))
            send_email(mail_to, mail_subject, mail_body, mail_attachments, mail_cc, mail_from)
    print("Program finished.")
if __name__ == "__main__":
    main()