-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
76 lines (60 loc) · 2.31 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import json
import base64
import os
from google.cloud import storage
def move_image(event, context):
print(
f"This Function was triggered by messageId {context.event_id} published at {context.timestamp}"
)
prediction_threshold = float(os.environ.get("prediction_threshold"))
prediction_bucket = os.environ.get("prediction_bucket")
print("Got Env Variables:")
print("prediction threshold= ", prediction_threshold)
print("prediction bucket = ", prediction_bucket)
inbound_bucket, pic_name, prediction_label, prediction_score = decode_data(
event["data"]
)
new_blob_name = get_new_blob_name(
pic_name, prediction_label, prediction_score, prediction_threshold
)
print("new blob = ", new_blob_name)
mv_blob(inbound_bucket, pic_name, prediction_bucket, new_blob_name)
def decode_data(data):
print("Start decoding...")
print("Received data: ", data)
data_decoded = base64.b64decode(data)
print("base64 decoded:, ", data_decoded)
msg = json.loads(data_decoded.decode("utf-8"))
print("msg dict: ", msg)
return (
msg["bucket_name"],
msg["image_name"],
msg["prediction_label"],
msg["prediction_score"],
)
def get_new_blob_name(
blob_name, prediction_label, prediction_score, prediction_threshold
):
folder_name = get_blob_folder_by_prediction(
prediction_label, prediction_score, prediction_threshold
)
print("new folder name: ", folder_name)
return folder_name + "/" + blob_name
def get_blob_folder_by_prediction(
prediction_label, prediction_score, prediction_threshold
):
folder_name = prediction_label
if prediction_score < prediction_threshold:
folder_name = "unclear"
return folder_name
def mv_blob(bucket_name, blob_name, new_bucket_name, new_blob_name):
print(f"Start moving {blob_name} to {new_blob_name}")
storage_client = storage.Client()
source_bucket = storage_client.get_bucket(bucket_name)
source_blob = source_bucket.blob(blob_name)
destination_bucket = storage_client.get_bucket(new_bucket_name)
# copy to new destination
new_blob = source_bucket.copy_blob(source_blob, destination_bucket, new_blob_name)
# delete in old destination
source_blob.delete()
print(f"File moved from {source_blob} to {new_blob_name}")