Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 52 additions & 19 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -639,16 +639,16 @@ To add message headers to the message, use the headers parameter. Headers can he
```python
memphis = Memphis()

await memphis.connect(...)
await memphis.connect(...)

await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
headers = {
'trace_header': 'track_me_123'
}
)
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
headers = {
'trace_header': 'track_me_123'
}
)
```

### Producing to a partition
Expand All @@ -658,28 +658,28 @@ Lastly, memphis can produce to a specific partition in a station. To do so, use
```python
memphis = Memphis()

await memphis.connect(...)
await memphis.connect(...)

await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
producer_partition_key = "2nd_partition"
)
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
producer_partition_key = "2nd_partition"
)
```

Or, alternatively, use the producer_partition_number parameter:
```python
memphis = Memphis()

await memphis.connect(...)
await memphis.connect(...)

await memphis.produce(
await memphis.produce(
station_name = "some_station",
producer_name = "temp_producer",
message = {'some':'message'},
producer_partition_number = 2
)
)
```

### Non-blocking Produce with Task Limits
Expand All @@ -695,6 +695,39 @@ await producer.produce(

You may read more about this [here](https://memphis.dev/blog/producing-messages-at-warp-speed-best-practices-for-optimizing-your-producers/) on the memphis.dev blog.

### Produce to multiple stations

Producing to multiple stations can be done by creating a producer with multiple stations and then calling produce on that producer.

```python
memphis = Memphis()

await memphis.connect(...)

producer = await memphis.producer(
station_name = ["station_1", "station_2"],
producer_name = "new_producer"
)

await producer.produce(
message = "some message"
)
```

Alternatively, it also possible to produce to multiple stations using the connection:

```python
memphis = Memphis()

await memphis.connect(...)

await memphis.produce(
station_name = ["station_1", "station_2"],
producer_name = "new_producer",
message = "some message"
)
```

### Destroying a Producer

```python
Expand Down
2 changes: 2 additions & 0 deletions memphis/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,11 @@ async def destroy(self, timeout_retries=5):
}
consumer_name = json.dumps(
destroy_consumer_req, indent=2).encode("utf-8")
# pylint: disable=protected-access
res = await self.connection._request(
"$memphis_consumer_destructions", consumer_name, 5, timeout_retries
)
# pylint: enable=protected-access
error = res.data.decode("utf-8")
if error != "" and not "not exist" in error:
raise MemphisError(error)
Expand Down
101 changes: 91 additions & 10 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import copy
import json
import ssl
from typing import Iterable, Union
from typing import Iterable, Union, List
import uuid
import base64
import re
Expand All @@ -37,7 +37,7 @@
from memphis.partition_generator import PartitionGenerator

app_id = str(uuid.uuid4())

# pylint: disable=too-many-lines
class Memphis:
MAX_BATCH_SIZE = 5000
MEMPHIS_GLOBAL_ACCOUNT_NAME = "$memphis"
Expand Down Expand Up @@ -416,14 +416,14 @@ async def _request(self, subject, payload, timeout, timeout_retries=5):

async def producer(
self,
station_name: str,
station_name: Union[str, List[str]],
producer_name: str,
generate_random_suffix: bool = False,
timeout_retries=5,
):
"""Creates a producer.
Args:
station_name (str): station name to produce messages into.
station_name (Union[str, List[str]]): station name to produce messages into.
producer_name (str): name for the producer.
generate_random_suffix (bool): Deprecated: will be stopped to be supported after November 1'st, 2023. false by default, if true concatenate a random suffix to producer's name
Raises:
Expand All @@ -434,16 +434,32 @@ async def producer(
try:
if not self.is_connection_active:
raise MemphisError("Connection is dead")
if not isinstance(station_name, str) and not isinstance(station_name, list):
raise MemphisError("station_name should be either string or list of strings")
real_name = producer_name.lower()
internal_station_name = get_internal_name(station_name)
if generate_random_suffix:
warnings.warn("Deprecation warning: generate_random_suffix will be stopped to be supported after November 1'st, 2023.")
producer_name = self.__generate_random_suffix(producer_name)
if isinstance(station_name, str):
return await self._single_station_producer(station_name, producer_name, real_name, timeout_retries)
else:
map_key = internal_station_name + "_" + producer_name.lower()
producer = None
if map_key in self.producers_map:
return self.producers_map[map_key]
return await self._multi_station_producer(station_name, producer_name, real_name)
except Exception as e:
raise MemphisError(str(e)) from e

async def _single_station_producer(
self,
station_name: str,
producer_name: str,
real_name: str,
timeout_retries: int,
):
try:
internal_station_name = get_internal_name(station_name)
map_key = internal_station_name + "_" + producer_name.lower()
producer = None
if map_key in self.producers_map:
return self.producers_map[map_key]

create_producer_req = {
"name": producer_name,
Expand Down Expand Up @@ -495,6 +511,15 @@ async def producer(
except Exception as e:
raise MemphisError(str(e)) from e


async def _multi_station_producer(
self,
station_names: List[str],
producer_name: str,
real_name: str
):
return Producer(self, producer_name, station_names, real_name)

def update_schema_data(self, station_name):
internal_station_name = get_internal_name(station_name)
if self.schema_updates_data[internal_station_name] != {}:
Expand Down Expand Up @@ -763,7 +788,7 @@ async def consumer(

async def produce(
self,
station_name: str,
station_name: Union[str, List[str]],
producer_name: str,
message,
generate_random_suffix: bool = False,
Expand All @@ -789,6 +814,30 @@ async def produce(
Raises:
Exception: _description_
"""
try:
if not isinstance(station_name, str) and not isinstance(station_name, list):
raise MemphisError("station_name should be either string or list of strings")
if isinstance(station_name, str):
await self._single_station_produce(station_name, producer_name, message, generate_random_suffix, ack_wait_sec, headers, async_produce, msg_id, producer_partition_key, producer_partition_number)
else:
await self._multi_station_produce(station_name, producer_name, message, generate_random_suffix, ack_wait_sec, headers, async_produce, msg_id, producer_partition_key, producer_partition_number)
except Exception as e:
raise MemphisError(str(e)) from e


async def _single_station_produce(
self,
station_name: str,
producer_name: str,
message,
generate_random_suffix: bool = False,
ack_wait_sec: int = 15,
headers: Union[Headers, None] = None,
async_produce: bool = False,
msg_id: Union[str, None] = None,
producer_partition_key: Union[str, None] = None,
producer_partition_number: Union[int, -1] = -1
):
try:
internal_station_name = get_internal_name(station_name)
map_key = internal_station_name + "_" + producer_name.lower()
Expand All @@ -813,6 +862,38 @@ async def produce(
except Exception as e:
raise MemphisError(str(e)) from e

async def _multi_station_produce(
self,
station_names: List[str],
producer_name: str,
message,
generate_random_suffix: bool = False,
ack_wait_sec: int = 15,
headers: Union[Headers, None] = None,
async_produce: bool = False,
msg_id: Union[str, None] = None,
producer_partition_key: Union[str, None] = None,
producer_partition_number: Union[int, -1] = -1
):
try:
producer = await self.producer(
station_name=station_names,
producer_name=producer_name,
generate_random_suffix=generate_random_suffix,
)
await producer.produce(
message=message,
ack_wait_sec=ack_wait_sec,
headers=headers,
async_produce=async_produce,
msg_id=msg_id,
producer_partition_key=producer_partition_key,
producer_partition_number=producer_partition_number
)
except Exception as e:
raise MemphisError(str(e)) from e


async def fetch_messages(
self,
station_name: str,
Expand Down
Loading