Skip to content

Commit

Permalink
CLN: Use to_dataframe to download query results. (#247)
Browse files Browse the repository at this point in the history
* CLN: Use `to_dataframe` to download query results.

This allows us to remove logic for parsing the schema and align with
google-cloud-bigquery.

* Bumps the minimum google-cloud-bigquery version, because we need to use the new dtypes argument.

* Cast to correct dtype in empty dataframes.

* Improve the conda CI build to truly use dependencies from conda, not pip. Adds pydata-google-auth to conda deps.
  • Loading branch information
tswast committed Feb 23, 2019
1 parent f729a44 commit ebcbfbe
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 102 deletions.
16 changes: 16 additions & 0 deletions benchmark/README.md
@@ -0,0 +1,16 @@
# pandas-gbq benchmarks

This directory contains a few scripts which are useful for performance
testing the pandas-gbq library. Use cProfile to time the script and see
details about where time is spent. To avoid timing how long BigQuery takes to
execute a query, run the benchmark twice to ensure the results are cached.

## `read_gbq`

Read a small table (a few KB).

python -m cProfile --sort=cumtime read_gbq_small_results.py

Read a large-ish table (100+ MB).

python -m cProfile --sort=cumtime read_gbq_large_results.py
8 changes: 8 additions & 0 deletions benchmark/read_gbq_large_results.py
@@ -0,0 +1,8 @@
import pandas_gbq

# Select 163 MB worth of data, to time how long it takes to download large
# result sets.
df = pandas_gbq.read_gbq(
"SELECT * FROM `bigquery-public-data.usa_names.usa_1910_2013`",
dialect="standard",
)
7 changes: 7 additions & 0 deletions benchmark/read_gbq_small_results.py
@@ -0,0 +1,7 @@
import pandas_gbq

# Select a few KB worth of data, to time downloading small result sets.
df = pandas_gbq.read_gbq(
"SELECT * FROM `bigquery-public-data.utility_us.country_code_iso`",
dialect="standard",
)
2 changes: 1 addition & 1 deletion ci/requirements-2.7.pip
Expand Up @@ -2,5 +2,5 @@ mock
pandas==0.17.1
google-auth==1.4.1
google-auth-oauthlib==0.0.1
google-cloud-bigquery==0.32.0
google-cloud-bigquery==1.9.0
pydata-google-auth==0.1.2
2 changes: 1 addition & 1 deletion ci/requirements-3.5.pip
@@ -1,5 +1,5 @@
pandas==0.19.0
google-auth==1.4.1
google-auth-oauthlib==0.0.1
google-cloud-bigquery==0.32.0
google-cloud-bigquery==1.9.0
pydata-google-auth==0.1.2
5 changes: 2 additions & 3 deletions ci/requirements-3.6-0.20.1.conda
@@ -1,6 +1,5 @@
google-auth
google-auth-oauthlib
google-cloud-bigquery==0.32.0
pydata-google-auth
google-cloud-bigquery==1.9.0
pytest
pytest-cov
codecov
Expand Down
2 changes: 1 addition & 1 deletion ci/run_conda.sh
Expand Up @@ -21,7 +21,7 @@ fi

REQ="ci/requirements-${PYTHON}-${PANDAS}"
conda install -q --file "$REQ.conda";
python setup.py develop
python setup.py develop --no-deps

# Run the tests
$DIR/run_tests.sh
18 changes: 18 additions & 0 deletions docs/source/changelog.rst
@@ -1,6 +1,24 @@
Changelog
=========

.. _changelog-0.10.0:

0.10.0 / TBD
------------

Dependency updates
~~~~~~~~~~~~~~~~~~

- Update the minimum version of ``google-cloud-bigquery`` to 1.9.0.
(:issue:`247`)

Internal changes
~~~~~~~~~~~~~~~~

- Use ``to_dataframe()`` from ``google-cloud-bigquery`` in the ``read_gbq()``
function. (:issue:`247`)


.. _changelog-0.9.0:

0.9.0 / 2019-01-11
Expand Down
82 changes: 49 additions & 33 deletions pandas_gbq/gbq.py
@@ -1,11 +1,9 @@
import logging
import time
import warnings
from collections import OrderedDict
from datetime import datetime

import numpy as np
from pandas import DataFrame

from pandas_gbq.exceptions import AccessDenied

Expand Down Expand Up @@ -37,7 +35,7 @@ def _check_google_client_version():
raise ImportError("Could not import pkg_resources (setuptools).")

# https://github.com/GoogleCloudPlatform/google-cloud-python/blob/master/bigquery/CHANGELOG.md
bigquery_minimum_version = pkg_resources.parse_version("0.32.0")
bigquery_minimum_version = pkg_resources.parse_version("1.9.0")
BIGQUERY_INSTALLED_VERSION = pkg_resources.get_distribution(
"google-cloud-bigquery"
).parsed_version
Expand Down Expand Up @@ -482,15 +480,16 @@ def run_query(self, query, **kwargs):
rows_iter = query_reply.result()
except self.http_error as ex:
self.process_http_error(ex)
result_rows = list(rows_iter)
total_rows = rows_iter.total_rows
schema = {
"fields": [field.to_api_repr() for field in rows_iter.schema]
}

logger.debug("Got {} rows.\n".format(total_rows))
schema_fields = [field.to_api_repr() for field in rows_iter.schema]
nullsafe_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields)
df = rows_iter.to_dataframe(dtypes=nullsafe_dtypes)

