Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ async def main():
reconnect=True, # defaults to True
max_reconnect=3, # defaults to 3
reconnect_interval_ms=1500, # defaults to 1500
timeout_ms=1500 # defaults to 1500
timeout_ms=1500, # defaults to 1500
# for TLS connection:
key_file='<key-client.pem>',
cert_file='<cert-client.pem>',
ca_file='<rootCA.pem>'
)
...
except Exception as e:
Expand Down
40 changes: 32 additions & 8 deletions memphis/memphis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import random
import json
import ssl
import time
from graphql import build_schema as build_graphql_schema, parse as parse_graphql, validate as validate_graphql
import graphql
Expand Down Expand Up @@ -83,7 +84,7 @@ async def configurations_listener(self):
except Exception as err:
raise MemphisError(err)

async def connect(self, host, username, connection_token, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000):
async def connect(self, host, username, connection_token, port=6666, reconnect=True, max_reconnect=10, reconnect_interval_ms=1500, timeout_ms=15000, cert_file = '', key_file='', ca_file=''):
"""Creates connection with Memphis.
Args:
host (str): memphis host.
Expand All @@ -94,6 +95,9 @@ async def connect(self, host, username, connection_token, port=6666, reconnect=T
max_reconnect (int, optional): The reconnect attempt. Defaults to 3.
reconnect_interval_ms (int, optional): Interval in miliseconds between reconnect attempts. Defaults to 200.
timeout_ms (int, optional): connection timeout in miliseconds. Defaults to 15000.
key_file (string): path to tls key file.
cert_file (string): path to tls cert file.
ca_file (string): path to tls ca file.
"""
self.host = self.__normalize_host(host)
self.username = username
Expand All @@ -105,13 +109,33 @@ async def connect(self, host, username, connection_token, port=6666, reconnect=T
self.timeout_ms = timeout_ms
self.connection_id = self.__generateConnectionID()
try:
self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port),
allow_reconnect=self.reconnect,
reconnect_time_wait=self.reconnect_interval_ms/1000,
connect_timeout=self.timeout_ms/1000,
max_reconnect_attempts=self.max_reconnect,
token=self.connection_token,
name=self.connection_id + "::" + self.username)
if (cert_file != '' or key_file != '' or ca_file != ''):
if cert_file == '':
raise MemphisConnectError("Must provide a TLS cert file")
if key_file == '':
raise MemphisConnectError("Must provide a TLS key file")
if ca_file == '':
raise MemphisConnectError("Must provide a TLS ca file")
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
ssl_ctx.load_verify_locations(ca_file)
ssl_ctx.load_cert_chain(certfile=cert_file, keyfile=key_file)
self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port),
allow_reconnect=self.reconnect,
reconnect_time_wait=self.reconnect_interval_ms/1000,
connect_timeout=self.timeout_ms/1000,
max_reconnect_attempts=self.max_reconnect,
token=self.connection_token,
name=self.connection_id + "::" + self.username,
tls=ssl_ctx,
tls_hostname=self.host)
else :
self.broker_manager = await broker.connect(servers=self.host+":"+str(self.port),
allow_reconnect=self.reconnect,
reconnect_time_wait=self.reconnect_interval_ms/1000,
connect_timeout=self.timeout_ms/1000,
max_reconnect_attempts=self.max_reconnect,
token=self.connection_token,
name=self.connection_id + "::" + self.username)

await self.configurations_listener()
self.broker_connection = self.broker_manager.jetstream()
Expand Down