Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
The diff you're trying to view is too large. We only load the first 3000 changed files.
24 changes: 24 additions & 0 deletions Nodejs12.16-COSAnalyzeInventory/config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"serverless-cloud-function-application": {
"Chinese": {
"name": "COS 清单文件简易分析",
"description": "本函数将源 bucket 中的 COS 清单文件进行简易分析,处理后将结果上传到指定 bucket",
"attention": "该函数模板仅供 COS 控制台使用,请勿自行创建",
"author": {
"name": "腾讯云"
}
},
"English": {
"name": "CosAnalyzeInventory",
"description": "This function will download cos inventory files from source bucket, analyze them, and upload result file to target bucket",
"attention": "This function template is only provided to COS console, please do not use it by yourself",
"author": {
"name": "Tencent Cloud"
}
},
"runtime": "Nodejs12.16",
"readme": "https://github.com/tencentyun/serverless-demo/tree/master/Nodejs12.16-COSAnalyzeInventory",
"version": "1.0.8",
"tags": ["Nodejs12.16", "analyze inventory", "cos analyze inventory"]
}
}
12 changes: 12 additions & 0 deletions Nodejs12.16-COSAnalyzeInventory/serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
component: scf
name: ap-guangzhou_default_CosAnalyzeInventory
inputs:
name: CosAnalyzeInventory
src:
src: ./src
handler: index.main_handler
runtime: Nodejs12.16
namespace: default
region: ap-guangzhou
memorySize: 512
timeout: 86400
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/* eslint-disable no-param-reassign */
const { streamPipelinePromise } = require('./utils');
const { Readable, PassThrough } = require('stream');
const Input = require('./Input/index');
const Filter = require('./Filter/index');
const Output = require('./Output/index');

class CosAnalyzeInventoryProcessTask {
constructor({
cosSdkInstance,
cosUpload,
bucket,
region,
key,
analyzeConfig,
sourceList,
}) {
const analyzeConfigColumns = analyzeConfig.columns.map(item => item.key);
sourceList = sourceList.map((item) => {
const {
type,
fileFormat = 'CSV',
fileSchema,
fileHeaderInfo = 'NONE',
fieldDelimiter,
} = item;
let inputConfig = {};
if (fileFormat === 'CSV') {
let columns = [];
if (fileSchema) {
columns = fileSchema
.split(',')
.map(key => key.trim())
.map(key => (key === 'Key' ? 'EncodedKey' : key));
} else {
columns = analyzeConfigColumns;
}
const unsetKeys = analyzeConfigColumns.filter(key => !columns.includes(key)
&& !['Prefix', 'PrefixDepth', 'Key', 'EncodedKey'].includes(key));
if (unsetKeys.length > 0) {
throw new Error(`${unsetKeys.join(', ')} has not found in file`);
}
if (analyzeConfig.inputExtractor === 'CosSelect') {
inputConfig = {
extractor: 'CosSelect',
params: {
Expression: `Select ${columns
.map((item, index) => `_${index + 1} as ${item === 'Key' ? 'EncodedKey' : item}`)
.join(', ')} from COSObject`,
ExpressionType: 'SQL',
InputSerialization: {
CompressionType: type === 'gzip' ? 'GZIP' : 'NONE',
CSV: {
FileHeaderInfo: fileHeaderInfo,
RecordDelimiter: '\n',
FieldDelimiter:
fieldDelimiter || analyzeConfig.colDelimiter || ',',
QuoteCharacter: '"',
QuoteEscapeCharacter: '"',
AllowQuotedRecordDelimiter: 'TRUE',
},
},
OutputSerialization: {
JSON: {
RecordDelimiter: '\n',
},
},
...(analyzeConfig.cosSelect || {}),
},
};
} else {
inputConfig = {
extractor: 'CosNormalCsv',
params: {
InputSerialization: {
RecordDelimiter: '\n',
FieldDelimiter:
fieldDelimiter || analyzeConfig.colDelimiter || ',',
Columns: analyzeConfig.columns || columns.map(key => ({ key })),
CompressionType: type === 'gzip' ? 'GZIP' : 'NONE',
},
},
};
}
}
return {
...item,
inputConfig,
};
});
Object.assign(this, {
cosSdkInstance,
cosUpload,
bucket,
region,
key,
analyzeConfig,
sourceList,
});
}
async runTask() {
const input = new Input({
cosSdkInstance: this.cosSdkInstance,
sourceList: this.sourceList,
});
const readStream = input.getReadStream();
const filter = new Filter({
filter: 'InventoryAnalyzeTransformStream',
params: {
...this.analyzeConfig,
destroySource: () => readStream.end(),
},
});
const output = new Output({
cosUpload: this.cosUpload,
consumer: 'COS',
params: {
objectConfig: {
Bucket: this.bucket,
Region: this.region,
Key: this.key,
},
extraConfig: {},
},
});
const passThrough = new PassThrough();
await this.targetSetHeader(passThrough);
const { streamList, promiseList } = output.getWriteStreamListAndPromiseList();
const result = await Promise.all([
streamPipelinePromise([
readStream,
filter.getTransformStream(),
passThrough,
]),
streamPipelinePromise([passThrough, ...streamList]),
...promiseList,
]);
return result.pop();
}
async targetSetHeader(passThrough) {
const {
select,
targetRowDelimiter = '\n',
targetColDelimiter = ',',
targetSetHeader = true,
} = this.analyzeConfig;
if (!targetSetHeader) {
return;
}
const headerStr = select.map(({ label, key }) => label || key).join(targetColDelimiter)
+ targetRowDelimiter;
Readable.from([headerStr]).pipe(passThrough, { end: false });
}
}

