diff --git a/sqlmesh/core/engine_adapter/snowflake.py b/sqlmesh/core/engine_adapter/snowflake.py index 09c530b8f3..d589b5d15b 100644 --- a/sqlmesh/core/engine_adapter/snowflake.py +++ b/sqlmesh/core/engine_adapter/snowflake.py @@ -407,7 +407,7 @@ def query_factory() -> Query: elif isinstance(df, pd.DataFrame): from snowflake.connector.pandas_tools import write_pandas - ordered_df = df[list(source_columns_to_types)] + ordered_df = df[list(source_columns_to_types)].reset_index(drop=True) # Workaround for https://github.com/snowflakedb/snowflake-connector-python/issues/1034 # The above issue has already been fixed upstream, but we keep the following diff --git a/sqlmesh/core/model/seed.py b/sqlmesh/core/model/seed.py index 9fd57fe6d3..ff12085690 100644 --- a/sqlmesh/core/model/seed.py +++ b/sqlmesh/core/model/seed.py @@ -113,7 +113,7 @@ def read(self, batch_size: t.Optional[int] = None) -> t.Generator[pd.DataFrame, batch_size = batch_size or df.size batch_start = 0 while batch_start < df.shape[0]: - yield df.iloc[batch_start : batch_start + batch_size, :] + yield df.iloc[batch_start : batch_start + batch_size, :].copy() batch_start += batch_size def _get_df(self) -> pd.DataFrame: diff --git a/tests/core/engine_adapter/test_snowflake.py b/tests/core/engine_adapter/test_snowflake.py index dcb6820297..085c51098b 100644 --- a/tests/core/engine_adapter/test_snowflake.py +++ b/tests/core/engine_adapter/test_snowflake.py @@ -469,6 +469,26 @@ def test_df_to_source_queries_use_schema( assert 'USE SCHEMA "other_catalog"."other_db"' in to_sql_calls(adapter) +def test_df_to_source_queries_reset_non_default_index( + make_mocked_engine_adapter: t.Callable, mocker: MockerFixture +): + mocker.patch( + "sqlmesh.core.engine_adapter.snowflake.SnowflakeEngineAdapter.table_exists", + return_value=False, + ) + write_pandas = mocker.patch("snowflake.connector.pandas_tools.write_pandas", return_value=None) + adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter) + + df = pd.DataFrame({"a": [2, 3], "b": [5, 6]}, index=[1, 2]) + adapter.replace_query( + "other_db.test_table", df, {"a": exp.DataType.build("INT"), "b": exp.DataType.build("INT")} + ) + + uploaded_df = write_pandas.call_args.args[1] + assert uploaded_df.index.equals(pd.RangeIndex(start=0, stop=2, step=1)) + assert uploaded_df.to_dict("list") == {"a": [2, 3], "b": [5, 6]} + + def test_create_managed_table(make_mocked_engine_adapter: t.Callable, mocker: MockerFixture): adapter = make_mocked_engine_adapter(SnowflakeEngineAdapter) diff --git a/tests/core/test_seed.py b/tests/core/test_seed.py index a22805cbd2..b6f335b0c6 100644 --- a/tests/core/test_seed.py +++ b/tests/core/test_seed.py @@ -58,6 +58,21 @@ def test_read_custom_settings(): pd.testing.assert_frame_equal(next(dfs), expected_df) +def test_read_returns_independent_batches(): + content = """key,value +1,one +2,two +""" + seed = Seed(content=content) + seed_reader = seed.reader() + + batches = list(seed_reader.read(batch_size=1)) + batches[0].at[0, "value"] = "changed" + + assert [df["value"].tolist() for df in batches] == [["changed"], ["two"]] + assert next(seed_reader.read())["value"].tolist() == ["one", "two"] + + def test_column_hashes(): content = """key,value,ds 1,one,2022-01-01