Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
deb9f12
wip
Aug 7, 2023
13c1510
Merge branch 'main' into enhancement/organizations-in-opensearch-C-1978
Aug 8, 2023
d39e548
wip
Aug 8, 2023
fdbebde
Merge branch 'main' into enhancement/organizations-in-opensearch-C-1978
Aug 8, 2023
0a57be6
sync organizations
Aug 9, 2023
f73a70f
wip
Aug 10, 2023
5368d34
Merge branch 'main' into enhancement/organizations-in-opensearch-C-1978
Aug 10, 2023
de97285
fixes
Aug 10, 2023
e1d6966
fixes
Aug 10, 2023
e6afa7a
added last enriched at to the org index
Aug 10, 2023
ee14bc0
querying organizations with opensearch
epipav Aug 10, 2023
64ea737
sync all orgs convenience script
epipav Aug 10, 2023
df451ed
Merge branch 'main' into enhancement/organizations-in-opensearch-C-1978
epipav Aug 11, 2023
c73d2f9
Add member organizations to opensearch (#1329)
epipav Aug 11, 2023
f0063f8
Merge branch 'main' into enhancement/organizations-in-opensearch-C-1978
epipav Aug 11, 2023
6a6815e
added script code
Aug 11, 2023
cba7aea
Merge branch 'main' into enhancement/organizations-in-opensearch-C-1978
Aug 11, 2023
bfa6ba0
fixes for memberOrganizations and fieldTranslator
epipav Aug 11, 2023
c101537
Merge branch 'enhancement/organizations-in-opensearch-C-1978' of gith…
epipav Aug 11, 2023
88f35fa
fixed linting
Aug 11, 2023
90b51b7
fixed linting
Aug 11, 2023
aa08719
removed searchSyncedAt column from db results in the old organization…
Aug 11, 2023
ab06a5f
segment passed in organization service, empty filters excluded
epipav Aug 11, 2023
7502a67
Merge branch 'enhancement/organizations-in-opensearch-C-1978' of gith…
epipav Aug 11, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
alter table organizations
drop column "searchSyncedAt";
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- Add the column with NOT NULL and a default value
alter table organizations
add column "searchSyncedAt" timestamptz not null default now();

-- Then, make the column nullable
alter table organizations
alter column "searchSyncedAt" drop not null;
90 changes: 89 additions & 1 deletion backend/src/database/repositories/organizationRepository.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import lodash from 'lodash'
import { SyncStatus } from '@crowd/types'
import { FieldTranslatorFactory, OpensearchQueryParser } from '@crowd/opensearch'
import { PageData } from '@crowd/common'
import { OpenSearchIndex, SyncStatus } from '@crowd/types'
import Sequelize, { QueryTypes } from 'sequelize'
import SequelizeRepository from './sequelizeRepository'
import AuditLogRepository from './auditLogRepository'
Expand All @@ -9,6 +11,8 @@ import { IRepositoryOptions } from './IRepositoryOptions'
import QueryParser from './filters/queryParser'
import { QueryOutput } from './filters/queryTypes'
import OrganizationSyncRemoteRepository from './organizationSyncRemoteRepository'
import isFeatureEnabled from '@/feature-flags/isFeatureEnabled'
import { FeatureFlag } from '@/types/common'

const { Op } = Sequelize

Expand Down Expand Up @@ -460,6 +464,9 @@ class OrganizationRepository {
}
}

// compatibility issue
delete result.searchSyncedAt

return result
}

Expand Down Expand Up @@ -569,6 +576,87 @@ class OrganizationRepository {
})
}

static async findAndCountAllOpensearch(
{
filter = {} as any,
limit = 20,
offset = 0,
orderBy = 'joinedAt_DESC',
countOnly = false,
segments = [] as string[],
customSortFunction = undefined,
},
options: IRepositoryOptions,
): Promise<PageData<any>> {
const tenant = SequelizeRepository.getCurrentTenant(options)

const segmentsEnabled = await isFeatureEnabled(FeatureFlag.SEGMENTS, options)

const segment = segments[0]

const translator = FieldTranslatorFactory.getTranslator(OpenSearchIndex.ORGANIZATIONS)

const parsed = OpensearchQueryParser.parse(
{ filter, limit, offset, orderBy },
OpenSearchIndex.ORGANIZATIONS,
translator,
)

// add tenant filter to parsed query
parsed.query.bool.must.push({
term: {
uuid_tenantId: tenant.id,
},
})

if (segmentsEnabled) {
// add segment filter
parsed.query.bool.must.push({
term: {
uuid_segmentId: segment,
},
})
}

// exclude empty filters if any
parsed.query.bool.must = parsed.query.bool.must.filter((obj) => {
// Check if the object has a non-empty 'term' property
if (obj.term) {
return Object.keys(obj.term).length !== 0
}
return true
})

if (customSortFunction) {
parsed.sort = customSortFunction
}

const countResponse = await options.opensearch.count({
index: OpenSearchIndex.ORGANIZATIONS,
body: { query: parsed.query },
})

if (countOnly) {
return {
rows: [],
count: countResponse.body.count,
limit,
offset,
}
}

const response = await options.opensearch.search({
index: OpenSearchIndex.ORGANIZATIONS,
body: parsed,
})

const translatedRows = response.body.hits.hits.map((o) =>
translator.translateObjectToCrowd(o._source),
)

return { rows: translatedRows, count: countResponse.body.count, limit, offset }
}

