-
-
Notifications
You must be signed in to change notification settings - Fork 38
/
mailbox.py
207 lines (163 loc) · 6.75 KB
/
mailbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
# Copyright Kevin Deldycke <kevin@deldycke.com> and contributors.
# All Rights Reserved.
#
# This program is Free Software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
import inspect
import mailbox
from functools import partial
from pathlib import Path
from boltons.dictutils import FrozenDict
from boltons.iterutils import flatten
from . import logger
from .colorize import choice_style
from .mail import DedupMail
""" Patch and tweak Python's standard librabry mailboxes constructors to set
sane defaults. Also forces out our own message factories to add deduplication
tools and utilities. """
def build_box_constructors():
"""Build our own mail constructors for each mailbox format.
Gather all constructors defined by the standard Python library and extend them
with our ``DedupMail`` class.
"""
# Only keep subclasses of the ``mailbox.Mailbox`` interface, but the latter and
# all others starting with an underscore.
for _, klass in inspect.getmembers(mailbox, inspect.isclass):
if (
klass != mailbox.Mailbox
and not klass.__name__.startswith("_")
and issubclass(klass, mailbox.Mailbox)
):
# Fetch the default factory for each mailbox type based on naming
# conventions.
message_klass = getattr(mailbox, f"{klass.__name__}Message")
assert issubclass(message_klass, mailbox.Message)
# Extend the default factory with DedupMail class.
factory_klass = type(
f"{klass.__name__}DedupMail",
(DedupMail, message_klass, object),
{
"__doc__": f"Extend the default message factory for {klass} with "
"our own ``DedupMail`` class to add deduplication utilities."
},
)
# Set our own custom factory and safety options to default constructor.
constructor = partial(klass, factory=factory_klass, create=False)
# Generates our own box_type_id for use in CLI parameters.
box_type_id = klass.__name__.lower()
yield box_type_id, constructor
# Mapping between supported box type IDs and their constructors.
BOX_TYPES = FrozenDict(build_box_constructors())
# Categorize each box type into its structure type.
BOX_STRUCTURES = FrozenDict(
{
"file": {"mbox", "mmdf", "babyl"},
"folder": {"maildir", "mh"},
}
)
# Check we did not forgot any box type.
assert set(flatten(BOX_STRUCTURES.values())) == set(BOX_TYPES)
# List of required sub-folders defining a properly structured maildir.
MAILDIR_SUBDIRS = frozenset(("cur", "new", "tmp"))
def autodetect_box_type(path):
"""Auto-detect the format of the mailbox located at the provided path.
Returns a box type as indexed in the ``box_types`` dictionnary above.
If the path is a file, then it is considered as an ``mbox``. Else, if th
provided path is a folder and feature the expecteed sub-directories, it is
parsed as a ``maildir``.
Future finer autodetection heuristics should be implemented here. Some ideas:
* single mail from a maildir
* plain text mail content
* other mailbox formats supported in Python's std lib:
* ``MH``
* ``Babyl``
* ``MMDF``
"""
box_type = None
# Validates folder is a maildir.
if path.is_dir():
for subdir in MAILDIR_SUBDIRS:
if not path.joinpath(subdir).is_dir():
raise ValueError(f"Missing sub-directory {subdir!r}")
box_type = "maildir"
# Validates folder is a mbox.
elif path.is_file():
box_type = "mbox"
if not box_type:
raise ValueError("Unrecognized mail source type.")
logger.info(f"{choice_style(box_type)} detected.")
return box_type
def open_box(path, box_type=False, force_unlock=False):
"""Open a mailbox.
Returns a list of boxes, one per sub-folder. All are locked, ready for operations.
If ``box_type`` is provided, forces the opening of the box in the specified format.
Defaults to (crude) autodetection.
"""
logger.info(f"\nOpening {choice_style(path)} ...")
path = Path(path)
if not box_type:
box_type = autodetect_box_type(path)
else:
logger.warning(f"Forcing {box_type} format.")
constructor = BOX_TYPES[box_type]
# Do not allow the constructor to create a new mailbox if not found.
box = constructor(path, create=False)
return open_subfolders(box, force_unlock)
def lock_box(box, force_unlock):
"""Lock an opened box and allows for forced unlocking.
Returns the locked box.
"""
try:
logger.debug("Locking box...")
box.lock()
except mailbox.ExternalClashError:
logger.error("Box already locked!")
# Remove the lock manually and re-lock.
if force_unlock:
logger.warning("Forcing removal of lock...")
# Forces internal metadata.
box._locked = True
box.unlock()
box.lock()
# Re-raise error.
else:
raise
logger.debug("Box opened.")
return box
def open_subfolders(box, force_unlock):
"""Browse recursively the subfolder tree of a box.
Returns a list of opened and locked boxes, each for one subfolder.
"""
folder_list = [lock_box(box, force_unlock)]
# Skip box types not supporting subfolders.
if hasattr(box, "list_folders"):
for folder_id in box.list_folders():
logger.info(f"Opening subfolder {folder_id} ...")
folder_list += open_subfolders(box.get_folder(folder_id), force_unlock)
return folder_list
def create_box(path, box_type=False):
"""Creates a brand new box from scratch."""
assert isinstance(path, Path)
logger.info(
f"Creating new {choice_style(box_type)} box at {choice_style(str(path))} ..."
)
if path.exists():
raise FileExistsError(path)
constructor = BOX_TYPES[box_type]
# Allow the constructor to create a new mail box as we already double-checked
# beforehand it does not exist.
box = constructor(path, create=True)
logger.debug("Locking box...")
box.lock()
return box