1+ import { MINUTE , SECOND } from '@sogebot/ui-helpers/constants' ;
12import { validateOrReject } from 'class-validator' ;
3+ import * as cronparser from 'cron-parser' ;
24import { cloneDeep } from 'lodash' ;
35import merge from 'lodash/merge' ;
46
57import type { Node } from '../d.ts/src/plugins' ;
68import { Plugin , PluginVariable } from './database/entity/plugins' ;
79import { isValidationError } from './helpers/errors' ;
810import { eventEmitter } from './helpers/events' ;
11+ import { error } from './helpers/log' ;
912import { adminEndpoint } from './helpers/socket' ;
1013import { processes , processNode } from './plugins/index' ;
1114
1215import Core from '~/_interface' ;
1316import { onStartup } from '~/decorators/on' ;
1417
18+ const cronTriggers = new Map < string , Node > ( ) ;
19+ const plugins : Plugin [ ] = [ ] ;
20+
1521const twitchChatMessage = {
1622 sender : {
1723 userName : 'string' ,
@@ -72,6 +78,14 @@ class Plugins extends Core {
7278 category : 'registry' , name : 'plugins' , id : 'registry/plugins' , this : null ,
7379 } ) ;
7480
81+ this . updateCache ( ) ;
82+ setInterval ( ( ) => {
83+ this . updateAllCrons ( ) ;
84+ } , MINUTE ) ;
85+ setInterval ( ( ) => {
86+ this . triggerCrons ( ) ;
87+ } , SECOND ) ;
88+
7589 eventEmitter . on ( 'tip' , async ( data ) => {
7690 const users = ( await import ( './users' ) ) . default ;
7791 const user = {
@@ -88,16 +102,75 @@ class Plugins extends Core {
88102 } ) ;
89103 }
90104
105+ async updateCache ( ) {
106+ const _plugins = await Plugin . find ( ) ;
107+ while ( plugins . length > 0 ) {
108+ plugins . shift ( ) ;
109+ }
110+ for ( const plugin of _plugins ) {
111+ plugins . push ( plugin ) ;
112+ }
113+ await this . updateAllCrons ( ) ;
114+ }
115+
116+ async updateAllCrons ( ) {
117+ // we will generate at least 2 minutes of span of crons
118+ // e.g. if we have cron every 1s -> 120 crons
119+ // 10s -> 12 crons
120+ // 10m -> 1 cron
121+ const cron = await this . process ( 'cron' , '' , null , { } ) ;
122+
123+ cronTriggers . clear ( ) ;
124+ for ( const { plugin, listeners } of cron ) {
125+ for ( const node of listeners ) {
126+ try {
127+ const cronParsed = cronparser . parseExpression ( node . data . value ) ;
128+
129+ const currentTime = Date . now ( ) ;
130+ let lastTime = new Date ( ) . toISOString ( ) ;
131+ const intervals : string [ ] = [ ] ;
132+ while ( currentTime + ( 2 * MINUTE ) > new Date ( lastTime ) . getTime ( ) ) {
133+ lastTime = cronParsed . next ( ) . toISOString ( ) ;
134+ intervals . push ( lastTime ) ;
135+ }
136+
137+ for ( const interval of intervals ) {
138+ cronTriggers . set ( `${ plugin . id } |${ interval } ` , node ) ;
139+ }
140+ } catch ( e ) {
141+ error ( e ) ;
142+ }
143+ }
144+ }
145+ }
146+
147+ async triggerCrons ( ) {
148+ for ( const [ pluginId , timestamp ] of [ ...cronTriggers . keys ( ) ] . map ( o => o . split ( '|' ) ) ) {
149+ if ( new Date ( timestamp ) . getTime ( ) < Date . now ( ) ) {
150+ const plugin = plugins . find ( o => o . id === pluginId ) ;
151+ const node = cronTriggers . get ( `${ pluginId } |${ timestamp } ` ) ;
152+ if ( plugin && node ) {
153+ const workflow = Object . values (
154+ JSON . parse ( plugin . workflow ) . drawflow . Home . data
155+ ) as Node [ ] ;
156+ this . processPath ( pluginId , workflow , node , { } , { } , null ) ;
157+ }
158+ cronTriggers . delete ( `${ pluginId } |${ timestamp } ` ) ;
159+ }
160+ }
161+ }
162+
91163 sockets ( ) {
92164 adminEndpoint ( '/core/plugins' , 'generic::getAll' , async ( cb ) => {
93- cb ( null , await Plugin . find ( ) ) ;
165+ cb ( null , plugins ) ;
94166 } ) ;
95167 adminEndpoint ( '/core/plugins' , 'generic::getOne' , async ( id , cb ) => {
96- cb ( null , await Plugin . findOne ( id ) ) ;
168+ cb ( null , plugins . find ( o => o . id === id ) ) ;
97169 } ) ;
98170 adminEndpoint ( '/core/plugins' , 'generic::deleteById' , async ( id , cb ) => {
99171 await Plugin . delete ( { id } ) ;
100172 await PluginVariable . delete ( { pluginId : id } ) ;
173+ await this . updateCache ( ) ;
101174 cb ( null ) ;
102175 } ) ;
103176 adminEndpoint ( '/core/plugins' , 'generic::validate' , async ( data , cb ) => {
@@ -121,6 +194,7 @@ class Plugins extends Core {
121194 merge ( itemToSave , item ) ;
122195 await validateOrReject ( itemToSave ) ;
123196 await itemToSave . save ( ) ;
197+ await this . updateCache ( ) ;
124198 cb ( null , itemToSave ) ;
125199 } catch ( e ) {
126200 if ( e instanceof Error ) {
@@ -136,7 +210,7 @@ class Plugins extends Core {
136210 } ) ;
137211 }
138212
139- async processPath ( pluginId : string , workflow : Node [ ] , currentNode : Node , parameters : Record < string , any > , variables : Record < string , any > , userstate : { userName : string ; userId : string } ) {
213+ async processPath ( pluginId : string , workflow : Node [ ] , currentNode : Node , parameters : Record < string , any > , variables : Record < string , any > , userstate : { userName : string ; userId : string } | null ) {
140214 parameters = cloneDeep ( parameters ) ;
141215 variables = cloneDeep ( variables ) ;
142216
@@ -174,22 +248,27 @@ class Plugins extends Core {
174248 }
175249 }
176250
177- async process ( type : keyof typeof this . listeners , message : string , userstate : { userName : string , userId : string } , params ?: Record < string , any > ) {
178- const plugins = await Plugin . find ( { enabled : true } ) ;
179- const pluginsWithListener : Plugin [ ] = [ ] ;
180- for ( const plugin of plugins ) {
251+ async process ( type : keyof typeof this . listeners | 'cron' , message : string , userstate : { userName : string , userId : string } | null , params ?: Record < string , any > ) {
252+ const pluginsEnabled = plugins . filter ( o => o . enabled ) ;
253+ const pluginsWithListener : { plugin : Plugin , listeners : Node [ ] } [ ] = [ ] ;
254+ for ( const plugin of pluginsEnabled ) {
181255 // explore drawflow
182256 const workflow = Object . values (
183257 JSON . parse ( plugin . workflow ) . drawflow . Home . data
184258 ) as Node [ ] ;
185259
186- const listeners = workflow . filter ( ( o : any ) => {
260+ const listeners = workflow . filter ( ( o : Node ) => {
187261 params ??= { } ;
188262 const isListener = o . name === 'listener' ;
263+ const isCron = o . name === 'cron' && type === 'cron' ;
189264 const isType = o . data . value === type ;
190265
191266 params . message = message ;
192267
268+ if ( isCron ) {
269+ return true ;
270+ }
271+
193272 if ( isListener && isType ) {
194273 switch ( type ) {
195274 case 'twitchCommand' : {
@@ -239,7 +318,7 @@ class Plugins extends Core {
239318 } ) ;
240319
241320 if ( listeners . length > 0 ) {
242- pluginsWithListener . push ( plugin ) ;
321+ pluginsWithListener . push ( { plugin, listeners } ) ;
243322 }
244323 }
245324 return pluginsWithListener ;
0 commit comments