In [None]:
! pip install google-cloud-bigquery --quiet

In [None]:
# with SA
import os
from google.cloud import bigquery

sa_file_name = [x for x in os.listdir('.') if x.endswith('.json')][0]

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = sa_file_name

In [None]:
table_str = """sessionid int64,
    sessionseqnum int64,
    nodeid int64,
    profileid int64,
    resourceid int64,
    startdatetime timestamp,
    enddatetime timestamp,
    qindex int64,
    gmtoffset int64,
    ringtime int64,
    talktime int64,
    holdtime int64,
    worktime int64,
    callwrapupdata string,
    callresult int64,
    dialinglistid int64,
    extractdatetime timestamp """

columns_list = [x.replace('\n', '').strip().replace('int64', 'integer').split(' ') for x in table_str.split(',')]

schema_list = []
for x in columns_list:
  schema_list.append(bigquery.SchemaField(x[0], x[1]))



In [None]:
from google.cloud import bigquery

client = bigquery.Client()

table_name = "uccx_22042_brazil.agent_connection_detail_dump"
file_uri = "gs://gcs_file_to_bq_new/AgentConnectionDetail_brazil_test.csv"


# config job config
job_config = bigquery.LoadJobConfig(
    schema=schema_list,
    skip_leading_rows=1,
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    source_format=bigquery.SourceFormat.CSV,
    field_delimiter='|'
)

load_job = client.load_table_from_uri(file_uri, table_name, job_config=job_config)

# wait to finish
load_job.result() 

<google.cloud.bigquery.job.LoadJob at 0x7fc5da01f5c0>

In [None]:
! pip install google-cloud-storage --quiet

In [None]:
from google.cloud import storage

client = storage.Client()
file_uri = "gs://gcs_file_to_bq_new_cloud_func/AgentConnectionDetail_brazil_test.csv"
bucket_name = "gcs_file_to_bq_new_cloud_func"
file_name = file_uri.split('/')[-1]
archive_folder_name = 'archive'

bucket = client.get_bucket(bucket_name)
blob = bucket.get_blob(file_name)

# try:
#   bucket.copy_blob(blob, bucket, new_name="{}/{}".format(archive_folder_name, file_name))

#   bucket.delete(blob)
# except Exception as e:
#   raise Exception("When to copy file from GCS to archive with error: {}".format(e))
print(dir(blob))


