Skip to content

Commit

Permalink
[CP][EWT-302]Patch Pool.DEFAULT_POOL_NAME in BaseOperator (apache#8587)…
Browse files Browse the repository at this point in the history
… (#49)

Open source commit id: b37ce29

Co-authored-by: Vishesh Jain <visheshj@twitter.com>
  • Loading branch information
vshshjn7 and Vishesh Jain committed May 13, 2020
1 parent d5d0a07 commit 9b58c88
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
22 changes: 17 additions & 5 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ under the License.
This file documents any backwards-incompatible changes in Airflow and
assists users migrating to a new version.

## CP

### Ability to patch Pool.DEFAULT_POOL_NAME in BaseOperator
It was not possible to patch pool in BaseOperator as the signature sets the default value of pool
as Pool.DEFAULT_POOL_NAME.
While using subdagoperator in unittest(without initializing the sqlite db), it was throwing the
following error:
```
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: slot_pool.
```
Fix for this, https://github.com/apache/airflow/pull/8587

## Airflow 1.10.4

### Python 2 support is going away
Expand All @@ -36,12 +48,12 @@ If you have a specific task that still requires Python 2 then you can use the Py

### Changes to GoogleCloudStorageHook

* the discovery-based api (`googleapiclient.discovery`) used in `GoogleCloudStorageHook` is now replaced by the recommended client based api (`google-cloud-storage`). To know the difference between both the libraries, read https://cloud.google.com/apis/docs/client-libraries-explained. PR: [#5054](https://github.com/apache/airflow/pull/5054)
* the discovery-based api (`googleapiclient.discovery`) used in `GoogleCloudStorageHook` is now replaced by the recommended client based api (`google-cloud-storage`). To know the difference between both the libraries, read https://cloud.google.com/apis/docs/client-libraries-explained. PR: [#5054](https://github.com/apache/airflow/pull/5054)
* as a part of this replacement, the `multipart` & `num_retries` parameters for `GoogleCloudStorageHook.upload` method have been deprecated.

The client library uses multipart upload automatically if the object/blob size is more than 8 MB - [source code](https://github.com/googleapis/google-cloud-python/blob/11c543ce7dd1d804688163bc7895cf592feb445f/storage/google/cloud/storage/blob.py#L989-L997). The client also handles retries automatically

* the `generation` parameter is deprecated in `GoogleCloudStorageHook.delete` and `GoogleCloudStorageHook.insert_object_acl`.
* the `generation` parameter is deprecated in `GoogleCloudStorageHook.delete` and `GoogleCloudStorageHook.insert_object_acl`.

Updating to `google-cloud-storage >= 1.16` changes the signature of the upstream `client.get_bucket()` method from `get_bucket(bucket_name: str)` to `get_bucket(bucket_or_name: Union[str, Bucket])`. This method is not directly exposed by the airflow hook, but any code accessing the connection directly (`GoogleCloudStorageHook().get_conn().get_bucket(...)` or similar) will need to be updated.

Expand Down Expand Up @@ -305,7 +317,7 @@ then you need to change it like this
@property
def is_active(self):
return self.active

### Support autodetected schemas to GoogleCloudStorageToBigQueryOperator

GoogleCloudStorageToBigQueryOperator is now support schema auto-detection is available when you load data into BigQuery. Unfortunately, changes can be required.
Expand All @@ -317,7 +329,7 @@ define a schema_fields:
gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
...
schema_fields={...})

or define a schema_object:

gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def __init__(
priority_weight=1, # type: int
weight_rule=WeightRule.DOWNSTREAM, # type: str
queue=configuration.conf.get('celery', 'default_queue'), # type: str
pool=Pool.DEFAULT_POOL_NAME, # type: str
pool=None, # type: str
sla=None, # type: Optional[timedelta]
execution_timeout=None, # type: Optional[timedelta]
on_failure_callback=None, # type: Optional[Callable]
Expand Down Expand Up @@ -351,7 +351,7 @@ def __init__(
self.retries = retries if retries is not None else \
int(configuration.conf.get('core', 'default_task_retries', fallback=0))
self.queue = queue
self.pool = pool
self.pool = Pool.DEFAULT_POOL_NAME if pool is None else pool
self.sla = sla
self.execution_timeout = execution_timeout
self.on_failure_callback = on_failure_callback
Expand Down

0 comments on commit 9b58c88

Please sign in to comment.