Skip to content

Commit

Permalink
feat(python): add timestamp time travel in delta scan/read (#15813)
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Apr 22, 2024
1 parent 1834aea commit fe190b3
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 14 deletions.
43 changes: 32 additions & 11 deletions py-polars/polars/io/delta.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
from urllib.parse import urlparse
Expand All @@ -17,7 +18,7 @@
def read_delta(
source: str,
*,
version: int | None = None,
version: int | str | datetime | None = None,
columns: list[str] | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
Expand All @@ -34,7 +35,7 @@ def read_delta(
Note: For Local filesystem, absolute and relative paths are supported but
for the supported object storages - GCS, Azure and S3 full URI must be provided.
version
Version of the Delta lake table.
Numerical version or timestamp version of the Delta lake table.
Note: If `version` is not provided, the latest version of delta lake
table is read.
Expand Down Expand Up @@ -77,6 +78,12 @@ def read_delta(
>>> pl.read_delta(table_path, version=1) # doctest: +SKIP
Time travel a delta table from local filesystem using a timestamp version.
>>> pl.read_delta(
... table_path, version=datetime(2020, 1, 1, tzinfo=timezone.utc)
... ) # doctest: +SKIP
Reads a Delta table from AWS S3.
See a list of supported storage options for S3 `here
<https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants>`__.
Expand Down Expand Up @@ -142,7 +149,7 @@ def read_delta(
def scan_delta(
source: str,
*,
version: int | None = None,
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
pyarrow_options: dict[str, Any] | None = None,
Expand All @@ -158,7 +165,7 @@ def scan_delta(
Note: For Local filesystem, absolute and relative paths are supported but
for the supported object storages - GCS, Azure and S3 full URI must be provided.
version
Version of the Delta lake table.
Numerical version or timestamp version of the Delta lake table.
Note: If `version` is not provided, the latest version of delta lake
table is read.
Expand Down Expand Up @@ -199,6 +206,12 @@ def scan_delta(
>>> pl.scan_delta(table_path, version=1).collect() # doctest: +SKIP
Time travel a delta table from local filesystem using a timestamp version.
>>> pl.scan_delta(
... table_path, version=datetime(2020, 1, 1, tzinfo=timezone.utc)
... ).collect() # doctest: +SKIP
Creates a scan for a Delta table from AWS S3.
See a list of supported storage options for S3 `here
<https://docs.rs/object_store/latest/object_store/aws/enum.AmazonS3ConfigKey.html#variants>`__.
Expand Down Expand Up @@ -281,7 +294,7 @@ def _resolve_delta_lake_uri(table_uri: str, *, strict: bool = True) -> str:

def _get_delta_lake_table(
table_path: str,
version: int | None = None,
version: int | str | datetime | None = None,
storage_options: dict[str, Any] | None = None,
delta_table_options: dict[str, Any] | None = None,
) -> deltalake.DeltaTable:
Expand All @@ -298,12 +311,20 @@ def _get_delta_lake_table(
if delta_table_options is None:
delta_table_options = {}

dl_tbl = deltalake.DeltaTable(
table_path,
version=version,
storage_options=storage_options,
**delta_table_options,
)
if not isinstance(version, (str, datetime)):
dl_tbl = deltalake.DeltaTable(
table_path,
version=version,
storage_options=storage_options,
**delta_table_options,
)
else:
dl_tbl = deltalake.DeltaTable(
table_path,
storage_options=storage_options,
**delta_table_options,
)
dl_tbl.load_as_version(version)

return dl_tbl

Expand Down
2 changes: 1 addition & 1 deletion py-polars/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ adbc = ["adbc_driver_manager", "adbc_driver_sqlite"]
async = ["nest_asyncio"]
cloudpickle = ["cloudpickle"]
connectorx = ["connectorx >= 0.3.2"]
deltalake = ["deltalake >= 0.14.0"]
deltalake = ["deltalake >= 0.15.0"]
fastexcel = ["fastexcel >= 0.9"]
fsspec = ["fsspec"]
gevent = ["gevent"]
Expand Down
2 changes: 1 addition & 1 deletion py-polars/requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ openpyxl
pyxlsb
xlsx2csv
XlsxWriter
deltalake>=0.14.0
deltalake>=0.15.0
pyiceberg>=0.5.0
# Csv
zstandard
Expand Down
59 changes: 58 additions & 1 deletion py-polars/tests/unit/io/test_delta.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from datetime import datetime
import os
from datetime import datetime, timezone
from pathlib import Path

import pyarrow as pa
Expand Down Expand Up @@ -33,6 +34,34 @@ def test_scan_delta_version(delta_table_path: Path) -> None:
assert_frame_not_equal(df1, df2)


@pytest.mark.write_disk()
def test_scan_delta_timestamp_version(tmp_path: Path) -> None:
df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]})
df_sample.write_delta(tmp_path, mode="append")

df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]})
df_sample2.write_delta(tmp_path, mode="append")

log_dir = tmp_path / "_delta_log"
log_mtime_pair = [
("00000000000000000000.json", datetime(2010, 1, 1).timestamp()),
("00000000000000000001.json", datetime(2024, 1, 1).timestamp()),
]
for file_name, dt_epoch in log_mtime_pair:
file_path = log_dir / file_name
os.utime(str(file_path), (dt_epoch, dt_epoch))

df1 = pl.scan_delta(
str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc)
).collect()
df2 = pl.scan_delta(
str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc)
).collect()

assert_frame_equal(df1, df_sample)
assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False)


def test_scan_delta_columns(delta_table_path: Path) -> None:
ldf = pl.scan_delta(str(delta_table_path), version=0).select("name")

Expand Down Expand Up @@ -78,6 +107,34 @@ def test_read_delta_version(delta_table_path: Path) -> None:
assert_frame_not_equal(df1, df2)


@pytest.mark.write_disk()
def test_read_delta_timestamp_version(tmp_path: Path) -> None:
df_sample = pl.DataFrame({"name": ["Joey"], "age": [14]})
df_sample.write_delta(tmp_path, mode="append")

df_sample2 = pl.DataFrame({"name": ["Ivan"], "age": [34]})
df_sample2.write_delta(tmp_path, mode="append")

log_dir = tmp_path / "_delta_log"
log_mtime_pair = [
("00000000000000000000.json", datetime(2010, 1, 1).timestamp()),
("00000000000000000001.json", datetime(2024, 1, 1).timestamp()),
]
for file_name, dt_epoch in log_mtime_pair:
file_path = log_dir / file_name
os.utime(str(file_path), (dt_epoch, dt_epoch))

df1 = pl.read_delta(
str(tmp_path), version=datetime(2010, 1, 1, tzinfo=timezone.utc)
)
df2 = pl.read_delta(
str(tmp_path), version=datetime(2024, 1, 1, tzinfo=timezone.utc)
)

assert_frame_equal(df1, df_sample)
assert_frame_equal(df2, pl.concat([df_sample, df_sample2]), check_row_order=False)


def test_read_delta_columns(delta_table_path: Path) -> None:
df = pl.read_delta(str(delta_table_path), version=0, columns=["name"])

Expand Down

0 comments on commit fe190b3

Please sign in to comment.