Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions emailer_lib/egress.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,31 +284,33 @@ def send_intermediate_email_with_smtp(
msg_alt.attach(MIMEText(i_email.text, "plain"))

# Attach inline images
for image_name, image_base64 in i_email.inline_attachments.items():
img_bytes = base64.b64decode(image_base64)
img = MIMEImage(img_bytes, _subtype="png", name=f"{image_name}")
if i_email.inline_attachments:
for image_name, image_base64 in i_email.inline_attachments.items():
img_bytes = base64.b64decode(image_base64)
img = MIMEImage(img_bytes, _subtype="png", name=f"{image_name}")

img.add_header("Content-ID", f"<{image_name}>")
img.add_header("Content-Disposition", "inline", filename=f"{image_name}")
img.add_header("Content-ID", f"<{image_name}>")
img.add_header("Content-Disposition", "inline", filename=f"{image_name}")

msg.attach(img)
msg.attach(img)

# Attach external files (any type)
for filename in i_email.external_attachments:
with open(filename, "rb") as f:
file_data = f.read()

# Guess MIME type based on file extension
mime_type, _ = mimetypes.guess_type(filename)
if mime_type is None:
mime_type = "application/octet-stream"
main_type, sub_type = mime_type.split("/", 1)

part = MIMEBase(main_type, sub_type)
part.set_payload(file_data)
encoders.encode_base64(part)
part.add_header("Content-Disposition", "attachment", filename=filename)
msg.attach(part)
if i_email.external_attachments:
for filename in i_email.external_attachments:
with open(filename, "rb") as f:
file_data = f.read()

# Guess MIME type based on file extension
mime_type, _ = mimetypes.guess_type(filename)
if mime_type is None:
mime_type = "application/octet-stream"
main_type, sub_type = mime_type.split("/", 1)

part = MIMEBase(main_type, sub_type)
part.set_payload(file_data)
encoders.encode_base64(part)
part.add_header("Content-Disposition", "attachment", filename=filename)
msg.attach(part)

# Send via SMTP with appropriate security protocol
if security == "ssl":
Expand Down
32 changes: 18 additions & 14 deletions emailer_lib/ingress.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
from __future__ import annotations
from base64 import b64encode
import json
import re

from email.message import EmailMessage
from mjml import mjml2html

from .structs import IntermediateEmail
from .mjml import MJMLTag
from .mjml.image_processor import _process_mjml_images
import warnings

__all__ = [
"redmail_to_intermediate_email",
Expand Down Expand Up @@ -43,31 +45,33 @@ def yagmail_to_intermediate_email():
pass


def mjml_to_intermediate_email(mjml_content: str) -> IntermediateEmail:
def mjml_to_intermediate_email(
mjml_content: str | MJMLTag,
) -> IntermediateEmail:
"""
Convert MJML markup to an IntermediateEmail

Parameters
------
----------
mjml_content
MJML markup string
MJML markup string or MJMLTag object

Returns
------
An Intermediate Email object

"""
email_content = mjml2html(mjml_content)
# Handle MJMLTag objects by preprocessing images
if isinstance(mjml_content, MJMLTag):
processed_mjml, inline_attachments = _process_mjml_images(mjml_content)
mjml_markup = processed_mjml._to_mjml()
else:
# String-based MJML, no preprocessing needed
warnings.warn("MJMLTag not detected; treating input as plaintext MJML markup", UserWarning)
mjml_markup = mjml_content
inline_attachments = {}

# Find all <img> tags and extract their src attributes
pattern = r'<img[^>]+src="([^"\s]+)"[^>]*>'
matches = re.findall(pattern, email_content)
inline_attachments = {}
for src in matches:
# in theory, retrieve the externally hosted images and save to bytes
# the user would need to pass CID-referenced images directly somehow,
# as mjml doesn't handle them
raise NotImplementedError("mj-image tags are not yet supported")
email_content = mjml2html(mjml_markup)

i_email = IntermediateEmail(
html=email_content,
Expand Down
1 change: 1 addition & 0 deletions emailer_lib/mjml/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
navbar_link,
social_element,
)
# _process_mjml_images is called internally by mjml_to_intermediate_email

__all__ = (
"MJMLTag",
Expand Down
114 changes: 77 additions & 37 deletions emailer_lib/mjml/_core.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,46 @@
# MJML core classes adapted from py-htmltools

## TODO: make sure Ending tags are rendered as needed
# https://documentation.mjml.io/#ending-tags
# https://documentation.mjml.io/#ending-tags

from typing import Dict, Mapping, Optional, Sequence, Union
import warnings
from io import BytesIO
from mjml import mjml2html


# Types for MJML
TagAttrValue = Union[str, float, bool, None]

TagAttrValue = Union[str, float, bool, None, bytes, BytesIO]
TagAttrs = Union[Dict[str, TagAttrValue], "TagAttrDict"]
TagChild = Union["MJMLTag", str, float, None, Sequence["TagChild"]]


class TagAttrDict(Dict[str, str]):
class TagAttrDict(Dict[str, Union[str, bytes, BytesIO]]):
"""
MJML attribute dictionary. All values are stored as strings.
MJML attribute dictionary. Most values are stored as strings, but bytes/BytesIO are preserved.
"""

def __init__(
self, *args: Mapping[str, TagAttrValue]
) -> None:
def __init__(self, *args: Mapping[str, TagAttrValue]) -> None:
super().__init__()
for mapping in args:
for k, v in mapping.items():
if v is not None:
self[k] = str(v)
# Preserve bytes and BytesIO objects as-is for image processing
if isinstance(v, (bytes, BytesIO)):
self[k] = v
else:
self[k] = str(v)

def update(self, *args: Mapping[str, TagAttrValue]) -> None:
for mapping in args:
for k, v in mapping.items():
if v is not None:
self[k] = str(v)
# Preserve bytes and BytesIO objects as-is for image processing
if isinstance(v, (bytes, BytesIO)):
self[k] = v
else:
self[k] = str(v)


class MJMLTag:
Expand All @@ -52,7 +60,7 @@ def __init__(
self.attrs = TagAttrDict()
self.children = []
self._is_leaf = _is_leaf

# Runtime validation for leaf tags
if self._is_leaf:
# For leaf tags, treat the first positional argument as content if provided
Expand All @@ -64,56 +72,67 @@ def __init__(
self.content = args[0]
else:
self.content = content

# Validate content type
if self.content is not None and not isinstance(self.content, (str, int, float)):
if self.content is not None and not isinstance(
self.content, (str, int, float)
):
raise TypeError(
f"<{tagName}> content must be a string, int, or float, "
f"got {type(self.content).__name__}"
)

# Validate attributes parameter type
if attributes is not None and not isinstance(attributes, (dict, TagAttrDict)):
if attributes is not None and not isinstance(
attributes, (dict, TagAttrDict)
):
raise TypeError(
f"attributes must be a dict or TagAttrDict, got {type(attributes).__name__}."
)

# Process attributes
if attributes is not None:
self.attrs.update(attributes)
else:
# For container tags
self.content = content

# Validate attributes parameter type
if attributes is not None and not isinstance(attributes, (dict, TagAttrDict)):
if attributes is not None and not isinstance(
attributes, (dict, TagAttrDict)
):
raise TypeError(
f"attributes must be a dict or TagAttrDict, got {type(attributes).__name__}. "
f"If you meant to pass children, use positional arguments for container tags."
)

# Collect children (for non-leaf tags only)
for arg in args:
if (
isinstance(arg, (str, float)) or arg is None or isinstance(arg, MJMLTag)
isinstance(arg, (str, float))
or arg is None
or isinstance(arg, MJMLTag)
):
self.children.append(arg)
elif isinstance(arg, Sequence) and not isinstance(arg, str):
self.children.extend(arg)

# Process attributes
if attributes is not None:
self.attrs.update(attributes)

# TODO: confirm if this is the case... I don't think it is
# # If content is provided, children should be empty
# if self.content is not None:
# self.children = []

def render_mjml(self, indent: int = 0, eol: str = "\n") -> str:
def _to_mjml(self, indent: int = 0, eol: str = "\n") -> str:
"""
Render MJMLTag and its children to MJML markup.
Ported from htmltools Tag rendering logic.

Note: BytesIO/bytes in image src attributes are not supported by _to_mjml().
Pass the MJMLTag directly to mjml_to_intermediate_email() instead.
"""

def _flatten(children):
Expand All @@ -125,6 +144,16 @@ def _flatten(children):
elif isinstance(c, (str, float)):
yield c

# Check for BytesIO/bytes in mj-image tags and raise clear error
if self.tagName == "mj-image" and "src" in self.attrs:
src_value = self.attrs["src"]
if isinstance(src_value, (bytes, BytesIO)):
raise ValueError(
"Cannot render MJML with BytesIO/bytes in image src attribute. "
"Pass the MJMLTag object directly to mjml_to_intermediate_email() instead of calling _to_mjml() first. "
"Example: i_email = mjml_to_intermediate_email(doc)"
)

# Build attribute string
attr_str = ""
if self.attrs:
Expand All @@ -138,7 +167,7 @@ def _flatten(children):
child_strs = []
for child in _flatten(self.children):
if isinstance(child, MJMLTag):
child_strs.append(child.render_mjml(indent + 2, eol))
child_strs.append(child._to_mjml(indent + 2, eol))
else:
child_strs.append(str(child))
if child_strs:
Expand All @@ -152,53 +181,64 @@ def _flatten(children):
return f"{pad}<{self.tagName}{attr_str}></{self.tagName}>"

def _repr_html_(self):
return self.to_html()
from ..ingress import mjml_to_intermediate_email
return mjml_to_intermediate_email(self)._repr_html_()

# TODO: make something deliberate
def __repr__(self) -> str:
return self.render_mjml()

def to_html(self, **mjml2html_kwargs):
warnings.warn(
f"__repr__ not yet fully implemented for MJMLTag({self.tagName})",
UserWarning,
stacklevel=2,
)
return f"<MJMLTag({self.tagName})>"

# warning explain that they are not to pass this to intermediate email
def to_html(self, **mjml2html_kwargs) -> str:
"""
Render MJMLTag to HTML using mjml2html.

If this is not a top-level <mjml> tag, it will be automatically wrapped
in <mjml><mj-body>...</mj-body></mjml> with a warning.


Note: This method embeds all images as inline data URIs in the HTML.
For email composition with inline attachments, use mjml_to_intermediate_email() instead.

Parameters
----------
**mjml2html_kwargs
Additional keyword arguments to pass to mjml2html

Returns
-------
str
Result from mjml2html containing html content
Result from `mjml-python.mjml2html()` containing html content
"""
if self.tagName == "mjml":
# Already a complete MJML document
mjml_markup = self.render_mjml()
mjml_markup = self._to_mjml()
elif self.tagName == "mj-body":
# Wrap only in mjml tag
warnings.warn(
"to_html() called on <mj-body> tag. "
"Automatically wrapping in <mjml>...</mjml>. "
"For full control, create a complete MJML document with the mjml() tag.",
UserWarning,
stacklevel=2
stacklevel=2,
)
wrapped = MJMLTag("mjml", self)
mjml_markup = wrapped.render_mjml()
mjml_markup = wrapped._to_mjml()
else:
# Warn and wrap in mjml/mj-body
warnings.warn(
f"to_html() called on <{self.tagName}> tag. "
"Automatically wrapping in <mjml><mj-body>...</mj-body></mjml>. "
"For full control, create a complete MJML document with the mjml() tag.",
UserWarning,
stacklevel=2
stacklevel=2,
)
# Wrap in mjml and mj-body
wrapped = MJMLTag("mjml", MJMLTag("mj-body", self))
mjml_markup = wrapped.render_mjml()
mjml_markup = wrapped._to_mjml()

return mjml2html(mjml_markup, **mjml2html_kwargs)
Loading