Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Queue #1592

Closed
edneijunior opened this issue Apr 17, 2021 · 10 comments
Closed

Queue #1592

edneijunior opened this issue Apr 17, 2021 · 10 comments

Comments

@edneijunior
Copy link

edneijunior commented Apr 17, 2021

Describe the bug
it is not a bug, but a cry for help!

create() code

//w2api - Version 0.0.2
Array.prototype.find = function (...args)
{
    let index = this.findIndex(...args);
    if (index >= 0)
        return index >= 0 ? this[index] : void 0;
};

global.openWA = require('@open-wa/wa-automate');
const fs = require('fs');
const async = require("async");
const request = require('request');
const moment = require('moment');
const mime = require('mime-types');
const PQueue  = require("p-queue");
const queue = new PQueue({ concurrency: 2, timeout: 1000 });
global.uaOverride = 'WhatsApp/2.16.352 Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/13.0.3 Safari/605.1.15';


global.WA_CLIENT = {};

/*
 * Function to read files as base64 string
 */
function base64Encode(file) {
    var body = fs.readFileSync(file);
    return body.toString('base64');
}
;

/*
 * WhatsApp API SUPER CLASS
 * Personal regards to:
 * Mohhamed Shah (openWA) - 
 * Peter Sírka (TotalJS) - 
 * This library was built using openWA and pieces of 
 */
function WHATS_API(USER_ID) {
    console.log("\n====================================================");
    console.log("@@Creating WhatsApp connection for: " + USER_ID);
    console.log("====================================================\n");
    this.QR_CODE = "";
    this.WEBHOOK = "";
    this.TOKEN = "";
    this.INSTANCE = USER_ID;
    this.CONNECTION = {};
}
;

/*
 * Sanitizing the type of ack response i want on webhook POST request
 * you can edit this method but pay attention to documentation.
 * ACK EVENTS:
 * 1 - send 
 * 2 - delivered
 * 3 - viewed
 * 4 - listened
 */


var SANITIZE_ACK = function (instanceID, data) {
    return JSON.stringify({
        ack: [{
                id: data.id.toString(),
                chatId: data.id,
                status: data.ack,
                body: data.body
            }],
        instanceId: instanceID
    });
};

/*
 * Sanitizing the type of message response i want on webhook POST request
 * you can edit this method but pay attention to documentation.
 */
var SANITIZE_MSG = function (instanceID, data) {
    return JSON.stringify({
        messages: [{
                id: data.id,
                body: data.body,
                filelink: (data.filelink ? data.filelink : null),
                fromMe: data.fromMe,
                self: data.self,
                isForwarded: data.isForwarded,
                author: data.from,
                time: data.t,
                lat: data.lat,
                lng: data.lng,
                ack: data.ack,
                chatId: data.chat.id,
                type: data.type,
                senderName: data.sender.pushname,
                caption: (data.caption ? data.caption : null),
                quotedMsgBody: (data.quotedMsgObj ? data.quotedMsgObj : null),
                chatName: data.sender.formattedName,
            }],
        instanceId: instanceID
    });
};

/*
 * Creating an prototype of messages to send information and control flow over webhook
 * you can edit this method but pay attention to documentation.
 */
WHATS_API.prototype.PROCESS_MESSAGE = function (data) {
    var that = this;
    var SANITIZED = SANITIZE_MSG(that.INSTANCE, data);
    request({
        method: 'POST',
        url: that.WEBHOOK,
        headers: {'Content-Type': 'application/json'},
        body: SANITIZED
    }, function (err, response, body) {
        if (err) {
            ERROR_CATCHER(err);
        } else {
            if (response.statusCode != 200) {
                ERROR_CATCHER("Status Code error: " + response.statusCode, response);
            } else {
                console.log(SANITIZED);
            }
        }
    });
};

/*
 * Creating an prototype of ack events to send information and control flow over webhook
 * you can edit this method but pay attention to documentation.
 */
WHATS_API.prototype.PROCESS_ACK = function (data) {
    var that = this;
    var SANITIZED = SANITIZE_ACK(that.INSTANCE, data);
    request({
        method: 'POST',
        url: that.WEBHOOK,
        headers: {'Content-Type': 'application/json'},
        body: SANITIZED
    }, function (err, response, body) {
        if (err) {
            ERROR_CATCHER(err);
        } else {
            if (response.statusCode != 200) {
                ERROR_CATCHER("Status Code WRONG: " + response.statusCode, response);
            } else {
                console.log(SANITIZED);
            }
        }
    });
};

