From fe190b3aecf56a558eb0de375be0773bcef40e99 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 22 Apr 2024 08:28:52 +0200 Subject: [PATCH] feat(python): add timestamp time travel in delta scan/read (#15813) --- py-polars/polars/io/delta.py | 43 ++++++++++++++----- py-polars/pyproject.toml | 2 +- py-polars/requirements-dev.txt | 2 +- py-polars/tests/unit/io/test_delta.py | 59 ++++++++++++++++++++++++++- 4 files changed, 92 insertions(+), 14 deletions(-) diff --git a/py-polars/polars/io/delta.py b/py-polars/polars/io/delta.py index f132fb589944..9255cd71a6e7 100644 --- a/py-polars/polars/io/delta.py +++ b/py-polars/polars/io/delta.py @@ -1,5 +1,6 @@ from __future__ import annotations +from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Any from urllib.parse import urlparse @@ -17,7 +18,7 @@ def read_delta( source: str, *, - version: int | None = None, + version: int | str | datetime | None = None, columns: list[str] | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, @@ -34,7 +35,7 @@ def read_delta( Note: For Local filesystem, absolute and relative paths are supported but for the supported object storages - GCS, Azure and S3 full URI must be provided. version - Version of the Delta lake table. + Numerical version or timestamp version of the Delta lake table. Note: If `version` is not provided, the latest version of delta lake table is read. @@ -77,6 +78,12 @@ def read_delta( >>> pl.read_delta(table_path, version=1) # doctest: +SKIP + Time travel a delta table from local filesystem using a timestamp version. + + >>> pl.read_delta( + ... table_path, version=datetime(2020, 1, 1, tzinfo=timezone.utc) + ... ) # doctest: +SKIP + Reads a Delta table from AWS S3. See a list of supported storage options for S3 `here `__. @@ -142,7 +149,7 @@ def read_delta( def scan_delta( source: str, *, - version: int | None = None, + version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, pyarrow_options: dict[str, Any] | None = None, @@ -158,7 +165,7 @@ def scan_delta( Note: For Local filesystem, absolute and relative paths are supported but for the supported object storages - GCS, Azure and S3 full URI must be provided. version - Version of the Delta lake table. + Numerical version or timestamp version of the Delta lake table. Note: If `version` is not provided, the latest version of delta lake table is read. @@ -199,6 +206,12 @@ def scan_delta( >>> pl.scan_delta(table_path, version=1).collect() # doctest: +SKIP + Time travel a delta table from local filesystem using a timestamp version. + + >>> pl.scan_delta( + ... table_path, version=datetime(2020, 1, 1, tzinfo=timezone.utc) + ... ).collect() # doctest: +SKIP + Creates a scan for a Delta table from AWS S3. See a list of supported storage options for S3 `here `__. @@ -281,7 +294,7 @@ def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str: def _get_delta_lake_table( table_path: str, - version: int | None = None, + version: int | str | datetime | None = None, storage_options: dict[str, Any] | None = None, delta_table_options: dict[str, Any] | None = None, ) -> deltalake.DeltaTable: @@ -298,12 +311,20 @@ def _get_delta_lake_table( if delta_table_options is None: delta_table_options = {} - dl_tbl = deltalake.DeltaTable( - table_path, - version=version, - storage_options=storage_options, - **delta_table_options, - ) + if not isinstance(version, (str, datetime)): + dl_tbl = deltalake.DeltaTable( + table_path, + version=version, + storage_options=storage_options, + **delta_table_options, + ) + else: + dl_tbl = deltalake.DeltaTable( + table_path, + storage_options=storage_options, + **delta_table_options, + ) + dl_tbl.load_as_version(version) return dl_tbl diff --git a/py-polars/pyproject.toml b/py-polars/pyproject.toml index 9a7a08e2db83..08a325f3f1a9 100644 --- a/py-polars/pyproject.toml +++ b/py-polars/pyproject.toml @@ -43,7 +43,7 @@ adbc = ["adbc_driver_manager", "adbc_driver_sqlite"] async = ["nest_asyncio"] cloudpickle = ["cloudpickle"] connectorx = ["connectorx >= 0.3.2"] -deltalake = ["deltalake >= 0.14.0"] +deltalake = ["deltalake >= 0.15.0"] fastexcel = ["fastexcel >= 0.9"] fsspec = ["fsspec"] gevent = ["gevent"] diff --git a/py-polars/requirements-dev.txt b/py-polars/requirements-dev.txt index e14a4b0daeff..22eb31b567fb 100644 --- a/py-polars/requirements-dev.txt +++ b/py-polars/requirements-dev.txt @@ -42,7 +42,7 @@ openpyxl pyxlsb xlsx2csv XlsxWriter -deltalake>=0.14.0 +deltalake>=0.15.0 pyiceberg>=0.5.0 # Csv zstandard diff --git a/py-polars/tests/unit/io/test_delta.py b/py-polars/tests/unit/io/test_delta.py index 423c3b4eaa72..811ed746c79e 100644 --- a/py-polars/tests/unit/io/test_delta.py +++ b/py-polars/tests/unit/io/test_delta.py @@ -1,6 +1,7 @@ from __future__ import annotations -from datetime import datetime +import os +from datetime import datetime, timezone from pathlib import Path import pyarrow as pa @@ -33,6 +34,34 @@ def test_scan_delta_version(delta_table_path: Path) -> None: assert_frame_not_equal(df1, df2) +@pytest.mark.write_disk() +def test_scan_delta_timestamp_version(tmp_path: Path) -> None: + df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]}) + df_sample.write_delta(tmp_path, mode="append") + + df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]}) + df_sample2.write_delta(tmp_path, mode="append") + + log_dir = tmp_path / "_delta_log" + log_mtime_pair = [ + ("00000000000000000000.json", datetime(2010, 1, 1).timestamp()), + ("00000000000000000001.json", datetime(2024, 1, 1).timestamp()), + ] + for file_name, dt_epoch in log_mtime_pair: + file_path = log_dir / file_name + os.utime(str(file_path), (dt_epoch, dt_epoch)) + + df1 = pl.scan_delta( + str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc) + ).collect() + df2 = pl.scan_delta( + str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc) + ).collect() + + assert_frame_equal(df1, df_sample) + assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False) + + def test_scan_delta_columns(delta_table_path: Path) -> None: ldf = pl.scan_delta(str(delta_table_path), version=0).select("name") @@ -78,6 +107,34 @@ def test_read_delta_version(delta_table_path: Path) -> None: assert_frame_not_equal(df1, df2) +@pytest.mark.write_disk() +def test_read_delta_timestamp_version(tmp_path: Path) -> None: + df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]}) + df_sample.write_delta(tmp_path, mode="append") + + df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]}) + df_sample2.write_delta(tmp_path, mode="append") + + log_dir = tmp_path / "_delta_log" + log_mtime_pair = [ + ("00000000000000000000.json", datetime(2010, 1, 1).timestamp()), + ("00000000000000000001.json", datetime(2024, 1, 1).timestamp()), + ] + for file_name, dt_epoch in log_mtime_pair: + file_path = log_dir / file_name + os.utime(str(file_path), (dt_epoch, dt_epoch)) + + df1 = pl.read_delta( + str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc) + ) + df2 = pl.read_delta( + str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc) + ) + + assert_frame_equal(df1, df_sample) + assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False) + + def test_read_delta_columns(delta_table_path: Path) -> None: df = pl.read_delta(str(delta_table_path), version=0, columns=["name"])