Skip to content
This repository has been archived by the owner on Aug 25, 2023. It is now read-only.

Commit

Permalink
Merge b805417 into e903baf
Browse files Browse the repository at this point in the history
  • Loading branch information
jarekdrabek committed Sep 3, 2018
2 parents e903baf + b805417 commit 6ae261c
Show file tree
Hide file tree
Showing 18 changed files with 282 additions and 100 deletions.
1 change: 1 addition & 0 deletions src/commons/big_query/big_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def __sync_query(self, query, timeout=30000, use_legacy_sql=False):

@google_http_error_retry(tries=1, delay=1, backoff=2)
def get_table(self, project_id, dataset_id, table_id, log_table=True):
logging.info("getting table %s", BigQueryTable(project_id, dataset_id, table_id))
try:
table = self.service.tables().get(
projectId=project_id, datasetId=dataset_id, tableId=table_id
Expand Down
Empty file.
60 changes: 60 additions & 0 deletions src/commons/big_query/streaming/data_streamer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import datetime
import json
import logging
import uuid

from apiclient.errors import Error
import httplib2
import googleapiclient.discovery
from oauth2client.client import GoogleCredentials

from src.commons.big_query.big_query_table import BigQueryTable
from src.commons.decorators.retry import retry
from src.commons.error_reporting import ErrorReporting


class DataStreamer(object):

@staticmethod
def _create_http():
return httplib2.Http(timeout=60)

def __init__(self, project_id, dataset_id, table_id):
self.big_query_table = BigQueryTable(project_id, dataset_id, table_id)
self.service = googleapiclient.discovery.build(
'bigquery',
'v2',
credentials=GoogleCredentials.get_application_default(),
http=self._create_http()
)

def stream_stats(self, rows, insert_id=None):
insert_id = insert_id or uuid.uuid4()
insert_all_data = {
'rows': [{
'json': data,
'insertId': str(insert_id)
} for data in rows]
}
logging.info("Streaming data to table %s (insertId:%s)",
self.big_query_table, insert_id)
insert_all_response = self._stream_metadata(insert_all_data)
if 'insertErrors' in insert_all_response:
logging.debug("Sent json: \n%s", json.dumps(insert_all_data))
error_message = "Error during streaming metadata to BigQuery: \n{}"\
.format(json.dumps(insert_all_response['insertErrors']))
logging.error(error_message)
ErrorReporting().report(error_message)
else:
logging.debug("Stats have been sent successfully to %s table",
self.big_query_table)

@retry(Error, tries=2, delay=2, backoff=2)
def _stream_metadata(self, insert_all_data):
partition = datetime.datetime.now().strftime("%Y%m%d")
return self.service.tabledata().insertAll(
projectId=self.big_query_table.get_project_id(),
datasetId=self.big_query_table.get_dataset_id(),
tableId='{}${}'.format(self.big_query_table.get_table_id(),
partition),
body=insert_all_data).execute(num_retries=3)
22 changes: 16 additions & 6 deletions src/slo/x_days_sli/sli_results_streamer.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
import logging

from src.commons.big_query.streaming.data_streamer import DataStreamer
from src.commons.config.configuration import configuration

class SLIResultsStreamer(object):

def __init__(self, big_query):
self.big_query = big_query
class SLIResultsStreamer(object):
def __init__(self,
project_id=configuration.backup_project_id,
dataset_id="SLI_history",
table_id="SLI_backup_creation_latency"
):
self.data_streamer = DataStreamer(project_id=project_id, dataset_id=dataset_id, table_id=table_id)

def stream(self, sli_results):
logging.info("TODO here results should be streamed into BQ")
logging.info("SLI results: %s", sli_results)
pass
if len(sli_results) == 0:
logging.info("Nothing to stream")
return

logging.info("Streaming SLI results: %s", sli_results)
self.data_streamer.stream_stats(sli_results)
logging.info("SLI results streamed")
2 changes: 1 addition & 1 deletion src/slo/x_days_sli/x_days_sli_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def __init__(self, x_days):
self.x_days = x_days
big_query = BigQuery()
self.querier = SLIViewQuerier(big_query)
self.streamer = SLIResultsStreamer(big_query)
self.streamer = SLIResultsStreamer()
self.filter = SLITableExistsFilter(big_query)

def recalculate_sli(self):
Expand Down
14 changes: 7 additions & 7 deletions terraform/SLI_3_days_views.tf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
resource "google_bigquery_table" "census_data_3_days_ago_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "census_data_3_days_ago"

view {
Expand Down Expand Up @@ -37,13 +37,13 @@ resource "google_bigquery_table" "census_data_3_days_ago_view" {
use_legacy_sql = true
}

depends_on = ["google_bigquery_dataset.SLO_views_legacy_dataset"]
depends_on = ["google_bigquery_dataset.SLI_views_legacy_dataset"]
}


resource "google_bigquery_table" "SLI_3_days_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "SLI_3_days"

view {
Expand All @@ -59,12 +59,12 @@ resource "google_bigquery_table" "SLI_3_days_view" {
IFNULL(last_backups.backup_created, MSEC_TO_TIMESTAMP(0)) as backup_created,
IFNULL(last_backups.backup_last_modified, MSEC_TO_TIMESTAMP(0)) as backup_last_modified
FROM
[${var.slos_views_destination_project}.SLO_views_legacy.census_data_3_days_ago] AS census
[${var.SLI_views_destination_project}.${var.SLI_views_legacy_dataset}.census_data_3_days_ago] AS census
LEFT JOIN (
SELECT
backup_created, backup_last_modified, source_project_id, source_dataset_id, source_table_id, source_partition_id
FROM
[${var.datastore_export_project}.datastore_export_views_legacy.last_available_backup_for_every_table_entity]
[${var.datastore_export_project}.${var.datastore_export_views_dataset}.last_available_backup_for_every_table_entity]
) AS last_backups
ON
census.projectId=last_backups.source_project_id AND
Expand Down
14 changes: 7 additions & 7 deletions terraform/SLI_4_days_views.tf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
resource "google_bigquery_table" "census_data_4_days_ago_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "census_data_4_days_ago"

view {
Expand Down Expand Up @@ -37,13 +37,13 @@ resource "google_bigquery_table" "census_data_4_days_ago_view" {
use_legacy_sql = true
}

depends_on = ["google_bigquery_dataset.SLO_views_legacy_dataset"]
depends_on = ["google_bigquery_dataset.SLI_views_legacy_dataset"]
}


resource "google_bigquery_table" "SLI_4_days_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "SLI_4_days"

view {
Expand All @@ -59,12 +59,12 @@ resource "google_bigquery_table" "SLI_4_days_view" {
IFNULL(last_backups.backup_created, MSEC_TO_TIMESTAMP(0)) as backup_created,
IFNULL(last_backups.backup_last_modified, MSEC_TO_TIMESTAMP(0)) as backup_last_modified
FROM
[${var.slos_views_destination_project}.SLO_views_legacy.census_data_4_days_ago] AS census
[${var.SLI_views_destination_project}.${var.SLI_views_legacy_dataset}.census_data_4_days_ago] AS census
LEFT JOIN (
SELECT
backup_created, backup_last_modified, source_project_id, source_dataset_id, source_table_id, source_partition_id
FROM
[${var.datastore_export_project}.datastore_export_views_legacy.last_available_backup_for_every_table_entity]
[${var.datastore_export_project}.${var.datastore_export_views_dataset}.last_available_backup_for_every_table_entity]
) AS last_backups
ON
census.projectId=last_backups.source_project_id AND
Expand Down
14 changes: 7 additions & 7 deletions terraform/SLI_5_days_views.tf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
resource "google_bigquery_table" "census_data_5_days_ago_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "census_data_5_days_ago"

view {
Expand Down Expand Up @@ -37,13 +37,13 @@ resource "google_bigquery_table" "census_data_5_days_ago_view" {
use_legacy_sql = true
}

depends_on = ["google_bigquery_dataset.SLO_views_legacy_dataset"]
depends_on = ["google_bigquery_dataset.SLI_views_legacy_dataset"]
}


resource "google_bigquery_table" "SLI_5_days_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "SLI_5_days"

view {
Expand All @@ -59,12 +59,12 @@ resource "google_bigquery_table" "SLI_5_days_view" {
IFNULL(last_backups.backup_created, MSEC_TO_TIMESTAMP(0)) as backup_created,
IFNULL(last_backups.backup_last_modified, MSEC_TO_TIMESTAMP(0)) as backup_last_modified
FROM
[${var.slos_views_destination_project}.SLO_views_legacy.census_data_5_days_ago] AS census
[${var.SLI_views_destination_project}.${var.SLI_views_legacy_dataset}.census_data_5_days_ago] AS census
LEFT JOIN (
SELECT
backup_created, backup_last_modified, source_project_id, source_dataset_id, source_table_id, source_partition_id
FROM
[${var.datastore_export_project}.datastore_export_views_legacy.last_available_backup_for_every_table_entity]
[${var.datastore_export_project}.${var.datastore_export_views_dataset}.last_available_backup_for_every_table_entity]
) AS last_backups
ON
census.projectId=last_backups.source_project_id AND
Expand Down
14 changes: 7 additions & 7 deletions terraform/SLI_7_days_views.tf
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
resource "google_bigquery_table" "census_data_7_days_ago_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "census_data_7_days_ago"

view {
Expand Down Expand Up @@ -37,13 +37,13 @@ resource "google_bigquery_table" "census_data_7_days_ago_view" {
use_legacy_sql = true
}

depends_on = ["google_bigquery_dataset.SLO_views_legacy_dataset"]
depends_on = ["google_bigquery_dataset.SLI_views_legacy_dataset"]
}


resource "google_bigquery_table" "SLI_7_days_view" {
project = "${var.slos_views_destination_project}"
dataset_id = "SLO_views_legacy"
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_views_legacy_dataset}"
table_id = "SLI_7_days"

view {
Expand All @@ -59,12 +59,12 @@ resource "google_bigquery_table" "SLI_7_days_view" {
IFNULL(last_backups.backup_created, MSEC_TO_TIMESTAMP(0)) as backup_created,
IFNULL(last_backups.backup_last_modified, MSEC_TO_TIMESTAMP(0)) as backup_last_modified
FROM
[${var.slos_views_destination_project}:SLO_views_legacy.census_data_7_days_ago] AS census
[${var.SLI_views_destination_project}:${var.SLI_views_legacy_dataset}.census_data_7_days_ago] AS census
LEFT JOIN (
SELECT
backup_created, backup_last_modified, source_project_id, source_dataset_id, source_table_id, source_partition_id
FROM
[${var.datastore_export_project}:datastore_export_views_legacy.last_available_backup_for_every_table_entity]
[${var.datastore_export_project}:${var.datastore_export_views_dataset}.last_available_backup_for_every_table_entity]
) AS last_backups
ON
census.projectId=last_backups.source_project_id AND
Expand Down
13 changes: 13 additions & 0 deletions terraform/SLI_filtered_table.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
resource "google_bigquery_table" "SLI_filtered_table" {
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_history_dataset}"
table_id = "SLI_backup_creation_latency"

time_partitioning {
type = "DAY"
}

schema= "${file("SLI_filtered_table_schema.json")}"

depends_on = ["google_bigquery_dataset.SLI_history_dataset"]
}
62 changes: 62 additions & 0 deletions terraform/SLI_filtered_table_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
[
{
"name": "snapshotTime",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "Where the data was filtered and put into the table"
},
{
"name": "projectId",
"type": "STRING",
"mode": "REQUIRED",
"description": "ProjectId of table that could not be backed up"
},
{
"name": "datasetId",
"type": "STRING",
"mode": "REQUIRED",
"description": "DatasetId of table that could not be backed up"
},
{
"name": "tableId",
"type": "STRING",
"mode": "REQUIRED",
"description": "TableId of table that could not be backed up"
},
{
"name": "partitionId",
"type": "STRING",
"mode": "NULLABLE",
"description": "PartitionId of table that could not be backed up"
},
{
"name": "creationTime",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "CreationTime of table that could not be backed up"
},
{
"name": "lastModifiedTime",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "LastModifiedTime of table that could not be backed up"
},
{
"name": "backupCreated",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "date of backup table creation (copyJob end)"
},
{
"name": "backupLastModified",
"type": "TIMESTAMP",
"mode": "REQUIRED",
"description": "date of last modification that will be included in the Backup (copyJob start time - as it is atomic operation and every change before that point is included in copy)"
},
{
"name": "xDays",
"type": "STRING",
"mode": "REQUIRED",
"description": "the number of days, the table was not backed up"
}
]
5 changes: 5 additions & 0 deletions terraform/SLI_history_dataset.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
resource "google_bigquery_dataset" "SLI_history_dataset" {
project = "${var.SLI_views_destination_project}"
dataset_id = "${var.SLI_history_dataset}"
location = "${var.SLI_views_location}"
}
41 changes: 41 additions & 0 deletions terraform/SLI_views_for_datastudio_dataset_with_views.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
resource "google_bigquery_dataset" "SLI_views_for_datastudio_legacy_dataset" {
dataset_id = "SLI_views_for_datastudio"
project = "${var.SLI_views_destination_project}"
location = "${var.SLI_views_location}"
}

resource "google_bigquery_table" "SLI_X_days_view" {
project = "${var.SLI_views_destination_project}"
dataset_id = "SLI_views_for_datastudio"
table_id = "SLI_X_days"

view {
query = <<EOF
#legacySQL
SELECT * FROM
(select *, "3" as days from [${var.SLI_views_destination_project}.${var.SLI_views_legacy_dataset}.SLI_3_days]),
(select *, "4" as days from [${var.SLI_views_destination_project}.${var.SLI_views_legacy_dataset}.SLI_4_days]),
(select *, "5" as days from [${var.SLI_views_destination_project}.${var.SLI_views_legacy_dataset}.SLI_5_days]),
(select *, "7" as days from [${var.SLI_views_destination_project}.${var.SLI_views_legacy_dataset}.SLI_7_days])
EOF
use_legacy_sql = true
}

depends_on = ["google_bigquery_table.SLI_3_days_view", "google_bigquery_table.SLI_4_days_view", "google_bigquery_table.SLI_5_days_view", "google_bigquery_table.SLI_7_days_view"]
}

resource "google_bigquery_table" "SLI_days_by_count_view" {
project = "${var.SLI_views_destination_project}"
dataset_id = "SLI_views_for_datastudio"
table_id = "SLI_days_by_count"

view {
query = <<EOF
#legacySQL
SELECT days, count(*) as count FROM [SLI_views_for_datastudio.SLI_X_days] GROUP BY days
EOF
use_legacy_sql = true
}

depends_on = ["google_bigquery_table.SLI_X_days_view", "google_bigquery_dataset.SLI_views_for_datastudio_legacy_dataset"]
}
Loading

0 comments on commit 6ae261c

Please sign in to comment.