-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.js
133 lines (121 loc) · 3.76 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
/**
* @fileoverview Google Cloud Function that will randomly choose synthetic, FHIR records from a GCS bucket.
* HTTP triggered function that Cloud Scheduler invokes.
* @author Joey Whelan <joey.whelan@gmail.com>
*/
const {Storage} = require('@google-cloud/storage');
const storage = new Storage();
const {google} = require('googleapis');
const healthcare = google.healthcare('v1beta1');
const BASE_URL = `projects/${process.env.PROJECT_ID}/locations/${process.env.LOCATION}` +
`/datasets/${process.env.DATASET}/fhirStores/${process.env.DATASTORE}`;
/**
* Produces a listing of the files in a GCS bucket
* @param {number} numBundles - max number of file to be fetched
* @return {Promise}
* @throws {Error} propagates exceptions
*/
async function fetchBundle(numBundles) {
const options = {
maxResults: numBundles
};
return await storage
.bucket(process.env.PATIENTS_BUCKET)
.getFiles(options);
}
/**
* Reads a FHIR bundle from GCS and then loads it to a Google Healthcare FHIR store
* @param {File} file - GCS File object
* @return {Promise}
* @throws {Error} propagates exceptions
*/
async function loadBundle(file) {
const auth = await google.auth.getClient({
scopes: ['https://www.googleapis.com/auth/cloud-platform'],
});
google.options({
auth,
headers: {
'Content-Type': 'application/fhir+json'
}
});
let bundle = '';
return new Promise((resolve, reject) => {
file.createReadStream()
.on('data', (chunk) => {
bundle += chunk;
})
.on('end', async () => {
const request = {
parent: BASE_URL,
type: 'Bundle',
requestBody: JSON.parse(bundle),
};
let response;
try {
response = await healthcare.projects.locations.datasets.fhirStores.fhir.executeBundle(request);
resolve(response.statusText);
}
catch(err) {
reject(err);
}
})
.on('error', (err) => {
reject(err);
});
});
}
/**
* Deletes a FHIR bundle from GCS
* @param {File} file - GCS File object
* @return {Promise}
* @throws {Error} propagates exceptions
*/
async function deleteBundle(file) {
return await file.delete();
}
/**
* Fetches FHIR Patient bundles from GCS, loads them into a Google Healthcare FHIR store,
* then deletes the bundles from GCS.
* @param {number} numBundles - number of bundles to be fetched, loaded, and deleted
* @return {Promise}
* @throws {Error} propagates exceptions
*/
async function processPatientBundles(numBundles) {
const [files] = await fetchBundle(numBundles);
console.log(`${files.length} bundles fetched`);
for (const file of files) {
console.log(`Loading bundle ${file.name}`);
let status = await loadBundle(file);
console.log(`${file.name} bundle loaded to FHIR store with status ${status}`);
await deleteBundle(file);
console.log(`${file.name} bundle deleted`);
}
return files.length;
}
/**
* GCF Pub/Sub trigger function.
*/
exports.generator = async (event, context) => {
try {
const numBundles = Math.floor(Math.random() * 3) + 1; //random num between 1 and 3
const count = await processPatientBundles(numBundles);
console.log(`${count} records processed.`);
}
catch(err) {
console.error(err);
}
};
/* test script */
/*
(async () => {
const numBundles = Math.floor(Math.random() * 3) + 1; //random num between 1 and 3
try {
const count = await processPatientBundles(numBundles);
console.log(`${count} bundles processed.`);
}
catch(err) {
console.error(err);
}
})();
*/