Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds retain_s3_files and s3_file_naming_scheme #77

Conversation

aaronsteers
Copy link

Resolves #76

This PR adds retain_s3_files and s3_file_naming_scheme as configuration options.

s3_file_naming_scheme (String)

  • Default: pipelinewise_{stream}_{timecode}.{ext}
  • Description: A parameterized string which specifies how each file should be named in S3. Variables allowed are: {stream}, {timecode}, and {ext}

retain_s3_files (Boolean)

  • Default: False
  • Description: Specify 'True' to keep files in S3 after importing is complete. Default behavior is False, which removes all files from S3 after loading.

@aaronsteers
Copy link
Author

@koszti - Let me know if everything looks okay here. I will post back again once I've completed end-to-end testing.

@aaronsteers
Copy link
Author

@koszti - Happy to report end-to-end testing on my side was successful. Is there anything further you'd like to see in terms of regression tests or changes?

image

@koszti
Copy link
Contributor

koszti commented Jun 4, 2020

this looks cool. Can you please add programmatic unit and integration tests as well covering the changes so we can test automatically?

  • For unit tests you can add something like test_put_to_stage into test_db_sync.py
    put_to_stage function returns the generated the filename and should be straightforward to test the output by calling it with a few different input parameters. However you might need to patch the boto3 object and the s3.upload_file methods. Alternatively you can consider to do some refactoring and create a clear function without the boto3 dependencies to make writing tests easier. Unfortunately this part of the code was not written very well with proper testing in mind :<

  • For Integration test you can add something like test_loading_tables_with_custom_s3_file_name into test_target_snowflake.py
    You can set the new config parameters and test if everything got loaded into Snowflake by the common assertions. You can use this as an example

@aaronsteers
Copy link
Author

@koszti - Thanks for this guidance. Yes, absolutely. Happy to add the unit tests you describe, and will post back here if I run into any blockers. Thanks again!

@aaronsteers
Copy link
Author

Just FYI - I'm using this PR fork in production successfully but I've yet to complete the creation of the additional tests. I'm still planning to produce those but have been focused recently on a couple other internal projects.

@aaronsteers
Copy link
Author

Logging here that, per conversations in #105, there are limited applications for this PR until parquet is also supported. While the tap does 'work' just fine, the long-term value is limited due to Snowflake's pure reliance on ordinal position when reading CSV files.

@aaronsteers aaronsteers marked this pull request as draft November 17, 2020 17:04
@arnisd
Copy link

arnisd commented Dec 16, 2020

Hi, What is the status for this PR. Is there any other option to maintain s3 files uploaded during the stg?

@aaronsteers
Copy link
Author

aaronsteers commented Dec 16, 2020

Hi, @arnisd . As I explain on #105, there are some challenges with creating long-term CSV storage in S3. Specifically, in our cases, we don't have a fully "stable" schema and we intentionally adapt and incorporate new columns as they are added. This creates a problem, however, since Snowflake does not use column headers at all and from day to day, each file may have a different column list or a different column ordering. (You only have an option to "skip" 1 or more header rows.)

The status of this PR is that we probably won't merge as-is. I've been working on Parquet as an intermediate store, and since Parquet knows it's own schema and is strongly typed, it would resolve the above issues.

That said, I do use this fork in my company's production environment. I just can't heartily recommend it, knowing the gap in schema introspection on historic data files.

@arnisd
Copy link

arnisd commented Dec 16, 2020

@aaronsteers thanks for the info. If you need any help with the target parquet I can help with that I am familiar with the format from the Spark world.

Also what do you think of this as an option: Instead of persisting those CSVs in the same folder that the target will use for future stages. Would another option be to copy the file to another folder used as a historical folder(useful in datalake architecture). This would allow us to store incremental loads, but will move everything out to make a clean folder so Snowflake does not misinterpret schema changes.

@aaronsteers
Copy link
Author

aaronsteers commented Dec 16, 2020

@aaronsteers thanks for the info. If you need any help with the target parquet I can help with that I am familiar with the format from the Spark world.

I may take you up on that! Do you have a GitLab user ID by chance? I'm building out a common framework based on some of the great work here from Pipelinewise, and based on my own learnings on the Singer platform. One of the first sources I'm tackling there, as a sample, is Parquet (here). If you wanted to help contribute some code over there - you could start from here and help me expand out the code base. My thought was once we have a target-parquet implementation we can vouch for, that work and code could then be incorporated back here as a substituted backend (in place of S3).

Also what do you think of this as an option: Instead of persisting those CSVs in the same folder that the target will use for future stages.

Yeah - I can see that as valuable. I believe currently the mapping to snowflake on each ingestion is at the file level - so extra files are not accidentally pulled in. That said, failed imports still can leave remnants so there could be some advantage to separate folders for "tmp" data versus "stored/landed" data.

@koszti
Copy link
Contributor

koszti commented Dec 16, 2020

We'd like to move away from csv completely and want to use parquet in target-snowflake. Additionally we'd like to add an option to keep the parquet files on s3 and create external tables in snowflake automatically on top of these parquet files on s3. We'll need to investigate the idea further but would be nice to save some money by not consuming compute credits when loading rarely used tables into real snowflake tables.