/*
 * to-do - Creating webhook events to inform when something goes wrong with API
 * if you have any knowleadge about it - help me to improve
 */
WHATS_API.prototype.PROCESS_STATE = function (data) {
    console.log("[STATE CHANGED] -", data);
};

/*
 * Prototype configuration for setup events incoming from openWA module
 * keep your hands away from this
 */


    WHATS_API.prototype.SETUP = function (CLIENT, WEBHOOK_INPUT, TOKEN_INPUT) {
    var that = this;
    that.WEBHOOK = WEBHOOK_INPUT;
    that.TOKEN = TOKEN_INPUT;
    that.CONNECTION = CLIENT;

    CLIENT.onMessage(message => {
        if (message.quotedMsgObj && message.quotedMsgObj.mimetype) {
            let m = message.quotedMsgObj;
            const mediaData = openWA.decryptMedia(m, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(m.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        m['body'] = `data:${m.mimetype};base64,${m['body']}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        m['body'] = `data:${m.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else if (message.mimetype) {
            const mediaData = openWA.decryptMedia(message, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(message.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        message['body'] = `data:${message.mimetype};base64,${message['body']}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        message['body'] = `data:${message.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else {
            that.PROCESS_MESSAGE(message);
        }
    });
    CLIENT.onAck(ack => {
        that.PROCESS_ACK(ack);
    });
    CLIENT.onStateChanged(state => {
        that.PROCESS_STATE(state);
    });
};
WHATS_API.prototype.SET_QRCODE = function (code) {
    var that = this;
    if (qrCodeManager) {
        qrCodeManager.send({qr: code});
    }
    ;
    that.QR_CODE = code;
};

module.exports = WHATS_API;

ON('ready', function () {

    /*
     * Creating Connection for WhatsApp and expose conection to WA_CLIENT global var
     * Pay attention to instance id configured on /config file
     */
    WA_CLIENT = new WHATS_API(F.config['instance']);

    /*
     * Declare event getter for when qrcode is available from openWA-api
     */
    openWA.ev.on('qr.**', function (qrcode, sessionId) {
        //SETTING QRCODE AVAILABLE ON address/qrCode
        WA_CLIENT.SET_QRCODE(qrcode);
    });

    /*
     * Finally creating connection and start headless webBrowser
     * Attention to headless param
     */
    openWA.create({
        sessionId: "/whatsSessions/" + F.config['instance'],
        useChrome: false,
        headless: true,
        throwErrorOnTosBlock: true,
        qrTimeout: 0, //set to 0 to wait forever for a qr scan
        authTimeout: 0, //set to 0 to wait forever for connection to phone
        autoRefresh: true, //default to true
        safeMode: false,
        disableSpins: true

    }).then(function (client) {
         if (qrCodeManager) {
            qrCodeManager.send({connected: true});
        }
        WA_CLIENT.SETUP(client, F.config['webhook'], F.config['token']);
    });

});


Expected behavior
an implemented queue

@smashah
Copy link
Member

smashah commented Apr 17, 2021

hey, @edneijunior I got your email and was working on a long term solution today.

The solution I came up with was to implement an optional queue on listener (onMessage, onAnyMessage, onAck, etc.) callbacks (the function you provide as the main parameter of these functions). This way, you can control your outflow of messages implicitly by controlling the inflow.

Here is an example of the upcoming changes:

image

In the above example, 1 message will be processed every 10 seconds via onAnyMessage whereas onMessage will remain 'real-time'. This is achieved simply by providing PQueue.options as the second parameter when setting these listeners.

Here is the corresponding console output from the above example:

image

This p-queue implementation will be unopinionated, meaning you will HAVE TO provide the p-queue options object as the second parameter in order to use the p-queue at all. This is because those options will be subjective to the desired behaviour of your automation and your desired throughput.

Hopefully, this will satisfy your needs for a p-queue. Please let me know if there are any other considerations.

Thanks

@smashah
Copy link
Member

smashah commented Apr 17, 2021

@github-actions run

⚡ Release! ⚡
(async () => {
function exec(cmd) {
  console.log(execSync(cmd).toString());
}

// Config
const gitUserEmail = "github-actions[bot]@users.noreply.github.com";
const gitUserName = "github-actions[bot]";

exec(`echo "//registry.npmjs.org/:_authToken=$NPM_TOKEN" > .npmrc`);
exec(`git config --global user.email "${gitUserEmail}"`);
exec(`git config --global user.name "${gitUserName}"`);
exec(`npm i -D`);
exec(`npm run release-ci minor`);

//comment on the issue
var result = execSync(`npx auto-changelog -o ./tempchangelog.txt --commit-limit false --template ./compact-keepachangelog.hbs --stdout`).toString();

    await postComment(result);

//create changelog image
exec(`npm run release-image`);
exec(`git commit -a -m 'updated release-image'`);
exec(`git push --force`);
  })();

@smashah
Copy link
Member

smashah commented Apr 17, 2021

Changelog

🚀 Release 3.11.0 (2021-04-17)

@smashah
Copy link
Member

smashah commented Apr 17, 2021

@edneijunior if you share your desired pqueue behaviour I can reply with the code that will work in the latest version of the library

@lesmo
Copy link

lesmo commented Apr 18, 2021

... what about outgoing messages when they're not sent in response to incoming messages? 🤔

@smashah
Copy link
Member

smashah commented Apr 18, 2021

@lesmo for those you can implement a PQueue manually

@edneijunior
Copy link
Author

Bro, you are awesome.
Thank you very much for so much consideration in your response.

@edneijunior
Copy link
Author

edneijunior commented Apr 19, 2021

Sem título

WA Version 3.11.1

CLIENT.onMessage(message => {
        if (message.quotedMsgObj && message.quotedMsgObj.mimetype) {
            let m = message.quotedMsgObj;
            const mediaData = openWA.decryptMedia(m, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(m.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        m['body'] = `data:${m.mimetype};base64,${m['body']}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        m['body'] = `data:${m.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else if (message.mimetype) {
            const mediaData = openWA.decryptMedia(message, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(message.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        message['body'] = `data:${message.mimetype};base64,${message['body']}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        message['body'] = `data:${message.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else {
            that.PROCESS_MESSAGE(message);
        }
    },{
        interval:10000,
        concurrency: 1,
        intervalCap:1
    });
    ```

@edneijunior
Copy link
Author

@edneijunior if you share your desired pqueue behaviour I can reply with the code that will work in the latest version of the library

I would like to send messages following the best practices. I want to send messages without overloading the session.

@smashah
Copy link
Member

smashah commented Apr 19, 2021

Sem título

WA Version 3.11.1

CLIENT.onMessage(message => {
        if (message.quotedMsgObj && message.quotedMsgObj.mimetype) {
            let m = message.quotedMsgObj;
            const mediaData = openWA.decryptMedia(m, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(m.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        m['body'] = `data:${m.mimetype};base64,${m['body']}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        m['body'] = `data:${m.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        m['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else if (message.mimetype) {
            const mediaData = openWA.decryptMedia(message, uaOverride).then(function (DECRYPTED_DATA) {
                var filename = `${message.t}.${mime.extension(message.mimetype)}`;
                fs.writeFile('/var/www/w2api/public/cdn/' + filename, Buffer.from(DECRYPTED_DATA, 'base64'), 'base64', function (err) {
                    if (err) {
                        console.log("#Error on saving file");
                        message['body'] = `data:${message.mimetype};base64,${message['body']}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    } else {
                        message['body'] = `data:${message.mimetype};base64,${base64Encode('/var/www/w2api/public/cdn/' + filename)}`;
                        message['filelink'] = 'cdn/' + filename;
                        that.PROCESS_MESSAGE(message);
                    }
                });
            });
        } else {
            that.PROCESS_MESSAGE(message);
        }
    },{
        interval:10000,
        concurrency: 1,
        intervalCap:1
    });
    ```

can you explain what's going on here and what is your question?

Regarding setting up to follow best practices, you should implement a setup that works for your use case. If you're not getting that many incoming messages then maybe you don't need to implement a queue. Based on community discussions, people get away with high message throughput without any issues.

The example provided is a bit extreme (1 message every 10 seconds) so I'll give you a starting point that should be safe but also not cause a huge backlog of queued messages:

  1. Every 5 seconds, process at most 10 messages, process 2 messages at a time.
{
interval: 5000,
intervalCap: 10,
concurrency: 2,
carryoverConcurrencyCount: true //<=== important to set this to true so you don't miss messages!
}
  1. Every 2 seconds, process at most 5 messages, process one message at a time.
{
interval: 2000,
intervalCap: 5,
concurrency: 1,
carryoverConcurrencyCount: true
}
  1. Every second, process at most 1 message, (concurrency at 1 is redundant, but still good practice to add it)
{
interval: 1000,
intervalCap: 1,
concurrency: 1,
carryoverConcurrencyCount: true
}

if you have any more discussion around p-queue, please join the discord (click the badge in the readme) and ask in the #pq channel

thanks

@smashah smashah closed this as completed Apr 19, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants