Skip to content

Commit

Permalink
Merge pull request blockscout#22 from mantlenetworkio/albert/fix_memo…
Browse files Browse the repository at this point in the history
…ry_leak

try to fix memory leak
  • Loading branch information
guoshijiang committed Nov 22, 2022
2 parents d96dda2 + 247c71e commit fdd0281
Show file tree
Hide file tree
Showing 6 changed files with 7,398 additions and 6,917 deletions.
2 changes: 2 additions & 0 deletions data-sync-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"@nestjs/config": "^2.2.0",
"@nestjs/core": "^8.0.0",
"@nestjs/platform-express": "^8.0.0",
"@nestjs/platform-socket.io": "6.10.14",
"@nestjs/schedule": "^2.1.0",
"@nestjs/swagger": "^5.2.0",
"@nestjs/typeorm": "^8.0.3",
Expand All @@ -37,6 +38,7 @@
"dotenv": "^16.0.0",
"ethers": "^5.7.0",
"mysql2": "^2.3.3",
"nest-status-monitor": "^0.1.4",
"pg": "^8.7.3",
"postgres": "^1.0.2",
"reflect-metadata": "^0.1.13",
Expand Down
3 changes: 3 additions & 0 deletions data-sync-service/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ import entities from './typeorm';
import { ScheduleModule } from '@nestjs/schedule';
import { L1IngestionModule } from './l1Ingestion/l1Ingestion.module';
import { L2IngestionModule } from './l2Ingestion/l2Ingestion.module';
import { StatusMonitorModule } from 'nest-status-monitor';
import statusMonitorConfig from './config/statusMonitor';

@Module({
imports: [
StatusMonitorModule.setUp(statusMonitorConfig),
ConfigModule.forRoot({ isGlobal: true }),
TypeOrmModule.forRootAsync({
imports: [ConfigModule],
Expand Down
31 changes: 31 additions & 0 deletions data-sync-service/src/config/statusMonitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export default {
pageTitle: 'data sync service Monitoring ',
// 配置端口
port: 3000,
// 这里记得加全局路由 '/api'
path: '/status',
ignoreStartsWith: '/health/alive',
spans: [
{
interval: 1, // Every second
retention: 60, // Keep 60 datapoints in memory
},
{
interval: 5, // Every 5 seconds
retention: 60,
},
{
interval: 15, // Every 15 seconds
retention: 60,
},
],
chartVisibility: {
cpu: true,
mem: true,
load: true,
responseTime: true,
rps: true,
statusCodes: true,
},
healthChecks: [],
};
180 changes: 121 additions & 59 deletions data-sync-service/src/l1Ingestion/l1Ingestion.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,13 @@ export class L1IngestionService {
},
} = item;
const { timestamp } = await this.web3.eth.getBlock(blockNumber);

const dataSource = getConnection();
const queryRunner = dataSource.createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction()
try {
const savedResult = await this.entityManager.save(TxnBatches, {
const savedResult = await queryRunner.manager.save(TxnBatches, {
batch_index: _batchIndex,
block_number: blockNumber.toString(),
hash: transactionHash,
Expand All @@ -179,12 +184,16 @@ export class L1IngestionService {
timestamp: new Date(Number(timestamp) * 1000).toISOString(),
inserted_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
});
})
result.push(savedResult);
await queryRunner.commitTransaction()
} catch (error) {
this.logger.error(
`l1 createTxnBatchesEvents blocknumber:${blockNumber} ${error}`,
);
await queryRunner.rollbackTransaction()
} finally {
await queryRunner.release()
}
}
return result;
Expand All @@ -208,8 +217,13 @@ export class L1IngestionService {
},
} = item;
const { timestamp } = await this.web3.eth.getBlock(blockNumber);

const dataSource = getConnection();
const queryRunner = dataSource.createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction()
try {
const savedResult = await this.entityManager.save(StateBatches, {
const savedResult = await queryRunner.manager.save(StateBatches, {
batch_index: _batchIndex,
block_number: blockNumber.toString(),
hash: transactionHash,
Expand All @@ -221,12 +235,16 @@ export class L1IngestionService {
timestamp: new Date(Number(timestamp) * 1000).toISOString(),
inserted_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
});
})
result.push(savedResult);
await queryRunner.commitTransaction()
} catch (error) {
this.logger.error(
`l1 createStateBatchesEvents blocknumber:${blockNumber} ${error}`,
);
await queryRunner.rollbackTransaction()
} finally {
await queryRunner.release()
}
}
return result;
Expand All @@ -241,8 +259,12 @@ export class L1IngestionService {
returnValues: { target, sender, message, messageNonce, gasLimit },
signature,
} = item;
const dataSource = getConnection();
const queryRunner = dataSource.createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction()
try {
const savedResult = await this.entityManager.save(L1SentMessageEvents, {
const savedResult = await queryRunner.manager.save(L1SentMessageEvents, {
tx_hash: transactionHash,
block_number: blockNumber.toString(),
target,
Expand All @@ -253,12 +275,16 @@ export class L1IngestionService {
signature,
inserted_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
});
})
result.push(savedResult);
await queryRunner.commitTransaction()
} catch (error) {
this.logger.error(
`l1 createSentEvents blocknumber:${blockNumber} ${error}`,
);
await queryRunner.rollbackTransaction()
} finally {
await queryRunner.release()
}
}
return result;
Expand All @@ -276,8 +302,13 @@ export class L1IngestionService {
returnValues: { msgHash },
signature,
} = item;

const dataSource = getConnection();
const queryRunner = dataSource.createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction()
try {
const savedResult = await this.entityManager.save(
const savedResult = await queryRunner.manager.save(
L1RelayedMessageEvents,
{
tx_hash: transactionHash,
Expand All @@ -287,12 +318,16 @@ export class L1IngestionService {
inserted_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
},
);
)
result.push(savedResult);
await queryRunner.commitTransaction()
} catch (error) {
this.logger.error(
`l1 createRelayedEvents blocknumber:${blockNumber} ${error}`,
);
await queryRunner.rollbackTransaction()
} finally {
await queryRunner.release()
}
}
return result;
Expand Down Expand Up @@ -323,30 +358,46 @@ export class L1IngestionService {
} else {
continue;
}
await this.entityManager.save(L1ToL2, {
hash: tx_hash,
l2_hash: l2_hash,
block: Number(block_number),
timestamp: relayedResult.timestamp,
tx_origin: sender,
queue_index: Number(message_nonce.toString()),
target: sender,
gas_limit: gas_limit,
inserted_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
});
await this.entityManager
.createQueryBuilder()
.update(L1SentMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: item.tx_hash })
.execute();
await this.entityManager
.createQueryBuilder()
.update(L2RelayedMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: relayedResult.tx_hash })
.execute();
const dataSource = getConnection();
const queryRunner = dataSource.createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction()
try {
// execute some operations on this transaction:
await queryRunner.manager.save(L1ToL2, {
hash: tx_hash,
l2_hash: l2_hash,
block: Number(block_number),
timestamp: relayedResult.timestamp,
tx_origin: sender,
queue_index: Number(message_nonce.toString()),
target: sender,
gas_limit: gas_limit,
inserted_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
})
await queryRunner.manager
.createQueryBuilder()
.update(L1SentMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: item.tx_hash })
.execute();
await queryRunner.manager
.createQueryBuilder()
.update(L2RelayedMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: relayedResult.tx_hash })
.execute();
// commit transaction now:
await queryRunner.commitTransaction()
} catch (err) {
// since we have errors let's rollback changes we made
await queryRunner.rollbackTransaction()
} finally {
// you need to release query runner which is manually created:
await queryRunner.release()
}

}
}
async createL2L1Relation() {
Expand All @@ -361,38 +412,49 @@ export class L1IngestionService {
messageNonce: sentList[i].message_nonce.toString(),
});
const relayedResult = await this.getRelayedEventByMsgHash(msgHash);
if (relayedResult) {
console.log('relayedResult.tx_hash', relayedResult.tx_hash);
await this.entityManager
.createQueryBuilder()
.update(L2ToL1)
.set({ hash: relayedResult.tx_hash, status: 'Relayed' })
.where('l2_hash = :l2_hash', { l2_hash: sentList[i].tx_hash })
.execute();
await this.entityManager
.createQueryBuilder()
.update(L2SentMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: sentList[i].tx_hash })
.execute();
await this.entityManager
.createQueryBuilder()
.update(L1RelayedMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: relayedResult.tx_hash })
.execute();
} else {
const totalElements = await this.getSccTotalElements();
const ltimestamp = Number(sentList[i].timestamp) / 1000
if (totalElements > sentList[i].block_number && ltimestamp + FraudProofWindow >= timestamp) {
await this.entityManager
const dataSource = getConnection();
const queryRunner = dataSource.createQueryRunner()
await queryRunner.connect()
await queryRunner.startTransaction()
try {
if (relayedResult) {
console.log('relayedResult.tx_hash', relayedResult.tx_hash);
await queryRunner.manager
.createQueryBuilder()
.update(L2ToL1)
.set({ status: 'Ready for Relay' })
.set({ hash: relayedResult.tx_hash, status: 'Relayed' })
.where('l2_hash = :l2_hash', { l2_hash: sentList[i].tx_hash })
.andWhere('status = :status', { status: 'Waiting' })
.execute();
await queryRunner.manager
.createQueryBuilder()
.update(L2SentMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: sentList[i].tx_hash })
.execute();
await queryRunner.manager
.createQueryBuilder()
.update(L1RelayedMessageEvents)
.set({ is_merge: true })
.where('tx_hash = :tx_hash', { tx_hash: relayedResult.tx_hash })
.execute();
} else {
const totalElements = await this.getSccTotalElements();
const ltimestamp = Number(sentList[i].timestamp) / 1000
if (totalElements > sentList[i].block_number && ltimestamp + FraudProofWindow >= timestamp) {
await queryRunner.manager
.createQueryBuilder()
.update(L2ToL1)
.set({ status: 'Ready for Relay' })
.where('l2_hash = :l2_hash', { l2_hash: sentList[i].tx_hash })
.andWhere('status = :status', { status: 'Waiting' })
.execute();
}
}
await queryRunner.commitTransaction()
} catch (error) {
await queryRunner.rollbackTransaction()
} finally {
await queryRunner.release()
}
}
}
Expand Down

0 comments on commit fdd0281

Please sign in to comment.