module.exports = CosAnalyzeInventoryProcessTask;
149 changes: 149 additions & 0 deletions Nodejs12.16-COSAnalyzeInventory/src/common/CosAnalyzeInventoryTask.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/* eslint-disable arrow-body-style */
/* eslint-disable no-unused-vars */
/* eslint-disable no-param-reassign */
const ScfInvokeTask = require('./ScfInvokeTask');
const CosAnalyzeInventoryProcessTask = require('./CosAnalyzeInventoryProcessTask');
const {
validateAnalyzeConfig,
fixAnalyzeConfig,
} = require('./Filter/utils/analyzeUtils');

class CosAnalyzeInventoryTask {
constructor({
cosSdkInstance,
scfSdkInstance,
cosUpload,
parentRequestId,
context,
functionName,
sourceList,
bucket,
region,
key,
prefix,
preAnalyzeConfig,
analyzeConfig,
}) {
Object.assign(this, {
cosSdkInstance,
scfSdkInstance,
cosUpload,
parentRequestId,
context,
functionName,
sourceList,
bucket,
region,
key: key || `${prefix}${functionName}/${context.request_id}/result.csv`,
prefix,
preAnalyzeConfig: preAnalyzeConfig
? fixAnalyzeConfig(preAnalyzeConfig)
: preAnalyzeConfig,
analyzeConfig: fixAnalyzeConfig(analyzeConfig),
});
}
async runTask() {
let result;
let error;
try {
let { sourceList } = this;
if (
!this.parentRequestId
&& this.preAnalyzeConfig
&& this.preAnalyzeConfig.columns
&& this.preAnalyzeConfig.columns.length
) {
const partNumber = 20;
const scfInvokeTask = new ScfInvokeTask({
scfSdkInstance: this.scfSdkInstance,
parallel: partNumber,
paramsList: this.getSplitList({
sourceList,
partNumber,
}).map((items, index) => ({
Region: this.context.tencentcloud_region,
FunctionName: this.context.function_name,
Namespace: this.context.namespace,
InvocationType: 'Event',
Qualifier: '$DEFAULT',
ClientContext: JSON.stringify({
bucket: this.bucket,
region: this.region,
key: `${this.prefix}${this.functionName}/${this.context.request_id}/tmp/${index}`,
analyzeConfig: {
...this.preAnalyzeConfig,
targetSetHeader: false,
},
sourceList: items,
parentRequestId: this.context.request_id,
}),
})),
});
const scfInvokeTaskResults = await scfInvokeTask.runTask();
sourceList = scfInvokeTaskResults.map(({ params }) => {
const { ClientContext } = params;
const { bucket, region, key } = JSON.parse(ClientContext);
return {
bucket,
region,
key,
url: this.cosSdkInstance.getObjectUrl({
Bucket: bucket,
Region: region,
Key: key,
Sign: false,
}),
fileFormat: 'CSV',
fileSchema: this.analyzeConfig.columns
.map(item => item.key)
.join(', '),
};
});
}
const cosAnalyzeInventoryProcessTask = new CosAnalyzeInventoryProcessTask({
cosSdkInstance: this.cosSdkInstance,
cosUpload: this.cosUpload,
bucket: this.bucket,
region: this.region,
key: this.key,
analyzeConfig: this.analyzeConfig,
sourceList,
});
result = await cosAnalyzeInventoryProcessTask.runTask();
} catch (err) {
error = err;
}
return {
params: {
parentRequestId: this.parentRequestId,
context: this.context,
functionName: this.functionName,
sourceListLength: this.sourceList.length,
bucket: this.bucket,
region: this.region,
key: this.key,
prefix: this.prefix,
preAnalyzeConfig: this.preAnalyzeConfig,
analyzeConfig: this.analyzeConfig,
},
result,
error,
};
}
getSplitList({ sourceList, partNumber = 10 }) {
if (sourceList.length <= partNumber) {
return sourceList.map(item => [item]);
}
const list = [...sourceList];
const results = Array(partNumber)
.fill()
.map(() => []);
for (let i = 0, len = list.length; i < len; i++) {
const item = list[i];
results[i % partNumber].push(item);
}
return results;
}
}

module.exports = CosAnalyzeInventoryTask;
33 changes: 33 additions & 0 deletions Nodejs12.16-COSAnalyzeInventory/src/common/Filter/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
const InventoryAnalyzeTransformStream = require('./types/InventoryAnalyzeTransformStream');
const CustomTransformStream = require('./types/CustomTransformStream');
const { getCamelCaseData } = require('../utils');

const { PassThrough } = require('stream');

class Filter {
constructor({ filter = 'PassThrough', params = {} }) {
Object.assign(this, {
filter,
params,
camelCaseParams: getCamelCaseData(params),
});
}
getTransformStream() {
const { filter } = this;
if (filter === 'PassThrough') {
return new PassThrough({
readableObjectMode: true,
writableObjectMode: true,
});
}
if (filter === 'Custom') {
return new CustomTransformStream(this.params);
}
if (filter === 'InventoryAnalyzeTransformStream') {
return new InventoryAnalyzeTransformStream(this.params);
}
throw new Error(`unknown filter: ${filter}`);
}
}

module.exports = Filter;
Loading