['_CHUNK_SIZE_MULTIPLE', '_STORAGE_CLASSES', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_acl', '_changes', '_chunk_size', '_do_download', '_do_multipart_upload', '_do_resumable_upload', '_do_upload', '_encryption_headers', '_encryption_key', '_get_content_type', '_get_download_url', '_get_transport', '_get_upload_arguments', '_get_writable_metadata', '_initiate_resumable_upload', '_patch_property', '_properties', '_query_params', '_require_client', '_set_properties', 'acl', 'bucket', 'cache_control', 'chunk_size', 'client', 'component_count', 'compose', 'content_disposition', 'content_encoding', 'content_language', 'content_type', 'crc32c', 'create_resumable_upload_session', 'delete', 'dow

In [None]:
# let's make it into cloud function, cloud function has to be in main.py
%%writefile main.py
from google.cloud import bigquery
from google.cloud import storage

storage_client = storage.Client()
bigquery_client = bigquery.Client()


def get_table_schema():
  table_str = """sessionid int64,
    sessionseqnum int64,
    nodeid int64,
    profileid int64,
    resourceid int64,
    startdatetime timestamp,
    enddatetime timestamp,
    qindex int64,
    gmtoffset int64,
    ringtime int64,
    talktime int64,
    holdtime int64,
    worktime int64,
    callwrapupdata string,
    callresult int64,
    dialinglistid int64,
    extractdatetime timestamp """

  columns_list = [x.replace('\n', '').strip().replace('int64', 'integer').split(' ') for x in table_str.split(',')]

  schema_list = []
  for x in columns_list:
    schema_list.append(bigquery.SchemaField(x[0], x[1]))
  
  return schema_list


def move_gcs_file_into_archive(file_uri, archive_folder_name="archive"):
  """
  As the notification is based on bucket, so we have to move the files into archive folder, 
  then we could avoid re-run the load job again."""
  bucket_name = file_uri.split('/')[2]
  file_name = file_uri.split('/')[-1]

  bucket = storage_client.get_bucket(bucket_name)
  blob = bucket.get_blob(file_name)

  try:
    bucket.copy_blob(blob, bucket, new_name="{}/{}".format(archive_folder_name, file_name))

    bucket.delete(blob)

    print("File: {} has been moved into archive folder:{}".format(file_name, archive_folder_name))
  except Exception as e:
    raise Exception("When to copy file from GCS to archive with error: {}".format(e))

  
def load_file_into_bq(event, context):
    client = bigquery.Client()

    table_name = "uccx_22042_brazil.agent_connection_detail_dump"
    file_uri = "gs://gcs_file_to_bq_new/AgentConnectionDetail_brazil_test.csv"

    schema_list = get_table_schema()

    # config job config
    job_config = bigquery.LoadJobConfig(
        schema=schema_list,
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter='|'
    )

    load_job = client.load_table_from_uri(file_uri, table_name, job_config=job_config)

    # wait to finish
    load_job.result() 

    print("start to do archive logic")
    move_gcs_file_into_archive(file_uri)

    print("Load action has finished")


Overwriting main.py


In [None]:
# have to add a requirements.txt file, otherwise couldn't find the package
%%writefile requirements.txt
google-cloud-bigquery
google-cloud-storage

Writing requirements.txt


In [None]:
# set prioject for cloud function
! gcloud config set project cloudtutorial-296001

Updated property [core/project].


In [None]:
# need login
! gcloud auth login

Go to the following link in your browser:

    https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=32555940559.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=openid+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fuserinfo.email+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcloud-platform+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fappengine.admin+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fcompute+https%3A%2F%2Fwww.googleapis.com%2Fauth%2Faccounts.reauth&state=26z7z1T4EU9SeaYsZDGupdsbqpQ6XJ&prompt=consent&access_type=offline&code_challenge=RoHacIGMApU_djOam9Qkz12ZNbdmbvypFEPfbMGh7Vw&code_challenge_method=S256

Enter verification code: 4/1AY0e-g6WkrCBN7iAQOAfyzOhIs7DvlX0RCYWXoHjyekpQcIABjuiuvPOJkg

You are now logged in as [gqianglu1990@gmail.com].
Your current project is [cloudtutorial-296001].  You can change this setting by running:
  $ gcloud config set project PROJECT_ID


In [None]:
# deploy the code
%%bash
gcloud functions deploy load_file_into_bq \
--runtime python37 \
--trigger-resource gcs_file_to_bq_new \
--trigger-event google.storage.object.finalize

availableMemoryMb: 256
buildId: efd6d87b-99e8-4a1c-8fe1-8aac4229eb65
entryPoint: load_file_into_bq
eventTrigger:
  eventType: google.storage.object.finalize
  failurePolicy: {}
  resource: projects/_/buckets/gcs_file_to_bq_new
  service: storage.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/cloudtutorial-296001/locations/us-central1/functions/load_file_into_bq
runtime: python37
serviceAccountEmail: cloudtutorial-296001@appspot.gserviceaccount.com
sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-0d10e295-9b7c-4042-a1ec-11c8a01089fa/6b5a291e-4d2f-4e99-9169-ea6503a222ec.zip?GoogleAccessId=service-299658701686@gcf-admin-robot.iam.gserviceaccount.com&Expires=1605669701&Signature=gcLyQ1aVNogwC3Q1fZJXlO8FdRVGb0rrhQiB54n09TzH6PNLrdsGLxk77bD1cTmvggtqTJcqaeDVzRSsBNO62oaYRXsSC9ZguyNpbZA0PTDNRFMDji5PPSZS4EIu24cZuare8JySPDw%2Bp%2BzVWREgBy64Q%2BgGV22daEse3D5B%2BYY22vmiCw0LiwT2i73U49h38COtD3ayoktWzpsvdJct0pBBT4OrjTdrlSsDX3ECBQVb7Y

Deploying function (may take a while - up to 2 minutes)...
..
For Cloud Build Stackdriver Logs, visit: https://console.cloud.google.com/logs/viewer?project=cloudtutorial-296001&advancedFilter=resource.type%3Dbuild%0Aresource.labels.build_id%3Defd6d87b-99e8-4a1c-8fe1-8aac4229eb65%0AlogName%3Dprojects%2Fcloudtutorial-296001%2Flogs%2Fcloudbuild
.............................done.


### Storage to pubsub then Cloud functions

As we could make our cloud functions based on the change of the bucket, but couldn't based on file, so let's try to use **notification** based on storage to pubsub.

In [None]:
# make notification
! gsutil notification create -t gcs-to-bq-cloud -f json gs://gcs_file_to_bq_new_cloud_func

Created Cloud Pub/Sub topic projects/cloudtutorial-296001/topics/gcs-to-bq-cloud
Created notification config projects/_/buckets/gcs_file_to_bq_new_cloud_func/notificationConfigs/1


In [None]:
%%writefile main.py
import base64
import json

from google.cloud import bigquery
from google.cloud import storage

storage_client = storage.Client()
bigquery_client = bigquery.Client()

dataset_name = "uccx_22042_brazil"
bucket_name = "gcs_file_to_bq_new_cloud_func"

bucket = storage_client.get_bucket(bucket_name)


def check_blob_exist_or_not(file_name):
  blob = bucket.get_blob(file_name)

  if not blob or not blob.exists():
    return False
  else:
    return True


def get_table_schema():
  table_str = """sessionid int64,
    sessionseqnum int64,
    nodeid int64,
    profileid int64,
    resourceid int64,
    startdatetime timestamp,
    enddatetime timestamp,
    qindex int64,
    gmtoffset int64,
    ringtime int64,
    talktime int64,
    holdtime int64,
    worktime int64,
    callwrapupdata string,
    callresult int64,
    dialinglistid int64,
    extractdatetime timestamp """

  columns_list = [x.replace('\n', '').strip().replace('int64', 'integer').split(' ') for x in table_str.split(',')]

  schema_list = []
  for x in columns_list:
    schema_list.append(bigquery.SchemaField(x[0], x[1]))
  
  return schema_list


def move_gcs_file_into_archive(file_uri, archive_folder_name="archive"):
  """
  As the notification is based on bucket, so we have to move the files into archive folder, 
  then we could avoid re-run the load job again."""
  bucket_name = file_uri.split('/')[2]
  file_name = file_uri.split('/')[-1]

  if not check_blob_exist_or_not(file_name):
    print("When try to use movement logic, File not exist")
    return None

  bucket = storage_client.get_bucket(bucket_name)
  blob = bucket.get_blob(file_name)

  try:
    # actually here will still trigger another message, as there are modification in the bucket, also
    # with delete...
    bucket.copy_blob(blob, bucket, new_name="{}/{}".format(archive_folder_name, file_name))

    bucket.delete_blob(file_name)

    print("File: {} has been moved into archive folder:{}".format(file_name, archive_folder_name))
  except Exception as e:
    raise Exception("When to copy file from GCS to archive with error: {}".format(e))

  
def load_file_into_bq(file_name, table_name):
    table_name = "{}.{}".format(dataset_name, table_name)
    file_uri = "gs://{}/{}".format(bucket_name, file_name)

    # This is to check the file exist in the bucket or not.
    if not check_blob_exist_or_not(file_name):
      print("We don't find the file, so couldn't load file into BQ!")
      return "We don't find the file, so couldn't load file into BQ"

    schema_list = get_table_schema()

    # config job config
    job_config = bigquery.LoadJobConfig(
        schema=schema_list,
        skip_leading_rows=1,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter='|'
    )

    print("Get file uri:", file_uri)

    load_job = bigquery_client.load_table_from_uri(file_uri, table_name, job_config=job_config)

    # wait to finish
    load_result = load_job.result() 

    if load_result.state == 'DONE':
      print("start to do archive logic AS load has finished")
      move_gcs_file_into_archive(file_uri)

      print("Load action has finished")


def sub_storage_pub(event, context):
  """event is a dict"""
  print("Function is triggerred by messageID:{} published at {}".format(context.event_id, context.timestamp))
  # print("Get event data: ", json.dumps(event))

  if "data" in event:
    # name = base64.b64decode(event['data']).decode('utf-8')
    event_data = json.loads(base64.b64decode(event['data']).decode('utf-8'))
    file_name = event_data['name']
    if file_name.startswith('AgentConnectionDetail_brazil'):
      load_file_into_bq(file_name="AgentConnectionDetail_brazil_test.csv", table_name="agent_connection_detail_dump")

    if file_name.startswith("AgentConnectionDetail_india"):
      load_file_into_bq(file_name="AgentConnectionDetail_india.csv", table_name="agent_connection_detail_dump")

    else:
      print("We don't find statisfy files.")
  else:
    print("Not good message.")
  
  

Overwriting main.py


In [None]:
%%bash
gcloud functions deploy sub_storage_pub \
--runtime python37 \
--trigger-topic gcs-to-bq-cloud

availableMemoryMb: 256
buildId: d3833476-8877-4fde-9638-568ea27a707c
entryPoint: sub_storage_pub
eventTrigger:
  eventType: google.pubsub.topic.publish
  failurePolicy: {}
  resource: projects/cloudtutorial-296001/topics/gcs-to-bq-cloud
  service: pubsub.googleapis.com
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/cloudtutorial-296001/locations/us-central1/functions/sub_storage_pub
runtime: python37
serviceAccountEmail: cloudtutorial-296001@appspot.gserviceaccount.com
sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-0d10e295-9b7c-4042-a1ec-11c8a01089fa/079e7cde-e116-4134-87bc-460eb83490ac.zip?GoogleAccessId=service-299658701686@gcf-admin-robot.iam.gserviceaccount.com&Expires=1605841801&Signature=CscOiVaH35RYJJfL1SKoeGaSWSn%2F7jwWpnucp%2BnLDh50U8yII5wuCybVAvShT3O%2F%2F7wK9%2FZIphMdELwJWiKm%2F4D8rbehwU7cayktqivVhCuJMcc8lJC9FEO%2FPpn8wRuZZyfCDFCjvov1OI7%2B1JH%2BYTxfQ1edjVTg4BxKp5b1tY4fBn4ZlIBhGHzrSVx8HDbAlZcQmzCSfm3%2BLdD%2FFuOC0COwF

Deploying function (may take a while - up to 2 minutes)...
..
For Cloud Build Stackdriver Logs, visit: https://console.cloud.google.com/logs/viewer?project=cloudtutorial-296001&advancedFilter=resource.type%3Dbuild%0Aresource.labels.build_id%3Dd3833476-8877-4fde-9638-568ea27a707c%0AlogName%3Dprojects%2Fcloudtutorial-296001%2Flogs%2Fcloudbuild
......................................done.


In [None]:
! gsutil cp *.csv gs://gcs_file_to_bq_new_cloud_func

Copying file://AgentConnectionDetail_brazil_test.csv [Content-Type=text/csv]...
Copying file://AgentConnectionDetail_india.csv [Content-Type=text/csv]...
-
Operation completed over 2 objects/22.2 KiB.                                     


### Cloud function with **PUSH** logic by http trigger

In [None]:
%%writefile main.py
import json

def hello_pubsub(request):
    import base64

    # print("""This Function was triggered by messageId {} published at {}
    # """.format(context.event_id, context.timestamp))
    request_data = request.get_json(silent=True)
    request_args = request.args

    print("Get requst:", request_data)
    if not request_data:
      request_data = request.form

    # here to add data extraction logic from pubsub
    if 'data' in request_data['message']:
      event_data = json.loads(base64.b64decode(request_data['message']['data']).decode('utf-8'))
      name = event_data['name']
      print("Get name", name)
    else:
      name = 'world'

    # if request_data and 'name' in request_data:
    #   print("Now is json data")
    #   name = request_data['name']
    # elif request_args and 'name' in request_args:
    #   name = request_args['name']
    # else:
    #     name = 'World'

    print('Hello {}!'.format(name))

Overwriting main.py


In [None]:
! gcloud functions deploy hello_pubsub --trigger-http --runtime python37


For Cloud Build Stackdriver Logs, visit: https://console.cloud.google.com/logs/viewer?project=cloudtutorial-296001&advancedFilter=resource.type%3Dbuild%0Aresource.labels.build_id%3Da0f5d0c8-79b3-448d-b372-478eda0d1774%0AlogName%3Dprojects%2Fcloudtutorial-296001%2Flogs%2Fcloudbuild
availableMemoryMb: 256
buildId: a0f5d0c8-79b3-448d-b372-478eda0d1774
entryPoint: hello_pubsub
httpsTrigger:
  url: https://us-central1-cloudtutorial-296001.cloudfunctions.net/hello_pubsub
ingressSettings: ALLOW_ALL
labels:
  deployment-tool: cli-gcloud
name: projects/cloudtutorial-296001/locations/us-central1/functions/hello_pubsub
runtime: python37
serviceAccountEmail: cloudtutorial-296001@appspot.gserviceaccount.com
sourceUploadUrl: https://storage.googleapis.com/gcf-upload-us-central1-0d10e295-9b7c-4042-a1ec-11c8a01089fa/5c82f515-a3a5-479f-928c-e65d6b5d809b.zip?GoogleAccessId=service-299658701686@gcf-admin-robot.iam.gserviceaccount.com&Expires=1605854283&Signature=dkykeBRA0heVbd2%2BDe8o9gvEnHC%2BNPUgOZr80

In [None]:
%%bash
curl -X POST "https://us-central1-cloudtutorial-296001.cloudfunctions.net/hello_pubsub" -H "Content-Type:application/json" --data '{"name":"lugq"}'


OK

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100    17  100     2  100    15     11     87 --:--:-- --:--:-- --:--:--   100


In [None]:
# make a post to our cloud function
import requests
import json

url = "https://us-central1-cloudtutorial-296001.cloudfunctions.net/hello_pubsub"

data = {"name": 'lugq'}

requests.post(url, data={"name": "lugq"})

<Response [200]>