From 58aaf9c8c5c77c46b169ffb8cc88588ab6023e3d Mon Sep 17 00:00:00 2001 From: John Peters Date: Fri, 20 Oct 2023 21:42:28 -0500 Subject: [PATCH 1/2] "Python readme done. Could use some small changes. TO:DO item marked as well for review" --- README.md | 630 +++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 528 insertions(+), 102 deletions(-) diff --git a/README.md b/README.md index 845366d..9d84d97 100644 --- a/README.md +++ b/README.md @@ -93,7 +93,112 @@ if __name__ == '__main__': asyncio.run(main()) ``` -Once connected, the entire functionalities offered by Memphis are available. +```python + async def connect( + self, + host: str, + username: str, + account_id: int = 1, # Cloud use only, ignored otherwise + connection_token: str = "", # JWT token given when creating client accounts + password: str = "", # For password-based connections + port: int = 6666, + reconnect: bool = True, + max_reconnect: int = 10, + reconnect_interval_ms: int = 1500, + timeout_ms: int = 2000, + # For TLS connections: + cert_file: str = "", + key_file: str = "", + ca_file: str = "", + ) +``` + +The connect function in the Memphis class allows for the connection to Memphis. Connecting to Memphis (cloud or open-source) will be needed in order to use any of the other functionality of the Memphis class. Upon connection, all of Memphis' features are available. + +What arguments are used with the Memphis.connect function change depending on the type of connection being made. + +For details on deploying memphis open-source with different types of connections see the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security). + +The default connection type (for cloud) is using a JWT token-based connection. Here is an example of using memphis.connect with a JWT token: + +```python + # Imports hidden. See other examples +async def main(): + try: + memphis = Memphis() + await memphis.connect( + host = "localhost", + username = "user", + connection_token = "token", + # port = 6666, default port + # reconnect = True, default reconnect setting + # max_reconnect = 10, default number of reconnect attempts + # reconnect_interval_ms = 1500, default reconnect interval + # timeout_ms = 2000, default duration of time for the connection to timeout + ) + except Exception as e: + print(e) + finally: + await memphis.close() + +if __name__ == '__main__': + asyncio.run(main()) +``` + +The token will be presented when creating new users. + +A password-based connection would look like this (using the defualt root memphis login with Memphis open-source): + +```python + # Imports hidden. See other examples +async def main(): + try: + memphis = Memphis() + await memphis.connect( + host = "localhost", + username = "root", + password = "memphis", + # port = 6666, default port + # reconnect = True, default reconnect setting + # max_reconnect = 10, default number of reconnect attempts + # reconnect_interval_ms = 1500, default reconnect interval + # timeout_ms = 2000, default duration of time for the connection to timeout + ) + except Exception as e: + print(e) + finally: + await memphis.close() + +if __name__ == '__main__': + asyncio.run(main()) +``` + +> For the rest of the examples, the try-except statement and the asyncio runtime call will be withheld to assist with the succinctness of the examples. + +A TLS based connection would look like this: + +```python + # Imports hidden. See other examples + + try: + memphis = Memphis() + await memphis.connect( + host = "localhost", + username = "user", + key_file = "~/tls_file_path.key", + cert_file = "~/tls_cert_file_path.crt", + ca_file = "~/tls_ca_file_path.crt", + # port = 6666, default port + # reconnect = True, default reconnect setting + # max_reconnect = 10, default number of reconnect attempts + # reconnect_interval_ms = 1500, default reconnect interval + # timeout_ms = 2000, default duration of time for the connection to timeout + ) + except Exception as e: + print(e) + finally: +``` +Memphis needs to configured for these use cases. To configure memphis to use TLS see the [docs](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/production-best-practices#memphis-metadata-tls-connection-configuration). ### Disconnecting from Memphis @@ -104,64 +209,218 @@ await memphis.close() ``` ### Creating a Station -**Unexist stations will be created automatically through the SDK on the first producer/consumer connection with default values.**

-_If a station already exists nothing happens, the new configuration will not be applied_ - -```python -station = memphis.station( - name="", - schema_name="", # defaults to "" (no schema) - retention_type=Retention.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES/ACK_BASED(cloud only). Defaults to MAX_MESSAGE_AGE_SECONDS - retention_value=604800, # defaults to 604800 - storage_type=Storage.DISK, # Storage.DISK/Storage.MEMORY. Defaults to DISK - replicas=1, # defaults to 1 - idempotency_window_ms=120000, # defaults to 2 minutes - send_poison_msg_to_dls=True, # defaults to true - send_schema_failed_msg_to_dls=True, # defaults to true - tiered_storage_enabled=False, # defaults to false - partitions_number=1, # defaults to 1 - dls_station="" # defaults to "" (no DLS station) - If selected DLS events will be sent to selected station as well -) +**A station will be automatically created for the user when a consumer or producer is used if no stations with the given station name exist.**

+_If the station trying to be created exists when this function is called, nothing will change with the exisitng station_ + +```python + async def station( + self, + name: str, + retention_type: Retention = Retention.MAX_MESSAGE_AGE_SECONDS, # MAX_MESSAGE_AGE_SECONDS/MESSAGES/BYTES/ACK_BASED(cloud only). Defaults to MAX_MESSAGE_AGE_SECONDS + retention_value: int = 604800, # defaults to 604800 + storage_type: Storage = Storage.DISK, # Storage.DISK/Storage.MEMORY. Defaults to DISK + replicas: int = 1, + idempotency_window_ms: int = 120000, # defaults to 2 minutes + schema_name: str = "", # defaults to "" (no schema) + send_poison_msg_to_dls: bool = True, # defaults to true + send_schema_failed_msg_to_dls: bool = True, # defaults to true + tiered_storage_enabled: bool = False, # defaults to false + partitions_number: int = 1, # defaults to 1 + dls_station: str = "", # defaults to "" (no DLS station). If given, both poison and schema failed events will be sent to the DLS + ) +``` + +The station function is used to create a station. Using the different arguemnts, one can programically create many different types of stations. The Memphis UI can also be used to create stations to the same effect. + +A minimal example, using all default values would simply create a station with the given name: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station" + ) +``` + +To change what criteria the station uses to decide if a message should be retained in the station, change the retention type. The different types of retention are documented [here](https://github.com/memphisdev/memphis.py#retention-types) in the python README. + +The unit of the rentention value will vary depending on the retention_type. The [previous link](https://github.com/memphisdev/memphis.py#retention-types) also describes what units will be used. + +Here is an example of a station which will only hold up to 10 messages: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + retention_type = Retention.MESSAGES, + retention_value = 10 + ) +``` + +Memphis stations can either store Messages on disk or in memory. A comparison of those types of storage can be found [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#tier-1-local-storage). + +Here is an example of how to create a station that uses Memory as its storage type: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + storage_type = Storage.MEMORY + ) +``` + +In order to make a station more redundant, replicas can be used. Read more about replicas [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#replicas-mirroring). Note that replicas are only available in cluster mode. Cluster mode can be enabled in the [Helm settings](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/1-installation#appendix-b-helm-deployment-options) when deploying Memphis with Kubernetes. + +Here is an example of creating a station with 3 replicas: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + replicas = 3 + ) +``` + +Idempotency defines how Memphis will prevent duplicate messages from being stored or consumed. The duration of time the message ID's will be stored in the station can be set with idempotency_window_ms. If the environment Memphis is deployed in has unreliably connection and/or a lot of latency, increasing this value might be desiriable. The default duration of time is set to two minutes. Read more about idempotency [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/idempotency). + +Here is an example of changing the idempotency window to 3 seconds: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + idempotency_window_ms = 180000 + ) +``` + +The schema name is used to set a schema to be enforced by the station. The default value of "" ensures that no schema is enforced. Here is an example of changing the schema to a defined schema in schemaverse called "sensor_logs": + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + schema = "sensor_logs" + ) ``` +There are two parameters for sending messages to the [dead-letter station(DLS)](https://docs.memphis.dev/memphis/memphis-broker/concepts/dead-letter#terminology). These are send_poison_msg_to_dls and send_schema_failed_msg_to_dls. + +Here is an example of sending poison messages to the DLS but not messages which fail to conform to the given schema. + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + schema = "sensor_logs", + send_poison_msg_to_dls = True, + send_schema_failed_msg_to_dls = False + ) +``` + +When either of the DLS flags are set to True, a station can also be set to handle these events. To set a station as the station to where schema failed or poison messages will be set to, use the dls_station parameter: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + schema = "sensor_logs", + send_poison_msg_to_dls = True, + send_schema_failed_msg_to_dls = False, + dls_station = "bad_sensor_messages_station" + ) +``` + +When the retention value is met, Mempihs by default will delete old messages. If tiered storage is setup, Memphis can instead move messages to tier 2 storage. Read more about tiered storage [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#storage-tiering). Enable this setting with the respective flag: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + tiered_storage_enabled = True + ) +``` + +[Partitioning](https://docs.memphis.dev/memphis/memphis-broker/concepts/station#partitions) might be useful for a station. To have a station partitioned, simply change the partitions number: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.station( + name = "my_station", + partitions_number = 3 + ) +``` + + ### Retention types -Memphis currently supports the following types of retention: +Retention types define the methodology behind how a station behaves with its messages. Memphis currently supports the following retention types: ```python memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS ``` -Means that every message persists for the value set in retention value field (in seconds) +When the retention type is set to MAX_MESSAGE_AGE_SECONDS, messages will persist in the station for the number of seconds specified in the retention_value. + ```python memphis.types.Retention.MESSAGES ``` -Means that after max amount of saved messages (set in retention value), the oldest messages will be deleted +When the retention type is set to MESSAGES, the station will only hold up to retention_value messages. The station will delete the oldest messsages to maintain a retention_value number of messages. ```python memphis.types.Retention.BYTES ``` -Means that after max amount of saved bytes (set in retention value), the oldest messages will be deleted +When the retention type is set to BYTES, the station will only hold up to retention_value BYTES. The oldest messages will be deleted in order to maintain at maximum retention_vlaue BYTES in the station. ```python memphis.types.Retention.ACK_BASED # for cloud users only ``` -Means that after a message is getting acked by all interested consumer groups it will be deleted from the Station. +When the retention type is set to ACK_BASED, messages in the station will be deleted after they are acked by all subscribed consumer groups. -### Retention Values -The `retention values` are directly related to the `retention types` mentioned above, where the values vary according to the type of retention chosen. +### Retention Values -All retention values are of type `int` but with different representations as follows: +The unit of the `retention_value` changes depending on the `retention_type` specified. -`memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS` is represented **in seconds**, `memphis.types.Retention.MESSAGES` in a **number of messages**, `memphis.types.Retention.BYTES` in a **number of bytes** and finally and finally `memphis.ACK_BASED` is not using the retentionValue param at all. +All retention values are of type `int`. The following units are used based on the respective retention type: -After these limits are reached oldest messages will be deleted. +`memphis.types.Retention.MAX_MESSAGE_AGE_SECONDS` is **in seconds**,
+`memphis.types.Retention.MESSAGES` is a **number of messages**,
+`memphis.types.Retention.BYTES` is a **number of bytes**,
+With `memphis.ACK_BASED`, the `retention_type` is ignored ### Storage types @@ -170,23 +429,22 @@ Memphis currently supports the following types of messages storage: ```python memphis.types.Storage.DISK ``` - -Means that messages persist on disk +When storage is set to DISK, messages are stored on disk. ```python memphis.types.Storage.MEMORY ``` - -Means that messages persist on the main memory +When storage is set to MEMORY, messages are stored in the system memory. ### Station partitions -Memphis station is created with 1 patition by default -You can change the patitions number as you wish in order to scale your stations +Memphis stations are created with 1 patition by default. +You can change the patitions number as you wish in order to scale your stations. See the [documentation](https://docs.memphis.dev/memphis/memphis-broker/concepts/station#partitions) for more details on partitions. + ### Destroying a Station -Destroying a station will remove all its resources (producers/consumers) +Destroying a station will remove all its resources (including producers/consumers) ```python station.destroy() @@ -202,7 +460,20 @@ Current available schema types - Protobuf / JSON schema / GraphQL schema / Avro ### Enforcing a Schema on an Existing Station ```python -await memphis.enforce_schema("", "") +async def enforce_schema(self, name, station_name) +``` + +To add a schema to an already created station, enforce_schema can be used. Here is an example using enforce_schmea to add a schema to a station: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.enforce_schmea( + name = "my_schmea", + station_name = "my_station" + ) ``` ### Deprecated - Attaching a Schema, use enforce_schema instead @@ -214,7 +485,19 @@ await memphis.attach_schema("", "") ### Detaching a Schema from Station ```python -await memphis.detach_schema("") + async def detach_schema(self, station_name) +``` + +To remove a schema from an already created station, detach_schema can be used. Here is an example of removing a schmea from a station: + +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.detach_schmea( + station_name = "my_station" + ) ``` @@ -224,25 +507,56 @@ The most common client operations are `produce` to send messages and `consume` t receive messages. Messages are published to a station and consumed from it by creating a consumer. -Consumers are pull based and consume all the messages in a station unless you are using a consumers group, in this case messages are spread across all members in this group. +Consumers are poll based and consume all the messages in a station unless you are using a consumer group. When using a consumer group, all consumers in the group will receive each message. -Memphis messages are payload agnostic. Payloads are `bytearray`. +Memphis messages are payload agnostic. Payloads are always `bytearray`s. -In order to stop getting messages, you have to call `consumer.destroy()`. Destroy will terminate regardless -of whether there are messages in flight for the client. +In order to stop getting messages, you have to call `consumer.destroy()`. Destroy will terminate the consumer even if messages are currently being sent to the consumer. -If a station is created with more than one partition, produce and consume bill be perform in a Round Robin fasion +If a station is created with more than one partition, producing to and consuming from the station will happen in a round robin fashion. ### Creating a Producer ```python -producer = await memphis.producer(station_name="", producer_name="") + async def producer( + self, + station_name: str, + producer_name: str, + generate_random_suffix: bool = False, #Depreicated + ) +``` + +Use the Memphis producer function to create a producer. Here is an example of creating a producer for a given station: + +```python + memphis = Memphis() + + await memphis.connect(...) + + producer = await memphis.producer( + station_name = "my_station", + producer_name = "new_producer" + ) ``` ### Producing a message -Without creating a producer. -In cases where extra performance is needed the recommended way is to create a producer first -and produce messages by using the produce function of it + +```python +async def produce( + self, + message, + ack_wait_sec: int = 15, + headers: Union[Headers, None] = None, + async_produce: Union[bool, None] = None, + nonblocking: bool = False, + msg_id: Union[str, None] = None, + concurrent_task_limit: Union[int, None] = None, + producer_partition_key: Union[str, None] = None, + producer_partition_number: Union[int, -1] = -1 + ): +``` +Without creating a producer.
+ ```python await memphis.produce(station_name='test_station_py', producer_name='prod_py', message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or bytearray/dict (schema validated station - avro schema) @@ -250,56 +564,98 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py', headers=headers, # default to {} nonblocking=False, #defaults to false msg_id="123", - producer_partition_key="key", #default to None - producer_partition_number=-1, #default to -1 + producer_partition_key="key" #default to None ) ``` +With creating a producer. Creating a producer and calling produce on it will increase the performance of producing messages as it reduces the overhead of pulling created producers from the cache. -With creating a producer ```python await producer.produce( message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema) ack_wait_sec=15) # defaults to 15 ``` -### Add headers +Here is an example of a produce function call that waits up to 30 seconds for an acknowledgement from memphis and does so in an nonblocking manner: ```python -headers= Headers() -headers.add("key", "value") -await producer.produce( - message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) or or bytearray/dict (schema validated station - avro schema) - headers=headers) # default to {} + memphis = Memphis() + + await memphis.connect(...) + + await memphis.produce( + station_name = "some_station", + producer_name = "temp_producer", + message = {'some':'message'}, + ack_wait_sec = 30, + nonblocking = True + ) ``` -### Non-blocking Produce -For better performance, the client won't block requests while waiting for an acknowledgment. +### Producing with idempotency + +As discussed before in the station section, idempotency is an important feature of memphis. To achieve idempotency, an id must be assigned to messages that are being produced. Use the msg_id parameter for this purpose. ```python -await producer.produce( - message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) - headers={}, nonblocking=True) + memphis = Memphis() + + await memphis.connect(...) + + await memphis.produce( + station_name = "some_station", + producer_name = "temp_producer", + message = {'some':'message'}, + msg_id = '42' + ) ``` -### Produce using partition key -Use any string to produce messages to a specific partition +### Producing with headers +To add message headers to the message, use the headers parameter. Headers can help with observability when using certain 3rd party to help monitor the behavior of memphis. See [here](https://docs.memphis.dev/memphis/memphis-broker/comparisons/aws-sqs-vs-memphis#observability) for more details. ```python -await producer.produce( - message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) - producer_partition_key="key", #default to None -) + memphis = Memphis() + + await memphis.connect(...) + + await memphis.produce( + station_name = "some_station", + producer_name = "temp_producer", + message = {'some':'message'}, + headers = { + 'trace_header': 'track_me_123' + } + ) ``` -### Produce using partition number -Use number of partition to produce messages to a specific partition +### Producing to a partition + +Lastly, memphis can produce to a specific partition in a station. To do so, use the producer_partition_key parameter: ```python -await producer.produce( - message='bytearray/protobuf class/dict/string/graphql.language.ast.DocumentNode', # bytearray / protobuf class (schema validated station - protobuf) or bytearray/dict (schema validated station - json schema) or string/bytearray/graphql.language.ast.DocumentNode (schema validated station - graphql schema) - producer_partition_number=-1 #default to -1 -) + memphis = Memphis() + + await memphis.connect(...) + + await memphis.produce( + station_name = "some_station", + producer_name = "temp_producer", + message = {'some':'message'}, + producer_partition_key = "2nd_partition" + ) +``` + +Or, alternatively use the producer_partition_number parameter: +```python + memphis = Memphis() + + await memphis.connect(...) + + await memphis.produce( + station_name = "some_station", + producer_name = "temp_producer", + message = {'some':'message'}, + producer_partition_number = 2 + ) ``` ### Non-blocking Produce with Task Limits @@ -313,18 +669,6 @@ await producer.produce( headers={}, nonblocking=True, limit_concurrent_tasks=500) ``` - -### Message ID -Stations are idempotent by default for 2 minutes (can be configured), Idempotency achieved by adding a message id - -```python -await producer.produce( - message='bytearray/protobuf class/dict', # bytes / protobuf class (schema validated station - protobuf) or bytes/dict (schema validated station - json schema) - headers={}, - async_produce=True, - msg_id="123") -``` - ### Destroying a Producer ```python @@ -348,6 +692,99 @@ consumer = await memphis.consumer( ) ``` +Here is an example on how to create a consumer with all of the default options: + +```python + memphis = Memphis() + + await memphis.connect(...) + + consumer = await Memphis.consumer( + station_name = "my_station", + consumer_name: "new_consumer", + ) +``` + +To create a consumer in a consumer group, add the consumer_group parameter: + +```python + memphis = Memphis() + + await memphis.connect(...) + + consumer = await Memphis.consumer( + station_name = "my_station", + consumer_name: "new_consumer", + consumer_group: "consumer_group_1" + ) +``` + +When using Consumer.consume, the consumer will continue to consume in an infinite loop. To change the rate at which the consumer polls, change the pull_interval_ms parameter: + +```python + memphis = Memphis() + + await memphis.connect(...) + + consumer = await Memphis.consumer( + station_name = "my_station", + consumer_name = "new_consumer", + pull_interval_ms = 2000 + ) +``` + +Every time the consumer polls, the consumer will try to take batch_size number of elements from the station. However, sometimes there are not enough messages in the station for the consumer to consume a full batch. In this case, the consumer will continue to wait until either batch_size messages are gathered or the time in milliseconds specified by batch_max_time_to_wait_ms is reached. + +Here is an example of a consumer that will try to poll 100 messages every 10 seconds while waiting up to 15 seconds for all messages to reach the consumer. + +```python + memphis = Memphis() + + await memphis.connect(...) + + consumer = await Memphis.consumer( + station_name = "my_station", + consumer_name = "new_consumer", + pull_interval_ms = 10000, + batch_size = 100, + batch_max_time_to_wait_ms = 15000 + ) +``` + +The max_msg_deliveries parameter allows the user how many messages the consumer is able to consume before consuming more. The max_ack_time_ms Here is an example where the consumer will only hold up to one batch of messages at a time: + +```python + memphis = Memphis() + + await memphis.connect(...) + + consumer = await Memphis.consumer( + station_name = "my_station", + consumer_name = "new_consumer", + pull_interval_ms = 10000, + batch_size = 100, + batch_max_time_to_wait_ms = 15000, + max_msg_deliveries = 100 + ) +``` +### Consume using a partition key +The key will be used to consume from a specific partition + +```python +consumer.consume(msg_handler, + consumer_partition_key = "key" #consume from a specific partition + ) +``` + +### Consume using a partition number +The number will be used to consume from a specific partition + +```python +consumer.consume(msg_handler, + consumer_partition_number = -1 #consume from a specific partition + ) +``` + ### Setting a context for message handler function ```python @@ -368,6 +805,7 @@ async def msg_handler(msgs, error, context): print(error) consumer.consume(msg_handler) ``` + #### Processing schema deserialized messages To get messages deserialized, use `msg.get_data_deserialized()`. @@ -381,27 +819,16 @@ async def msg_handler(msgs, error, context): consumer.consume(msg_handler) ``` -if you have ingested data into station in one format, afterwards you apply a schema on the station, the consumer won't deserialize the previously ingested data. For example, you have ingested string into the station and attached a protobuf schema on the station. In this case, consumer won't deserialize the string. +There may be some instances where you apply a schema *after* a station has received some messages. In order to consume those messages you must use get_data_deserialized so that the consumer does not try to deserialize the given payload into the data format that is set in the schema and instead keeps the payload in its encoded format. -### Consume using a partition key -The key will be used to consume from a specific partition +TO:DO: Compare these paragraphs. I'm not sure if I captured the meaning exactly. It reads very confusingly. -```python -consumer.consume(msg_handler, - consumer_partition_key = "key" #consume from a specific partition - ) -``` +if you have ingested data into station in one format, afterwards you apply a schema on the station, the consumer won't deserialize the previously ingested data. For example, you have ingested string into the station and attached a protobuf schema on the station. In this case, consumer won't deserialize the string. -### Consume using a partition number -The number will be used to consume from a specific partition +### Fetch a single batch of messages -```python -consumer.consume(msg_handler, - consumer_partition_number = -1 #consume from a specific partition - ) -``` +Using fetch_messages or fetch will allow the user to remove a specific number of messages from a given station. This behavior could be beneficial if the user does not want to have a consumer actively poll from a station indefinetly. -### Fetch a single batch of messages ```python msgs = await memphis.fetch_messages( station_name="", @@ -465,7 +892,6 @@ sequence_number = msg.get_sequence_number() consumer.destroy() ``` - ### Check connection status ```python From 8abf960e6f248aef95d9dd485213042fbdfbb9cd Mon Sep 17 00:00:00 2001 From: John Peters Date: Mon, 23 Oct 2023 10:40:31 -0500 Subject: [PATCH 2/2] "Python readme changes made. I re-read everything to make sure it all sounds good, and addressed the changes in the PR that Yaniv made" --- README.md | 112 ++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 9d84d97..6f0d3fd 100644 --- a/README.md +++ b/README.md @@ -119,17 +119,17 @@ What arguments are used with the Memphis.connect function change depending on th For details on deploying memphis open-source with different types of connections see the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security). -The default connection type (for cloud) is using a JWT token-based connection. Here is an example of using memphis.connect with a JWT token: +A password-based connection would look like this (using the defualt root memphis login with Memphis open-source): ```python # Imports hidden. See other examples async def main(): - try: + try: memphis = Memphis() await memphis.connect( host = "localhost", - username = "user", - connection_token = "token", + username = "root", + password = "memphis", # port = 6666, default port # reconnect = True, default reconnect setting # max_reconnect = 10, default number of reconnect attempts @@ -145,19 +145,44 @@ if __name__ == '__main__': asyncio.run(main()) ``` -The token will be presented when creating new users. - -A password-based connection would look like this (using the defualt root memphis login with Memphis open-source): +If you wanted to connect to Memphis cloud instead, simply add your account ID and change the host. The host and account_id can be found on the Overview page in the Memphis cloud UI under your name at the top. Here is an example to connecting to a cloud broker that is located in US East: ```python # Imports hidden. See other examples async def main(): try: + memphis = Memphis() + await memphis.connect( + host = "aws-us-east-1.cloud.memphis.dev", + username = "my_client_username", + password = "my_client_password", + account_id = "123456789" + # port = 6666, default port + # reconnect = True, default reconnect setting + # max_reconnect = 10, default number of reconnect attempts + # reconnect_interval_ms = 1500, default reconnect interval + # timeout_ms = 2000, default duration of time for the connection to timeout + ) + except Exception as e: + print(e) + finally: + await memphis.close() + +if __name__ == '__main__': + asyncio.run(main()) +``` + +It is possible to use a token-based connection to memphis as well, where multiple users can share the same token to connect to memphis. Here is an example of using memphis.connect with a token: + +```python + # Imports hidden. See other examples +async def main(): + try: memphis = Memphis() await memphis.connect( host = "localhost", - username = "root", - password = "memphis", + username = "user", + connection_token = "token", # port = 6666, default port # reconnect = True, default reconnect setting # max_reconnect = 10, default number of reconnect attempts @@ -173,6 +198,10 @@ if __name__ == '__main__': asyncio.run(main()) ``` +The token will be presented when creating new users. + +Memphis needs to be configured to use a token based connection. See the [docs](https://docs.memphis.dev/memphis/memphis-broker/concepts/security) for help doing this. + > For the rest of the examples, the try-except statement and the asyncio runtime call will be withheld to assist with the succinctness of the examples. A TLS based connection would look like this: @@ -209,6 +238,9 @@ await memphis.close() ``` ### Creating a Station + +Stations are distributed units that store messages. Producers add messages to stations and Consumers take messages from them. Each station stores messages until their retention policy causes them to either delete the messages or move them to [remote storage](https://docs.memphis.dev/memphis/integrations-center/storage/s3-compatible). + **A station will be automatically created for the user when a consumer or producer is used if no stations with the given station name exist.**

_If the station trying to be created exists when this function is called, nothing will change with the exisitng station_ @@ -232,7 +264,7 @@ _If the station trying to be created exists when this function is called, nothin The station function is used to create a station. Using the different arguemnts, one can programically create many different types of stations. The Memphis UI can also be used to create stations to the same effect. -A minimal example, using all default values would simply create a station with the given name: +Creating a station with just a name name would create a station with that named and containing the default options provided above: ```python memphis = Memphis() @@ -244,6 +276,8 @@ A minimal example, using all default values would simply create a station with t ) ``` +### Stations with Retention + To change what criteria the station uses to decide if a message should be retained in the station, change the retention type. The different types of retention are documented [here](https://github.com/memphisdev/memphis.py#retention-types) in the python README. The unit of the rentention value will vary depending on the retention_type. The [previous link](https://github.com/memphisdev/memphis.py#retention-types) also describes what units will be used. @@ -262,6 +296,8 @@ Here is an example of a station which will only hold up to 10 messages: ) ``` +### Station storage types + Memphis stations can either store Messages on disk or in memory. A comparison of those types of storage can be found [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#tier-1-local-storage). Here is an example of how to create a station that uses Memory as its storage type: @@ -277,6 +313,8 @@ Here is an example of how to create a station that uses Memory as its storage ty ) ``` +### Station Replicas + In order to make a station more redundant, replicas can be used. Read more about replicas [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#replicas-mirroring). Note that replicas are only available in cluster mode. Cluster mode can be enabled in the [Helm settings](https://docs.memphis.dev/memphis/open-source-installation/kubernetes/1-installation#appendix-b-helm-deployment-options) when deploying Memphis with Kubernetes. Here is an example of creating a station with 3 replicas: @@ -292,6 +330,8 @@ Here is an example of creating a station with 3 replicas: ) ``` +### Station idempotency + Idempotency defines how Memphis will prevent duplicate messages from being stored or consumed. The duration of time the message ID's will be stored in the station can be set with idempotency_window_ms. If the environment Memphis is deployed in has unreliably connection and/or a lot of latency, increasing this value might be desiriable. The default duration of time is set to two minutes. Read more about idempotency [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/idempotency). Here is an example of changing the idempotency window to 3 seconds: @@ -307,6 +347,8 @@ Here is an example of changing the idempotency window to 3 seconds: ) ``` +### Enforcing a schema + The schema name is used to set a schema to be enforced by the station. The default value of "" ensures that no schema is enforced. Here is an example of changing the schema to a defined schema in schemaverse called "sensor_logs": ```python @@ -320,6 +362,8 @@ The schema name is used to set a schema to be enforced by the station. The defau ) ``` +### Dead Letter Stations + There are two parameters for sending messages to the [dead-letter station(DLS)](https://docs.memphis.dev/memphis/memphis-broker/concepts/dead-letter#terminology). These are send_poison_msg_to_dls and send_schema_failed_msg_to_dls. Here is an example of sending poison messages to the DLS but not messages which fail to conform to the given schema. @@ -353,6 +397,8 @@ When either of the DLS flags are set to True, a station can also be set to handl ) ``` +### Station Tiered Storage + When the retention value is met, Mempihs by default will delete old messages. If tiered storage is setup, Memphis can instead move messages to tier 2 storage. Read more about tiered storage [here](https://docs.memphis.dev/memphis/memphis-broker/concepts/storage-and-redundancy#storage-tiering). Enable this setting with the respective flag: ```python @@ -366,6 +412,8 @@ When the retention value is met, Mempihs by default will delete old messages. If ) ``` +### Station Partitions + [Partitioning](https://docs.memphis.dev/memphis/memphis-broker/concepts/station#partitions) might be useful for a station. To have a station partitioned, simply change the partitions number: ```python @@ -409,8 +457,6 @@ memphis.types.Retention.ACK_BASED # for cloud users only When the retention type is set to ACK_BASED, messages in the station will be deleted after they are acked by all subscribed consumer groups. - - ### Retention Values The unit of the `retention_value` changes depending on the `retention_type` specified. @@ -436,12 +482,6 @@ memphis.types.Storage.MEMORY ``` When storage is set to MEMORY, messages are stored in the system memory. -### Station partitions - -Memphis stations are created with 1 patition by default. -You can change the patitions number as you wish in order to scale your stations. See the [documentation](https://docs.memphis.dev/memphis/memphis-broker/concepts/station#partitions) for more details on partitions. - - ### Destroying a Station Destroying a station will remove all its resources (including producers/consumers) @@ -500,14 +540,14 @@ To remove a schema from an already created station, detach_schema can be used. H ) ``` - ### Produce and Consume messages -The most common client operations are `produce` to send messages and `consume` to +The most common client operations are using `produce` to send messages and `consume` to receive messages. -Messages are published to a station and consumed from it by creating a consumer. -Consumers are poll based and consume all the messages in a station unless you are using a consumer group. When using a consumer group, all consumers in the group will receive each message. +Messages are published to a station with a Producer and consumed from it by a Consumer. + +Consumers are poll based and consume all the messages in a station. Consumers can also be grouped into consumer groups. When consuming with a consumer group, all consumers in the group will receive each message. Memphis messages are payload agnostic. Payloads are always `bytearray`s. @@ -540,7 +580,6 @@ Use the Memphis producer function to create a producer. Here is an example of cr ``` ### Producing a message - ```python async def produce( self, @@ -555,7 +594,7 @@ async def produce( producer_partition_number: Union[int, -1] = -1 ): ``` -Without creating a producer.
+Both producers and connections can use the produce function. To produce a message from a connection, simply call `memphis.produce`. This function will create a producer if none with the given name exists, otherwise it will pull the producer from a cache and use it to produce the message. ```python await memphis.produce(station_name='test_station_py', producer_name='prod_py', @@ -568,7 +607,7 @@ await memphis.produce(station_name='test_station_py', producer_name='prod_py', ) ``` -With creating a producer. Creating a producer and calling produce on it will increase the performance of producing messages as it reduces the overhead of pulling created producers from the cache. +Creating a producer and calling produce on it will increase the performance of producing messages as it removes the overhead of pulling created producers from the cache. ```python await producer.produce( @@ -610,6 +649,7 @@ As discussed before in the station section, idempotency is an important feature ``` ### Producing with headers + To add message headers to the message, use the headers parameter. Headers can help with observability when using certain 3rd party to help monitor the behavior of memphis. See [here](https://docs.memphis.dev/memphis/memphis-broker/comparisons/aws-sqs-vs-memphis#observability) for more details. ```python @@ -644,7 +684,7 @@ Lastly, memphis can produce to a specific partition in a station. To do so, use ) ``` -Or, alternatively use the producer_partition_number parameter: +Or, alternatively, use the producer_partition_number parameter: ```python memphis = Memphis() @@ -659,9 +699,9 @@ Or, alternatively use the producer_partition_number parameter: ``` ### Non-blocking Produce with Task Limits + For better performance, the client won't block requests while waiting for an acknowledgment. -If you are producing a large number of messages and see timeout errors, then you may need to -limit the number of concurrent tasks like so: +If you are producing a large number of messages very quickly, there maybe some timeout errors, then you may need to limit the number of concurrent tasks to get around this: ```python await producer.produce( @@ -669,6 +709,8 @@ await producer.produce( headers={}, nonblocking=True, limit_concurrent_tasks=500) ``` +You may read more about this [here](https://memphis.dev/blog/producing-messages-at-warp-speed-best-practices-for-optimizing-your-producers/) on the memphis.dev blog. + ### Destroying a Producer ```python @@ -692,7 +734,7 @@ consumer = await memphis.consumer( ) ``` -Here is an example on how to create a consumer with all of the default options: +Consumers are used to pull messages from a station. Here is how to create a consumer with all of the default parameters: ```python memphis = Memphis() @@ -719,7 +761,7 @@ To create a consumer in a consumer group, add the consumer_group parameter: ) ``` -When using Consumer.consume, the consumer will continue to consume in an infinite loop. To change the rate at which the consumer polls, change the pull_interval_ms parameter: +When using Consumer.consume, the consumer will continue to consume in an infinite loop. To change the rate at which the consumer polls the station for new messages, change the pull_interval_ms parameter: ```python memphis = Memphis() @@ -733,7 +775,7 @@ When using Consumer.consume, the consumer will continue to consume in an infinit ) ``` -Every time the consumer polls, the consumer will try to take batch_size number of elements from the station. However, sometimes there are not enough messages in the station for the consumer to consume a full batch. In this case, the consumer will continue to wait until either batch_size messages are gathered or the time in milliseconds specified by batch_max_time_to_wait_ms is reached. +Every time the consumer pulls from the station, the consumer will try to take batch_size number of elements from the station. However, sometimes there are not enough messages in the station for the consumer to consume a full batch. In this case, the consumer will continue to wait until either batch_size messages are gathered or the time in milliseconds specified by batch_max_time_to_wait_ms is reached. Here is an example of a consumer that will try to poll 100 messages every 10 seconds while waiting up to 15 seconds for all messages to reach the consumer. @@ -794,7 +836,7 @@ consumer.set_context(context) ### Processing messages -Once all the messages in the station were consumed the msg_handler will receive error: `Memphis: TimeoutError`. +To use a consumer to process messages, use the consume function. The consume function will have a consumer poll a station for new messages as discussed in previous sections. The consumer will stop polling the statoin once all the messages in the station were consumed, and the msg_handler will receive a `Memphis: TimeoutError`. ```python async def msg_handler(msgs, error, context): @@ -819,11 +861,7 @@ async def msg_handler(msgs, error, context): consumer.consume(msg_handler) ``` -There may be some instances where you apply a schema *after* a station has received some messages. In order to consume those messages you must use get_data_deserialized so that the consumer does not try to deserialize the given payload into the data format that is set in the schema and instead keeps the payload in its encoded format. - -TO:DO: Compare these paragraphs. I'm not sure if I captured the meaning exactly. It reads very confusingly. - -if you have ingested data into station in one format, afterwards you apply a schema on the station, the consumer won't deserialize the previously ingested data. For example, you have ingested string into the station and attached a protobuf schema on the station. In this case, consumer won't deserialize the string. +There may be some instances where you apply a schema *after* a station has received some messages. In order to consume those messages get_data_deserialized may be used to consume the messages without trying to apply the schema to them. As an example, if you produced a string to a station and then attached a protobuf schema, using get_data_deserialized will not try to deserialize the string as a protobuf-formatted message. ### Fetch a single batch of messages