Skip to content

Commit

Permalink
refactor: notebook output tables (#40)
Browse files Browse the repository at this point in the history
* feat: refactor to new notebook output table API

* chore: bump version

* fix: support both resumable and non-resumable notebook uploads

* fix: resumable vs. standard upload

* add logging for debugging

* refactor: remove ability to pass string to notebook.create_output_Table

* fix: open file to be uploaded

* chore: final cleanup, remove debug logs
  • Loading branch information
imathews committed May 20, 2024
1 parent 6a4c717 commit 85a1126
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 51 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"pyarrow >= 13.0.0",
"requests >= 2.0.0",
"shapely >= 2.0.1",
"tqdm == 4.64.0",
"tqdm >= 4.64.0",
]

# What packages are optional?
Expand Down
2 changes: 1 addition & 1 deletion src/redivis/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.7"
__version__ = "0.14.8"
1 change: 0 additions & 1 deletion src/redivis/classes/Base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import json


class Base:
Expand Down
1 change: 0 additions & 1 deletion src/redivis/classes/Dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from .Query import Query
from .Base import Base
from urllib.parse import quote as quote_uri
import warnings

from ..common.api_request import make_request, make_paginated_request

Expand Down
1 change: 0 additions & 1 deletion src/redivis/classes/File.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ..common.api_request import make_request
from urllib.parse import quote as quote_uri


