Skip to content

Commit

Permalink
feat(Email Trigger (IMAP) Node): Migrate from imap-simple to `@n8n/…
Browse files Browse the repository at this point in the history
…imap` (#8899)

Co-authored-by: Jonathan Bennetts <jonathan.bennetts@gmail.com>
  • Loading branch information
netroy and Joffcom committed Apr 9, 2024
1 parent 2826104 commit 9f87cc2
Show file tree
Hide file tree
Showing 16 changed files with 616 additions and 89 deletions.
4 changes: 2 additions & 2 deletions package.json
Expand Up @@ -26,8 +26,8 @@
"start:tunnel": "./packages/cli/bin/n8n start --tunnel",
"start:windows": "cd packages/cli/bin && n8n",
"test": "turbo run test",
"test:backend": "pnpm --filter=!@n8n/chat --filter=!n8n-design-system --filter=!n8n-editor-ui --filter=!n8n-nodes-base test",
"test:nodes": "pnpm --filter=n8n-nodes-base test",
"test:backend": "pnpm --filter=!@n8n/chat --filter=!n8n-design-system --filter=!n8n-editor-ui --filter=!n8n-nodes-base --filter=!@n8n/n8n-nodes-langchain test",
"test:nodes": "pnpm --filter=n8n-nodes-base --filter=@n8n/n8n-nodes-langchain test",
"test:frontend": "pnpm --filter=@n8n/chat --filter=n8n-design-system --filter=n8n-editor-ui test",
"watch": "turbo run watch --parallel",
"webhook": "./packages/cli/bin/n8n webhook",
Expand Down
15 changes: 15 additions & 0 deletions packages/@n8n/imap/.eslintrc.js
@@ -0,0 +1,15 @@
const sharedOptions = require('@n8n_io/eslint-config/shared');

/**
* @type {import('@types/eslint').ESLint.ConfigData}
*/
module.exports = {
extends: ['@n8n_io/eslint-config/base'],

...sharedOptions(__dirname),

rules: {
'@typescript-eslint/consistent-type-imports': 'error',
'n8n-local-rules/no-plain-errors': 'off',
},
};
2 changes: 2 additions & 0 deletions packages/@n8n/imap/jest.config.js
@@ -0,0 +1,2 @@
/** @type {import('jest').Config} */
module.exports = require('../../../jest.config');
34 changes: 34 additions & 0 deletions packages/@n8n/imap/package.json
@@ -0,0 +1,34 @@
{
"name": "@n8n/imap",
"version": "0.1.0",
"scripts": {
"clean": "rimraf dist .turbo",
"dev": "pnpm watch",
"typecheck": "tsc",
"build": "tsc -p tsconfig.build.json",
"format": "prettier --write . --ignore-path ../../../.prettierignore",
"lint": "eslint . --quiet",
"lintfix": "eslint . --fix",
"watch": "tsc -p tsconfig.build.json --watch",
"test": "echo \"Error: no test created yet\""
},
"main": "dist/index.js",
"module": "src/index.ts",
"types": "dist/index.d.ts",
"files": [
"dist/**/*"
],
"dependencies": {
"iconv-lite": "0.6.3",
"imap": "0.8.19",
"quoted-printable": "1.0.1",
"utf8": "3.0.0",
"uuencode": "0.0.4"
},
"devDependencies": {
"@types/imap": "^0.8.40",
"@types/quoted-printable": "^1.0.2",
"@types/utf8": "^3.0.3",
"@types/uuencode": "^0.0.3"
}
}
235 changes: 235 additions & 0 deletions packages/@n8n/imap/src/ImapSimple.ts
@@ -0,0 +1,235 @@
/* eslint-disable @typescript-eslint/no-use-before-define */
import { EventEmitter } from 'events';
import type Imap from 'imap';
import { type ImapMessage } from 'imap';
import * as qp from 'quoted-printable';
import * as iconvlite from 'iconv-lite';
import * as utf8 from 'utf8';
import * as uuencode from 'uuencode';

import { getMessage } from './helpers/getMessage';
import type { Message, MessagePart } from './types';

const IMAP_EVENTS = ['alert', 'mail', 'expunge', 'uidvalidity', 'update', 'close', 'end'] as const;

export class ImapSimple extends EventEmitter {
/** flag to determine whether we should suppress ECONNRESET from bubbling up to listener */
private ending = false;

constructor(private readonly imap: Imap) {
super();

// pass most node-imap `Connection` events through 1:1
IMAP_EVENTS.forEach((event) => {
this.imap.on(event, this.emit.bind(this, event));
});

// special handling for `error` event
this.imap.on('error', (e: Error & { code?: string }) => {
// if .end() has been called and an 'ECONNRESET' error is received, don't bubble
if (e && this.ending && e.code?.toUpperCase() === 'ECONNRESET') {
return;
}
this.emit('error', e);
});
}

/** disconnect from the imap server */
end(): void {
// set state flag to suppress 'ECONNRESET' errors that are triggered when .end() is called.
// it is a known issue that has no known fix. This just temporarily ignores that error.
// https://github.com/mscdex/node-imap/issues/391
// https://github.com/mscdex/node-imap/issues/395
this.ending = true;

// using 'close' event to unbind ECONNRESET error handler, because the node-imap
// maintainer claims it is the more reliable event between 'end' and 'close'.
// https://github.com/mscdex/node-imap/issues/394
this.imap.once('close', () => {
this.ending = false;
});

this.imap.end();
}

/**
* Search the currently open mailbox, and retrieve the results
*
* Results are in the form:
*
* [{
* attributes: object,
* parts: [ { which: string, size: number, body: string }, ... ]
* }, ...]
*
* See node-imap's ImapMessage signature for information about `attributes`, `which`, `size`, and `body`.
* For any message part that is a `HEADER`, the body is automatically parsed into an object.
*/
async search(
/** Criteria to use to search. Passed to node-imap's .search() 1:1 */
// eslint-disable-next-line @typescript-eslint/no-explicit-any
searchCriteria: any[],
/** Criteria to use to fetch the search results. Passed to node-imap's .fetch() 1:1 */
fetchOptions: Imap.FetchOptions,
) {
return await new Promise<Message[]>((resolve, reject) => {
this.imap.search(searchCriteria, (e, uids) => {
if (e) {
reject(e);
return;
}

if (uids.length === 0) {
resolve([]);
return;
}

const fetch = this.imap.fetch(uids, fetchOptions);
let messagesRetrieved = 0;
const messages: Message[] = [];

const fetchOnMessage = async (message: Imap.ImapMessage, seqNo: number) => {
const msg: Message = await getMessage(message);
msg.seqNo = seqNo;
messages[seqNo] = msg;

messagesRetrieved++;
if (messagesRetrieved === uids.length) {
resolve(messages.filter((m) => !!m));
}
};

const fetchOnError = (error: Error) => {
fetch.removeListener('message', fetchOnMessage);
fetch.removeListener('end', fetchOnEnd);
reject(error);
};

const fetchOnEnd = () => {
fetch.removeListener('message', fetchOnMessage);
fetch.removeListener('error', fetchOnError);
};

fetch.on('message', fetchOnMessage);
fetch.once('error', fetchOnError);
fetch.once('end', fetchOnEnd);
});
});
}

/** Download a "part" (either a portion of the message body, or an attachment) */
async getPartData(
/** The message returned from `search()` */
message: Message,
/** The message part to be downloaded, from the `message.attributes.struct` Array */
part: MessagePart,
) {
return await new Promise<string>((resolve, reject) => {
const fetch = this.imap.fetch(message.attributes.uid, {
bodies: [part.partID],
struct: true,
});

const fetchOnMessage = async (msg: ImapMessage) => {
const result = await getMessage(msg);
if (result.parts.length !== 1) {
reject(new Error('Got ' + result.parts.length + ' parts, should get 1'));
return;
}

const data = result.parts[0].body as string;

const encoding = part.encoding.toUpperCase();

if (encoding === 'BASE64') {
resolve(Buffer.from(data, 'base64').toString());
return;
}

if (encoding === 'QUOTED-PRINTABLE') {
if (part.params?.charset?.toUpperCase() === 'UTF-8') {
resolve(Buffer.from(utf8.decode(qp.decode(data))).toString());
} else {
resolve(Buffer.from(qp.decode(data)).toString());
}
return;
}

if (encoding === '7BIT') {
resolve(Buffer.from(data).toString('ascii'));
return;
}

if (encoding === '8BIT' || encoding === 'BINARY') {
const charset = part.params?.charset ?? 'utf-8';
resolve(iconvlite.decode(Buffer.from(data), charset));
return;
}

if (encoding === 'UUENCODE') {
const parts = data.toString().split('\n'); // remove newline characters
const merged = parts.splice(1, parts.length - 4).join(''); // remove excess lines and join lines with empty string
resolve(uuencode.decode(merged));
return;
}

// if it gets here, the encoding is not currently supported
reject(new Error('Unknown encoding ' + part.encoding));
};

const fetchOnError = (error: Error) => {
fetch.removeListener('message', fetchOnMessage);
fetch.removeListener('end', fetchOnEnd);
reject(error);
};

const fetchOnEnd = () => {
fetch.removeListener('message', fetchOnMessage);
fetch.removeListener('error', fetchOnError);
};

fetch.once('message', fetchOnMessage);
fetch.once('error', fetchOnError);
fetch.once('end', fetchOnEnd);
});
}

/** Adds the provided flag(s) to the specified message(s). */
async addFlags(
/** The messages uid */
uid: number[],
/** The flags to add to the message(s). */
flags: string | string[],
) {
return await new Promise<void>((resolve, reject) => {
this.imap.addFlags(uid, flags, (e) => (e ? reject(e) : resolve()));
});
}

/** Returns a list of mailboxes (folders). */
async getBoxes() {
return await new Promise<Imap.MailBoxes>((resolve, reject) => {
this.imap.getBoxes((e, boxes) => (e ? reject(e) : resolve(boxes)));
});
}

/** Open a mailbox */
async openBox(
/** The name of the box to open */
boxName: string,
): Promise<Imap.Box> {
return await new Promise((resolve, reject) => {
this.imap.openBox(boxName, (e, result) => (e ? reject(e) : resolve(result)));
});
}

/** Close a mailbox */
async closeBox(
/** If autoExpunge is true, any messages marked as Deleted in the currently open mailbox will be removed @default true */
autoExpunge = true,
) {
return await new Promise<void>((resolve, reject) => {
this.imap.closeBox(autoExpunge, (e) => (e ? reject(e) : resolve()));
});
}
}
27 changes: 27 additions & 0 deletions packages/@n8n/imap/src/errors.ts
@@ -0,0 +1,27 @@
export abstract class ImapError extends Error {}

/** Error thrown when a connection attempt has timed out */
export class ConnectionTimeoutError extends ImapError {
constructor(
/** timeout in milliseconds that the connection waited before timing out */
readonly timeout?: number,
) {
let message = 'connection timed out';
if (timeout) {
message += `. timeout = ${timeout} ms`;
}
super(message);
}
}

export class ConnectionClosedError extends ImapError {
constructor() {
super('Connection closed unexpectedly');
}
}

export class ConnectionEndedError extends ImapError {
constructor() {
super('Connection ended unexpectedly');
}
}
53 changes: 53 additions & 0 deletions packages/@n8n/imap/src/helpers/getMessage.ts
@@ -0,0 +1,53 @@
import {
parseHeader,
type ImapMessage,
type ImapMessageBodyInfo,
type ImapMessageAttributes,
} from 'imap';
import type { Message, MessageBodyPart } from '../types';

/**
* Given an 'ImapMessage' from the node-imap library, retrieves the `Message`
*/
export async function getMessage(
/** an ImapMessage from the node-imap library */
message: ImapMessage,
): Promise<Message> {
return await new Promise((resolve) => {
let attributes: ImapMessageAttributes;
const parts: MessageBodyPart[] = [];

const messageOnBody = (stream: NodeJS.ReadableStream, info: ImapMessageBodyInfo) => {
let body: string = '';

const streamOnData = (chunk: Buffer) => {
body += chunk.toString('utf8');
};

stream.on('data', streamOnData);
stream.once('end', () => {
stream.removeListener('data', streamOnData);

parts.push({
which: info.which,
size: info.size,
body: /^HEADER/g.test(info.which) ? parseHeader(body) : body,
});
});
};

const messageOnAttributes = (attrs: ImapMessageAttributes) => {
attributes = attrs;
};

const messageOnEnd = () => {
message.removeListener('body', messageOnBody);
message.removeListener('attributes', messageOnAttributes);
resolve({ attributes, parts });
};

message.on('body', messageOnBody);
message.once('attributes', messageOnAttributes);
message.once('end', messageOnEnd);
});
}

0 comments on commit 9f87cc2

Please sign in to comment.