Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
c277126
minor fix to sandbox agent shutdown
iakkus Mar 30, 2021
1ab1632
minor fix to sandbox frontend to handle shutdown properly
iakkus Mar 30, 2021
c05b490
fix connect loop in DataLayerClient and LocalQueueClient
iakkus Mar 30, 2021
1b9b1cd
minor separate test cases for interaction latency
iakkus Apr 2, 2021
fd49b40
replace loopback with unix domain socket for local queue with redis
iakkus Apr 2, 2021
09a4ff2
Merge pull request #113 from knix-microfunctions/feature/redis_domain…
iakkus Apr 6, 2021
372e771
fix(management): python deployWorkflow workflownameforlabel contained…
manuelstein Apr 6, 2021
03011eb
Simplify and unify function worker object creation (#115)
iakkus Apr 8, 2021
f5a277d
minor fix to redis Makefile
iakkus Apr 21, 2021
fd35c90
create execution entry in sandbox frontend for async request to make …
iakkus May 3, 2021
2d39e56
distinguish between regular async calls and special async calls (e.g.…
iakkus May 4, 2021
9e37e57
Triggersfrontend, adding Cargo.lock to lock versions of dependencies
paarijaat May 4, 2021
ac7d35e
move the java request handler to the end of the docker build
iakkus May 7, 2021
f347d3e
allow specifying custom knative resource and annotation specification…
paarijaat May 11, 2021
d16e6e5
update ansible readme; fixes #117
iakkus May 14, 2021
e35fa1e
function worker: fix addressable function stopping when blocking to g…
iakkus May 17, 2021
17cfe55
local queue: parameterize redis version, not listen to default port
iakkus May 20, 2021
f1cae2d
test utils: print workflow endpoints when deployed
iakkus May 20, 2021
17c9916
java request handler: lazy initialization of the api connection
iakkus May 21, 2021
231e691
prometheus metrics - expose per workflow sandbox prometheus metrics v…
paarijaat May 26, 2021
f21206f
prometheus metrics - adding a sample podmonitor yaml file, showing ho…
paarijaat May 26, 2021
3fdd230
function worker: add retry logic for failed remote session update mes…
iakkus May 27, 2021
ee439b1
adding more prometheus metrics to monitor detailed network counters, …
paarijaat May 31, 2021
632783a
Feature/handle remote results (#118)
iakkus Jun 8, 2021
eb336ed
gui: show multiple endpoints if available, but copy a random one
iakkus Jun 10, 2021
56a3832
sdk: fix get encoding
iakkus Jun 17, 2021
6983c3d
fixing Map state problems for low maxConcurrency values
ksatzke Jun 17, 2021
5cff7b3
reduce per execution logging
paarijaat Jun 17, 2021
6a9bf03
re enabling mfn_backups
paarijaat Jun 21, 2021
595597f
one more __mfn_backup log
iakkus Jun 21, 2021
de5131a
DataLayerService: timing logs for riak access
iakkus Jun 21, 2021
0d3bd8f
sandbox frontend: log input backups instead of storing in the datalayer
iakkus Jul 6, 2021
f35c97c
function worker: store results to data layer only for async executions
iakkus Jul 8, 2021
c2f38ae
sandbox frontend: checkpoints according to the workflow parameter
iakkus Jul 15, 2021
34d3d57
sandbox agent: stabilize shutdown when a child process fails; functio…
iakkus Jul 16, 2021
95f2a96
frontend logging updated. can now set log levels
paarijaat Jul 16, 2021
7022bc1
loglevel can now be set per workflow
paarijaat Jul 16, 2021
fbe523e
sandbox frontend: fix signalling to sandbox agent about start
iakkus Jul 16, 2021
e9490da
missing comma in kservice.json
paarijaat Jul 16, 2021
826ca84
management, adding logs to debug env variables update issue
paarijaat Jul 19, 2021
b94a67d
env variable update issue, changing to equalities
paarijaat Jul 19, 2021
7ea1d4f
frontend, logging request body as debug message
paarijaat Jul 19, 2021
672f0b9
Function worker: debug messages for sending remote messages
iakkus Jul 20, 2021
e6680c4
function worker to pick level from env variable LOG_LEVEL. Defaults t…
paarijaat Jul 21, 2021
ed66e7d
minor spelling mistake
paarijaat Jul 21, 2021
09540f7
add debug logs to publish output and session utils
paarijaat Jul 22, 2021
f95c344
k8s, setting hostname = container name. This was not the case, hence …
paarijaat Jul 22, 2021
a5ea5f2
allow retrieving and passing session function metadata to avoid datal…
paarijaat Jul 27, 2021
c32c6bd
session metadata passing optimization, adding a debug log
paarijaat Jul 28, 2021
a8d7d72
sandbox agent: minor fix to graceful shutdown
iakkus Aug 4, 2021
a8be8c6
ansible: update readme
iakkus Aug 11, 2021
6267b21
deploy: ansible update sandbox.yaml
iakkus Aug 11, 2021
4066efe
Merge branch 'develop' of github.com:knix-microfunctions/knix into de…
iakkus Aug 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ public void connect (Map<String,Integer> riakNodes) {
try {
Namespace bucket = new Namespace(BUCKET_TYPE_DEFAULT, MFN_KEYSPACES);
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(NUM_NODES).build();

long t_start = System.currentTimeMillis();
client.execute(props);
this.logExecutionTime("connect(bucketProperties)", System.currentTimeMillis() - t_start);

this.initiateBucketToTypeMapping();
} catch (Exception e) {
Expand Down Expand Up @@ -176,6 +179,11 @@ public void close() {
LOGGER.info("Riak client shutdown.");
}

private void logExecutionTime(String commandName, long duration)
{
LOGGER.info(commandName + " execution time: " + duration + " ms.");
}

private boolean detectInvalidName (String str) {
if (str == null) {
return false;
Expand Down Expand Up @@ -203,7 +211,11 @@ public boolean createKeyspace (String keyspace, Metadata metadata) {
try {
Namespace bucket = new Namespace(BUCKET_TYPE_DEFAULT, keyspace);
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(NUM_NODES).build();

long t_start = System.currentTimeMillis();
client.execute(props);
this.logExecutionTime("createKeyspace(bucketProperties)", System.currentTimeMillis() - t_start);

LOGGER.info("createKeyspace() Keyspace: " + keyspace + " Metadata: replication factor: " + Integer.toString(replicationFactor));
return this.insertRow(MFN_KEYSPACES, null, keyspace, ByteBuffer.allocate(Integer.BYTES).putInt(replicationFactor));
} catch (Exception e) {
Expand Down Expand Up @@ -394,7 +406,11 @@ private boolean createTableWithType (String keyspace, String table, String table

Namespace bucket = new Namespace(tableType, keyspace + ";" + table);
StoreBucketProperties props = new StoreBucketProperties.Builder(bucket).withNVal(replicationFactor).withW(replicationFactor).withR(replicationFactor).withNotFoundOk(false).build();

long t_start = System.currentTimeMillis();
client.execute(props);
this.logExecutionTime("createTableWithType(bucketProperties)", System.currentTimeMillis() - t_start);

LOGGER.info("createTableWithType() Keyspace: " + keyspace + " Table: " + table + " TableType: " + tableType);
boolean success = this.insertRow(keyspace, null, table, ByteBuffer.wrap(tableType.getBytes(StandardCharsets.UTF_8)));
if (success) {
Expand Down Expand Up @@ -534,7 +550,11 @@ public boolean insertRow (String keyspace, String table, String key, ByteBuffer
Location location = new Location(bucket, key);
RiakObject object = new RiakObject().setContentType(Constants.CTYPE_OCTET_STREAM).setValue(BinaryValue.unsafeCreate(value.array()));
StoreValue store = new StoreValue.Builder(object).withLocation(location).build();

long t_start = System.currentTimeMillis();
client.execute(store);
this.logExecutionTime("insertRow()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("insertRow() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand Down Expand Up @@ -570,7 +590,10 @@ public AbstractMap.SimpleEntry<String, ByteBuffer> selectRow (String keyspace, S

Location location = new Location(bucket, key);
FetchValue fetch = new FetchValue.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchValue.Response response = client.execute(fetch);
this.logExecutionTime("selectRow()", System.currentTimeMillis() - t_start);

RiakObject object = response.getValue(RiakObject.class);
if (object == null || object.getValue() == null) {
Expand Down Expand Up @@ -623,13 +646,20 @@ public boolean updateRow (String keyspace, String table, String key, ByteBuffer

Location location = new Location(bucket, key);
FetchValue fetch = new FetchValue.Builder(location).withOption(FetchValue.Option.DELETED_VCLOCK, true).build();

long t_start = System.currentTimeMillis();
FetchValue.Response response = client.execute(fetch);
this.logExecutionTime("updateRow(fetch)", System.currentTimeMillis() - t_start);

RiakObject object = response.getValue(RiakObject.class);
object.setValue(BinaryValue.unsafeCreate(value.array()));

StoreValue store = new StoreValue.Builder(object).withLocation(location).build();

t_start = System.currentTimeMillis();
client.execute(store);
this.logExecutionTime("updateRow(store)", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("updateRow() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand Down Expand Up @@ -669,7 +699,11 @@ private boolean deleteRowWithType (String keyspace, String table, String key, St

Location location = new Location(bucket, key);
DeleteValue delete = new DeleteValue.Builder(location).build();

long t_start = System.currentTimeMillis();
client.execute(delete);
this.logExecutionTime("deleteRowWithType()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("deleteRowWithType() failed. Keyspace: " + keyspace + " Table: " + table + " TableType: " + tableType, e);
Expand Down Expand Up @@ -708,7 +742,10 @@ private List<String> selectKeysWithType (String keyspace, String table, int star
}

ListKeys list = new ListKeys.Builder(bucket).build();

long t_start = System.currentTimeMillis();
ListKeys.Response response = client.execute(list);
this.logExecutionTime("selectKeysWithType()", System.currentTimeMillis() - t_start);

List<String> keys = new ArrayList<String>();
for (Location location: response) {
Expand Down Expand Up @@ -744,8 +781,11 @@ private List<String> selectAllKeysWithType (String keyspace, String table, Strin
}

ListKeys list = new ListKeys.Builder(bucket).build();
ListKeys.Response response = client.execute(list);

long t_start = System.currentTimeMillis();
ListKeys.Response response = client.execute(list);
this.logExecutionTime("selectAllKeysWithType()", System.currentTimeMillis() - t_start);

List<String> keys = new ArrayList<String>();
for (Location location: response) {
keys.add(location.getKeyAsString());
Expand Down Expand Up @@ -787,7 +827,11 @@ public AbstractMap.SimpleEntry<String, Long> getCounter (String keyspace, String
Location location = new Location(bucket, counterName);

FetchCounter fetch = new FetchCounter.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchCounter.Response response = client.execute(fetch);
this.logExecutionTime("getCounter()", System.currentTimeMillis() - t_start);

RiakCounter counter = response.getDatatype();
Long counterValue = counter.view();
return new AbstractMap.SimpleEntry<String, Long>(counterName, counterValue);
Expand Down Expand Up @@ -815,7 +859,11 @@ public AbstractMap.SimpleEntry<String, Long> incrementCounter (String keyspace,

CounterUpdate delta = new CounterUpdate(increment);
UpdateCounter update = new UpdateCounter.Builder(location, delta).withReturnDatatype(true).build();

long t_start = System.currentTimeMillis();
UpdateCounter.Response response = client.execute(update);
this.logExecutionTime("incrementCounter()", System.currentTimeMillis() - t_start);

RiakCounter counter = response.getDatatype();
Long counterValue = counter.view();
return new AbstractMap.SimpleEntry<String, Long>(counterName, counterValue);
Expand Down Expand Up @@ -880,7 +928,11 @@ public AbstractMap.SimpleEntry<String, Set<String>> retrieveSet (String keyspace
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("retrieveSet()", System.currentTimeMillis() - t_start);

RiakSet rSet = response.getDatatype();
Set<BinaryValue> binarySet = rSet.view();

Expand All @@ -907,7 +959,10 @@ public boolean addItemToSet (String keyspace, String table, String setName, Stri

SetUpdate item = new SetUpdate().add(setItem);
UpdateSet update = new UpdateSet.Builder(location, item).build();
long t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("addItemToSet()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("addItemToSet() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -926,12 +981,20 @@ public boolean removeItemFromSet (String keyspace, String table, String setName,
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("removeItemFromSet(fetch)", System.currentTimeMillis() - t_start);

Context context = response.getContext();

SetUpdate item = new SetUpdate().remove(setItem);
UpdateSet update = new UpdateSet.Builder(location, item).withContext(context).build();

t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("removeItemFromSet(update)", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("removeItemFromSet() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -950,7 +1013,11 @@ public boolean containsItemInSet (String keyspace, String table, String setName,
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("containsItemInSet()", System.currentTimeMillis() - t_start);

RiakSet rSet = response.getDatatype();
Set<BinaryValue> binarySet = rSet.view();
return binarySet.contains(BinaryValue.create(setItem));
Expand Down Expand Up @@ -995,7 +1062,11 @@ public int getSizeOfSet (String keyspace, String table, String setName) {
Location location = new Location(bucket, setName);

FetchSet fetch = new FetchSet.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchSet.Response response = client.execute(fetch);
this.logExecutionTime("getSizeOfSet()", System.currentTimeMillis() - t_start);

RiakSet rSet = response.getDatatype();
Set<BinaryValue> binarySet = rSet.view();
return binarySet.size();
Expand Down Expand Up @@ -1039,7 +1110,11 @@ public AbstractMap.SimpleEntry<String, Set<String>> retrieveKeysetFromMap (Strin
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("retrieveKeysetFromMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();

Expand All @@ -1065,7 +1140,11 @@ public AbstractMap.SimpleEntry<String, Map<String, ByteBuffer>> retrieveAllEntri
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("retrieveAllEntriesFromMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();

Expand Down Expand Up @@ -1096,7 +1175,11 @@ public boolean putEntryToMap (String keyspace, String table, String mapName, Str
RegisterUpdate register = new RegisterUpdate(BinaryValue.unsafeCreate(entryValue.array()));
MapUpdate entry = new MapUpdate().update(entryKey, register);
UpdateMap update = new UpdateMap.Builder(location, entry).build();

long t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("putEntryToMap()", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("putEntryToMap() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -1115,7 +1198,11 @@ public AbstractMap.SimpleEntry<String, ByteBuffer> getEntryFromMap (String keysp
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("getEntryFromMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
RiakRegister rRegister = rMap.getRegister(entryKey);
ByteBuffer entryValue = ByteBuffer.wrap(rRegister.view().unsafeGetValue());
Expand All @@ -1137,12 +1224,20 @@ public boolean removeEntryFromMap (String keyspace, String table, String mapName
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("removeEntryFromMap(fetch)", System.currentTimeMillis() - t_start);

Context context = response.getContext();

MapUpdate entry = new MapUpdate().removeRegister(entryKey);
UpdateMap update = new UpdateMap.Builder(location, entry).withContext(context).build();

t_start = System.currentTimeMillis();
client.execute(update);
this.logExecutionTime("removeEntryFromMap(update)", System.currentTimeMillis() - t_start);

return true;
} catch (Exception e) {
LOGGER.error("removeEntryFromMap() failed. Keyspace: " + keyspace + " Table: " + table, e);
Expand All @@ -1161,7 +1256,11 @@ public boolean containsKeyInMap (String keyspace, String table, String mapName,
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("containsKeyInMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
return entries.containsKey(BinaryValue.create(entryKey));
Expand Down Expand Up @@ -1206,7 +1305,11 @@ public int getSizeOfMap (String keyspace, String table, String mapName) {
Location location = new Location(bucket, mapName);

FetchMap fetch = new FetchMap.Builder(location).build();

long t_start = System.currentTimeMillis();
FetchMap.Response response = client.execute(fetch);
this.logExecutionTime("getSizeOfMap()", System.currentTimeMillis() - t_start);

RiakMap rMap = response.getDatatype();
Map<BinaryValue, List<RiakDatatype>> entries = rMap.view();
return entries.size();
Expand Down
5 changes: 4 additions & 1 deletion FunctionWorker/python/DataLayerClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ def __init__(self, locality=1, sid=None, wid=None, suid=None, is_wf_private=Fals
#print("Creating datalayer client in keyspace=%s, tablename=%s, maptablename=%s, settablename=%s, countertablename=%s" % (self.keyspace,self.tablename, self.maptablename, self.settablename, self.countertablename))
self.locality = locality

self._is_running = True

self.connect()

if init_tables:
Expand Down Expand Up @@ -105,7 +107,7 @@ def _drop_keyspace(self):

def connect(self):
retry = 0.5 #s
while True:
while self._is_running:
try:
host, port = self.dladdress.split(':')
self.socket = TSocket.TSocket(host, int(port))
Expand Down Expand Up @@ -607,6 +609,7 @@ def listKeys(self, start, count, tableName=None):
return keys_response

def shutdown(self):
self._is_running = False
try:
self.transport.close()
except Thrift.TException as exc:
Expand Down
Loading