Skip to content
This repository has been archived by the owner on Aug 31, 2023. It is now read-only.

Commit

Permalink
Fix for #9, BigQuery views are now automatically created
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-kolda committed Nov 6, 2017
1 parent 973907d commit 39f0de2
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 35 deletions.
11 changes: 11 additions & 0 deletions bq_schemas/bigquery_views_legacy/table_metadata_v0_1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- This view aggregates all data from last 2 days and deduplicates it based on table reference.
-- Deleted table can be returned by this query up to 2 days.
SELECT * FROM (
SELECT projectId, datasetId, tableId, creationTime, lastModifiedTime, location, numBytes, numLongTermBytes, numRows, type, timePartitioning.type AS partitioningType,
row_number() OVER (PARTITION BY projectId, datasetId, tableId ORDER BY snapshotTime DESC) AS rownum,
COUNT(partition.partitionId) WITHIN RECORD AS partitionCount
FROM [bigquery.table_metadata_v0_1]
WHERE
_PARTITIONTIME BETWEEN TIMESTAMP(UTC_USEC_TO_DAY(NOW() - 48 * 60 * 60 * 1000000))
AND TIMESTAMP(UTC_USEC_TO_DAY(CURRENT_TIMESTAMP())))
WHERE rownum = 1
87 changes: 62 additions & 25 deletions gcp_census/model/model_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from oauth2client.client import GoogleCredentials
from google.appengine.api import app_identity

from gcp_census.model.table import Table
from gcp_census.model.view import View


class ModelCreator(object):
def __init__(self, model_directory):
Expand All @@ -24,18 +27,30 @@ def __init__(self, model_directory):
def _create_http():
return httplib2.Http(timeout=10)

def list_models(self):
def __list_tables(self):
for table in self.__list_files('.json'):
with open(table[2]) as json_file:
json_dict = json.load(json_file)
yield Table(table[0], table[1], json_dict)

def __list_views(self):
for view in self.__list_files('.sql'):
with open(view[2]) as view_file:
content = view_file.readlines()
yield View(view[0], view[1], content)

def __list_files(self, extension):
for group_dir in os.listdir(self.model_directory):
subdirectory = os.path.join(self.model_directory, group_dir)
if os.path.isdir(subdirectory):
for model_file in os.listdir(subdirectory):
model_name = os.path.splitext(model_file)[0]
with open(os.path.join(self.model_directory, group_dir,
model_file)) as json_file:
json_dict = json.load(json_file)
yield Model(group_dir, model_name, json_dict)
if model_file.endswith(extension):
model_name = os.path.splitext(model_file)[0]
filename = os.path.join(self.model_directory, group_dir,
model_file)
yield group_dir, model_name, filename

def list_groups(self):
def __list_groups(self):
for group_dir in os.listdir(self.model_directory):
subdirectory = os.path.join(self.model_directory, group_dir)
if os.path.isdir(subdirectory):
Expand All @@ -44,7 +59,7 @@ def list_groups(self):
def create_missing_datasets(self):
project_id = self.__get_project_id()
location = os.getenv('BIGQUERY_LOCATION')
for dataset_id in self.list_groups():
for dataset_id in self.__list_groups():
self.__create_dataset_if_missing(project_id, dataset_id, location)

def __create_dataset_if_missing(self, project_id, dataset_id, location):
Expand All @@ -68,30 +83,60 @@ def __create_dataset_if_missing(self, project_id, dataset_id, location):
else:
raise e

def create_missing_models(self):
def create_missing_tables(self):
project_id = self.__get_project_id()
for model in self.list_models():
for table in self.__list_tables():
logging.debug("Creating BQ table %s:%s.%s",
project_id, model.group, model.name)
project_id, table.group, table.name)
body = {
'tableReference': {
'projectId': project_id,
'datasetId': model.group,
'tableId': model.name
'datasetId': table.group,
'tableId': table.name
}
}
body.update(model.json_dict)
body.update(table.json_dict)
try:
self.service.tables().insert(
projectId=project_id, datasetId=model.group,
projectId=project_id, datasetId=table.group,
body=body
).execute()
logging.info("Table %s:%s.%s created successfully", project_id,
model.group, model.name)
table.group, table.name)
except HttpError as e:
if e.resp.status == 409:
logging.info("Table %s:%s.%s already exists", project_id,
model.group, model.name)
table.group, table.name)
else:
raise e

