Copyright [2018] [Google]
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

In [0]:
!pip install google-cloud-bigquery-reservation

# Configure Project Variables

In [0]:
admin_project_id =  ''#'Project that owns the slot capacity'
project_id = ''#'BigQuery Project to use slots'
region = 'US'

# Authenticate

In [0]:
from google.colab import auth
auth.authenticate_user()

# https://cloud.google.com/resource-manager/docs/creating-managing-projects
!gcloud config set project {project_id}

# On Demand Query

In [0]:
from google.cloud import bigquery
from google.cloud.bigquery.job import QueryJobConfig
import time

def run_query(prefix, dry_run=True):
  client = bigquery.Client(project=project_id)
  config = QueryJobConfig(use_query_cache=False, 
                          use_legacy_sql=False,
                          dry_run=dry_run)
  
  query = '''
    SELECT source_id_mf, single_date, frame_id, count(*) OVER () as total
    FROM `bigquery-public-data.wise_all_sky_data_release.mep_wise` a
    INNER JOIN
  	  (
        SELECT state_geom 
    	  FROM `bigquery-public-data.geo_us_boundaries.states` 
    	  WHERE state = 'VA'
  	  ) b
  	  ON ST_WITHIN(a.point, b.state_geom)
    LIMIT 10000'''

  start = time.time()
  job = client.query(query, job_config=config, job_id_prefix=prefix)

  GB = 2**30
  if not dry_run:
    job.result(max_results=1)
    end = time.time()
    row_count = job.to_dataframe().total[0]
    print('query execution: ~{} seconds elapsed'.format((end - start)))
    print('query result is {} rows'.format(row_count))
    print('GiB processed {}'.format((job.total_bytes_processed / GB)))
  else:
    print('dry_run estimated GiB processed {}'.format((job.total_bytes_processed / GB)))

  print('GiB billed {}'.format((job.total_bytes_billed / GB)))
  

In [0]:
run_query("on-demand-")

# Build Reservations API

In [0]:
from google.cloud.bigquery.reservation_v1 import *
from google.api_core import retry

res_api = ReservationServiceClient()
parent_arg = "projects/{}/locations/{}".format(
    admin_project_id, region)

# Commitments, Reservations, and Assignments
Commitment is a purchase, Flex slots in this case<br/>
Reservation is a named allocation of slots<br/>
Assignments give Orgs, Folders, and Projects access to a Reservation

In [0]:

def purchase_commitment(slots=500):
  commit_config = CapacityCommitment(plan='FLEX', slot_count=slots)
  commit = res_api.create_capacity_commitment(parent=parent_arg,
                                              capacity_commitment=commit_config)
  print(commit)
  return commit.name

In [0]:
def create_reservation(reservation_name, slots=500):
  res_config = Reservation(slot_capacity=slots, ignore_idle_slots=False)
  res = res_api.create_reservation(parent=parent_arg, 
                                   reservation_id=reservation_name,
                                   reservation=res_config)  
  print(res)
  return res.name

In [0]:
def create_assignment(reservation_id, user_project):
  assign_config = Assignment(job_type='QUERY',
                             assignee='projects/{}'.format(user_project))
  assign = res_api.create_assignment(parent=reservation_id,
                                     assignment=assign_config)
  print(assign)
  return assign.name

In [0]:
def cleanup(assignment_id, reservation_id, commit_id):
  res_api.delete_assignment(name=assignment_id)
  res_api.delete_reservation(name=reservation_id)
  res_api.delete_capacity_commitment(name=commit_id, 
                                     retry=retry.Retry(deadline=90, 
                                                       predicate=Exception,
                                                       maximum=2))

# Use Reserved Slots
Project will be billed for the time Flex Slots are used

In [0]:
start = time.time()
slots = 500
reservation_name = 'sample-reservation'

commit_id = purchase_commitment(slots)
res_id = create_reservation(reservation_name, slots)
assign_id = create_assignment(res_id, project_id)

time.sleep(150) # assignment takes a few minutes to attach
run_query("reserved-", False)
cleanup(assign_id, res_id, commit_id)

end = time.time()
print("reservation ran for ~{} seconds".format((end - start)))