<a href="https://colab.research.google.com/github/jonwright13/py-mailer/blob/main/LetterCMailSender.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports

In [None]:
# Install packages
!pip install msal
!pip install python-docx
!pip install mammoth
!pip install dnspython

# Import packages
import msal
import requests
import pandas as pd
import markdown
import mammoth
import re
import dns.resolver
import smtplib
import csv

import docx
from docx.opc.constants import RELATIONSHIP_TYPE as RT

# Import google specific packages
from google.colab import userdata

from google.colab import drive

from google.colab import auth
auth.authenticate_user()

import gspread
from google.auth import default

# Functions

In [None]:
def authenticate():
  # Configuration
  tenant_id = userdata.get('tenant_id')
  client_id = userdata.get('client_id')
  client_secret = userdata.get('app_secret')

  authority = f"https://login.microsoftonline.com/{tenant_id}"
  scopes = ["https://graph.microsoft.com/.default"]

  # Authentication
  app = msal.ConfidentialClientApplication(
      client_id, authority=authority, client_credential=client_secret
  )

  return app.acquire_token_for_client(scopes=scopes)

def make_a_tag(url, text):
  return f'<a href="{url}">{text}</a>'

def make_social_html(url, src, alt, width="50px"):
  return f'<a href="{url}"><img src="{src}" alt="{alt}" style="width:{width};height:auto;"/></a>'

def make_socials_html(site_html, socials_list):
  socials_html = "".join([html.replace('style="margin: 0;"', 'style="margin: 0; align-self: center;"') for html in socials_list])
  return f'<div style="display: flex; flex-direction: column; margin-top: 20px; margin-left: 5px; gap: 1px;">{site_html}<div>{socials_html}</div></div>'



def excel_boolean_to_python(excel_value):
    """
    Converts an Excel boolean value (TRUE or FALSE) to a Python boolean.

    Args:
        excel_value: The Excel boolean value (string or boolean).

    Returns:
        A Python boolean (True or False).
    """

    if isinstance(excel_value, bool):
        return excel_value  # If it's already a Python boolean, return it.

    if isinstance(excel_value, str):
        excel_value = excel_value.strip().upper()  # Remove whitespace and convert to uppercase.
        if excel_value == "TRUE":
            return True
        elif excel_value == "FALSE":
            return False
        else:
            return False #Or raise an exception, depending on your error handling preference.
    else:
        return bool(excel_value)

def get_emails_from_sheet(sheet_id):
  creds, _ = default()
  gc = gspread.authorize(creds)

  # Open the Google Sheet by its ID (from the URL)
  sheet = gc.open_by_key(sheet_id)

  # Select the worksheet (e.g., the first sheet)
  worksheet = sheet['Final']

  # Get all values from the worksheet
  data = worksheet.get_all_values()

  # Create a Pandas DataFrame
  df = pd.DataFrame(data[1:], columns=data[0])  # Assuming the first row is the header

  # Construct the list of dictionaries
  email_list = []
  for index, row in df.iterrows():
    if excel_boolean_to_python(row['Include']):
      email_list.append({'first_name': row['First Name'], 'last_name': row['Last Name'], 'email': row['Emails'], 'personalisation': row['Personalisation']})

  return email_list

def convert_word_to_html(docx_path):
    """Converts a Word document to HTML."""
    try:
        with open(docx_path, "rb") as docx_file:
            result = mammoth.convert_to_html(docx_file)
            html = result.value
            return html
    except FileNotFoundError:
        return f"Error: File '{docx_path}' not found."
    except Exception as e:
        return f"An error occurred: {e}"

def is_valid_email(email):
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return re.match(pattern, email) is not None

def check_mx_records(email):
    domain = email.split('@')[-1]
    try:
        mx_records = dns.resolver.resolve(domain, 'MX')
        return bool(mx_records)
    except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN, dns.exception.Timeout):
        return False

def check_smtp(email):
    domain = email.split('@')[-1]
    try:
        mx_records = dns.resolver.resolve(domain, 'MX')
        mail_server = str(mx_records[0].exchange)
        server = smtplib.SMTP(mail_server)
        server.set_debuglevel(0)
        server.quit()
        return True
    except Exception:
        return False