def create_missing_views(self):
project_id = self.__get_project_id()
for view in self.__list_views():
logging.debug("Creating BQ view %s:%s.%s",
project_id, view.group, view.name)
body = {
'tableReference': {
'projectId': project_id,
'datasetId': view.group,
'tableId': view.name
},
"view": {
"query": view.query
},
"description": view.description
}
try:
self.service.tables().insert(
projectId=project_id, datasetId=view.group,
body=body
).execute()
logging.info("View %s:%s.%s created successfully", project_id,
view.group, view.name)
except HttpError as e:
if e.resp.status == 409:
logging.info("View %s:%s.%s already exists", project_id,
view.group, view.name)
else:
raise e

Expand All @@ -101,11 +146,3 @@ def __get_project_id():
app_identity.get_application_id())


class Model(object):
def __init__(self, group, name, json_dict):
self.group = group
self.name = name
self.json_dict = json_dict

def __str__(self):
return "Model(group={}, name={})".format(self.group, self.name)
3 changes: 2 additions & 1 deletion gcp_census/model/model_creator_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ def __init__(self, request=None, response=None):
def get(self):
model_creator = ModelCreator("bq_schemas")
model_creator.create_missing_datasets()
model_creator.create_missing_models()
model_creator.create_missing_tables()
model_creator.create_missing_views()
8 changes: 8 additions & 0 deletions gcp_census/model/table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class Table(object):
def __init__(self, group, name, json_dict):
self.group = group
self.name = name
self.json_dict = json_dict

def __str__(self):
return "Table(group={}, name={})".format(self.group, self.name)
18 changes: 18 additions & 0 deletions gcp_census/model/view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
class View(object):
def __init__(self, group, name, file_content):
self.group = group
self.name = name
self.file_content = file_content

def __str__(self):
return "View(group={}, name={})".format(self.group, self.name)

@property
def query(self):
return "".join(self.file_content)

@property
def description(self):
desc_lines = [line[2:] for line in self.file_content
if line.startswith('--')]
return "".join(desc_lines)
9 changes: 6 additions & 3 deletions tests/model_creator_handler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ def tearDown(self):
patch.stopall()

@patch.object(ModelCreator, 'create_missing_datasets')
@patch.object(ModelCreator, 'create_missing_models')
def test_happy_path(self, create_missing_models, create_missing_datasets):
@patch.object(ModelCreator, 'create_missing_tables')
@patch.object(ModelCreator, 'create_missing_views')
def test_happy_path(self, create_missing_views, create_missing_tables,
create_missing_datasets):
# when
response = self.under_test.get(
'/createModels'
)
# then
self.assertEqual(response.status_int, 200)
create_missing_models.assert_called_once()
create_missing_views.assert_called_once()
create_missing_tables.assert_called_once()
create_missing_datasets.assert_called_once()
79 changes: 73 additions & 6 deletions tests/model_creator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def test_should_create_dataset(self, _create_http):
http_mock = Mock(wraps=HttpMockSequence([
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_test_schema.json')),
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_datasets_insert_200.json')),
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_datasets_insert_200.json'))
]))
Expand All @@ -37,14 +39,16 @@ def test_should_create_dataset(self, _create_http):

# then
calls = http_mock.mock_calls
self.assertEqual(2, len(calls))
self.assertEqual(3, len(calls))

@patch.object(ModelCreator, '_create_http')
def test_should_ignore_dataset_already_exists_error(self, _create_http):
# given
http_mock = Mock(wraps=HttpMockSequence([
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_test_schema.json')),
({'status': '409'}, test_utils.content(
'tests/json_samples/bigquery_v2_datasets_insert_409.json')),
({'status': '409'}, test_utils.content(
'tests/json_samples/bigquery_v2_datasets_insert_409.json'))
]))
Expand All @@ -56,7 +60,7 @@ def test_should_ignore_dataset_already_exists_error(self, _create_http):

