# BiqQuery to Cloud Storage

This notebook shows how to execute a query against bigquery and store the results in BigQuery.

In [33]:
# Libraries ... 
from google.cloud import bigquery
from google.cloud import storage
import datetime
import time
import os
import pandas as pd

In [31]:
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = r'/your_GCP_creds/credentials.json'

In [8]:
todaysDate = datetime.date.today().strftime('%Y-%m-%d');

In [29]:
# Query Parameters
DATASET = 'bigquery-public-data:crypto_bitcoin'
TABLE = 'outputs'
SQL_PATH = 'sql/bitcoin-balance-query.sql'

# Storage Parameters
PROJECT_ID = 'data-science-on-gcp-323609'
LOCATION = 'UK'
BUCKET_URI = 'gs://blockchain-exploration'

extra = '/queries/' + todaysDate + '-' + 'bitcoin-balance-query.csv'

In [12]:
# Import SQL Statement
QUERY = open(SQL_PATH).read()

In [15]:
client = bigquery.Client()

In [18]:
%%bigquery df
WITH double_entry_book AS (
   -- debits
   SELECT
    array_to_string(inputs.addresses, ",") as address
   , inputs.type
   , -inputs.value as value
   FROM `bigquery-public-data.crypto_bitcoin.inputs` as inputs
   UNION ALL
   -- credits
   SELECT
    array_to_string(outputs.addresses, ",") as address
   , outputs.type
   , outputs.value as value
   FROM `bigquery-public-data.crypto_bitcoin.outputs` as outputs
)
SELECT
   address
,   type   
,   sum(value) as balance
FROM double_entry_book
GROUP BY 1,2
ORDER BY balance DESC
LIMIT 1000

Query complete after 0.01s: 100%|██████████| 7/7 [00:00<00:00, 4336.16query/s]                        
Downloading: 100%|██████████| 1000/1000 [00:01<00:00, 573.27rows/s]


In [19]:
df.head()

Unnamed: 0,address,type,balance
0,34xp4vRoCGJym3xR7yCVPFHoCNxv4Twseo,scripthash,28812621066262.0
1,bc1qgdjqv0av3q56jvd82tkdjpy7gdp9ut8tlqmgrpmv24...,witness_v0_scripthash,17801098544936.0
2,1P5ZEDWTKTFGxQjZphgWPQUpe554WKDfHQ,pubkeyhash,10807757445252.0
3,37XuVSEpWW4trkfmvWzegTHQt7BdktSKUs,scripthash,9450511406719.0
4,38UmuUqPCrFmQo4khkomQwZ4VbY2nZMJ67,scripthash,9356602192816.0


In [32]:
client = storage.Client()
bucket = client.get_bucket(BUCKET_URI)

NotFound: 404 GET https://storage.googleapis.com/storage/v1/b/gs://blockchain-exploration?projection=noAcl&prettyPrint=false: Not Found

In [None]:
bucket.blob('upload_test/test.csv').upload_from_string(df.to_csv(), 'text/csv')

In [23]:
# Write query results to a new table
job_config = bigquery.QueryJobConfig()

In [24]:
table_ref = client.dataset(DATASET).table(TABLE)

In [27]:
job_config.destination = table_ref

In [26]:
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

In [28]:
query_job = client.query(
    QUERY,
    job_config=job_config)

BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/data-science-on-gcp-323609/jobs?prettyPrint=false: Invalid dataset ID "bigquery-public-data:crypto_bitcoin". Dataset IDs must be alphanumeric (plus underscores and dashes) and must be at most 1024 characters long.

(job ID: 267391b8-2b74-4699-837a-121e52ddde7f)

                   -----Query Job SQL Follows-----                   

    |    .    |    .    |    .    |    .    |    .    |    .    |
   1:WITH double_entry_book AS (
   2:   -- debits
   3:   SELECT
   4:    array_to_string(inputs.addresses, ",") as address
   5:   , inputs.type
   6:   , -inputs.value as value
   7:   FROM `bigquery-public-data.crypto_bitcoin.inputs` as inputs
   8:   UNION ALL
   9:   -- credits
  10:   SELECT
  11:    array_to_string(outputs.addresses, ",") as address
  12:   , outputs.type
  13:   , outputs.value as value
  14:   FROM `bigquery-public-data.crypto_bitcoin.outputs` as outputs
  15:)
  16:SELECT
  17:   address
  18:,   type   
  19:,   sum(value) as balance
  20:FROM double_entry_book
  21:GROUP BY 1,2
  22:ORDER BY balance DESC
  23:LIMIT 1000
    |    .    |    .    |    .    |    .    |    .    |    .    |

In [22]:
job_config.destination = table_ref
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

query_job = client.query(
    QUERY,
    job_config=job_config)
rows = list(query_job)  # Waits for the query to finish

BadRequest: 400 POST https://bigquery.googleapis.com/bigquery/v2/projects/data-science-on-gcp-323609/jobs?prettyPrint=false: Invalid dataset ID "bigquery-public-data:crypto_bitcoin". Dataset IDs must be alphanumeric (plus underscores and dashes) and must be at most 1024 characters long.

(job ID: 9cba1500-c275-4176-af44-49887e686622)

                   -----Query Job SQL Follows-----                   

    |    .    |    .    |    .    |    .    |    .    |    .    |
   1:WITH double_entry_book AS (
   2:   -- debits
   3:   SELECT
   4:    array_to_string(inputs.addresses, ",") as address
   5:   , inputs.type
   6:   , -inputs.value as value
   7:   FROM `bigquery-public-data.crypto_bitcoin.inputs` as inputs
   8:   UNION ALL
   9:   -- credits
  10:   SELECT
  11:    array_to_string(outputs.addresses, ",") as address
  12:   , outputs.type
  13:   , outputs.value as value
  14:   FROM `bigquery-public-data.crypto_bitcoin.outputs` as outputs
  15:)
  16:SELECT
  17:   address
  18:,   type   
  19:,   sum(value) as balance
  20:FROM double_entry_book
  21:GROUP BY 1,2
  22:ORDER BY balance DESC
  23:LIMIT 1000
    |    .    |    .    |    .    |    .    |    .    |    .    |

In [None]:
# Libraries ... 
from google.cloud import bigquery
import sys


def query_to_storage(PROJECT_ID, DATASET, TABLE, LOCATION, SQL_PATH, BUCKET_URI):
    QUERY = open(SQL_PATH).read()

    # bq client
    client = bigquery.Client()

    # Write query results to a new table
    job_config = bigquery.QueryJobConfig()
    table_ref = client.dataset(DATASET).table(TABLE)
    job_config.destination = table_ref
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

    query_job = client.query(
        QUERY,
        location=LOCATION, # Location must match dataset
        job_config=job_config)
    rows = list(query_job)  # Waits for the query to finish


    # Export table to GCS
    destination_uri = BUCKET_URI
    dataset_ref = client.dataset(DATASET, project=PROJECT_ID)
    table_ref = dataset_ref.table(TABLE)

    extract_job = client.extract_table(
        table_ref,
        destination_uri,
        location=LOCATION)
    extract_job.result()  # Waits for job to complete


if __name__ == "__main__":
    args = sys.argv
    # args[0] = current file
    # args[1] = function name
    # args[2:] = function args : (*unpacked)
    globals()[args[1]](*args[2:])