To perform a ETL pipeline, some configurations are needed. For example: credentials to connect to data sources and default paths. This chapter will cover the main setup configurations on Butterfree.
Some of the configurations are possible to set trough environment variables, the API or both. The priority is first trying to use the parameters used in the instantiation, with the environment variables as fallback. Check the following examples:
Configurations:
- Kafka Consumer Connection String: connection string to use to connect to the Kafka.
from butterfree.extract.readers import KafkaReader
kafka_reader = KafkaReader(
id="reader_id",
topic="topic_name",
value_schema=schema,
connection_string="host1:port,host2:port"
)
Setup the following environment variable in the Spark Cluster/Locally:
KAFKA_CONSUMER_CONNECTION_STRING
After setting this variables you can instantiate KafkaReader
easier:
from butterfree.extract.readers import KafkaReader
kafka_reader = KafkaReader(
id="reader_id",
topic="topic_name",
value_schema=schema
)
Configurations:
-
Cassandra Username: username to connect to CassandraDB.
-
Cassandra Password: password to connect to CassandraDB.
-
Cassandra Host: host to connect to CassandraDB.
-
Cassandra Keyspace: keyspace to connect to CassandraDB.
-
Stream Checkpoint Path: path on S3 to save the checkpoint for the streaming query. Only need when performing streaming writes.
from butterfree.configs.db import CassandraConfig
from butterfree.load.writers import OnlineFeatureStoreWriter
db_config = CassandraConfig(
username="username",
password="password",
host="host",
keyspace="keyspace",
stream_checkpoint_path="path"
)
writer = OnlineFeatureStoreWriter(db_config=db_config)
Setup the following environment variables in the Spark Cluster/Locally:
CASSANDRA_USERNAME
, CASSANDRA_PASSWORD
, CASSANDRA_HOST
, CASSANDRA_KEYSPACE
, STREAM_CHECKPOINT_PATH
After setting this variables you can instantiate CassandraConfig
and OnlineFeatureStoreWriter
easier:
from butterfree.configs.db import CassandraConfig
from butterfree.load.writers import OnlineFeatureStoreWriter
db_config = CassandraConfig()
writer = OnlineFeatureStoreWriter(db_config=db_config)
# or just
writer = OnlineFeatureStoreWriter()
Configurations:
-
Kafka connection string: string with hosts and ports to connect.
-
Stream Checkpoint Path: path on S3 to save the checkpoint for the streaming query. Only need when performing streaming writes.
from butterfree.configs.db import KafkaConfig
from butterfree.load.writers import OnlineFeatureStoreWriter
db_config = KafkaConfig(
kafka_connection_string="kafka_connection_string",
stream_checkpoint_path="path"
)
writer = OnlineFeatureStoreWriter(db_config=db_config)
Setup the following environment variables in the Spark Cluster/Locally:
KAFKA_CONSUMER_CONNECTION_STRING
, STREAM_CHECKPOINT_PATH
After setting this variables you can instantiate KafkaConfig
and OnlineFeatureStoreWriter
easier:
from butterfree.configs.db import KafkaConfig
from butterfree.load.writers import OnlineFeatureStoreWriter
kafka_config = KafkaConfig()
writer = OnlineFeatureStoreWriter(db_config=kafka_config)
Configurations:
-
Feature Store S3 Bucket: Bucket on S3 to use for the Historical Feature Store.
-
Feature Store Historical Database: Database on Spark metastore to use to write the tables from Historical Feature Store.
from butterfree.configs.db import S3Config
from butterfree.load.writers import HistoricalFeatureStoreWriter
db_config = S3Config(bucket="bucket")
writer = HistoricalFeatureStoreWriter(
db_config=db_config,
database="database"
)
Setup the following environment variables in the Spark Cluster/Locally:
FEATURE_STORE_S3_BUCKET
, FEATURE_STORE_HISTORICAL_DATABASE
After setting this variables you can instantiate S3Config
and HistoricalFeatureStoreWriter
easier:
from butterfree.configs.db import S3Config
from butterfree.load.writers import HistoricalFeatureStoreWriter
db_config = S3Config()
writer = HistoricalFeatureStoreWriter(db_config=db_config)
# or just
writer = HistoricalFeatureStoreWriter()
For more detailed information on the API arguments is better to check their own docstrings.
- Check if Spark cluster machines have all the permissions to read/write/modify on the Historical FeatureStore Bucket
- Check if Spark cluster machines can reach the all the configured hosts
- Check if the configured credentials have all the needed permissions on CassandraDB