def send_mass_email(subject, mailers, access_token, cc_email=None):
    endpoint = userdata.get("email_endpoint")
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    for recipient in mailers:
      email = recipient['email']
      body = recipient['body']

      email_valid = is_valid_email(email)

      if (email_valid):
        mx_valid = check_mx_records(email)

        if(mx_valid):
          smtp_check = check_smtp(email)

          if smtp_check:

            message = {
                  "message": {
                      "subject": subject,
                      "body": {"contentType": "HTML", "content": body}, #use the personalized body
                      "toRecipients": [{"emailAddress": {"address": email}}],  # Only one recipient per email
                      # "ccRecipients": [{"emailAddress": {"address": cc_email}}]
                  },
              }

            response = requests.post(endpoint, headers=headers, json=message)

            if response.status_code == 202:
                print(f"Email sent successfully to: {email}")
            else:
                print(f"Failed to send email to {email}. Status code: {response.status_code}, Response: {response.text}")
          else:
            print(f"SMTP response failed. Aborting: {email}")
        else:
          print(f"Mail server does not exist. Aborting: {email}")
      else:
        print(f"Email not valid. Aborting: {email}")



# Process

In [None]:
# Build Email List
creds, _ = default()
gc = gspread.authorize(creds)

# Open the Google Sheet by its ID (from the URL)
sheet = gc.open_by_key(userdata.get("sheet_id_prod"))    # Prod emails
# sheet = gc.open_by_key(userdata.get("sheet_id_dev"))    # Test Emails

# Select the worksheet (e.g., the first sheet)
worksheet = sheet.worksheet("Final")

# Get all values from the worksheet
data = worksheet.get_all_values()

# Create a Pandas DataFrame
df = pd.DataFrame(data[1:], columns=data[0])  # Assuming the first row is the header

# Enable for dev
# df = df.head()

# Construct the list of dictionaries
email_list = []
for index, row in df.iterrows():
  if not excel_boolean_to_python(row['Unsubscribed']):

    email = row['Email']

    # Check if email is of a valid format first
    email_valid = is_valid_email(email)

    # Check if the MX records for the mail server are valid
    if email_valid:
      mx_valid = check_mx_records(email)
    else:
      mx_valid = False

    # Check if the SMTP server sends a response after a ping
    if mx_valid:
      smtp_check = check_smtp(email)
    else:
      smtp_check = False

    email_list.append({
        'first_name': row['First Name'], 'email': email, 'personalisation': row['Personalisation'], "language": row["Language"],
        "email_valid": email_valid, "mx_valid": mx_valid, "smtp_check": smtp_check
        })

In [None]:
for item in email_list:
  print(item)

In [None]:
# Prepare HTML Email
drive.mount('/content/drive')

promo_link = "https://youtu.be/f_phPlJVM1w"
promo_link_text = "Promo"
promo_html = make_a_tag(promo_link, promo_link_text)

yt_png = "https://img.icons8.com/?size=100&id=19318&format=png&color=000000"
yt_link = "https://www.youtube.com/@Letter-C-Cordee"
site_png = "https://i.imgur.com/coQa7jT.png"
site_link = "www.letter-c.com"
li_png = "https://img.icons8.com/?size=100&id=13930&format=png&color=000000"
li_link = "https://www.linkedin.com/company/letter-c-cordee"
yt_html = make_social_html(yt_link, yt_png, "YouTube")
site_html = make_social_html(site_link, site_png, "Letter-C", "100px")
li_html = make_social_html(li_link, li_png, "LinkedIn")

socials_htmls = make_socials_html(site_html, [yt_html, li_html])

unsubscribe_link = "https://sites.google.com/view/letter-c-unsubscribe/home"

languages = {
    "English": {
        "path": userdata.get("doc_path_en"),
        "subject": "Letter C: Humanitarian Aid YouTube Channel Launch",
        "signature": '<p>Emery Brusset<br>Founder</p><p><a href="emery.brusset@letter-c.com">emery.brusset@letter-c.com</a><br><a href="brusset@social-terrain.com">brusset@social-terrain.com</a><br><a href="management@letter-c.com">management@letter-c.com</a></p>',
        "unsubscribe": f"""
          <div>
              <p>If you would like to unsubscribe, click the button below:</p>
              <a href="{unsubscribe_link}" style="text-decoration:underline; display:inline-block;">
                  Unsubscribe
              </a>
          </div>
          """
    },
    "French": {
        "path": userdata.get("doc_path_fr"),
        "subject": "Lettre C : Lancement de la chaîne YouTube sur l'aide humanitaire",
        "signature": '<p>Emery Brusset<br>Fondateur</p><p><a href="emery.brusset@letter-c.com">emery.brusset@letter-c.com</a><br><a href="brusset@social-terrain.com">brusset@social-terrain.com</a><br><a href="management@letter-c.com">management@letter-c.com</a></p>',
        "unsubscribe": f"""
          <div>
              <p>Si vous souhaitez vous désabonner, cliquez sur le bouton ci-dessous:</p>
              <a href="{unsubscribe_link}" style="text-decoration:underline; display:inline-block;">
                  Désinscrire
              </a>
          </div>
          """
    }
}

