Skip to content

Commit

Permalink
Merge pull request #655 from ystia/bugfix/GH-654-fs-logs-events-enhan…
Browse files Browse the repository at this point in the history
…cement

Bugfix/gh 654 fs logs events enhancement
  • Loading branch information
stebenoist committed Jun 9, 2020
2 parents f3aa356 + 53a7c77 commit 55db4a0
Show file tree
Hide file tree
Showing 9 changed files with 297 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### ENHANCEMENTS

* Alien4Cloud download URL change ([GH-637](https://github.com/ystia/yorc/issues/637))
* Enhance logs and events long-polling performances on file storage ([GH-654](https://github.com/ystia/yorc/issues/654))

### BUG FIXES

Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ func (dm DynamicMap) GetInt(name string) int {
return cast.ToInt(dm.Get(name))
}

// GetIntOrDefault returns the value of the given key casted into an int.
// The given default value is returned if not found.
func (dm DynamicMap) GetIntOrDefault(name string, defaultValue int) int {
if !dm.IsSet(name) {
return defaultValue
}
return cast.ToInt(dm.Get(name))
}

// GetInt64 returns the value of the given key casted into an int64.
// 0 is returned if not found.
func (dm DynamicMap) GetInt64(name string) int64 {
Expand Down
71 changes: 49 additions & 22 deletions doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1172,8 +1172,10 @@ Currently Yorc supports 3 store ``types``:
* ``Log``
* ``Event``
Yorc supports 3 store ``implementations``:
Yorc supports 5 store ``implementations``:
* ``consul``
* ``file``
* ``cipherFile``
* ``fileCache``
* ``cipherFileCache``
Expand Down Expand Up @@ -1253,55 +1255,53 @@ Log
Store that contains the applicative logs, also present in Alien4Cloud logs. ``consul`` is the default implementation for this store type.
If you face some Consul memory usage issues, you can choose ``fileCache`` or ``cipherFileCache`` as logs may contains private information.
If you face some Consul memory usage issues, you can choose ``file`` or ``cipherFile`` as logs may contains private information.
Cache is not useful for this kind of data as we use blocking queries and modification index to retrieve it.
Event
^^^^^
Store that contains the applicative events, also present in Alien4Cloud events. ``consul`` is the default implementation for this store type.
Same remarks as for Log as it's same kind of data and usage.
Store implementations
~~~~~~~~~~~~~~~~~~~~~
Currently Yorc provide 3 implementations with the following names:
Currently Yorc provide 5 implementations (in fact 2 real ones with combinations around file) described below but you're welcome to contribute and bring your own implementation, you just need to implement the Store interface
See `Storage interface <https://github.com/ystia/yorc/blob/develop/storage/store/store.go>`_.
consul
^^^^^^
This is the Consul KV store used by Yorc for main internal storage stuff. For example, the configuration of the stores is kept in the Consul KV.
As Consul is already configurable here: :ref:`Consul configuration<yorc_config_file_consul_section>`, no other configuration is provided in this section.
fileCache
^^^^^^^^^
file
^^^^
This is a file store with a cache system.
This is a file store without cache system. It can be used for logs and events as this data is retrieved via blocking queries and modification index which can be used with a cache system.
Here are specific properties for this implementation:
.. _storage_file_cache_props:
.. _storage_file_props:
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| Property Name | Description | Data Type | Required | Default |
+=====================================+====================================================+===========+==================+=================+
| ``root_dir`` | Root directory used for file storage | string | no | work/store |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| ``cache_num_counters`` | number of keys to track frequency of | int64 | no | 1e5 (100 000) |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| ``cache_max_cost`` | maximum cost of cache | int64 | no | 1e7 (10 M) |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| ``cache_buffer_items`` | number of keys per Get buffer | int64 | no | 64 |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| ``blocking_query_default_timeout`` | default timeout for blocking queries | string | no | 5m (5 minutes)|
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| ``concurrency_limit`` | Limit for concurrent operations | integer | no | 1000 |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
For more information on cache properties, you can refer to `Ristretto README <https://github.com/dgraph-io/ristretto>`_
cipherFileCache
^^^^^^^^^^^^^^^
cipherFile
^^^^^^^^^^
This is a file store with a cache system and file data encryption (AES-256 bits key) which requires a 32-bits length passphrase.
This is a file store with file data encryption (AES-256 bits key) which requires a 32-bits length passphrase.
Here are specific properties for this implementation in addition to ``fileCache`` properties:
Here are specific properties for this implementation in addition to ``file`` properties:
+----------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| Property Name | Description | Data Type | Required | Default |
Expand All @@ -1314,7 +1314,7 @@ Here are specific properties for this implementation in addition to ``fileCache`
``Passphrase`` can be set with ``Secret function`` and retrieved from Vault as explained in the Vault integration chapter.
Here is a JSON example of stores configuration with a cipherFileStore implementation for logs.
Here is a JSON example of stores configuration with a cipherFile store implementation for logs.
.. code-block:: JSON
Expand All @@ -1324,7 +1324,7 @@ Here is a JSON example of stores configuration with a cipherFileStore implementa
"stores": [
{
"name": "myCipherFileStore",
"implementation": "cipherFileCache",
"implementation": "cipherFile",
"migrate_data_from_consul": true,
"types": ["Log"],
"properties": {
Expand All @@ -1343,14 +1343,41 @@ The same sample in YAML
reset: false
stores:
- name: myCipherFileStore
implementation: cipherFileCache
implementation: cipherFile
migrate_data_from_consul: true
types:
- Log
properties:
root_dir: "/mypath/to/store"
passphrase: "myverystrongpasswordo32bitlength"
fileCache
^^^^^^^^^
This is a file store with a cache system.
Here are specific properties for this implementation:
.. _storage_file_cache_props:
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| Property Name | Description | Data Type | Required | Default |
+=====================================+====================================================+===========+==================+=================+
| ``cache_num_counters`` | number of keys to track frequency of | int64 | no | 1e5 (100 000) |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| ``cache_max_cost`` | maximum cost of cache | int64 | no | 1e7 (10 M) |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
| ``cache_buffer_items`` | number of keys per Get buffer | int64 | no | 64 |
+-------------------------------------+----------------------------------------------------+-----------+------------------+-----------------+
For more information on cache properties, you can refer to `Ristretto README <https://github.com/dgraph-io/ristretto>`_
cipherFileCache
^^^^^^^^^^^^^^^
This is a file store with a cache system and file data encryption (AES-256 bits key) which requires a 32-bits length passphrase.
.. _storage_reset_note:
Stores configuration is saved once when Yorc server starts. If you want to re-initialize storage, you have to set the ``reset`` property to True and restart Yorc.
Expand Down
3 changes: 0 additions & 3 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,6 @@ func getLogsOrEvents(ctx context.Context, deploymentID string, waitIndex uint64,

log.Debugf("Found %d %s before accessing index[%q]", len(kvps), data, strconv.FormatUint(lastIndex, 10))
for _, kvp := range kvps {
if kvp.LastModifyIndex <= waitIndex {
continue
}
logsOrEvents = append(logsOrEvents, kvp.RawValue)
}
log.Debugf("Found %d %s after index", len(logsOrEvents), data)
Expand Down
15 changes: 8 additions & 7 deletions storage/internal/consul/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,14 @@ func (c *consulStore) List(ctx context.Context, k string, waitIndex uint64, time
if err := c.codec.Unmarshal(kvp.Value, &value); err != nil {
return nil, 0, errors.Wrapf(err, "failed to unmarshal stored value: %q", string(kvp.Value))
}

values = append(values, store.KeyValueOut{
Key: kvp.Key,
LastModifyIndex: kvp.ModifyIndex,
Value: value,
RawValue: kvp.Value,
})
if kvp.ModifyIndex > waitIndex {
values = append(values, store.KeyValueOut{
Key: kvp.Key,
LastModifyIndex: kvp.ModifyIndex,
Value: value,
RawValue: kvp.Value,
})
}
}
return values, qm.LastIndex, nil
}
118 changes: 93 additions & 25 deletions storage/internal/file/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import (

const indexFileNotExist = uint64(1)

const defaultConcurrencyLimit = 1000

type fileStore struct {
id string
properties config.DynamicMap
Expand Down Expand Up @@ -76,7 +78,7 @@ func NewStore(cfg config.Configuration, storeID string, properties config.Dynami
fileLocks: make(map[string]*sync.RWMutex),
withCache: withCache,
withEncryption: withEncryption,
concurrencyLimit: cfg.UpgradeConcurrencyLimit,
concurrencyLimit: properties.GetIntOrDefault("concurrency_limit", defaultConcurrencyLimit),
}

// Instantiate cache if necessary
Expand Down Expand Up @@ -411,7 +413,11 @@ func (s *fileStore) GetLastModifyIndex(k string) (uint64, error) {

func (s *fileStore) List(ctx context.Context, k string, waitIndex uint64, timeout time.Duration) ([]store.KeyValueOut, uint64, error) {
if waitIndex == 0 {
return s.list(k)
index, err := s.GetLastModifyIndex(k)
if err != nil {
return nil, index, err
}
return s.list(ctx, k, waitIndex, index)
}

// Default timeout to 5 minutes if not set as param or as config property
Expand Down Expand Up @@ -445,10 +451,36 @@ func (s *fileStore) List(ctx context.Context, k string, waitIndex uint64, timeou
}
}

return s.list(k)
return s.list(ctx, k, waitIndex, index)
}

func (s *fileStore) list(k string) ([]store.KeyValueOut, uint64, error) {
func (s *fileStore) listFirstLevelTree(rootPath string, waitIndex, lastIndex uint64) ([]string, []store.KeyValueOut, error) {
// Retrieve all sub-directories to list keys in concurrency at first level
var subPaths []string
infos, err := ioutil.ReadDir(rootPath)
if err != nil {
return subPaths, nil, err
}

kvs := make([]store.KeyValueOut, 0)
for _, info := range infos {
pathFile := path.Join(rootPath, info.Name())
if info.IsDir() {
subPaths = append(subPaths, pathFile)
} else {
kv, err := s.addKeyValueToList(info, pathFile, waitIndex, lastIndex)
if err != nil {
return subPaths, nil, err
}
if kv != nil {
kvs = append(kvs, *kv)
}
}
}
return subPaths, kvs, err
}

func (s *fileStore) list(ctx context.Context, k string, waitIndex, lastIndex uint64) ([]store.KeyValueOut, uint64, error) {
rootPath := s.buildFilePath(k, false)
fInfo, err := os.Stat(rootPath)
if err != nil {
Expand All @@ -461,39 +493,75 @@ func (s *fileStore) list(k string) ([]store.KeyValueOut, uint64, error) {
if !fInfo.IsDir() {
return nil, 0, nil
}
var index, lastIndex uint64
// Fill kv collection recursively for the related directory
kvs := make([]store.KeyValueOut, 0)

err = filepath.Walk(rootPath, func(pathFile string, info os.FileInfo, err error) error {
// List first-level tree directories in order to walk them concurrently
subPaths, kvs, err := s.listFirstLevelTree(rootPath, waitIndex, lastIndex)

errGroup, ctx := errgroup.WithContext(ctx)
sem := make(chan struct{}, s.concurrencyLimit)

// Use a channel to provide kvs concurrently safe
c := make(chan store.KeyValueOut)
for _, subPath := range subPaths {
pathItem := subPath
sem <- struct{}{}
errGroup.Go(func() error {
defer func() {
<-sem
}()
return s.walk(c, pathItem, k, waitIndex, lastIndex)
})
}

go func() {
errGroup.Wait()
close(c)
}()

// Fill the kvs from channel once all goroutines are finished
for r := range c {
kvs = append(kvs, r)
}

return kvs, lastIndex, errGroup.Wait()
}

func (s *fileStore) walk(c chan store.KeyValueOut, pathItem, k string, waitIndex, lastIndex uint64) error {
err := filepath.Walk(pathItem, func(pathFile string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Add kv for a file
// determine lastIndex for a directory
if !info.IsDir() {
var raw []byte
var value map[string]interface{}
_, raw, err = s.getValueFromFile(pathFile, &value)
kv, err := s.addKeyValueToList(info, pathFile, waitIndex, lastIndex)
if err != nil {
return err
}

kv := store.KeyValueOut{
Key: s.extractKeyFromFilePath(pathFile, true),
LastModifyIndex: uint64(info.ModTime().UnixNano()),
Value: value,
RawValue: raw,
}
kvs = append(kvs, kv)
} else {
index = uint64(info.ModTime().UnixNano())
if index > lastIndex {
lastIndex = index
if kv != nil {
c <- *kv
}
}
return nil
})
return err
}

func (s *fileStore) addKeyValueToList(info os.FileInfo, pathFile string, waitIndex, lastIndex uint64) (*store.KeyValueOut, error) {
index := uint64(info.ModTime().UnixNano())
// Retrieve only files before last modification Index
if index > waitIndex && (lastIndex == 0 || index <= lastIndex) {
var value map[string]interface{}
_, raw, err := s.getValueFromFile(pathFile, &value)
if err != nil {
return nil, err
}

return kvs, lastIndex, err
return &store.KeyValueOut{
Key: s.extractKeyFromFilePath(pathFile, true),
LastModifyIndex: index,
Value: value,
RawValue: raw,
}, nil
}
return nil, nil
}

0 comments on commit 55db4a0

Please sign in to comment.