Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# formatted using black & isort
b2692e213c7ef62882a1b9b7c95affff3246b036
9 changes: 9 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
repos:
- repo: https://github.com/psf/black
rev: 23.1.0
hooks:
- id: black
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
PRE_COMMIT = pre-commit
PRE_COMMIT_RUN_ARGS = --all-files
PRE_COMMIT_INSTALL_ARGS = --install-hooks

.PHONY: lint
lint:
$(PRE_COMMIT) run $(PRE_COMMIT_RUN_ARGS)

.PHONY: pre-commit-install
pre-commit-install:
$(PRE_COMMIT) install $(PRE_COMMIT_INSTALL_ARGS)
19 changes: 14 additions & 5 deletions examples/consumer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from memphis import Memphis, MemphisError, MemphisConnectError, MemphisHeaderError

from memphis import Memphis, MemphisConnectError, MemphisError, MemphisHeaderError


async def main():
Expand All @@ -17,11 +18,18 @@ async def msg_handler(msgs, error, context):

try:
memphis = Memphis()
await memphis.connect(host="<memphis-host>", username="<application type username>", connection_token="<broker-token>")
await memphis.connect(
host="<memphis-host>",
username="<application type username>",
connection_token="<broker-token>",
)

consumer = await memphis.consumer(
station_name="<station-name>", consumer_name="<consumer-name>", consumer_group="")

station_name="<station-name>",
consumer_name="<consumer-name>",
consumer_group="",
)

consumer.set_context({"key": "value"})
consumer.consume(msg_handler)
# Keep your main thread alive so the consumer will keep receiving data
Expand All @@ -33,5 +41,6 @@ async def msg_handler(msgs, error, context):
finally:
await memphis.close()

if __name__ == '__main__':

if __name__ == "__main__":
asyncio.run(main())
36 changes: 29 additions & 7 deletions examples/producer.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,46 @@
import asyncio
from memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError

from memphis import (
Headers,
Memphis,
MemphisConnectError,
MemphisError,
MemphisHeaderError,
MemphisSchemaError,
)


async def main():
try:
memphis = Memphis()
await memphis.connect(host="<memphis-host>", username="<application type username>", connection_token="<broker-token>")
await memphis.connect(
host="<memphis-host>",
username="<application type username>",
connection_token="<broker-token>",
)

producer = await memphis.producer(
station_name="<station-name>", producer_name="<producer-name>")
station_name="<station-name>", producer_name="<producer-name>"
)
headers = Headers()
headers.add("key", "value")
headers.add("key", "value")
for i in range(5):
await producer.produce(bytearray('Message #'+str(i)+': Hello world', 'utf-8'), headers=headers) # you can send the message parameter as dict as well
await producer.produce(
bytearray("Message #" + str(i) + ": Hello world", "utf-8"),
headers=headers,
) # you can send the message parameter as dict as well

except (MemphisError, MemphisConnectError, MemphisHeaderError, MemphisSchemaError) as e:
except (
MemphisError,
MemphisConnectError,
MemphisHeaderError,
MemphisSchemaError,
) as e:
print(e)

finally:
await memphis.close()

if __name__ == '__main__':

if __name__ == "__main__":
asyncio.run(main())
9 changes: 8 additions & 1 deletion memphis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from memphis.memphis import Memphis, Headers, MemphisError, MemphisConnectError, MemphisSchemaError, MemphisHeaderError
import memphis.retention_types
import memphis.storage_types
from memphis.memphis import (
Headers,
Memphis,
MemphisConnectError,
MemphisError,
MemphisHeaderError,
MemphisSchemaError,
)
Loading