Data flows orchestrated using Prefect.
- ECS runs Prefect Agent and Flow tasks
- Docker image stores Flow code and libraries
- Auto-scaling Prefect Agent based on CPU usage
- Private subnet shields tasks from the internet; only egress is needed
- Permissions according to the principle of least privilege.
We use two environments in this repo:
- A Python environment, for writing Prefect Flows. Code is located in
src/. - A Node environment for AWS infrastructure that Prefect needs to run Flows. Code is located in
.aws/.
- docker
- PyCharm
Black is will be installed as part of your Dev dependencies. This is a great way to auto format your Python code. You can even install it as the default formatter in your IDE to format on save. Here is a link to an example for vs-code: https://code.visualstudio.com/docs/python/editing#_formatting.
We will probably not add Black formatting check as part of the PR process for this version of Prefect v1. However, we may require it for developing and deploying flows on v2.
- Copy the
.env.examplefile to a file in the same directory called.env. Change the values according to the instructions you find in that file.⚠️ Do not put your credentials in.env.exampleto prevent accidentally checking them into git. Modifying.envis safe because it's git ignored.- Set
PREFECT__CLOUD__API_KEYto a Prefect API key that you can create on the API keys page. - Set
SNOWFLAKE_PRIVATE_KEYto your decrypted private key, as follows:- If you haven't already, create Snowflake development credentials. These are usually stored in
~/.snowflake-keys. - Run
openssl rsa -in ~/.snowflake/rsa_key.p8and enter the passphrase for this file when prompted. - Copy the value, after (but not including)
-----BEGIN RSA PRIVATE KEY-----and before (not including)-----END RSA PRIVATE KEY-----. - In a text editor, remove all newlines (
\n). SetSNOWFLAKE_PRIVATE_KEYto the resulting string.
- If you haven't already, create Snowflake development credentials. These are usually stored in
- Set
- Run
docker compose build && docker compose upto check that you can start the Prefect agent. When the build is complete, you should see the agent start up and poll to Prefect Cloud:prefect_1 | DEBUG:agent:No ready flow runs found. prefect_1 | [2022-02-03 23:18:53,127] DEBUG - agent | No ready flow runs found. prefect_1 | [2022-02-03 23:18:53,128] DEBUG - agent | Sleeping flow run poller for 2.0 seconds...
- Hit Ctrl+C to stop the Prefect agent.
- In PyCharm, right-click on the src directory > Mark Directory as > Sources Root
- In PyCharm, Configuring Docker Compose as a remote interpreter
⚠️ Even when flows are executed locally they can affect production resources. Check whether theAWS_PROFILEvariable in your.envfile has write access in production, and if so, consider whether the flow you're going to run could have unintended consequences. Ask if you're not sure.- Run
source <(maws -o awscli)and choose the AWS account that matches the value ofAWS_PROFILEin your.envfile. - Run the flow in PyCharm, for example by right-clicking on the corresponding file in the
src/flows/directory and choosing 'Run'.
Libraries are managed using pipenv, to create a consistent run-time environment. Follow the steps below to install a new Python library.
- Run
docker compose up. - In a new terminal, run
docker compose exec prefect pipenv install pydantic, replacingpydanticwith the library you want to install.- Add
--devfor development libraries that don't need to be installed on production, for exampledocker compose exec prefect pipenv install --dev pytest.
- Add
- The output should look something like this:
$ docker compose exec prefect pipenv install pydantic
Installing pydantic...
Adding pydantic to Pipfile's [packages]...
✔ Installation Succeeded
Installing dependencies from Pipfile.lock (46b380)...
🐍 ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 68/68 — 00:00:20- Run
docker compose cp prefect:/Pipfile ./ && docker compose cp prefect:/Pipfile.lock ./to copy the Pipenv files from your Docker container to your host.- Note: if you get
No such command: cp, try upgrading Docker, or use docker cp instead.
- Note: if you get
PipfileandPipfile.lockshould have been changed. Commit those changes to git.- Run
docker compose buildto rebuild your Docker image.
- node/npm and nvm (see How to set up a Node.js development environment)
- tfenv
cd .awsto go to the .aws directory in the project rootnvm useto use the right Node versionnpm cito install packages from package-lock.json
Pushing to the dev branch (see 'Deployment' below) is an easy way to deploy infrastructure changes to Pocket-Dev.
The steps below are useful if you want to iterate more quickly over changes in the .aws/ directory.
- Run
$(maws)and obtain write access to Pocket-Dev - Run
tfenv useto get the right version of Terraform - Run
npm run build:dev - From the
.aws/directory, runcd cdktf.out/stacks/data-flows/ - Run
terraform initand choose 'Dev' - Run
npm run build:dev && terraform apply. Repeat this step when you want to apply changes.
Here are some things you'll want to do for using a flow in production:
- Get the flow into on-call alerts (instructions here)
- Log important metrics (for example number of rows)
- Throw exceptions for invalid input
- Usually flows will run on a schedule
- Pocket-Dev:
git push -f origin my-local-branch:dev - Production: get your PR approved, and merge it into the main branch
Deployments take about 15 minutes. You can monitor their progress in CircleCI and CodePipeline.
This section lists the manual steps that have to be taken when this service is deployed to an AWS environment for the first time.
Create a Prefect project with the name equal to the git branch name which will trigger the deployment.
The following parameters need to be created in the SSM Parameter Store.
Replace {Env} with the environment name as defined in
.aws/src/config.
| Name | Type | Description |
|---|---|---|
/DataFlows/{Env}/PREFECT_API_KEY |
SecureString | Prefect service account API key with 'user' permissions to the previously created project |
/DataFlows/{Env}/SNOWFLAKE_PRIVATE_KEY |
SecureString | Decrypted base64 Snowflake private key |
/DataFlows/{Env}/SNOWFLAKE_ACCOUNT |
String | Snowflake account id |
/DataFlows/{Env}/SNOWFLAKE_USER |
String | Snowflake username |
/DataFlows/{Env}/DBT_CLOUD_TOKEN |
SecureString | Dbt service account token |
/DataFlows/{Env}/DBT_CLOUD_ACCOUNT_ID |
String | Dbt account id that you can find in the Dbt cloud url |
/DataFlows/{Env}/GCE_KEY |
SecureString | GCP key |
/DataFlows/{Env}/BRAZE_API_KEY |
SecureString | Braze API key with write access to 'User Data' and 'Subscription'. |
/DataFlows/{Env}/BRAZE_REST_ENDPOINT |
String | Braze REST endpoint |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_ROLE |
String | Snowflake Role for executing the Deleted User Account data deletions. |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_WAREHOUSE |
String | Snowflake Warehouse used for data deletions of Deleted User Accounts. |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_DB |
String | Snowflake DB to record the deleted user accounts. |
/DataFlows/{Env}/SNOWFLAKE_DATA_RETENTION_SCHEMA |
String | Snowflake Schema to record the deleted user accounts. |
/DataFlows/{Env}/SNOWFLAKE_SNOWPLOW_DB |
String | Snowflake DB that has the Snowplow raw events. |
/DataFlows/{Env}/SNOWFLAKE_SNOWPLOW_SCHEMA |
String | Snowflake Schema that has the Snowplow raw events. |
/DataFlows/{Env}/SNOWFLAKE_RAWDATA_DB |
String | Snowflake DB that has the legacy Raw events. |
/DataFlows/{Env}/SNOWFLAKE_RAWDATA_FIREHOSE_SCHEMA |
String | Snowflake Schema that has the legacy Raw events. |
/DataFlows/{Env}/SNOWFLAKE_SNAPSHOT_DB |
String | Snowflake DB that has the legacy Raw Snapshot events. |
/DataFlows/{Env}/SNOWFLAKE_SNAPSHOT_FIREHOSE_SCHEMA |
String | Snowflake Schema that has the legacy Raw Snapshot events. |
- Data validation
- Persist Prefect results to S3
- Automated integration tests
- Python linter
- Sentry integration
- Switch to the LocalDaskExecutor to allow tasks to be executed in parallel
- Prefect works well with long running tasks. When flow execution duration is not a concern, its generally better to build simpler flows that take longer than invest developer time to build more efficient but more complex flows.
- When working with large data files.
- Queue them to disk and process in chunks or streams.
- Avoid reduce tasks,
foo.map(), bar.map(), reduce(). All results have to be in memory at the same time significantly increasing memory requirements.
- Smaller source files are easier to work with than large source files. Large source files require downloading to disk and then chunking. Small source files can be downloaded entirely to memory and directly to data frames.
- File transfer between a 4 VCPU ECS task and S3 is fast. Defining flow worker thread counts to be greater than the VCPU count only provides minimal gains.
- Python gzip compression defaults to as small as possible and maximum CPU. We want compression=1 in most scenarios.
- Experimental cloud account: https://cloud.prefect.io/mathijs-getpocket-com-s-account
- Running Prefect locally
- Running Prefect on AWS
