Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions correlation/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM ubuntu:22.04
RUN apt update
RUN apt install -y ca-certificates git
FROM ubuntu:24.04
RUN apt-get update
RUN apt-get install -y ca-certificates git wget
COPY correlation /app/
COPY docs/swagger.json /app/docs/
COPY docs/swagger.yaml /app/docs/
Expand All @@ -9,4 +9,13 @@ COPY run.sh /
RUN chmod +x /app/correlation
RUN chmod +x /run.sh
RUN update-ca-certificates
RUN wget -O /app/asn-blocks-v4.csv https://cdn.utmstack.com/geoip/asn-blocks-v4.csv
RUN wget -O /app/asn-blocks-v6.csv https://cdn.utmstack.com/geoip/asn-blocks-v6.csv
RUN wget -O /app/blocks-v4.csv https://cdn.utmstack.com/geoip/blocks-v4.csv
RUN wget -O /app/blocks-v6.csv https://cdn.utmstack.com/geoip/blocks-v6.csv
RUN wget -O /app/locations-en.csv https://cdn.utmstack.com/geoip/locations-en.csv
RUN wget -O /app/ip_blocklist.list https://intelligence.threatwinds.com/feeds/public/ip/cumulative.list
RUN wget -O /app/domain_blocklist.list https://intelligence.threatwinds.com/feeds/public/domain/cumulative.list
RUN wget -O /app/hostname_blocklist.list https://intelligence.threatwinds.com/feeds/public/hostname/cumulative.list

ENTRYPOINT [ "/run.sh" ]
2 changes: 2 additions & 0 deletions correlation/api/newLogHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"encoding/json"
"fmt"
"github.com/utmstack/UTMStack/correlation/ti"
"io"
"log"
"net/http"
Expand Down Expand Up @@ -74,6 +75,7 @@ func NewLog(c *gin.Context) {
}

