Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ rows = [
inserted = client.push_rows('dataset', 'table', rows, 'id')
```

# Import data from Google cloud storage
```python
schema = [ {"name": "username", "type": "string", "mode": "nullable"} ]
job = client.import_data_from_uris( ['gs://mybucket/mydata.json'],
'dataset',
'table',
schema,
source_format=JOB_SOURCE_FORMAT_JSON)

job = client.wait_for_job(job, timeout=60)
print(job)
```

# Managing Datasets

The client provides an API for listing, creating, deleting, updating and patching datasets.
Expand Down
1 change: 0 additions & 1 deletion bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@

logger = logging.getLogger('bigquery')
logger.setLevel(logging.DEBUG)

209 changes: 204 additions & 5 deletions bigquery/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import calendar
from collections import defaultdict
from datetime import datetime
from time import sleep, time
from hashlib import sha256
import json

from apiclient.discovery import build
Expand All @@ -11,9 +13,21 @@
from bigquery.errors import UnfinishedQueryException
from bigquery.schema_builder import schema_from_record


BIGQUERY_SCOPE = 'https://www.googleapis.com/auth/bigquery'
BIGQUERY_SCOPE_READ_ONLY = 'https://www.googleapis.com/auth/bigquery.readonly'

JOB_CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'
JOB_CREATE_NEVER = 'CREATE_NEVER'
JOB_SOURCE_FORMAT_NEWLINE_DELIMITED_JSON = 'NEWLINE_DELIMITED_JSON'
JOB_SOURCE_FORMAT_DATASTORE_BACKUP = 'DATASTORE_BACKUP'
JOB_SOURCE_FORMAT_CSV = 'CSV'
JOB_WRITE_TRUNCATE = 'WRITE_TRUNCATE'
JOB_WRITE_APPEND = 'WRITE_APPEND'
JOB_WRITE_EMPTY = 'WRITE_EMPTY'
JOB_ENCODING_UTF_8 = 'UTF-8'
JOB_ENCODING_ISO_8859_1 = 'ISO-8859-1'


def get_client(project_id, credentials=None, service_account=None,
private_key=None, readonly=True):
Expand Down Expand Up @@ -67,11 +81,11 @@ def _get_bq_service(credentials=None, service_account=None, private_key=None,
def _credentials():
"""Import and return SignedJwtAssertionCredentials class"""
from oauth2client.client import SignedJwtAssertionCredentials

return SignedJwtAssertionCredentials


class BigQueryClient(object):

def __init__(self, bq_service, project_id):
self.bigquery = bq_service
self.project_id = project_id
Expand Down Expand Up @@ -309,6 +323,191 @@ def get_tables(self, dataset_id, app_id, start_time, end_time):

return self._filter_tables_by_time(app_tables, start_time, end_time)

def import_data_from_uris(
self,
source_uris,
dataset,
table,
schema=None,
job=None,
source_format=None,
create_disposition=None,
write_disposition=None,
encoding=None,
ignore_unknown_values=None,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize the None values are being used in a falsey way, but the defaults should probably be False to be more explicit to users that these are bool values. Same goes for the other bool kwargs.

max_bad_records=None,
allow_jagged_rows=None,
allow_quoted_newlines=None,
field_delimiter=None,
quote=None,
skip_leading_rows=None,
):
"""
Imports data into a BigQuery table from cloud storage.
Args:
source_uris: required string or list of strings representing
the uris on cloud storage of the form:
gs://bucket/filename
dataset: required string id of the dataset
table: required string id of the table
job: optional string identifying the job (a unique jobid
is automatically generated if not provided)
schema: optional list representing the bigquery schema
source_format: optional string
(one of the JOB_SOURCE_FORMAT_* constants)
create_disposition: optional string
(one of the JOB_CREATE_* constants)
write_disposition: optional string
(one of the JOB_WRITE_* constants)
encoding: optional string default
(one of the JOB_ENCODING_* constants)
ignore_unknown_values: optional boolean
max_bad_records: optional boolean
allow_jagged_rows: optional boolean for csv only
allow_quoted_newlines: optional boolean for csv only
field_delimiter: optional string for csv only
quote: optional string the quote character for csv only
skip_leading_rows: optional int for csv only

Optional arguments with value None are determined by
BigQuery as described:
https://developers.google.com/bigquery/docs/reference/v2/jobs

Returns:
dict, a BigQuery job resource or None on failure
"""
source_uris = source_uris if isinstance(source_uris, list) \
else [source_uris]

configuration = {
"destinationTable": {
"projectId": self.project_id,
"tableId": table,
"datasetId": dataset
},
"sourceUris": source_uris,
}

if max_bad_records:
configuration['maxBadRecords'] = max_bad_records

if ignore_unknown_values:
configuration['ignoreUnknownValues'] = ignore_unknown_values

if create_disposition:
configuration['createDisposition'] = create_disposition

if write_disposition:
configuration['writeDisposition'] = write_disposition

if encoding:
configuration['encoding'] = encoding

if schema:
configuration['schema'] = schema

if source_format:
configuration['sourceFormat'] = source_format

if not job:
hex = sha256(":".join(source_uris) + str(time())).hexdigest()
job = "{dataset}-{table}-{digest}".format(
dataset=dataset,
table=table,
digest=hex
)

if source_format == JOB_SOURCE_FORMAT_CSV:
if field_delimiter:
configuration['fieldDelimiter'] = field_delimiter

if allow_jagged_rows:
configuration['allowJaggedRows'] = allow_jagged_rows

if allow_quoted_newlines:
configuration['allowQuotedNewlines'] = allow_quoted_newlines

if quote:
configuration['quote'] = quote

if skip_leading_rows:
configuration['skipLeadingRows'] = skip_leading_rows

elif field_delimiter or allow_jagged_rows \
or allow_quoted_newlines or quote or skip_leading_rows:
all_values = dict(field_delimiter=field_delimiter,
allow_jagged_rows=allow_jagged_rows,
allow_quoted_newlines=allow_quoted_newlines,
skip_leading_rows=skip_leading_rows,
quote=quote)
non_null_values = dict((k, v) for k, v
in all_values.items()
if v)
raise Exception("Parameters field_delimiter, allow_jagged_rows, "
"allow_quoted_newlines, quote and "
"skip_leading_rows are only allowed when "
"source_format=JOB_SOURCE_FORMAT_CSV: %s"
% non_null_values)

body = {
"configuration": {
'load': configuration
},
"jobReference": {
"projectId": self.project_id,
"jobId": job
}
}

try:
logger.info("Creating load job %s" % body)
job_resource = self.bigquery.jobs() \
.insert(projectId=self.project_id, body=body) \
.execute()
return job_resource
except Exception, e:
logger.error("Failed while starting uri import job: {0}"
.format(e))
return None

def wait_for_job(self, job, interval=5, timeout=None):
"""
Waits until the job indicated by job_resource is done or has failed
Args:
job: dict, representing a BigQuery job resource or jobId
interval: optional float polling interval in seconds, default = 5
timeout: optional float timeout in seconds, default = None
Returns:
dict, final state of the job_resource, as described here:
https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/python/latest/bigquery_v2.jobs.html#get
Raises:
standard exceptions on http / auth failures (you must retry)
"""
if isinstance(job, dict): # job is a job resource
complete = job.get('jobComplete')
job_id = job['jobReference']['jobId']
else: # job is the jobId
complete = False
job_id = job
job_resource = None

start_time = time()
elapsed_time = 0
while not (complete
or (timeout is not None and elapsed_time > timeout)):
sleep(interval)
request = self.bigquery.jobs().get(projectId=self.project_id,
jobId=job_id)
job_resource = request.execute()
error = job_resource.get('error')
if error:
raise Exception("{message} ({code}). Errors: {errors}",
**error)
complete = job_resource.get('status').get('state') == u'DONE'
elapsed_time = time() - start_time

return job_resource

def push_rows(self, dataset, table, rows, insert_id_key=None):
"""Upload rows to BigQuery table.

Expand Down Expand Up @@ -471,8 +670,8 @@ def _in_range(self, start_time, end_time, time):
ONE_MONTH = 2764800 # 32 days

return start_time <= time <= end_time or \
time <= start_time <= time + ONE_MONTH or \
time <= end_time <= time + ONE_MONTH
time <= start_time <= time + ONE_MONTH or \
time <= end_time <= time + ONE_MONTH

def _get_query_results(self, job_collection, project_id, job_id,
offset=None, limit=None):
Expand Down Expand Up @@ -594,8 +793,8 @@ def create_dataset(self, dataset_id, friendly_name=None, description=None,
datasets.insert(projectId=self.project_id,
body=dataset_data).execute()
return True
except:
logger.error('Cannot create dataset %s' % dataset_id)
except Exception, e:
logger.error('Cannot create dataset %s, %s' % (dataset_id, e))
return False

def get_datasets(self):
Expand Down
1 change: 0 additions & 1 deletion bigquery/schema_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,3 @@ def bigquery_type(o, timestamp_parser=default_timestamp_parser):
else:
raise Exception(
"Invalid object for BigQuery schema: {0} ({1}".format(o, t))

Loading