if df.empty:
df = _cast_empty_df_dtypes(schema_fields, df)

return schema, result_rows
logger.debug("Got {} rows.\n".format(rows_iter.total_rows))
return df

def load_data(
self,
Expand Down Expand Up @@ -638,45 +637,62 @@ def delete_and_recreate_table(self, dataset_id, table_id, table_schema):
table.create(table_id, table_schema)


def _parse_schema(schema_fields):
# see:
def _bqschema_to_nullsafe_dtypes(schema_fields):
# Only specify dtype when the dtype allows nulls. Otherwise, use pandas's
# default dtype choice.
#
# See:
# http://pandas.pydata.org/pandas-docs/dev/missing_data.html
# #missing-data-casting-rules-and-indexing
dtype_map = {
"FLOAT": np.dtype(float),
# Even though TIMESTAMPs are timezone-aware in BigQuery, pandas doesn't
# support datetime64[ns, UTC] as dtype in DataFrame constructors. See:
# https://github.com/pandas-dev/pandas/issues/12513
"TIMESTAMP": "datetime64[ns]",
"TIME": "datetime64[ns]",
"DATE": "datetime64[ns]",
"DATETIME": "datetime64[ns]",
"BOOLEAN": bool,
"INTEGER": np.int64,
}

dtypes = {}
for field in schema_fields:
name = str(field["name"])
if field["mode"].upper() == "REPEATED":
yield name, object
else:
dtype = dtype_map.get(field["type"].upper())
yield name, dtype
continue

dtype = dtype_map.get(field["type"].upper())
if dtype:
dtypes[name] = dtype

return dtypes

def _parse_data(schema, rows):

column_dtypes = OrderedDict(_parse_schema(schema["fields"]))
df = DataFrame(data=(iter(r) for r in rows), columns=column_dtypes.keys())
def _cast_empty_df_dtypes(schema_fields, df):
"""Cast any columns in an empty dataframe to correct type.
for column in df:
dtype = column_dtypes[column]
null_safe = (
df[column].notnull().all()
or dtype == float
or dtype == "datetime64[ns]"
In an empty dataframe, pandas cannot choose a dtype unless one is
explicitly provided. The _bqschema_to_nullsafe_dtypes() function only
provides dtypes when the dtype safely handles null values. This means
that empty int64 and boolean columns are incorrectly classified as
``object``.
"""
if not df.empty:
raise ValueError(
"DataFrame must be empty in order to cast non-nullsafe dtypes"
)
if dtype and null_safe:
df[column] = df[column].astype(
column_dtypes[column], errors="ignore"
)

dtype_map = {"BOOLEAN": bool, "INTEGER": np.int64}

for field in schema_fields:
column = str(field["name"])
if field["mode"].upper() == "REPEATED":
continue

dtype = dtype_map.get(field["type"].upper())
if dtype:
df[column] = df[column].astype(dtype)

return df


Expand Down Expand Up @@ -825,8 +841,8 @@ def read_gbq(
credentials=credentials,
private_key=private_key,
)
schema, rows = connector.run_query(query, configuration=configuration)
final_df = _parse_data(schema, rows)

final_df = connector.run_query(query, configuration=configuration)

# Reindex the DataFrame on the provided column
if index_col is not None:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -22,7 +22,7 @@ def readme():
"pydata-google-auth",
"google-auth",
"google-auth-oauthlib",
"google-cloud-bigquery>=0.32.0",
"google-cloud-bigquery>=1.9.0",
]

extras = {"tqdm": "tqdm>=4.23.0"}
Expand Down
86 changes: 58 additions & 28 deletions tests/system/test_gbq.py
Expand Up @@ -6,11 +6,12 @@

