Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
134 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,58 +1,97 @@ | ||
import * as mongo from 'mongodb'; | ||
import Xev from 'xev'; | ||
|
||
const ev = new Xev(); | ||
import Meta, { IMeta } from './models/meta'; | ||
|
||
type ID = string | mongo.ObjectID; | ||
|
||
function publish(channel: string, type: string, value?: any): void { | ||
const message = type == null ? value : value == null ? | ||
{ type: type } : | ||
{ type: type, body: value }; | ||
class Publisher { | ||
private ev: Xev; | ||
private meta: IMeta; | ||
|
||
ev.emit(channel, message); | ||
} | ||
constructor() { | ||
this.ev = new Xev(); | ||
|
||
export function publishUserStream(userId: ID, type: string, value?: any): void { | ||
publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
setInterval(async () => { | ||
this.meta = await Meta.findOne({}); | ||
}, 5000); | ||
} | ||
|
||
export function publishDriveStream(userId: ID, type: string, value?: any): void { | ||
publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
public getMeta = async () => { | ||
if (this.meta != null) return this.meta; | ||
|
||
export function publishNoteStream(noteId: ID, type: string): void { | ||
publish(`note-stream:${noteId}`, null, noteId); | ||
} | ||
this.meta = await Meta.findOne({}); | ||
return this.meta; | ||
} | ||
|
||
export function publishUserListStream(listId: ID, type: string, value?: any): void { | ||
publish(`user-list-stream:${listId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
private publish = (channel: string, type: string, value?: any): void => { | ||
const message = type == null ? value : value == null ? | ||
{ type: type } : | ||
{ type: type, body: value }; | ||
|
||
export function publishMessagingStream(userId: ID, otherpartyId: ID, type: string, value?: any): void { | ||
publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
this.ev.emit(channel, message); | ||
} | ||
|
||
export function publishMessagingIndexStream(userId: ID, type: string, value?: any): void { | ||
publish(`messaging-index-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
public publishUserStream = (userId: ID, type: string, value?: any): void => { | ||
this.publish(`user-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
|
||
export function publishReversiStream(userId: ID, type: string, value?: any): void { | ||
publish(`reversi-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
public publishDriveStream = (userId: ID, type: string, value?: any): void => { | ||
this.publish(`drive-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
|
||
export function publishReversiGameStream(gameId: ID, type: string, value?: any): void { | ||
publish(`reversi-game-stream:${gameId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
public publishNoteStream = (noteId: ID, type: string): void => { | ||
this.publish(`note-stream:${noteId}`, null, noteId); | ||
} | ||
|
||
export function publishLocalTimelineStream(note: any): void { | ||
publish('local-timeline', null, note); | ||
} | ||
public publishUserListStream = (listId: ID, type: string, value?: any): void => { | ||
this.publish(`user-list-stream:${listId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
|
||
export function publishHybridTimelineStream(userId: ID, note: any): void { | ||
publish(userId ? `hybrid-timeline:${userId}` : 'hybrid-timeline', null, note); | ||
} | ||
public publishMessagingStream = (userId: ID, otherpartyId: ID, type: string, value?: any): void => { | ||
this.publish(`messaging-stream:${userId}-${otherpartyId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
|
||
public publishMessagingIndexStream = (userId: ID, type: string, value?: any): void => { | ||
this.publish(`messaging-index-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
|
||
public publishReversiStream = (userId: ID, type: string, value?: any): void => { | ||
this.publish(`reversi-stream:${userId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
|
||
public publishReversiGameStream = (gameId: ID, type: string, value?: any): void => { | ||
this.publish(`reversi-game-stream:${gameId}`, type, typeof value === 'undefined' ? null : value); | ||
} | ||
|
||
export function publishGlobalTimelineStream(note: any): void { | ||
publish('global-timeline', null, note); | ||
public publishLocalTimelineStream = async (note: any): Promise<void> => { | ||
const meta = await this.getMeta(); | ||
if (meta.disableLocalTimeline) return; | ||
this.publish('local-timeline', null, note); | ||
} | ||
|
||
public publishHybridTimelineStream = async (userId: ID, note: any): Promise<void> => { | ||
const meta = await this.getMeta(); | ||
if (meta.disableLocalTimeline) return; | ||
this.publish(userId ? `hybrid-timeline:${userId}` : 'hybrid-timeline', null, note); | ||
} | ||
|
||
public publishGlobalTimelineStream = (note: any): void => { | ||
this.publish('global-timeline', null, note); | ||
} | ||
} | ||
|
||
const publisher = new Publisher(); | ||
|
||
export default publisher; | ||
|
||
export const publishUserStream = publisher.publishUserStream; | ||
export const publishDriveStream = publisher.publishDriveStream; | ||
export const publishNoteStream = publisher.publishNoteStream; | ||
export const publishUserListStream = publisher.publishUserListStream; | ||
export const publishMessagingStream = publisher.publishMessagingStream; | ||
export const publishMessagingIndexStream = publisher.publishMessagingIndexStream; | ||
export const publishReversiStream = publisher.publishReversiStream; | ||
export const publishReversiGameStream = publisher.publishReversiGameStream; | ||
export const publishLocalTimelineStream = publisher.publishLocalTimelineStream; | ||
export const publishHybridTimelineStream = publisher.publishHybridTimelineStream; | ||
export const publishGlobalTimelineStream = publisher.publishGlobalTimelineStream; |