Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sqlmesh/core/engine_adapter/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ def query_factory() -> Query:
elif isinstance(df, pd.DataFrame):
from snowflake.connector.pandas_tools import write_pandas

ordered_df = df[list(source_columns_to_types)]
ordered_df = df[list(source_columns_to_types)].reset_index(drop=True)

# Workaround for https://github.com/snowflakedb/snowflake-connector-python/issues/1034
# The above issue has already been fixed upstream, but we keep the following
Expand Down
2 changes: 1 addition & 1 deletion sqlmesh/core/model/seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def read(self, batch_size: t.Optional[int] = None) -> t.Generator[pd.DataFrame,
batch_size = batch_size or df.size
batch_start = 0
while batch_start < df.shape[0]:
yield df.iloc[batch_start : batch_start + batch_size, :]
yield df.iloc[batch_start : batch_start + batch_size, :].copy()
batch_start += batch_size

def _get_df(self) -> pd.DataFrame:
Expand Down
20 changes: 20 additions & 0 deletions tests/core/engine_adapter/test_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,26 @@ def test_df_to_source_queries_use_schema(
assert 'USE SCHEMA "other_catalog"."other_db"' in to_sql_calls(adapter)


def test_df_to_source_queries_reset_non_default_index(
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
):
mocker.patch(
"sqlmesh.core.engine_adapter.snowflake.SnowflakeEngineAdapter.table_exists",
return_value=False,
)
write_pandas = mocker.patch("snowflake.connector.pandas_tools.write_pandas", return_value=None)
adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter)

df = pd.DataFrame({"a": [2, 3], "b": [5, 6]}, index=[1, 2])
adapter.replace_query(
"other_db.test_table", df, {"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")}
)

uploaded_df = write_pandas.call_args.args[1]
assert uploaded_df.index.equals(pd.RangeIndex(start=0, stop=2, step=1))
assert uploaded_df.to_dict("list") == {"a": [2, 3], "b": [5, 6]}


def test_create_managed_table(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture):
adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter)

Expand Down
15 changes: 15 additions & 0 deletions tests/core/test_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ def test_read_custom_settings():
pd.testing.assert_frame_equal(next(dfs), expected_df)


def test_read_returns_independent_batches():
content = """key,value
1,one
2,two
"""
seed = Seed(content=content)
seed_reader = seed.reader()

batches = list(seed_reader.read(batch_size=1))
batches[0].at[0, "value"] = "changed"

assert [df["value"].tolist() for df in batches] == [["changed"], ["two"]]
assert next(seed_reader.read())["value"].tolist() == ["one", "two"]


def test_column_hashes():
content = """key,value,ds
1,one,2022-01-01
Expand Down
Loading