import google.oauth2.service_account
import numpy as np
import pandas
import pandas.util.testing as tm
import pytest
import pytz
from pandas import DataFrame, NaT, compat
from pandas.compat import range, u
import pytest
import pytz

from pandas_gbq import gbq

Expand Down Expand Up @@ -138,14 +139,6 @@ def test_should_be_able_to_get_a_bigquery_client(self, gbq_connector):
bigquery_client = gbq_connector.get_client()
assert bigquery_client is not None

def test_should_be_able_to_get_schema_from_query(self, gbq_connector):
schema, pages = gbq_connector.run_query("SELECT 1")
assert schema is not None

def test_should_be_able_to_get_results_from_query(self, gbq_connector):
schema, pages = gbq_connector.run_query("SELECT 1")
assert pages is not None


def test_should_read(project, credentials):
query = 'SELECT "PI" AS valid_string'
Expand Down Expand Up @@ -319,7 +312,8 @@ def test_should_properly_handle_timestamp_unix_epoch(self, project_id):
tm.assert_frame_equal(
df,
DataFrame(
{"unix_epoch": [np.datetime64("1970-01-01T00:00:00.000000Z")]}
{"unix_epoch": ["1970-01-01T00:00:00.000000Z"]},
dtype="datetime64[ns]",
),
)

Expand All @@ -334,19 +328,46 @@ def test_should_properly_handle_arbitrary_timestamp(self, project_id):
tm.assert_frame_equal(
df,
DataFrame(
{
"valid_timestamp": [
np.datetime64("2004-09-15T05:00:00.000000Z")
]
}
{"valid_timestamp": ["2004-09-15T05:00:00.000000Z"]},
dtype="datetime64[ns]",
),
)

def test_should_properly_handle_datetime_unix_epoch(self, project_id):
query = 'SELECT DATETIME("1970-01-01 00:00:00") AS unix_epoch'
df = gbq.read_gbq(
query,
project_id=project_id,
credentials=self.credentials,
dialect="legacy",
)
tm.assert_frame_equal(
df,
DataFrame(
{"unix_epoch": ["1970-01-01T00:00:00"]}, dtype="datetime64[ns]"
),
)

def test_should_properly_handle_arbitrary_datetime(self, project_id):
query = 'SELECT DATETIME("2004-09-15 05:00:00") AS valid_timestamp'
df = gbq.read_gbq(
query,
project_id=project_id,
credentials=self.credentials,
dialect="legacy",
)
tm.assert_frame_equal(
df,
DataFrame(
{"valid_timestamp": [np.datetime64("2004-09-15T05:00:00")]}
),
)

@pytest.mark.parametrize(
"expression, type_",
[
("current_date()", "<M8[ns]"),
("current_timestamp()", "<M8[ns]"),
("current_timestamp()", "datetime64[ns]"),
("current_datetime()", "<M8[ns]"),
("TRUE", bool),
("FALSE", bool),
Expand Down Expand Up @@ -378,7 +399,19 @@ def test_should_properly_handle_null_timestamp(self, project_id):
credentials=self.credentials,
dialect="legacy",
)
tm.assert_frame_equal(df, DataFrame({"null_timestamp": [NaT]}))
tm.assert_frame_equal(
df, DataFrame({"null_timestamp": [NaT]}, dtype="datetime64[ns]")
)

def test_should_properly_handle_null_datetime(self, project_id):
query = "SELECT CAST(NULL AS DATETIME) AS null_datetime"
df = gbq.read_gbq(
query,
project_id=project_id,
credentials=self.credentials,
dialect="standard",
)
tm.assert_frame_equal(df, DataFrame({"null_datetime": [NaT]}))

def test_should_properly_handle_null_boolean(self, project_id):
query = "SELECT BOOLEAN(NULL) AS null_boolean"
Expand Down Expand Up @@ -549,17 +582,14 @@ def test_zero_rows(self, project_id):
credentials=self.credentials,
dialect="legacy",
)
page_array = np.zeros(
(0,),
dtype=[
("title", object),
("id", np.dtype(int)),
("is_bot", np.dtype(bool)),
("ts", "M8[ns]"),
],
)
empty_columns = {
"title": pandas.Series([], dtype=object),
"id": pandas.Series([], dtype=np.dtype(int)),
"is_bot": pandas.Series([], dtype=np.dtype(bool)),
"ts": pandas.Series([], dtype="datetime64[ns]"),
}
expected_result = DataFrame(
page_array, columns=["title", "id", "is_bot", "ts"]
empty_columns, columns=["title", "id", "is_bot", "ts"]
)
tm.assert_frame_equal(df, expected_result, check_index_type=False)

Expand Down

0 comments on commit ebcbfbe

Please sign in to comment.