static async findAndCountAll(
{
filter = {} as any,
Expand Down
22 changes: 20 additions & 2 deletions backend/src/services/organizationService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import Plans from '../security/plans'
import telemetryTrack from '../segment/telemetryTrack'
import { IServiceOptions } from './IServiceOptions'
import { enrichOrganization } from './helpers/enrichment'
import { getSearchSyncWorkerEmitter } from '@/serverless/utils/serviceSQS'

export default class OrganizationService extends LoggerBase {
options: IServiceOptions
Expand Down Expand Up @@ -120,6 +121,9 @@ export default class OrganizationService extends LoggerBase {

await SequelizeRepository.commitTransaction(transaction)

const searchSyncEmitter = await getSearchSyncWorkerEmitter()
await searchSyncEmitter.triggerOrganizationSync(this.options.currentTenant.id, record.id)

return record
} catch (error) {
await SequelizeRepository.rollbackTransaction(transaction)
Expand Down Expand Up @@ -151,6 +155,9 @@ export default class OrganizationService extends LoggerBase {
await SequelizeRepository.commitTransaction(transaction)
}

const searchSyncEmitter = await getSearchSyncWorkerEmitter()
await searchSyncEmitter.triggerOrganizationSync(this.options.currentTenant.id, record.id)

return record
} catch (error) {
await SequelizeRepository.rollbackTransaction(transaction)
Expand All @@ -177,6 +184,12 @@ export default class OrganizationService extends LoggerBase {
}

await SequelizeRepository.commitTransaction(transaction)

const searchSyncEmitter = await getSearchSyncWorkerEmitter()

for (const id of ids) {
await searchSyncEmitter.triggerRemoveOrganization(this.options.currentTenant.id, id)
}
} catch (error) {
await SequelizeRepository.rollbackTransaction(transaction)
throw error
Expand Down Expand Up @@ -204,8 +217,8 @@ export default class OrganizationService extends LoggerBase {
const orderBy = data.orderBy
const limit = data.limit
const offset = data.offset
return OrganizationRepository.findAndCountAll(
{ advancedFilter, orderBy, limit, offset },
return OrganizationRepository.findAndCountAllOpensearch(
{ filter: advancedFilter, orderBy, limit, offset, segments: data.segments },
this.options,
)
}
Expand All @@ -224,6 +237,11 @@ export default class OrganizationService extends LoggerBase {
)

await SequelizeRepository.commitTransaction(transaction)

const searchSyncEmitter = await getSearchSyncWorkerEmitter()
for (const id of ids) {
await searchSyncEmitter.triggerRemoveOrganization(this.options.currentTenant.id, id)
}
} catch (error) {
await SequelizeRepository.rollbackTransaction(transaction)
throw error
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/modules/organization/config/filters/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const organizationSearchFilter: SearchFilterConfig = {
return [
{
or: [
{ name: { textContains: value } },
{ displayName: { textContains: value } },
],
},
];
Expand Down
14 changes: 12 additions & 2 deletions services/apps/data_sink_worker/src/service/member.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ export default class MemberService extends LoggerBase {

if (organizationIds.length > 0) {
await this.nodejsWorkerEmitter.enrichMemberOrganizations(tenantId, id, organizationIds)
for (const orgId of organizationIds) {
await this.searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, orgId)
}
}

return id
Expand All @@ -112,7 +115,7 @@ export default class MemberService extends LoggerBase {
fireSync = true,
): Promise<void> {
try {
const updated = await this.store.transactionally(async (txStore) => {
const { updated, organizationIds } = await this.store.transactionally(async (txStore) => {
const txRepo = new MemberRepository(txStore, this.log)
const txMemberAttributeService = new MemberAttributeService(txStore, this.log)
const dbIdentities = await txRepo.getIdentities(id, tenantId)
Expand Down Expand Up @@ -176,12 +179,19 @@ export default class MemberService extends LoggerBase {
}
}

return updated
return { updated, organizationIds }
})

if (updated && fireSync) {
await this.searchSyncWorkerEmitter.triggerMemberSync(tenantId, id)
}

if (organizationIds.length > 0) {
await this.nodejsWorkerEmitter.enrichMemberOrganizations(tenantId, id, organizationIds)
for (const orgId of organizationIds) {
await this.searchSyncWorkerEmitter.triggerOrganizationSync(tenantId, orgId)
}
}
} catch (err) {
this.log.error(err, { memberId: id }, 'Error while updating a member!')
throw err
Expand Down
5 changes: 5 additions & 0 deletions services/apps/search_sync_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
"script:update-mappings": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/update-mappings.ts",
"script:sync-all-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-all-members.ts",
"script:sync-all-activities": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-all-activities.ts",
"script:sync-all-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-all-organizations.ts",
"script:sync-member": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-member.ts",
"script:sync-activity": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-activity.ts",
"script:sync-organization": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-organization.ts",
"script:sync-tenant-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-tenant-members.ts",
"script:sync-tenant-activities": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-tenant-activities.ts",
"script:sync-tenant-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/sync-tenant-organizations.ts",
"script:cleanup-tenant-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/cleanup-tenant-members.ts",
"script:cleanup-tenant-activities": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/cleanup-tenant-activities.ts",
"script:cleanup-tenant-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/cleanup-tenant-organizations.ts",
"script:cleanup-all-members": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/cleanup-all-members.ts",
"script:cleanup-all-activities": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/cleanup-all-activities.ts",
"script:cleanup-all-organizations": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/cleanup-all-activities.ts",
"script:delete-index": "SERVICE=script TS_NODE_TRANSPILE_ONLY=true node -r tsconfig-paths/register -r ts-node/register src/bin/delete-index.ts"
},
"dependencies": {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { DB_CONFIG } from '@/conf'
import { InitService } from '@/service/init.service'
import { OpenSearchService } from '@/service/opensearch.service'
import { OrganizationSyncService } from '@/service/organization.sync.service'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'

const log = getServiceLogger()

setImmediate(async () => {
const openSearchService = new OpenSearchService(log)
await openSearchService.initialize()

const dbConnection = getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const service = new OrganizationSyncService(store, openSearchService, log)

const pageSize = 100
let results = await service.getAllIndexedTenantIds(pageSize)

while (results.data.length > 0) {
for (const id of results.data) {
if (id !== InitService.FAKE_TENANT_ID) {
await service.cleanupOrganizationIndex(id)
}
}
if (results.afterKey) {
results = await service.getAllIndexedTenantIds(pageSize, results.afterKey)
} else {
results = { data: [] }
}
}

process.exit(0)
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { SQS_CONFIG } from '@/conf'
import { getServiceLogger } from '@crowd/logging'
import { SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: tenantId')
process.exit(1)
}

const tenantId = processArguments[0]

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new SearchSyncWorkerEmitter(sqsClient, log)
await emitter.init()

await emitter.triggerOrganizationCleanup(tenantId)
process.exit(0)
})
27 changes: 27 additions & 0 deletions services/apps/search_sync_worker/src/bin/sync-all-organizations.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { DB_CONFIG } from '@/conf'
import { OpenSearchService } from '@/service/opensearch.service'
import { DbStore, getDbConnection } from '@crowd/database'
import { getServiceLogger } from '@crowd/logging'
import { OrganizationRepository } from '@/repo/organization.repo'
import { OrganizationSyncService } from '@/service/organization.sync.service'

const log = getServiceLogger()

setImmediate(async () => {
const openSearchService = new OpenSearchService(log)
await openSearchService.initialize()

const dbConnection = getDbConnection(DB_CONFIG())
const store = new DbStore(log, dbConnection)

const repo = new OrganizationRepository(store, log)

const tenantIds = await repo.getTenantIds()

const service = new OrganizationSyncService(store, openSearchService, log)

for (const tenantId of tenantIds) {
await service.syncTenantOrganizations(tenantId, 500)
}
process.exit(0)
})
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { SQS_CONFIG } from '@/conf'
import { getServiceLogger } from '@crowd/logging'
import { SearchSyncWorkerEmitter, getSqsClient } from '@crowd/sqs'

const log = getServiceLogger()

const processArguments = process.argv.slice(2)

if (processArguments.length !== 1) {
log.error('Expected 1 argument: tenantId')
process.exit(1)
}

const tenantId = processArguments[0]

setImmediate(async () => {
const sqsClient = getSqsClient(SQS_CONFIG())
const emitter = new SearchSyncWorkerEmitter(sqsClient, log)
await emitter.init()

await emitter.triggerTenantOrganizationSync(tenantId)
process.exit(0)
})
Loading