# then
calls = http_mock.mock_calls
self.assertEqual(2, len(calls))
self.assertEqual(3, len(calls))

@patch.object(ModelCreator, '_create_http')
def test_should_propagate_dataset_500_error(self, _create_http):
Expand All @@ -79,7 +83,7 @@ def test_should_propagate_dataset_500_error(self, _create_http):
self.assertEqual(500, context.exception.resp.status)

@patch.object(ModelCreator, '_create_http')
def test_should_create_table(self, _create_http):
def test_should_create_tables(self, _create_http):
# given
http_mock = Mock(wraps=HttpMockSequence([
({'status': '200'}, test_utils.content(
Expand All @@ -95,7 +99,7 @@ def test_should_create_table(self, _create_http):
under_test = ModelCreator("bq_schemas")

# when
under_test.create_missing_models()
under_test.create_missing_tables()

# then
calls = http_mock.mock_calls
Expand All @@ -118,7 +122,7 @@ def test_should_ignore_table_already_exists_error(self, _create_http):
under_test = ModelCreator("bq_schemas")

# when
under_test.create_missing_models()
under_test.create_missing_tables()

# then
calls = http_mock.mock_calls
Expand All @@ -137,9 +141,72 @@ def test_should_propagate_table_500_error(self, _create_http):

# when
with self.assertRaises(HttpError) as context:
under_test.create_missing_models()
under_test.create_missing_tables()

# then
calls = http_mock.mock_calls
self.assertEqual(2, len(calls))
self.assertEqual(500, context.exception.resp.status)

@patch.object(ModelCreator, '_create_http')
def test_should_create_views(self, _create_http):
# given
http_mock = Mock(wraps=HttpMockSequence([
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_test_schema.json')),
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_tables_insert_200.json')),
]))
_create_http.return_value = http_mock
under_test = ModelCreator("bq_schemas")

# when
under_test.create_missing_views()

# then
calls = http_mock.mock_calls
self.assertEqual(2, len(calls))
json_request = test_utils.get_body_from_http_request(
calls[1])
self.assertTrue('description' in json_request)
self.assertTrue('query' in json_request['view'])

@patch.object(ModelCreator, '_create_http')
def test_should_ignore_view_already_exists_error(self, _create_http):
# given
http_mock = Mock(wraps=HttpMockSequence([
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_test_schema.json')),
({'status': '409'}, test_utils.content(
'tests/json_samples/bigquery_v2_tables_insert_409.json'))
]))
_create_http.return_value = http_mock
under_test = ModelCreator("bq_schemas")

# when
under_test.create_missing_views()

# then
calls = http_mock.mock_calls
self.assertEqual(2, len(calls))

@patch.object(ModelCreator, '_create_http')
def test_should_propagate_view_500_error(self, _create_http):
# given
http_mock = Mock(wraps=HttpMockSequence([
({'status': '200'}, test_utils.content(
'tests/json_samples/bigquery_v2_test_schema.json')),
({'status': '500'}, '')
]))
_create_http.return_value = http_mock
under_test = ModelCreator("bq_schemas")

# when
with self.assertRaises(HttpError) as context:
under_test.create_missing_views()

# then
calls = http_mock.mock_calls
self.assertEqual(2, len(calls))
self.assertEqual(500, context.exception.resp.status)

14 changes: 14 additions & 0 deletions tests/model_view_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import unittest

from gcp_census.model.view import View


class TestModelCreator(unittest.TestCase):
def test_should_parse_view(self):
# given
view = View('group', 'name',
['--desc1\n', '--desc2\n', 'SELECT * FROM\n', 'table'])

# when & then
self.assertEqual(view.description, 'desc1\ndesc2\n')
self.assertEqual(view.query, '--desc1\n--desc2\nSELECT * FROM\ntable')
6 changes: 6 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import os
import json


def content(filename):
if not os.path.exists(filename):
raise Exception("File not found: {0}".format(filename))
with open(filename, 'r') as f:
return f.read()


def get_body_from_http_request(call):
payload = call[1][2]
return json.loads(payload) if payload else None

0 comments on commit 39f0de2

Please sign in to comment.