class File(Base):
def __init__(
self,
Expand Down
93 changes: 55 additions & 38 deletions src/redivis/classes/Notebook.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
from .Table import Table
from .Base import Base
from ..common.api_request import make_request
from tqdm.auto import tqdm

import pathlib
import uuid
import os
import pyarrow as pa
import pyarrow.dataset as pa_dataset
import pyarrow.parquet as pa_parquet
import pandas as pd
from ..common.retryable_upload import perform_resumable_upload, perform_standard_upload

class Notebook(Base):
def __init__(
Expand All @@ -17,46 +19,61 @@ def __init__(
self.current_notebook_job_id = current_notebook_job_id

def create_output_table(self, data=None, *, name=None, append=False, geography_variables=None):
if type(data) == str:
temp_file_path = str
temp_file_path = f"/tmp/redivis/out/{uuid.uuid4()}"
pathlib.Path(temp_file_path).parent.mkdir(exist_ok=True, parents=True)

import geopandas
import pandas as pd

if isinstance(data, geopandas.GeoDataFrame):
if geography_variables is None:
geography_variables = list(data.select_dtypes('geometry'))
data.to_wkt().to_parquet(path=temp_file_path, index=False)
elif isinstance(data, pd.DataFrame):
data.to_parquet(path=temp_file_path, index=False)
elif isinstance(data, pa_dataset.Dataset):
pa_dataset.write_dataset(data, temp_file_path, format='parquet', basename_template='part-{i}.parquet', max_partitions=1)
temp_file_path = f'{temp_file_path}/part-0.parquet'
elif isinstance(data, pa.Table):
pa_parquet.write_table(data, temp_file_path)
else:
temp_file_path = f"/tmp/redivis/out/{uuid.uuid4()}"
pathlib.Path(temp_file_path).parent.mkdir(exist_ok=True, parents=True)

import geopandas

if isinstance(data, geopandas.GeoDataFrame):
if geography_variables is None:
geography_variables = list(data.select_dtypes('geometry'))
data.to_wkt().to_parquet(path=temp_file_path, index=False)
elif isinstance(data, pd.DataFrame):
data.to_parquet(path=temp_file_path, index=False)
elif isinstance(data, pa_dataset.Dataset):
pa_dataset.write_dataset(data, temp_file_path, format='parquet', basename_template='part-{i}.parquet', max_partitions=1)
temp_file_path = f'{temp_file_path}/part-0.parquet'
elif isinstance(data, pa.Table):
pa_parquet.write_table(data, temp_file_path)
# importing polars is causing an IllegalInstruction error on ARM + Docker. Import inline to avoid crashes elsewhwere
# TODO: revert once fixed upstream
import polars
if isinstance(data, polars.LazyFrame):
data.sink_parquet(temp_file_path)
elif isinstance(data, polars.DataFrame):
data.write_parquet(temp_file_path)
else:
# importing polars is causing an IllegalInstruction error on ARM + Docker. Import inline to avoid crashes elsewhwere
# TODO: revert once fixed upstream
import polars
if isinstance(data, polars.LazyFrame):
data.sink_parquet(temp_file_path)
elif isinstance(data, polars.DataFrame):
data.write_parquet(temp_file_path)
else:
raise Exception('Unknown datatype provided to notebook.create_output_table. Must either by a file path, or an instance of pandas.DataFrame, pyarrow.Dataset, pyarrow.Table, dask.DataFrame, polars.LazyFrame, or polars.DataFrame')
raise Exception('Unknown datatype provided to notebook.create_output_table. Must either by a file path, or an instance of pandas.DataFrame, pyarrow.Dataset, pyarrow.Table, dask.DataFrame, polars.LazyFrame, or polars.DataFrame')

size = os.stat(temp_file_path).st_size

pbar_bytes = tqdm(total=size, unit='B', leave=False, unit_scale=True)

res = make_request(
method="POST",
path=f"/notebookJobs/{self.current_notebook_job_id}/tempUploads",
payload={"tempUploads": [{"size": size, "resumable": size > 1e8}]},
)
temp_upload = res["results"][0]

with open(temp_file_path, 'rb') as f:
res = make_request(
method="PUT",
path=f"/notebookJobs/{self.current_notebook_job_id}/outputTable",
query={"name": name, "append": append, "geographyVariables": geography_variables},
payload=f,
parse_payload=False
)

if type(data) != str:
os.remove(temp_file_path)
if temp_upload["resumable"]:
perform_resumable_upload(data=f, progressbar=pbar_bytes,
temp_upload_url=temp_upload["url"])
else:
perform_standard_upload(data=f, temp_upload_url=temp_upload["url"],
progressbar=pbar_bytes)

pbar_bytes.close()
os.remove(temp_file_path)

res = make_request(
method="PUT",
path=f"/notebookJobs/{self.current_notebook_job_id}/outputTable",
payload={"name": name, "append": append, "geographyVariables": geography_variables, "tempUploadId": temp_upload["id"]},
)


return Table(name=res["name"], properties=res)
4 changes: 3 additions & 1 deletion src/redivis/classes/Query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import time
import warnings
import pyarrow as pa
import pandas as pd

from ..common.api_request import make_request
from ..common.list_rows import list_rows
Expand Down Expand Up @@ -107,6 +106,8 @@ def to_pandas_dataframe(self, max_results=None, *, geography_variable="", progre
batch_preprocessor=batch_preprocessor
)

import pandas as pd

if dtype_backend == 'numpy_nullable':
df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={
pa.int64(): pd.Int64Dtype(),
Expand Down Expand Up @@ -146,6 +147,7 @@ def to_geopandas_dataframe(self, max_results=None, *, geography_variable="", pro
coerce_schema=False,
batch_preprocessor=batch_preprocessor
)
import pandas as pd

if dtype_backend == 'numpy_nullable':
df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={
Expand Down
1 change: 0 additions & 1 deletion src/redivis/classes/Row.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import copy
import operator
from typing import Any, Dict, Iterable, Tuple


Expand Down
5 changes: 4 additions & 1 deletion src/redivis/classes/Table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import concurrent.futures
import pathlib
import pyarrow as pa
import pandas as pd
from tqdm.auto import tqdm
from threading import Event

Expand Down Expand Up @@ -394,6 +393,8 @@ def to_pandas_dataframe(self, max_results=None, *, variables=None, progress=True
batch_preprocessor=batch_preprocessor
)

import pandas as pd

if dtype_backend == 'numpy_nullable':
df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={
pa.int64(): pd.Int64Dtype(),
Expand Down Expand Up @@ -429,6 +430,8 @@ def to_geopandas_dataframe(self, max_results=None, *, variables=None, geography_
batch_preprocessor=batch_preprocessor
)

import pandas as pd

if dtype_backend == 'numpy_nullable':
df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={
pa.int64(): pd.Int64Dtype(),
Expand Down
15 changes: 11 additions & 4 deletions src/redivis/classes/Upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import warnings
import io
import pyarrow as pa
import pandas as pd
from tqdm.auto import tqdm

from ..common.util import get_geography_variable, get_warning
Expand All @@ -15,7 +14,7 @@
from .Base import Base
from .Variable import Variable
from ..common.api_request import make_request, make_paginated_request
from ..common.retryable_upload import perform_resumable_upload
from ..common.retryable_upload import perform_resumable_upload, perform_standard_upload

class Upload(Base):
def __init__(
Expand Down Expand Up @@ -68,10 +67,16 @@ def create(
res = make_request(
method="POST",
path=f"{self.table.uri}/tempUploads",
payload={"tempUploads": [{"size": size, "name": self.name, "resumable": True}]},
payload={"tempUploads": [{"size": size, "name": self.name, "resumable": size > 1e7}]},
)
temp_upload = res["results"][0]
perform_resumable_upload(data=data, temp_upload_url=temp_upload["url"], progressbar=pbar_bytes)
if temp_upload["resumable"]:
perform_resumable_upload(data=data, progressbar=pbar_bytes,
temp_upload_url=temp_upload["url"])
else:
perform_standard_upload(data=data, temp_upload_url=temp_upload["url"],
progressbar=pbar_bytes)

if progress:
pbar_bytes.close()

Expand Down Expand Up @@ -257,6 +262,7 @@ def to_pandas_dataframe(self, max_results=None, *, variables=None, progress=True
coerce_schema=True,
batch_preprocessor=batch_preprocessor
)
import pandas as pd

if dtype_backend == 'numpy_nullable':
df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={
Expand Down Expand Up @@ -291,6 +297,7 @@ def to_geopandas_dataframe(self, max_results=None, *, variables=None, geography_
coerce_schema=True,
batch_preprocessor=batch_preprocessor
)
import pandas as pd

if dtype_backend == 'numpy_nullable':
df = arrow_table.to_pandas(self_destruct=True, date_as_object=date_as_object, types_mapper={
Expand Down
1 change: 0 additions & 1 deletion src/redivis/common/retryable_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import requests
import os
import logging
from ..common.api_request import make_request
from tqdm.utils import CallbackIOWrapper


Expand Down

0 comments on commit 85a1126

Please sign in to comment.