Skip to content

Commit

Permalink
feat(server): add app network traffic metering (#1892)
Browse files Browse the repository at this point in the history
feat(server): add app network traffic metering (#1892)

---------

Co-authored-by: HUAHUAI23 <lim@outlook.com>
  • Loading branch information
HUAHUAI23 and HUAHUAI23 committed Mar 7, 2024
1 parent c0ba9ea commit ef30cd9
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 11 deletions.
7 changes: 7 additions & 0 deletions server/src/billing/billing-creation-task.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export class BillingCreationTaskService {
}

// If last tick is less than 1 minute ago, return
// Limit concurrency? But then there's only ever one task, even with 3 server replicas. !!!
if (Date.now() - this.lastTick.getTime() < 1000 * 60) {
this.logger.debug(
`Skip billing creation task due to last tick time ${this.lastTick.toISOString()}`,
Expand Down Expand Up @@ -109,6 +110,7 @@ export class BillingCreationTaskService {

const meteringData = await this.billing.getMeteringData(
app,
latestBillingTime,
nextMeteringTime,
)
if (meteringData.cpu === 0 && meteringData.memory === 0) {
Expand Down Expand Up @@ -188,6 +190,10 @@ export class BillingCreationTaskService {
usage: priceInput.dedicatedDatabase.capacity,
amount: priceResult.dedicatedDatabase.capacity,
},
networkTraffic: {
usage: priceInput.networkTraffic,
amount: priceResult.networkTraffic,
},
},
startAt: startAt,
endAt: nextMeteringTime,
Expand Down Expand Up @@ -257,6 +263,7 @@ export class BillingCreationTaskService {
dto.memory = meteringData.memory
dto.storageCapacity = bundle.resource.storageCapacity
dto.databaseCapacity = bundle.resource.databaseCapacity
dto.networkTraffic = meteringData.networkTraffic || 0

dto.dedicatedDatabase = {
cpu: bundle.resource.dedicatedDatabase?.limitCPU || 0,
Expand Down
73 changes: 69 additions & 4 deletions server/src/billing/billing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,18 @@ import { ApplicationBilling } from './entities/application-billing'
import { CalculatePriceDto } from './dto/calculate-price.dto'
import { BillingQuery } from './interface/billing-query.interface'
import { PrometheusDriver } from 'prometheus-query'
import { Application } from 'src/application/entities/application'
import {
Application,
ApplicationState,
} from 'src/application/entities/application'
import { RegionService } from 'src/region/region.service'
import { Traffic } from './entities/network-traffic'
import { TrafficDatabase } from 'src/system-database'

@Injectable()
export class BillingService {
private readonly db = SystemDatabase.db
private readonly trafficDB = TrafficDatabase.db

constructor(
private readonly resource: ResourceService,
Expand Down Expand Up @@ -213,6 +219,11 @@ export class BillingService {
'dedicated database replicas option not found',
)

assert(
groupedOptions[ResourceType.NetworkTraffic],
'network traffic not found',
)

// calculate cpu price
const cpuOption = groupedOptions[ResourceType.CPU]
const cpuPrice = new Decimal(cpuOption.price).mul(dto.cpu)
Expand Down Expand Up @@ -257,12 +268,18 @@ export class BillingService {

const ddbTotalPrice = ddbCPUPrice.add(ddbMemoryPrice).add(ddbCapacityPrice)

const networkTrafficOption = groupedOptions[ResourceType.NetworkTraffic]
const networkTrafficPrice = new Decimal(networkTrafficOption.price).mul(
dto.networkTraffic || 0,
)

// calculate total price
const totalPrice = cpuPrice
.add(memoryPrice)
.add(storagePrice)
.add(databasePrice)
.add(ddbTotalPrice)
.add(networkTrafficPrice)

return {
cpu: cpuPrice.toNumber(),
Expand All @@ -274,34 +291,82 @@ export class BillingService {
memory: ddbMemoryPrice.toNumber(),
capacity: ddbCapacityPrice.toNumber(),
},
networkTraffic: networkTrafficPrice.toNumber(),
total: totalPrice.toNumber(),
}
}

async getMeteringData(app: Application, time: Date) {
async getMeteringData(app: Application, startAt: Date, endAt: Date) {
const region = await this.region.findOne(app.regionId)

const prom = new PrometheusDriver({
endpoint: region.prometheusConf.apiUrl,
})

const cpuTask = prom
.instantQuery(`laf:billing:cpu{appid="${app.appid}"}`, time)
.instantQuery(`laf:billing:cpu{appid="${app.appid}"}`, endAt)
.then((res) => res.result[0])
.then((res) => Number(res.value.value))

const memoryTask = prom
.instantQuery(`laf:billing:memory{appid="${app.appid}"}`, time)
.instantQuery(`laf:billing:memory{appid="${app.appid}"}`, endAt)
.then((res) => res.result[0])
.then((res) => Number(res.value.value))

const [cpu, memory] = await Promise.all([cpuTask, memoryTask]).catch(() => {
return [0, 0]
})

const networkTraffic = await this.getAppTrafficUsage(app, startAt, endAt)

return {
cpu,
memory,
networkTraffic,
}
}

async getAppTrafficUsage(
app: Application,
startAt: Date,
endAt: Date,
): Promise<number> {
if (!this.trafficDB) {
return 0
}

// If the application stops during the current hour, traffic for the current hour is still billed
const twoHoursAgo = new Date(Date.now() - 2 * 60 * 60 * 1000)
if (app.state === ApplicationState.Stopped && app.updatedAt < twoHoursAgo) {
return 0
}

const aggregationPipeline = [
{
$match: {
'traffic_meta.pod_type_name': app.appid,
timestamp: { $gte: startAt, $lt: endAt },
},
},
{
$group: {
_id: null,
totalSentBytes: { $sum: '$sent_bytes' },
},
},
]

// get app network usage
const result = await this.trafficDB
.collection<Traffic>('traffic')
.aggregate(aggregationPipeline)
.toArray()

// [ { _id: null, totalSentBytes: Long('70204946') } ]
const totalSentBytes = result[0]?.totalSentBytes || 0
const bytesPerMegabyte = 1024 * 1024
const totalSentMegabytes = Math.ceil(totalSentBytes / bytesPerMegabyte)

return totalSentMegabytes
}
}
10 changes: 9 additions & 1 deletion server/src/billing/dto/calculate-price.dto.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import { ApiProperty, OmitType } from '@nestjs/swagger'
import { IsNotEmpty, IsString } from 'class-validator'
import { IsNotEmpty, IsNumber, IsOptional, IsString } from 'class-validator'
import { UpdateApplicationBundleDto } from 'src/application/dto/update-application.dto'

export class CalculatePriceDto extends OmitType(UpdateApplicationBundleDto, [
'validate',
]) {
@ApiProperty({ example: 0.036 })
@IsOptional()
@IsNumber({}, { message: 'networkTraffic must be a number' })
networkTraffic?: number

@ApiProperty()
@IsNotEmpty()
@IsString()
Expand All @@ -18,6 +23,9 @@ export class CalculatePriceResultDto {
@ApiProperty({ example: 0.036 })
memory: number

@ApiProperty({ example: 0.036, required: false })
networkTraffic?: number

@ApiProperty({ example: 0.036 })
storageCapacity: number

Expand Down
27 changes: 27 additions & 0 deletions server/src/billing/entities/network-traffic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { ObjectId } from 'mongodb'

export class Traffic {
_id?: ObjectId

timestamp: Date

traffic_meta: TrafficMeta

recv_bytes: number

sent_bytes: number
}

export class TrafficMeta {
pod_address: string

pod_name: string

pod_namespace: string

pod_type: number

pod_type_name: string

traffic_tag: string
}
11 changes: 7 additions & 4 deletions server/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ export class ServerConfig {
return process.env.DATABASE_URL
}

static get TRAFFIC_DATABASE_URL() {
if (!process.env.TRAFFIC_DATABASE_URL) {
return
}
return process.env.TRAFFIC_DATABASE_URL
}

static get NOTIFICATION_CENTER_URL() {
return process.env.NOTIFICATION_CENTER_URL
}
Expand Down Expand Up @@ -47,10 +54,6 @@ export class ServerConfig {
return process.env.DISABLED_GATEWAY_TASK === 'true'
}

static get DISABLED_BUCKET_DOMAIN_TASK() {
return process.env.DISABLED_BUCKET_DOMAIN_TASK === 'true'
}

static get DISABLED_TRIGGER_TASK() {
return process.env.DISABLED_TRIGGER_TASK === 'true'
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/gateway/bucket-domain-task.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export class BucketDomainTaskService {

@Cron(CronExpression.EVERY_SECOND)
async tick() {
if (ServerConfig.DISABLED_BUCKET_DOMAIN_TASK) return
if (ServerConfig.DISABLED_GATEWAY_TASK) return

// Phase `Creating` -> `Created`
this.handleCreatingPhase().catch((err) => {
Expand Down
6 changes: 5 additions & 1 deletion server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@ import { SwaggerModule, DocumentBuilder } from '@nestjs/swagger'
import { ValidationPipe, VersioningType } from '@nestjs/common'
import { ServerConfig } from './constants'
import { InitializerService } from './initializer/initializer.service'
import { SystemDatabase } from './system-database'
import { SystemDatabase, TrafficDatabase } from './system-database'
import * as helmet from 'helmet'
import * as bodyParser from 'body-parser'

async function bootstrap() {
await SystemDatabase.ready

if (ServerConfig.TRAFFIC_DATABASE_URL) {
await TrafficDatabase.ready
}

const app = await NestFactory.create(AppModule, {
logger: ['error', 'warn', 'log', 'debug', 'verbose'],
})
Expand Down
31 changes: 31 additions & 0 deletions server/src/system-database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,34 @@ export class SystemDatabase {
}
}
}

export class TrafficDatabase {
private static readonly logger = new Logger(TrafficDatabase.name)
private static _client: MongoClient
static ready = this.initialize()

static get client() {
return this._client
}

static get db() {
return this.client?.db()
}

static async initialize() {
if (!ServerConfig.TRAFFIC_DATABASE_URL) {
this.logger.log('no traffic database connect url')
return
}
this._client = new MongoClient(ServerConfig.TRAFFIC_DATABASE_URL)
try {
const client = await this._client.connect()
this.logger.log('Connected to traffic database')
return client
} catch (err) {
this.logger.error('Failed to connect to traffic database')
this.logger.error(err)
process.exit(1)
}
}
}

0 comments on commit ef30cd9

Please sign in to comment.