email_htmls = {}

for lang, config in languages.items():

  # Get HTML from word doc
  docx_path = config['path']
  print("Opening", docx_path)
  with open(docx_path, "rb") as docx_file:
      result = mammoth.convert_to_html(docx_file)
      html_body = result.value

  signature = config['signature']
  unsubscribe = config['unsubscribe']

  # Add in the promo link html
  html_body = html_body.replace("<p>&lt;promolink&gt;</p>", promo_html)

  # Add in the socials html
  html_body = html_body.replace("<p>&lt;socials&gt;</p>", socials_htmls)

  # Add in the signature (Language dependent)
  html_body = html_body.replace("<p>&lt;signature&gt;</p>", signature)

  # Add in the unsubscribe button (Language dependent)
  html_body = html_body.replace("<p>&lt;unsubscribe&gt;</p>", unsubscribe)

  email_htmls[lang] = {
      "subject": config['subject'],
      "body": html_body
  }



In [None]:
email_htmls['English']

In [None]:
mailing_list = []
for person in email_list:
  if person['email_valid'] and person['mx_valid'] and person['smtp_check']:
    name = person['first_name']
    email = person['email']
    language = person['language']
    personalisation = person['personalisation']
    print(person)

    if language != "English" or language != "French":
      language = "English"

    html = email_htmls[language]['body']
    subject = email_htmls[language]['subject']

    html = html.replace("&lt;name&gt;", f"Bonjour {name}")

    if personalisation:
      html = html.replace("&lt;personalisation&gt;", f'\n{personalisation}')
    else:
      html = html.replace("&lt;personalisation&gt;", "")

    mailing_list.append({"email": email, "html": html, "subject": subject})



In [None]:
mailing_list[1]['html']

In [None]:
# Function to read log and check if an email was already sent
def email_already_sent(email):
    try:
        with open(LOG_FILE, mode="r", newline="") as file:
            reader = csv.reader(file)
            sent_emails = {row[0] for row in reader}  # Set of sent email addresses
        return email in sent_emails
    except FileNotFoundError:
        return False  # Log file doesn't exist yet, so no emails have been sent

# Function to log email status
def log_email(email, status):
    with open(LOG_FILE, mode="a", newline="") as file:
        writer = csv.writer(file)
        writer.writerow([email, status])

In [None]:
auth = authenticate()
LOG_FILE = "email_log.csv"

sent_count = 0
skip_count = 0
fail_count = 0

if "access_token" in auth:
    access_token = auth["access_token"]

    endpoint = userdata.get('email_endpoint')
    headers = {
        "Authorization": f"Bearer {access_token}",
        "Content-Type": "application/json",
    }

    for recipient in mailing_list:
      email = recipient['email']
      body = recipient['html']
      subject = recipient['subject']

      # Skip email if already sent
      if email_already_sent(email):
          print(f"Skipping {email}, already sent.")
          skip_count += 1
          continue

      message = {
            "message": {
                "subject": subject,
                "body": {"contentType": "HTML", "content": body}, #use the personalized body
                "toRecipients": [{"emailAddress": {"address": email}}],  # Only one recipient per email
            },
        }

      response = requests.post(endpoint, headers=headers, json=message)

      if response.status_code == 202:
          print(f"Email sent successfully to: {email}")
          log_email(email, "Success")
          sent_count += 1
      else:
          print(f"Failed to send email to {email}. Status code: {response.status_code}, Response: {response.text}")
          log_email(email, "Failed")
          fail_count += 1

print()
print(f"Completed mail automation:\nSent: {sent_count}\nSkipped: {skip_count}\nFailed: {fail_count}")