So, supporting parquet files, and keeping these parquet files on s3 are something that we're really looking after in the near future and would be nice if we could work together somehow.

@aaronsteers
Copy link
Author

aaronsteers commented Apr 22, 2021

@koszti - I was excited and surprised to see Parquet support has shipped! Congrats on that! That's huge! 👍
#149

When/if I or someone else gets around to it, that lights this up again for being a viable feature addition - except likely a parquet-only feature.

@koszti
Copy link
Contributor

koszti commented Apr 27, 2021

@aaronsteers yes, parquet file format is now fully supported in target-snowflake, but seems like the original expectations were too high.

Using parquet files to load data into snowflake tables is significantly slower than CSV. Furthermore leaving parquet on s3 and selecting it directly from snowlfake by SQL is possible but it's really really slow. Doesn't matter if the data is already in columnar format in parquet, snowflake does not benefit from it and it's doing full scan on every column even if you select only one of them. Snowflake works far the best with CSV files and requires to load everything into real snowflake tables.

Based on our tests loading data from parquet is 30-50% slower than loading the same data from CSV. And selecting data directly from parquet files on s3 is 100x slower than selecting the same data from real snowflake tables.

Parquet file support is still relevant in singer frameworks but seems like it should be implemented separately as target-parquet and not as part of target-snowflake :( wdyt?

@aaronsteers
Copy link
Author

aaronsteers commented Apr 28, 2021

@koszti - Thanks for sharing these lessons learned. I found this article which confirms some of the metrics you are reporting.

Loading from Gzipped CSV is several times faster than loading from ORC and Parquet at an impressive 15 TB/Hour. While 5-6 TB/hour is decent if your data is originally in ORC or Parquet, don’t go out of your way to CREATE ORC or Parquet files from CSV in the hope that it will load Snowflake faster.

A few observations/thoughts...

In retrospect, I guess this shouldn't have been so surprising.

  1. Given that snowflake is re-processing input data into micropartitions no larger than 16 MB, which is then recompressed into Snowflakes own proprietary columnar format, I guess it makes sense that Snowflake is able to deserialize and re-serialize data faster when just read serially from a compress CSV one row at a time. Especially given bullet (3) below.
  2. The Extract-Load use case which Taps and Targets are focused on is characterized by serial access to all stored columns. Parquet (and columnar formats in general) are better optimized for random access and access to subset of columns.
  3. Your point about Snowflake not benefiting from only accepting a subset of columns at a time is likely a symptom of Snowflake's implementation choice of converting the entire row to a variant then to be parsed afterwards in SQL. This sounds like an implementation problem on Snowflake's side, and the performance implications are likely a result of that design decision. That said, I fear Snowflake may be actually incentivized not to improve this, since the Snowflake is incentivized towards ingesting the data into their own proprietary format, rather than optimizing their parser to more effectively leverage the parquet format as Spark and other big data platforms do today already.

No perfect solution

I think the point in storing data in parquet would be to invest in the data lake itself as an asset to be leveraged for interoperability, disaster recovery, and/or retroactive historical restatements. Those data lake use cases are not well served with the .csv.gz format because:

  • (1) the nature of singer is to land data dynamically generated according to a potentially ever-changing schema
  • (2) the design decision on the Snowflake side to only support ordinal column number access to CSVs and not having any support for column-name awareness (let alone data type awareness)

Given the above, I still think it may be worth some amount of performance penalty if building out the data lake is an important priority within an organization. This was especially important at my last employer when dealing with the salesforce tap, since (1) columns are added and removed on a monthly if not weekly basis and the (2) default "upsert" behavior for tables with columns causes irretrievable data loss if we later needed to analyze previous versions of a given record by its key.

Upshot

In the long run, I do hope the pressure will build for Snowflake to improve their performance profile for Parquet data access. If users in the meanwhile want to build out a data lake as a complement to their Snowflake environment, I think the Parquet root is probably the best-available choice. If a user doesn't care about retaining a usable archive in S3, probably the CSV.GZ format would provide the best raw throughput. Columnar formats like parquet are optimized for random selective access rather than full one-time serial write and read. If snowflake CSV parsing was column-name aware, I would not be against it necessarily as a long-term storage platform but I don't know how it is viable with only ordinal column access.

@koszti
Copy link
Contributor

koszti commented Jul 21, 2021

Similar feature added by #178 , #180 and #189 and released as part of 1.13.0

archive_load_files: (Default: False) When enabled, the files loaded to Snowflake will also be stored in archive_load_files_s3_bucket under the key /{archive_load_files_s3_prefix}/{schema_name}/{table_name}/. All archived files will have tap, schema, table and archived-by as S3 metadata keys. When incremental replication is used, the archived files will also have the following S3 metadata keys: incremental-key, incremental-key-min and incremental-key-max.

archive_load_files_s3_prefix: (Default: "archive") When archive_load_files is enabled, the archived files will be placed in the archive S3 bucket under this prefix.

archive_load_files_s3_bucket: (Default: Value of s3_bucket) When archive_load_files is enabled, the archived files will be placed in this bucket.

@koszti koszti closed this Jul 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Option to not delete from S3 after load?
3 participants