Skip to content
This repository was archived by the owner on Oct 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions api/models/thread.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,27 @@ export const getThreadsByChannel = (channelId: string, options: PaginationOption
};

// prettier-ignore
export const getThreadsByChannels = (channelIds: Array<string>, options: PaginationOptions): Promise<Array<DBThread>> => {
const { first, after } = options

type GetThreadsByChannelPaginationOptions = {
first: number,
after: number,
sort: 'new' | 'trending'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we want these options to be defined and expand over time? Is trending the same as popular? Can we ever sort by date? e.g. trending this week/month/year

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably won't be able to do trending this week/month/year with this current setup since we only store the score right now. I think other than that we probably want "TOP", but that's a different calculation altogether.

Not sure, any thoguhts/ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure either, just trying to think ahead a bit for what other constrains might enter the mix. But also don't want to get too caught up to prevent this from moving forward :)

};

export const getThreadsByChannels = (
channelIds: Array<string>,
options: GetThreadsByChannelPaginationOptions
): Promise<Array<DBThread>> => {
const { first, after, sort } = options;

let order = [db.desc('lastActive'), db.desc('createdAt')];
// If we want the top threads, first sort by the score and then lastActive
if (sort === 'trending') order.unshift(db.desc('score'));

return db
.table('threads')
.getAll(...channelIds, { index: 'channelId' })
.filter(thread => db.not(thread.hasFields('deletedAt')))
.orderBy(db.desc('lastActive'), db.desc('createdAt'))
.orderBy(...order)
.skip(after || 0)
.limit(first || 999999)
.run();
Expand Down Expand Up @@ -447,10 +460,10 @@ export const setThreadLock = (threadId: string, value: boolean, userId: string,
.run()
.then(async () => {
const thread = await getThreadById(threadId)
const event = value
? byModerator
? events.THREAD_LOCKED_BY_MODERATOR

const event = value
? byModerator
? events.THREAD_LOCKED_BY_MODERATOR
: events.THREAD_LOCKED
: byModerator
? events.THREAD_UNLOCKED_BY_MODERATOR
Expand Down
10 changes: 9 additions & 1 deletion api/mutations/message/addMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { trackUserThreadLastSeenQueue } from 'shared/bull/queues';
import type { FileUpload } from 'shared/types';
import { events } from 'shared/analytics';
import { isAuthedResolver as requireAuth } from '../../utils/permissions';
import { trackQueue } from 'shared/bull/queues';
import { trackQueue, calculateThreadScoreQueue } from 'shared/bull/queues';

type Input = {
message: {
Expand Down Expand Up @@ -344,6 +344,14 @@ export default requireAuth(async (_: any, args: Input, ctx: GraphQLContext) => {
timestamp: Date.now(),
});

calculateThreadScoreQueue.add(
{
threadId: message.threadId,
},
{
jobId: message.threadId,
}
);
return {
...dbMessage,
contextPermissions,
Expand Down
12 changes: 11 additions & 1 deletion api/mutations/message/deleteMessage.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { getUserPermissionsInChannel } from '../../models/usersChannels';
import { getUserPermissionsInCommunity } from '../../models/usersCommunities';
import { events } from 'shared/analytics';
import { isAuthedResolver as requireAuth } from '../../utils/permissions';
import { trackQueue } from 'shared/bull/queues';
import { trackQueue, calculateThreadScoreQueue } from 'shared/bull/queues';

type Input = {
id: string,
Expand Down Expand Up @@ -116,5 +116,15 @@ export default requireAuth(async (_: any, args: Input, ctx: GraphQLContext) => {
message.senderId
);
})
.then(() => {
return calculateThreadScoreQueue.add(
{
threadId: message.threadId,
},
{
jobId: message.threadId,
}
);
})
.then(() => true);
});
15 changes: 12 additions & 3 deletions api/mutations/thread/addThreadReaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { GraphQLContext } from '../../';
import { addThreadReaction } from '../../models/threadReaction';
import { getThreadById } from '../../models/thread';
import { isAuthedResolver as requireAuth } from '../../utils/permissions';
import { calculateThreadScoreQueue } from 'shared/bull/queues';

type Input = {
input: {
Expand All @@ -13,8 +14,16 @@ type Input = {

export default requireAuth(
async (_: any, args: Input, { user, loaders }: GraphQLContext) => {
return await addThreadReaction(args.input, user.id).then(
async () => await getThreadById(args.input.threadId)
);
return await addThreadReaction(args.input, user.id).then(() => {
calculateThreadScoreQueue.add(
{
threadId: args.input.threadId,
},
{
jobId: args.input.threadId,
}
);
return getThreadById(args.input.threadId);
});
}
);
15 changes: 12 additions & 3 deletions api/mutations/thread/removeThreadReaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { GraphQLContext } from '../../';
import { removeThreadReaction } from '../../models/threadReaction';
import { getThreadById } from '../../models/thread';
import { isAuthedResolver as requireAuth } from '../../utils/permissions';
import { calculateThreadScoreQueue } from 'shared/bull/queues';

type Input = {
input: {
Expand All @@ -12,8 +13,16 @@ type Input = {

export default requireAuth(
async (_: any, args: Input, { user, loaders }: GraphQLContext) => {
return await removeThreadReaction(args.input.threadId, user.id).then(
async () => await getThreadById(args.input.threadId)
);
return await removeThreadReaction(args.input.threadId, user.id).then(() => {
calculateThreadScoreQueue.add(
{
threadId: args.input.threadId,
},
{
jobId: args.input.threadId,
}
);
return getThreadById(args.input.threadId);
});
}
);
14 changes: 10 additions & 4 deletions api/queries/community/threadConnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,15 @@ import {
import { getThreadsByChannels } from '../../models/thread';
import { canViewCommunity } from '../../utils/permissions';

export type CommunityThreadConnectionPaginationOptions = {
after: string,
first: number,
sort: 'new' | 'trending',
};

// prettier-ignore
export default async (root: DBCommunity, args: PaginationOptions, ctx: GraphQLContext) => {
const { first = 10, after } = args
export default async (root: DBCommunity, args: CommunityThreadConnectionPaginationOptions, ctx: GraphQLContext) => {
const { first = 10, after, sort = 'new' } = args
const { user, loaders } = ctx
const { id } = root

Expand All @@ -29,7 +35,7 @@ export default async (root: DBCommunity, args: PaginationOptions, ctx: GraphQLCo
// Get the index from the encoded cursor, asdf234gsdf-2 => ["-2", "2"]
const lastDigits = cursor.match(/-(\d+)$/);
const lastThreadIndex =
lastDigits && lastDigits.length > 0 && parseInt(lastDigits[1], 10);
lastDigits && lastDigits.length > 0 && parseInt(lastDigits[1], 10) || 0;
const currentUser = user;

// if the user is signed in, only return stories for the channels
Expand All @@ -44,10 +50,10 @@ export default async (root: DBCommunity, args: PaginationOptions, ctx: GraphQLCo
channels = await getPublicChannelsByCommunity(id);
}

// $FlowFixMe
const threads = await getThreadsByChannels(channels, {
first,
after: lastThreadIndex,
sort,
});

return {
Expand Down
18 changes: 18 additions & 0 deletions api/queries/thread/rootThread.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// @flow
import { calculateThreadScoreQueue } from 'shared/bull/queues';
import type { GraphQLContext } from '../../';

export default async (
Expand Down Expand Up @@ -50,6 +51,23 @@ export default async (
(!communityPermissions || !communityPermissions.isMember)
)
return null;

// If the threads score hasn't been updated in the past
// 24 hours add a new job to the queue to update it
if (
(!thread.score && !thread.scoreUpdatedAt) ||
(thread.scoreUpdatedAt &&
Date.now() > new Date(thread.scoreUpdatedAt).getTime() + 86400000)
) {
calculateThreadScoreQueue.add(
{
threadId: thread.id,
},
{
jobId: thread.id,
}
);
}
return thread;
}
};
6 changes: 6 additions & 0 deletions api/types/Community.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ const Community = /* GraphQL */ `
message: String
}

enum CommunityThreadConnectionSort {
new
trending
}

type Community {
id: ID!
createdAt: Date
Expand All @@ -153,6 +158,7 @@ const Community = /* GraphQL */ `
threadConnection(
first: Int = 10
after: String
sort: CommunityThreadConnectionSort = new
): CommunityThreadsConnection @cost(complexity: 2, multiplier: "first")
metaData: CommunityMetaData @cost(complexity: 10)
invoices: [Invoice] @cost(complexity: 1)
Expand Down
1 change: 1 addition & 0 deletions mercury/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

// queues
export const PROCESS_REPUTATION_EVENT = 'process reputation event';
export const CALCULATE_THREAD_SCORE = 'calculate thread score';

// reputation event types
export const THREAD_CREATED = 'thread created';
Expand Down
4 changes: 3 additions & 1 deletion mercury/index.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
const debug = require('debug')('mercury');
const createWorker = require('../shared/bull/create-worker');
import processReputationEvent from './queues/processReputationEvent';
import { PROCESS_REPUTATION_EVENT } from './constants';
import calculateThreadScore from './queues/calculateThreadScore';
import { PROCESS_REPUTATION_EVENT, CALCULATE_THREAD_SCORE } from './constants';

const PORT = process.env.PORT || 3005;

Expand All @@ -10,6 +11,7 @@ debug('Logging with debug enabled!');

const server = createWorker({
[PROCESS_REPUTATION_EVENT]: processReputationEvent,
[CALCULATE_THREAD_SCORE]: calculateThreadScore,
});

debug(
Expand Down
102 changes: 101 additions & 1 deletion mercury/models/thread.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,109 @@
// @flow
const { db } = require('./db');
import type { DBThread } from 'shared/types';

export const getThread = (id: string): Promise<Object> => {
export const getThread = (id: string): Promise<DBThread> => {
return db
.table('threads')
.get(id)
.run();
};

export const getParticipantCount = (threadId: string): Promise<number> => {
return db
.table('usersThreads')
.getAll(threadId, { index: 'threadId' })
.filter({ isParticipant: true })
.count()
.default(0)
.run();
};

const timeRanges = {
hourly: { start: 3600, end: 0 },
daily: { start: 86400, end: 3600 },
weekly: { start: 604800, end: 86400 },
rest: { start: Date.now(), end: 604800 },
};

export const getParticipantCountByTime = (
threadId: string,
range: 'hourly' | 'daily' | 'weekly' | 'rest'
): Promise<number> => {
return db
.table('messages')
.getAll(threadId, { index: 'threadId' })
.filter(
db.row
.hasFields('deletedAt')
.not()
.and(
db
.row('timestamp')
.ge(db.now().sub(timeRanges[range].start))
.and(db.row('timestamp').le(db.now().sub(timeRanges[range].end)))
)
)
.map(rec => rec('senderId'))
.distinct()
.count()
.default(0)
.run();
};

export const getReactionCountByTime = (
threadId: string,
range: 'hourly' | 'daily' | 'weekly' | 'rest'
): Promise<number> => {
return db
.table('threadReactions')
.getAll(threadId, { index: 'threadId' })
.filter(
db.row
.hasFields('deletedAt')
.not()
.and(
db
.row('createdAt')
.ge(db.now().sub(timeRanges[range].start))
.and(db.row('createdAt').le(db.now().sub(timeRanges[range].end)))
)
)
.count()
.default(0)
.run();
};

export const getMessageCount = (threadId: string): Promise<number> => {
return db
.table('messages')
.getAll(threadId, { index: 'threadId' })
.filter(db.row.hasFields('deletedAt').not())
.count()
.default(0)
.run();
};

export const getReactionCount = (threadId: string): Promise<number> => {
return db
.table('threadReactions')
.getAll(threadId, { index: 'threadId' })
.filter(row => row.hasFields('deletedAt').not())
.count()
.default(0)
.run();
};

export const storeThreadScore = (
threadId: string,
score: number
): Promise<any> => {
return db
.table('threads')
.get(threadId)
.update({
score,
scoreUpdatedAt: new Date(),
})
.run();
};
Loading