Skip to content

Commit

Permalink
remove redundant swap logic
Browse files Browse the repository at this point in the history
  • Loading branch information
DeaconDesperado committed Nov 25, 2015
1 parent 2db757a commit 6359592
Showing 1 changed file with 5 additions and 15 deletions.
20 changes: 5 additions & 15 deletions luigi/contrib/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import logging
import luigi.target
import time
import random

logger = logging.getLogger('luigi-interface')

Expand Down Expand Up @@ -284,10 +283,8 @@ def copy(self,


class BigqueryTarget(luigi.target.Target):
def __init__(self, project_id, dataset_id, table_id, client=None, tmp_dataset_id=None):
def __init__(self, project_id, dataset_id, table_id, client=None):
self.table = BQTable(project_id=project_id, dataset_id=dataset_id, table_id=table_id)
tmp_table_id = "_" + table_id + "_%09d" % random.randrange(0, 1e10)
self.tmp_table = BQTable(project_id=project_id, dataset_id=tmp_dataset_id or "_incoming", table_id=tmp_table_id)
self.client = client or BigqueryClient()

@classmethod
Expand Down Expand Up @@ -384,9 +381,9 @@ def run(self):
'configuration': {
'load': {
'destinationTable': {
'projectId': output.tmp_table.project_id,
'datasetId': output.tmp_table.dataset_id,
'tableId': output.tmp_table.table_id,
'projectId': output.table.project_id,
'datasetId': output.table.dataset_id,
'tableId': output.table.table_id,
},
'sourceFormat': self.source_format,
'writeDisposition': self.write_disposition,
Expand All @@ -398,14 +395,7 @@ def run(self):
if self.schema:
job['configuration']['load']['schema'] = {'fields': self.schema}

logger.info('Loading data to temporary table %s.%s', output.tmp_table.dataset_id, output.tmp_table.table_id)
bq_client.run_job(output.tmp_table.project_id, job, dataset=output.tmp_table.dataset)
logger.info('Moving temporary table %s.%s to destination table %s.%s',
output.tmp_table.dataset_id, output.tmp_table.table_id,
output.table.dataset_id, output.table.table_id)
bq_client.copy(output.tmp_table, output.table)
logger.info('Removing temporary table %s.%s', output.tmp_table.dataset_id, output.tmp_table.table_id)
bq_client.delete_table(output.tmp_table)
bq_client.run_job(output.table.project_id, job, dataset=output.table.dataset)


class BigqueryRunQueryTask(MixinBigqueryBulkComplete, luigi.Task):
Expand Down

0 comments on commit 6359592

Please sign in to comment.