Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions internal/app/ingest/processor_assets.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,9 @@ func (p *AssetProcessor) createAssetFromCTIS(
ctisAsset *ctis.Asset,
tool *ctis.Tool,
) (*asset.Asset, error) {
assetType := mapCTISAssetType(ctisAsset.Type)
rawType := mapCTISAssetType(ctisAsset.Type)
// Resolve type aliases: e.g., "firewall" → type=network, sub_type=firewall
coreType, subType := asset.ResolveTypeAlias(rawType)
criticality := mapCTISCriticality(ctisAsset.Criticality)

name := getAssetName(ctisAsset)
Expand All @@ -1229,11 +1231,16 @@ func (p *AssetProcessor) createAssetFromCTIS(
name = name[:maxNameLength]
}

newAsset, err := asset.NewAsset(name, assetType, criticality)
newAsset, err := asset.NewAsset(name, coreType, criticality)
if err != nil {
return nil, err
}

// Set sub_type from TypeAliases or from properties
if subType != "" {
newAsset.SetSubType(subType)
}

newAsset.SetTenantID(tenantID)

// Set description (with length limit) - log if truncated
Expand Down Expand Up @@ -1265,6 +1272,14 @@ func (p *AssetProcessor) createAssetFromCTIS(
newAsset.AddTag(tag)
}

// Promote sub_type from properties if not already set via TypeAliases
if newAsset.SubType() == "" {
if st, ok := ctisAsset.Properties["sub_type"].(string); ok && st != "" {
newAsset.SetSubType(st)
delete(ctisAsset.Properties, "sub_type")
}
}

// Set discovery info
discoverySource := "agent"
discoveryTool := ""
Expand Down
2 changes: 0 additions & 2 deletions internal/infra/http/handler/asset_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ type AssetResponse struct {
FindingSeverityCounts *FindingSeverityResponse `json:"finding_severity_counts,omitempty"`
Description string `json:"description,omitempty"`
Tags []string `json:"tags,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
Properties map[string]any `json:"properties,omitempty"`
PrimaryOwner *OwnerBriefResponse `json:"primary_owner,omitempty"`

Expand Down Expand Up @@ -169,7 +168,6 @@ func toAssetResponse(a *asset.Asset) AssetResponse {
FindingCount: a.FindingCount(),
Description: a.Description(),
Tags: a.Tags(),
Metadata: a.Metadata(),
Properties: a.Properties(),

// Discovery
Expand Down
57 changes: 15 additions & 42 deletions internal/infra/postgres/asset_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@ func NewAssetRepository(db *DB) *AssetRepository {

// Create persists a new asset.
func (r *AssetRepository) Create(ctx context.Context, a *asset.Asset) error {
metadata, err := json.Marshal(a.Metadata())
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}

properties, err := json.Marshal(a.Properties())
if err != nil {
return fmt.Errorf("failed to marshal properties: %w", err)
Expand All @@ -46,14 +41,14 @@ func (r *AssetRepository) Create(ctx context.Context, a *asset.Asset) error {
INSERT INTO assets (
id, tenant_id, parent_id, owner_id, owner_ref, name, asset_type, sub_type, criticality, status,
scope, exposure, risk_score,
description, tags, metadata, properties,
description, tags, properties,
provider, external_id, classification, sync_status, last_synced_at, sync_error,
discovery_source, discovery_tool, discovered_at,
compliance_scope, data_classification, pii_data_exposed, phi_data_exposed, regulatory_owner_id,
is_internet_accessible, exposure_changed_at, last_exposure_level,
first_seen, last_seen, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37, $38)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29, $30, $31, $32, $33, $34, $35, $36, $37)
`

ownerRefVal := sql.NullString{String: a.OwnerRef(), Valid: a.OwnerRef() != ""}
Expand All @@ -73,7 +68,6 @@ func (r *AssetRepository) Create(ctx context.Context, a *asset.Asset) error {
a.RiskScore(),
a.Description(),
pq.Array(a.Tags()),
metadata,
properties,
a.Provider().String(),
nullString(a.ExternalID()),
Expand Down Expand Up @@ -370,7 +364,7 @@ func (r *AssetRepository) selectQuery() string {
COALESCE(fc.finding_medium, 0) as finding_medium,
COALESCE(fc.finding_low, 0) as finding_low,
COALESCE(fc.finding_info, 0) as finding_info,
a.description, a.tags, a.metadata, a.properties,
a.description, a.tags, a.properties,
a.provider, a.external_id, a.classification, a.sync_status, a.last_synced_at, a.sync_error,
a.discovery_source, a.discovery_tool, a.discovered_at,
a.compliance_scope, a.data_classification, a.pii_data_exposed, a.phi_data_exposed, a.regulatory_owner_id,
Expand All @@ -394,11 +388,6 @@ func (r *AssetRepository) selectQuery() string {

// Update updates an existing asset.
func (r *AssetRepository) Update(ctx context.Context, a *asset.Asset) error {
metadata, err := json.Marshal(a.Metadata())
if err != nil {
return fmt.Errorf("failed to marshal metadata: %w", err)
}

properties, err := json.Marshal(a.Properties())
if err != nil {
return fmt.Errorf("failed to marshal properties: %w", err)
Expand All @@ -408,13 +397,13 @@ func (r *AssetRepository) Update(ctx context.Context, a *asset.Asset) error {
UPDATE assets
SET parent_id = $2, owner_id = $3, owner_ref = $4, name = $5, asset_type = $6, sub_type = $7, criticality = $8, status = $9,
scope = $10, exposure = $11, risk_score = $12,
description = $13, tags = $14, metadata = $15, properties = $16,
provider = $17, external_id = $18, classification = $19, sync_status = $20, last_synced_at = $21, sync_error = $22,
discovery_source = $23, discovery_tool = $24, discovered_at = $25,
compliance_scope = $26, data_classification = $27, pii_data_exposed = $28, phi_data_exposed = $29, regulatory_owner_id = $30,
is_internet_accessible = $31, exposure_changed_at = $32, last_exposure_level = $33,
last_seen = $34, updated_at = $35
WHERE id = $1 AND tenant_id = $36
description = $13, tags = $14, properties = $15,
provider = $16, external_id = $17, classification = $18, sync_status = $19, last_synced_at = $20, sync_error = $21,
discovery_source = $22, discovery_tool = $23, discovered_at = $24,
compliance_scope = $25, data_classification = $26, pii_data_exposed = $27, phi_data_exposed = $28, regulatory_owner_id = $29,
is_internet_accessible = $30, exposure_changed_at = $31, last_exposure_level = $32,
last_seen = $33, updated_at = $34
WHERE id = $1 AND tenant_id = $35
`

updateOwnerRef := sql.NullString{String: a.OwnerRef(), Valid: a.OwnerRef() != ""}
Expand All @@ -433,7 +422,6 @@ func (r *AssetRepository) Update(ctx context.Context, a *asset.Asset) error {
a.RiskScore(),
a.Description(),
pq.Array(a.Tags()),
metadata,
properties,
a.Provider().String(),
nullString(a.ExternalID()),
Expand Down Expand Up @@ -620,7 +608,6 @@ func (r *AssetRepository) doScan(scan func(dest ...any) error) (*asset.Asset, er
findingInfo int
description sql.NullString
tags pq.StringArray
metadata []byte
properties []byte
provider sql.NullString
externalID sql.NullString
Expand Down Expand Up @@ -651,7 +638,7 @@ func (r *AssetRepository) doScan(scan func(dest ...any) error) (*asset.Asset, er
&idStr, &tenantIDStr, &parentIDStr, &ownerIDStr, &ownerRef, &name, &assetType, &subType, &criticality, &status,
&scope, &exposure, &riskScore, &findingCount,
&findingCritical, &findingHigh, &findingMedium, &findingLow, &findingInfo,
&description, &tags, &metadata, &properties,
&description, &tags, &properties,
&provider, &externalID, &classification, &syncStatus, &lastSyncedAt, &syncError,
&discoverySource, &discoveryTool, &discoveredAt,
&complianceScope, &dataClassification, &piiDataExposed, &phiDataExposed, &regulatoryOwnerIDStr,
Expand All @@ -665,7 +652,7 @@ func (r *AssetRepository) doScan(scan func(dest ...any) error) (*asset.Asset, er
a, err := r.reconstructAsset(
idStr, tenantIDStr, parentIDStr, ownerIDStr, ownerRef, name, assetType, subType.String, criticality, status,
scope, exposure, riskScore, findingCount,
description, tags, metadata, properties,
description, tags, properties,
provider, externalID, classification, syncStatus, lastSyncedAt, syncError,
discoverySource, discoveryTool, discoveredAt,
complianceScope, dataClassification, piiDataExposed, phiDataExposed, regulatoryOwnerIDStr,
Expand Down Expand Up @@ -695,7 +682,7 @@ func (r *AssetRepository) reconstructAsset(
riskScore, findingCount int,
description sql.NullString,
tags pq.StringArray,
metadataBytes, propertiesBytes []byte,
propertiesBytes []byte,
provider sql.NullString,
externalID, classification sql.NullString,
syncStatus sql.NullString,
Expand Down Expand Up @@ -744,13 +731,6 @@ func (r *AssetRepository) reconstructAsset(
parsedProvider := asset.ParseProvider(nullStringValue(provider))
parsedSyncStatus := asset.ParseSyncStatus(nullStringValue(syncStatus))

var metadata map[string]any
if len(metadataBytes) > 0 {
if err := json.Unmarshal(metadataBytes, &metadata); err != nil {
metadata = make(map[string]any)
}
}

var properties map[string]any
if len(propertiesBytes) > 0 {
if err := json.Unmarshal(propertiesBytes, &properties); err != nil {
Expand Down Expand Up @@ -804,7 +784,6 @@ func (r *AssetRepository) reconstructAsset(
findingCount,
desc,
[]string(tags),
metadata,
properties,
parsedProvider,
nullStringValue(externalID),
Expand Down Expand Up @@ -1103,12 +1082,12 @@ func (r *AssetRepository) UpsertBatch(ctx context.Context, assets []*asset.Asset
INSERT INTO assets (
id, tenant_id, parent_id, owner_id, name, asset_type, criticality, status,
scope, exposure, risk_score,
description, tags, metadata, properties,
description, tags, properties,
provider, external_id, classification, sync_status, last_synced_at, sync_error,
discovery_source, discovery_tool, discovered_at,
first_seen, last_seen, created_at, updated_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27)
ON CONFLICT (tenant_id, name) DO UPDATE SET
tags = (
SELECT array_agg(DISTINCT t)
Expand All @@ -1135,11 +1114,6 @@ func (r *AssetRepository) UpsertBatch(ctx context.Context, assets []*asset.Asset
defer stmt.Close()

for _, a := range assets {
metadata, err := json.Marshal(a.Metadata())
if err != nil {
return created, updated, fmt.Errorf("failed to marshal metadata: %w", err)
}

properties, err := json.Marshal(a.Properties())
if err != nil {
return created, updated, fmt.Errorf("failed to marshal properties: %w", err)
Expand All @@ -1160,7 +1134,6 @@ func (r *AssetRepository) UpsertBatch(ctx context.Context, assets []*asset.Asset
a.RiskScore(),
a.Description(),
pq.Array(a.Tags()),
metadata,
properties,
a.Provider().String(),
nullString(a.ExternalID()),
Expand Down
3 changes: 3 additions & 0 deletions migrations/000140_merge_metadata_into_properties.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- Rollback: re-add metadata column (data cannot be fully restored)
ALTER TABLE assets ADD COLUMN IF NOT EXISTS metadata JSONB DEFAULT '{}'::jsonb;
CREATE INDEX IF NOT EXISTS idx_assets_metadata ON assets USING GIN (metadata);
17 changes: 17 additions & 0 deletions migrations/000140_merge_metadata_into_properties.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Migration 000140: Merge metadata into properties, drop metadata column
--
-- The assets table has both `properties` (scanner data) and `metadata` (user data).
-- UI already merges both: metadata = { ...properties, ...metadata }
-- Simplify: one column `properties` for everything.

-- Step 1: Merge metadata into properties (metadata wins on conflict)
UPDATE assets
SET properties = COALESCE(properties, '{}'::jsonb) || COALESCE(metadata, '{}'::jsonb),
updated_at = NOW()
WHERE metadata IS NOT NULL AND metadata != '{}'::jsonb;

-- Step 2: Drop metadata GIN index
DROP INDEX IF EXISTS idx_assets_metadata;

-- Step 3: Drop metadata column
ALTER TABLE assets DROP COLUMN IF EXISTS metadata;
2 changes: 2 additions & 0 deletions migrations/000141_promote_sub_type_from_properties.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Rollback: clear inferred sub_types (cannot distinguish manual vs inferred)
UPDATE assets SET sub_type = NULL WHERE sub_type IS NOT NULL;
90 changes: 90 additions & 0 deletions migrations/000141_promote_sub_type_from_properties.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
-- Migration 000141: Promote sub_type from properties to column
--
-- Collectors send sub_type in properties JSONB. Ingest now promotes it,
-- but existing data needs backfill.
--
-- Two sources:
-- 1. properties->>'sub_type' (explicit)
-- 2. TypeAliases inference from asset_type (e.g., kubernetes assets with
-- properties containing namespace → sub_type = 'namespace')

-- ============================================================
-- Step 1: Promote explicit sub_type from properties
-- ============================================================
UPDATE assets
SET sub_type = properties->>'sub_type',
properties = properties - 'sub_type',
updated_at = NOW()
WHERE sub_type IS NULL OR sub_type = ''
AND properties->>'sub_type' IS NOT NULL
AND properties->>'sub_type' != '';

-- ============================================================
-- Step 2: Infer sub_type from TypeAliases patterns
-- ============================================================

-- Kubernetes: cluster vs namespace
UPDATE assets SET sub_type = 'namespace', updated_at = NOW()
WHERE asset_type = 'kubernetes'
AND (sub_type IS NULL OR sub_type = '')
AND (name LIKE '%/%' OR properties->>'namespace' IS NOT NULL);

UPDATE assets SET sub_type = 'cluster', updated_at = NOW()
WHERE asset_type = 'kubernetes'
AND (sub_type IS NULL OR sub_type = '')
AND name NOT LIKE '%/%';

-- Network: infer from properties.type
UPDATE assets SET sub_type = properties->>'type', updated_at = NOW()
WHERE asset_type = 'network'
AND (sub_type IS NULL OR sub_type = '')
AND properties->>'type' IS NOT NULL
AND properties->>'type' IN ('firewall', 'load_balancer', 'switch', 'router',
'vpn_gateway', 'wireless_controller', 'ids', 'vpc', 'subnet');

-- Database: infer from properties.engine
UPDATE assets SET sub_type = properties->>'engine', updated_at = NOW()
WHERE asset_type = 'database'
AND (sub_type IS NULL OR sub_type = '')
AND properties->>'engine' IS NOT NULL;

-- Storage: infer s3_bucket from properties.type
UPDATE assets SET sub_type = 's3_bucket', updated_at = NOW()
WHERE asset_type = 'storage'
AND (sub_type IS NULL OR sub_type = '')
AND (properties->>'type' = 's3' OR name LIKE 's3://%');

-- Container: infer from registry
UPDATE assets SET sub_type = 'image', updated_at = NOW()
WHERE asset_type = 'container'
AND (sub_type IS NULL OR sub_type = '');

-- Identity: infer from properties.type
UPDATE assets SET sub_type = properties->>'type', updated_at = NOW()
WHERE asset_type = 'identity'
AND (sub_type IS NULL OR sub_type = '')
AND properties->>'type' IS NOT NULL
AND properties->>'type' IN ('iam_user', 'iam_role', 'service_account');

-- Service: infer from port/protocol
UPDATE assets SET sub_type = 'open_port', updated_at = NOW()
WHERE asset_type = 'service'
AND (sub_type IS NULL OR sub_type = '')
AND name ~ ':\d+:(tcp|udp)$';

-- Application: infer from properties or URL pattern
UPDATE assets SET sub_type = 'api', updated_at = NOW()
WHERE asset_type = 'application'
AND (sub_type IS NULL OR sub_type = '')
AND (name LIKE '%/api%' OR name LIKE '%api.%' OR properties->>'type' = 'api');

UPDATE assets SET sub_type = 'website', updated_at = NOW()
WHERE asset_type = 'application'
AND (sub_type IS NULL OR sub_type = '')
AND name LIKE 'https://%';

-- Cloud Account: infer provider as sub_type
UPDATE assets SET sub_type = properties->>'provider', updated_at = NOW()
WHERE asset_type = 'cloud_account'
AND (sub_type IS NULL OR sub_type = '')
AND properties->>'provider' IS NOT NULL;
Loading
Loading