# BigQuery Concurrency Tester

## 1. Imports

In [None]:
import time
import random
import numpy as np
from concurrent.futures import ThreadPoolExecutor, as_completed
from google.cloud import bigquery

## 2. Parameters

In [None]:
project_id = "santi-demos"
location = "US"
concurrency = 20
duration_seconds = 60

## 3. Queries

In [None]:
query1 = { "sql": """SELECT COUNT(*) FROM `bigquery-public-data.usa_names.usa_1910_2013`;""", "percentage": 60}
query2 = { "sql": """SELECT name, SUM(number) as total
FROM `bigquery-public-data.usa_names.usa_1910_2013`
WHERE gender = 'F'
GROUP BY name
ORDER BY total DESC
LIMIT 10;""", "percentage": 30}
query3 = { "sql": """SELECT
    t1.name,
    t1.total,
    t2.year,
    t2.number AS number_in_year
FROM (
    SELECT name, SUM(number) as total
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE gender = 'M'
    GROUP BY name
    ORDER BY total DESC
    LIMIT 5
) AS t1
JOIN `bigquery-public-data.usa_names.usa_1910_2013` AS t2
ON t1.name = t2.name
WHERE t2.year > 2000
ORDER BY t2.year, t2.number DESC;""", "percentage": 10}

## 4. Helper Functions

In [None]:
def create_weighted_query_list(queries):
    """Creates a weighted list of queries for random selection."""
    weighted_list = []
    for query in queries:
        count = int(query['percentage'])
        weighted_list.extend([query['sql']] * count)
    return weighted_list

def run_query(client, sql):
    """Executes a single BigQuery query and returns the result."""
    job_config = bigquery.QueryJobConfig(use_query_cache=False)
    start_time = time.time()
    try:
        query_job = client.query(sql, job_config=job_config)
        results = query_job.result()  # Wait for the job to complete
        end_time = time.time()
        return {
            'success': True,
            'duration': end_time - start_time,
            'rows': results.total_rows
        }
    except Exception as e:
        end_time = time.time()
        return {
            'success': False,
            'duration': end_time - start_time,
            'error': str(e)
        }

## 5. Main Logic

In [None]:
queries = [query1, query2, query3]
weighted_queries = create_weighted_query_list(queries)

client = bigquery.Client(project=project_id)

print(f"Starting BigQuery concurrency test...")
print(f"Project: {project_id}")
print(f"Concurrency: {concurrency}")
print(f"Duration: {duration_seconds} seconds")
print("-" * 30)

total_queries = 0
successful_queries = 0
failed_queries = 0
query_durations = []

with ThreadPoolExecutor(max_workers=concurrency) as executor:
    futures = []
    start_time = time.time()
    while time.time() - start_time < duration_seconds:
        sql = random.choice(weighted_queries)
        futures.append(executor.submit(run_query, client, sql))
        total_queries += 1
        time.sleep(1 / concurrency) # To avoid submitting all queries at once

    for future in as_completed(futures):
        result = future.result()
        if result['success']:
            successful_queries += 1
            query_durations.append(result['duration'])
            print(f"Query successful in {result['duration']:.2f}s, rows: {result['rows']}")
        else:
            failed_queries += 1
            print(f"Query failed in {result['duration']:.2f}s, error: {result['error']}")

print("-" * 30)
print("Test finished.")
print(f"Total queries submitted: {total_queries}")
print(f"Successful queries: {successful_queries}")
print(f"Failed queries: {failed_queries}")

if successful_queries > 0:
    avg_duration = np.mean(query_durations)
    p95_duration = np.percentile(query_durations, 95)
    p99_duration = np.percentile(query_durations, 99)
    throughput = successful_queries / duration_seconds

    print(f"Average query duration: {avg_duration:.2f}s")
    print(f"P95 query duration: {p95_duration:.2f}s")
    print(f"P99 query duration: {p99_duration:.2f}s")
    print(f"Average throughput: {throughput:.2f} queries/sec")

## Analyze Results

This cell uses BigQuery magic to query the Information Schema and analyze the results of the concurrency test. It calculates the total slot milliseconds, total bytes processed, and estimates the cost of the queries in the different BigQuery editions.

In [None]:
%load_ext bigquery_magics

In [None]:
%%bigquery

%%bigquery

with slots as (
SELECT
  res.period_start,
  res.reservation_id,
  jobs.project_id,
  jobs.user_email,
  SUM(jobs.period_slot_ms) / 1000 / 60 AS period_slot_minutes,
  SUM(jobs.total_bytes_processed) AS total_bytes_processed,
  ANY_VALUE(res.slots_assigned) AS slots_assigned,
  ANY_VALUE(res.slots_max_assigned) AS slots_max_assigned,
  ANY_VALUE(res.autoscale.current_slots) as slots_autoscaling
FROM
  `region-us`.INFORMATION_SCHEMA.JOBS_TIMELINE_BY_ORGANIZATION jobs
JOIN
  `region-us`.INFORMATION_SCHEMA.RESERVATIONS_TIMELINE res
  ON TIMESTAMP_TRUNC(jobs.period_start, MINUTE) = res.period_start
     AND jobs.reservation_id = res.reservation_id
WHERE
  (jobs.job_creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 15 MINUTE))
  AND (jobs.statement_type != "SCRIPT" OR jobs.statement_type IS NULL)  
GROUP BY 1,2,3, 4
  
ORDER BY
  period_start asc,
  reservation_id
)
select reservation_id, project_id, user_email, (sum (slots.slots_autoscaling) + sum (slots.slots_assigned)) / 60 * 0.06 as editions_cost,  
  (SUM(total_bytes_processed) / 1024 / 1024 / 1024 / 1024) * 6.25 AS on_demand_cost
from slots group by 1,2,3