cache.AddToCache(l)
ti.Enqueue(l)
search.AddToQueue(l)
response["status"] = "queued"
c.JSON(http.StatusOK, response)
Expand Down
42 changes: 21 additions & 21 deletions correlation/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@ import (

const bufferSize int = 1000000

var cacheStorageMutex = &sync.RWMutex{}
var storageMutex = &sync.RWMutex{}

var CacheStorage []string
var storage []string

func Status() {
for {
log.Printf("Logs in cache: %v", len(CacheStorage))
if len(CacheStorage) != 0 {
est := gjson.Get(CacheStorage[0], "@timestamp").String()
log.Printf("Logs in cache: %v", len(storage))
if len(storage) != 0 {
est := gjson.Get(storage[0], "@timestamp").String()
log.Printf("Old document in cache: %s", est)
}
time.Sleep(60 * time.Second)
Expand All @@ -31,8 +31,8 @@ func Status() {

func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
var elements []string
cacheStorageMutex.RLock()
defer cacheStorageMutex.RUnlock()
storageMutex.RLock()
defer storageMutex.RUnlock()

cToBreak := 0
ait := time.Now().UTC().Unix() - func() int64 {
Expand All @@ -43,8 +43,8 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
return seconds
}
}()
for i := len(CacheStorage) - 1; i >= 0; i-- {
est := gjson.Get(CacheStorage[i], "@timestamp").String()
for i := len(storage) - 1; i >= 0; i-- {
est := gjson.Get(storage[i], "@timestamp").String()
eit, err := time.Parse(time.RFC3339Nano, est)
if err != nil {
log.Printf("Could not parse @timestamp: %v", err)
Expand All @@ -61,23 +61,23 @@ func Search(allOf []rules.AllOf, oneOf []rules.OneOf, seconds int64) []string {
var allCatch bool
var oneCatch bool
for _, of := range oneOf {
oneCatch = evalElement(CacheStorage[i], of.Field, of.Operator, of.Value)
oneCatch = evalElement(storage[i], of.Field, of.Operator, of.Value)
if oneCatch {
break
}
}
for _, af := range allOf {
allCatch = evalElement(CacheStorage[i], af.Field, af.Operator, af.Value)
allCatch = evalElement(storage[i], af.Field, af.Operator, af.Value)
if !allCatch {
break
}
}
if (len(allOf) == 0 || allCatch) && (len(oneOf) == 0 || oneCatch) {
elements = append(elements, CacheStorage[i])
elements = append(elements, storage[i])
}
}
}

return elements
}

Expand All @@ -97,9 +97,9 @@ func ProcessQueue() {
go func() {
for {
l := <-logs
cacheStorageMutex.Lock()
CacheStorage = append(CacheStorage, l)
cacheStorageMutex.Unlock()
storageMutex.Lock()
storage = append(storage, l)
storageMutex.Unlock()
}
}()
}
Expand All @@ -109,11 +109,11 @@ func Clean() {
for {
var clean bool

if len(CacheStorage) > 1 {
if len(storage) > 1 {
if utils.AssignedMemory >= 80 {
clean = true
} else {
old := gjson.Get(CacheStorage[0], "@timestamp").String()
old := gjson.Get(storage[0], "@timestamp").String()
oldTime, err := time.Parse(time.RFC3339Nano, old)
if err != nil {
log.Printf("Could not parse old log timestamp. Cleaning up")
Expand All @@ -129,9 +129,9 @@ func Clean() {
}

if clean {
cacheStorageMutex.Lock()
CacheStorage = CacheStorage[1:]
cacheStorageMutex.Unlock()
storageMutex.Lock()
storage = storage[1:]
storageMutex.Unlock()
} else {
time.Sleep(5 * time.Second)
}
Expand Down
13 changes: 5 additions & 8 deletions correlation/cache/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package cache_test
package cache

import (
"testing"


"github.com/utmstack/UTMStack/correlation/cache"
"github.com/utmstack/UTMStack/correlation/rules"
"testing"
)

func TestSearch(t *testing.T) {
Expand All @@ -16,7 +13,7 @@ func TestSearch(t *testing.T) {
`{"@timestamp":"2022-01-01T00:00:03.000Z","field1":"value1","field2":"value2"}`,
`{"@timestamp":"2022-01-01T00:00:04.000Z","field1":"value1","field2":"value2"}`,
}
cache.CacheStorage = cacheStorage
storage = cacheStorage
allOf := []rules.AllOf{
{Field: "field1", Operator: "==", Value: "value1"},
}
Expand All @@ -31,7 +28,7 @@ func TestSearch(t *testing.T) {
`{"@timestamp":"2022-01-01T00:00:01.000Z","field1":"value1","field2":"value2"}`,
`{"@timestamp":"2022-01-01T00:00:00.000Z","field1":"value1","field2":"value2"}`,
}
result := cache.Search(allOf, oneOf, int64(seconds))
result := Search(allOf, oneOf, int64(seconds))
if len(result) != len(expected) {
t.Errorf("Expected %d elements, but got %d", len(expected), len(result))
}
Expand All @@ -40,4 +37,4 @@ func TestSearch(t *testing.T) {
t.Errorf("Expected %s, but got %s", expected[i], r)
}
}
}
}
60 changes: 29 additions & 31 deletions correlation/cache/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,14 @@ import (

func inCIDR(addr, network string) (bool, error) {
_, subnet, err := net.ParseCIDR(network)
if err == nil {
ip := net.ParseIP(addr)
if ip != nil {
if subnet.Contains(ip) {
return true, nil
}
}
if err != nil {
return false, fmt.Errorf("invalid CIDR")
}
ip := net.ParseIP(addr)
if ip == nil {
return false, fmt.Errorf("invalid IP address")
}
return false, err
return subnet.Contains(ip), nil
}

func equal(val1, val2 string) bool {
Expand Down Expand Up @@ -54,25 +52,25 @@ func endWith(str, suff string) bool {
return strings.HasSuffix(str, suff)
}

func expresion(exp, str string) (bool, error) {
func expression(exp, str string) (bool, error) {
re, err := regexp.Compile(exp)
if err == nil {
if re.MatchString(str) {
return true, nil
}
if err != nil {
return false, err
}
return false, err
return re.MatchString(str), nil
}

func parseFloats(val1, val2 string) (float64, float64, error) {
f1, err1 := strconv.ParseFloat(val1, 64)
if err1 != nil {
return 0, 0, err1
f1, err := strconv.ParseFloat(val1, 64)
if err != nil {
return 0, 0, err
}
f2, err2 := strconv.ParseFloat(val2, 64)
if err2 != nil {
return 0, 0, err2

f2, err := strconv.ParseFloat(val2, 64)
if err != nil {
return 0, 0, err
}

return f1, f2, nil
}

Expand Down Expand Up @@ -105,17 +103,17 @@ func compare(operator, val1, val2 string) bool {
case "not end with":
return !endWith(val1, val2)
case "regexp":
matched, err := expresion(val2, val1)
matched, err := expression(val2, val1)
if err != nil {
return false
}
return matched
case "not regexp":
matched, err := expresion(val2, val1)
matched, err := expression(val2, val1)
if err != nil {
return false
}
return matched
return !matched
case "<":
f1, f2, err := parseFloats(val1, val2)
if err != nil {
Expand Down Expand Up @@ -144,24 +142,24 @@ func compare(operator, val1, val2 string) bool {
return true
case "in cidr":
matched, err := inCIDR(val1, val2)
if err == nil {
return matched
if err != nil {
return false
}
return false
return matched
case "not in cidr":
matched, err := inCIDR(val1, val2)
if err == nil {
return !matched
if err != nil {
return false
}
return false
return !matched
default:
return false
}
}

func evalElement(elem, field, operator, value string) bool {
if gjson.Get(elem, field).Exists() {
return compare(operator, gjson.Get(elem, field).String(), value)
if elem := gjson.Get(elem, field); elem.Exists() {
return compare(operator, elem.String(), value)
} else if operator == "not exist" {
return true
}
Expand Down
1 change: 0 additions & 1 deletion correlation/config.yml.prod
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
rulesFolder: /app/rulesets/
geoipFolder: /app/geosets/
elasticsearch: "http://ELASTICSEARCH_HOST:ELASTICSEARCH_PORT"
postgresql:
server: POSTGRESQL_HOST
Expand Down
Loading