Skip to content

Commit

Permalink
Merge pull request #405 from machbase/779-mqttnats-bridge-subscriber
Browse files Browse the repository at this point in the history
779 mqttnats bridge subscriber
  • Loading branch information
kevin-lee0604 committed Jun 20, 2024
2 parents 19f5bc1 + 939f07e commit d489f09
Show file tree
Hide file tree
Showing 29 changed files with 1,340 additions and 187 deletions.
113 changes: 95 additions & 18 deletions src/api/repository/bridge.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
import request from '@/api/core';

export type BridgeType = 'SQLite' | 'PostgreSql' | 'Mysql' | 'MSSQL' | 'MQTT';
export type BridgeType = 'SQLite' | 'PostgreSql' | 'Mysql' | 'MSSQL' | 'MQTT' | 'NATS';
interface SubrItemType {
name: string;
autoStart: boolean;
state: string;
task: string;
bridge: string;
topic: string;
type?: string;
QoS?: string;
queue?: string;
}
interface SUBR_RES_TYPE extends RES_COMM {
data: SubrItemType[];
}
interface RES_COMM {
elapse: string;
reason: string;
success: boolean;
}
export interface BridgeItemType {
name: string;
type: BridgeType;
path: string;
childs?: SubrItemType[];
}
interface BridgeListResType {
interface BridgeListResType extends RES_COMM {
data: BridgeItemType[];
elapse: string;
reason: string;
success: boolean;
}
export interface GenKeyResType {
export interface GenKeyResType extends RES_COMM {
[key: string]: string | boolean | undefined;
success: boolean;
elapse: string;
reason: string;
// TOKEN_INFO
certificate: string;
privateKey: string;
Expand All @@ -31,16 +45,10 @@ export interface CreatePayloadType {
type: BridgeType | '';
path: string;
}
interface DelResType {
elapse: string;
reason: string;
success: boolean;
}
interface DelResType extends RES_COMM {}
export type CommandBridgeStateType = 'test' | 'exec' | 'query';
interface CommandRedType {
elapse: string;
reason: string;
success: boolean;
export type CommandSubrStateType = 'start' | 'stop';
interface CommandRedType extends RES_COMM {
data?: {
column: string[];
rows: string[][];
Expand Down Expand Up @@ -104,3 +112,72 @@ export const commandBridge = (aState: CommandBridgeStateType, aBridgeName: strin
data: sPayload,
});
};

/** Subscriber */

/**
* Get subr list
* @returns subr list
*/
export const getSubr = (): Promise<SUBR_RES_TYPE> => {
return request({
method: 'GET',
url: `/api/subscribers`,
});
};
/**
* Get subr item
* @returns subr info
*/
export const getSubrItem = (aSubrName: string): Promise<SubrItemType> => {
return request({
method: 'GET',
url: `/api/subscribers/${aSubrName}`,
});
};

/**
* Gen subr
* @autostart '--autostart' makes the subscriber starts along with machbase-neo starts. Ommit this to start/stop manually.
* @name 'nats_subr' the name of the subscriber.
* @bridge 'my_nats' the name of the bridge that the subscriber is going to use.
* @topic 'iot.sensor' subject name to subscribe. it should be in NATS subject syntax.
* @task 'db/append/EXAMPLE:csv' writing descriptor, it means the incoming data is in CSV format and writing data into the table EXAMPLE in append mode.
* @autostart makes the subscriber will start automatically when machbase-neo starts. If the subscriber is not autostart mode, you can make it start and stop manually by subscriber start and subscriber stop commands.
* @QoS if the bridge is MQTT type, it specifies the QoS level of the subscription to the topic. It supports 0, 1 and the default is 0 if it is not specified.
* @queue if the bridge is NATS type, it specifies the Queue Group.
*/
export const genSubr = (aData: SUBR_RES_TYPE): Promise<RES_COMM> => {
return request({
method: 'POST',
url: `/api/subscribers`,
data: aData,
});
};

/**
* Delete subr
* @TargetName string
* @return status
*/
export const delSubr = (aTargetName: string): Promise<RES_COMM> => {
return request({
method: 'DELETE',
url: `/api/subscribers/${aTargetName}`,
});
};

/**
* Command Subr
* @param aState
* @param aSubrName
* @param aCommand
* @returns
*/
export const commandSubr = (aState: CommandSubrStateType, aSubrName: string): Promise<RES_COMM> => {
return request({
method: 'POST',
url: `/api/subscribers/${aSubrName}/state`,
data: { state: aState },
});
};
10 changes: 10 additions & 0 deletions src/api/repository/timer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ export const getTimer = (): Promise<TimerListResType> => {
url: `/api/timers`,
});
};
/**
* Get timer item
* @returns target item
*/
export const getTimerItem = (aTimerName: string): Promise<TimerItemType> => {
return request({
method: 'GET',
url: `/api/timers/${aTimerName}`,
});
};

/**
* Gen timer
Expand Down
3 changes: 1 addition & 2 deletions src/components/ShellManage/index.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ export const ShellManage = ({ pCode }: { pCode: ShellItemType }) => {
<ExtensionTab.Input
pCallback={(event: React.FormEvent<HTMLInputElement>) => handlePayload('label', event)}
pValue={sPayload.label}
pWidth={'300px'}
pAutoFocus
/>
</ExtensionTab.ContentBlock>
Expand All @@ -214,7 +213,7 @@ export const ShellManage = ({ pCode }: { pCode: ShellItemType }) => {
<ExtensionTab.Input
pCallback={(event: React.FormEvent<HTMLInputElement>) => handlePayload('command', event)}
pValue={sPayload.command}
pWidth={'100%'}
pWidth={'400px'}
/>
</ExtensionTab.ContentBlock>
<ExtensionTab.ContentBlock>
Expand Down
3 changes: 2 additions & 1 deletion src/components/bridge/content.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/** SELECTE */
export const SELECTE_TYPE = ['SQLite', 'PostgreSql', 'Mysql', 'MSSQL', 'MQTT'];
export const SELECTE_TYPE = ['SQLite', 'PostgreSql', 'Mysql', 'MSSQL', 'MQTT', 'NATS'];
export const SUBSCRIBER_TYPE = ['mqtt', 'nats'];

/** HINT */
11 changes: 5 additions & 6 deletions src/components/bridge/createBridge.tsx
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { useEffect, useRef, useState } from 'react';
import { ExtensionTab } from '../extension/ExtensionTab';
import { useSetRecoilState } from 'recoil';
import { useRecoilState, useSetRecoilState } from 'recoil';
import { Pane, SashContent } from 'split-pane-react';
import SplitPane from 'split-pane-react/esm/SplitPane';
import { gBoardList, gBridgeList } from '@/recoil/recoil';
import { VscWarning } from 'react-icons/vsc';
import { IconButton } from '../buttons/IconButton';
import { LuFlipVertical } from 'react-icons/lu';
import { BridgeItemType, CreatePayloadType, genBridge, getBridge } from '@/api/repository/bridge';
import { BridgeItemType, CreatePayloadType, genBridge } from '@/api/repository/bridge';
import { SELECTE_TYPE } from './content';

export const CreateBridge = () => {
const setList = useSetRecoilState<BridgeItemType[]>(gBridgeList);
const [sBridgeList, setBridgeList] = useRecoilState<BridgeItemType[]>(gBridgeList);
const sBodyRef: any = useRef(null);
const [sGroupWidth, setGroupWidth] = useState<any[]>(['50', '50']);
const [sResErrMessage, setResErrMessage] = useState<string | undefined>(undefined);
Expand All @@ -27,10 +27,9 @@ export const CreateBridge = () => {
/** create item */
const createItem = async () => {
const sRes = await genBridge(sCreatePayload);

if (sRes.success) {
const sResList = await getBridge();
if (sResList.success) setList(sResList.data);
else setList([]);
setBridgeList([...sBridgeList, { ...sCreatePayload, type: sCreatePayload.type.toLowerCase() }] as any);
handleSavedCode(true);
setResErrMessage(undefined);
} else {
Expand Down
66 changes: 57 additions & 9 deletions src/components/bridge/index.tsx
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { ExtensionTab } from '@/components/extension/ExtensionTab';
import { useRecoilState, useSetRecoilState } from 'recoil';
import { gActiveBridge, gBoardList, gBridgeList } from '@/recoil/recoil';
import { gActiveBridge, gActiveSubr, gBoardList, gBridgeList, gSelectedTab } from '@/recoil/recoil';
import { Pane, SashContent } from 'split-pane-react';
import SplitPane from 'split-pane-react/esm/SplitPane';
import { BridgeItemType, commandBridge, delBridge, getBridge } from '@/api/repository/bridge';
import { BridgeItemType, commandBridge, delBridge } from '@/api/repository/bridge';
import { CreateBridge } from './createBridge';
import { useEffect, useRef, useState } from 'react';
import { IconButton } from '../buttons/IconButton';
import { LuFlipVertical } from 'react-icons/lu';
import { ConfirmModal } from '../modal/ConfirmModal';
import { getCommandState } from '@/utils/bridgeCommandHelper';
import { SUBSCRIBER_TYPE } from './content';
import { generateUUID } from '@/utils';

export const Bridge = ({ pCode }: { pCode: BridgeItemType }) => {
const [sBoardList, setBoardList] = useRecoilState<any[]>(gBoardList);
Expand All @@ -25,19 +27,17 @@ export const Bridge = ({ pCode }: { pCode: BridgeItemType }) => {
command: { message: '', data: undefined },
test: { message: '', data: undefined },
});
const setResList = useSetRecoilState<BridgeItemType[] | undefined>(gBridgeList);
const [sBridgeList, setBridgeList] = useRecoilState<BridgeItemType[] | undefined>(gBridgeList);
const TYPE = 'bridge';
const [sIsDeleteModal, setIsDeleteModal] = useState<boolean>(false);
const setActiveSubrName = useSetRecoilState<any>(gActiveSubr);
const setSelectedTab = useSetRecoilState<any>(gSelectedTab);

/** delete item */
const deleteItem = async () => {
const sRes = await delBridge(pCode.name);
if (sRes.success) {
const sBridgeList = await getBridge();
if (sBridgeList.success) setResList(sBridgeList?.data || []);
else setResList([]);

const sTempList = sBridgeList.data ? sBridgeList.data.filter((aInfo: any) => aInfo.name !== pCode.name) : [];
const sTempList = sBridgeList ? sBridgeList.filter((aInfo: any) => aInfo.name !== pCode.name) : [];
if (sTempList && sTempList.length > 0) {
setActiveName(sTempList[0].name);
const aTarget = sBoardList.find((aBoard: any) => aBoard.type === TYPE);
Expand Down Expand Up @@ -71,6 +71,7 @@ export const Bridge = ({ pCode }: { pCode: BridgeItemType }) => {
});
});
}
setBridgeList(sTempList);
}
setIsDeleteModal(false);
};
Expand All @@ -83,7 +84,50 @@ export const Bridge = ({ pCode }: { pCode: BridgeItemType }) => {
sTmp.command = (e.target as HTMLInputElement).value;
setPayload(sTmp);
};
const checkExistTab = (aType: string) => {
const sResut = sBoardList.reduce((prev: boolean, cur: any) => {
return prev || cur.type === aType;
}, false);
return sResut;
};
/** Open subr create page */
const handleNewSubr = () => {
const sExistKeyTab = checkExistTab('subscriber');
setActiveSubrName(undefined);

if (sExistKeyTab) {
const aTarget = sBoardList.find((aBoard: any) => aBoard.type === 'subscriber');
setBoardList((aBoardList: any) => {
return aBoardList.map((aBoard: any) => {
if (aBoard.id === aTarget.id) {
return {
...aTarget,
name: `SUBR: create`,
code: { bridge: pCode, subr: {} },
savedCode: false,
};
}
return aBoard;
});
});
setSelectedTab(aTarget.id);
return;
} else {
const sId = generateUUID();
setBoardList([
...sBoardList,
{
id: sId,
type: 'subscriber',
name: `SUBR: create`,
code: { bridge: pCode, subr: {} },
savedCode: false,
},
]);
setSelectedTab(sId);
return;
}
};
const handleCommand = async (aState: 'test' | 'command') => {
let sCommand: any = undefined;
let sState: any = undefined;
Expand Down Expand Up @@ -159,12 +203,15 @@ export const Bridge = ({ pCode }: { pCode: BridgeItemType }) => {
<div style={{ marginTop: '8px' }}>
<ExtensionTab.TextButton pText="Delete" pType="DELETE" pCallback={handleDelete} />
<ExtensionTab.TextButton pText="Test" pType="CREATE" pCallback={() => handleCommand('test')} />
{SUBSCRIBER_TYPE.includes(sPayload.type) && (
<ExtensionTab.TextButton pWidth="120px" pText="New subscriber" pType="CREATE" pCallback={handleNewSubr} />
)}
</div>
{sCommandRes.test?.data && sCommandRes.test?.data?.success && <ExtensionTab.TextResSuccess pText={'success'} />}
{sCommandRes.test.message !== '' && <ExtensionTab.TextResErr pText={sCommandRes.test.message} />}
</ExtensionTab.ContentBlock>

{sPayload.type !== 'mqtt' && (
{!SUBSCRIBER_TYPE.includes(sPayload.type) && (
<>
{/* Command */}
<ExtensionTab.ContentBlock>
Expand All @@ -186,6 +233,7 @@ export const Bridge = ({ pCode }: { pCode: BridgeItemType }) => {
</Pane>
<Pane>
<ExtensionTab.Header>
<div />
<div style={{ display: 'flex' }}>
<IconButton pIcon={<LuFlipVertical style={{ transform: 'rotate(90deg)' }} />} pIsActive={isVertical} onClick={() => setIsVertical(true)} />
<IconButton pIcon={<LuFlipVertical />} pIsActive={!isVertical} onClick={() => setIsVertical(false)} />
Expand Down
32 changes: 32 additions & 0 deletions src/components/bridge/subscriber/content.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/** TABLE */
export const SUBR_OPTIONS_TABLE = {
columns: ['Name', 'Default', 'Description'],
rows: [
['timeformat ', 'ns', 'Time format: s, ms, us, ns'],
['tz', 'UTC', 'Time Zone: UTC, Local and location spec'],
['delimiter', ',', 'CSV delimiter, ignored if content is not csv'],
['heading', 'false', 'If CSV contains header line, set true to skip the first line'],
],
};
export const SUBR_OPTIONS_EXAMPLE_TABLE = {
columns: ['', ''],
rows: [['db/append/EXAMPLE:csv?timeformat=s&heading=true'], ['db/write/EXAMPLE:csv:gzip?timeformat=s'], ['db/append/EXAMPLE:json?timeformat=2&pendingMsgLimit=1048576']],
};
export const SUBR_METHOD_TABLE = {
columns: ['', '', ''],
rows: [
['append', 'writing data in append mode'],
['write', 'writing data with INSERT sql statement'],
],
};
export const SUBR_FORMAT_TABLE = {
columns: ['', '', ''],
rows: [
['json', '(default)'],
['csv', ''],
],
};
export const SUBR_TQL_SCRIPT_TABLE = {
columns: ['NAME', 'BRIDGE', 'TOPIC', 'DESTINATION', 'AUTOSTART', 'STATE'],
rows: [['NATS_SUBR', 'my_nats', 'test.topic', '/test.tql', 'true', 'RUNNING']],
};
Loading

0 comments on commit d489f09

Please sign in to comment.