In [22]:
from hashlib import sha256
from typing import Any, Dict, List, NamedTuple, Optional, Union
import json
import pandas as pd


# interface
# get_query_ids() 
    # returns a DF of source_path, query_id, query_hash - the idea here is that you
    # (or someone) can check for existing queries based on path. I guess I could also
    # do that in the store - i.e. don't add a new one if the hash is the same, 
    # just overwrite with the new details. Hmm. Maybe you don't need to create a query_id. 
    # I could just do this checking in the data layer comparing source_path and 
    # source_index with existing values. LMK what you think.
#
# add_queries(queries: List[Dict[as described above]])
# add_kql_properties(query_id, properties: Dict[Liam's dict])
# get_filter_lists() - will return a dictionary of lists of unique values of various properties for the UI filtering
#     I could also return lists of unique query names and paths
# find_queries(**kwargs) - this is going to be an interesting one given that we have a flexible set of properties to search on.
#     kwargs lets us specify a flexible list of conditions, examples:
#         source_path="/some/path - exact string match (prob case insensitive)
#         query_name={matches: regex} - match based on a pandas operator like regex, startswith, contains
#         table=["table1", "table2"] - intersection of queries that use both these tables
#     it will return a DF of query_id + basic properties.
# get_query(query_id) - find_queries will return a list, to get all the props for a query, you'd need to call this.
# get_schema(table) 

QueryDict = Dict[str, Union[str, int, Dict[str, Any]]]
QueryList = List[QueryDict]


class QueryUpdate(NamedTuple):
    query: QueryDict
    action: str
    query_id: Optional[str] = None


class DataStore:

    _ATTRIB_INDEXES = {
        "tactics": list,
        "techniques": list
    }
    _KQL_INDEXES = {
        "tables": list,
        "operators": list,
        "fields": list,
        "functions": list,
        "tactics": list,
        "techniques": list,
    }

    def __init__(self, json_path: Optional[str] = None, json_data: Optional[Union[str, Dict]] = None):
        self._source_path = json_path
        if json_path or json_data:
            self._data = self._read_data(json_path or json_data).set_index("query_id")
            self._data["query_hash"] = pd.util.hash_pandas_object(self._data["query"]
            self.attributes = self._extract_attributes()
            self._indexes = {}
            self._new_queries: pd.DataFrame = None
    
    def get_filter_lists(self, categories: List[str]) -> Dict[str, List[str]]:
        """Return unique lists of values for each category."""
        pass

    def get_query_ids(self) -> pd.DataFrame:
        return self._data[["source_path", "query_name" "query_hash"]]

    def add_queries(self, queries: QueryList):
        query_batch = [
            self._check_existing_query(self, query: QueryDict)
            for query in queries
        ]
        self._add_or_update_queries(query_batch)

    def add_query(self, query: QueryDict):
        query_batch = [self._check_existing_query(self, query: QueryDict)]
        self._add_or_update_queries(query_batch)

    def add_kql_properties

    def _check_existing_query(self, query: QueryDict):
        existing_query = self._data[
            (self._data["source_path"] == query["source_path"])
            &
            (self._data["source_index"] == query["source_index"])
        ]
        if not self._data[existing_query].empty:
            return QueryUpdate(query, "update", self._data[existing_query].query_id)
        else:
            return QueryUpdate(query, "add", None)

    def _add_or_update_queries(self, queries: List[QueryUpdate]):
        update_queries = [query.query_id for query in queries]
        new_data = self._data[ ~(self._data.index.isin(update_queries))]
        self._new_queries = pd.DataFrame([query.query for query in queries])
        self._update_query_data()

    def _update_query_data(self):
        if self._new_queries is not None:
            new_df = pd.concat([self._data, self._new_queries])
            attribs = self._extract_attributes()
            kql_props = self._extract_kql_props()
            prev_data = self._data
            prev_attributes = self._attributes
            prev_kql_props = self._kql_props
            try:
                self._data = new_df
                self._attributes = new_df
                self._kql_props = kql_props
            except Exception as err:
                self._data = prev_data
                self._attributes = prev_attributes
                self._kql_props = prev_kql_props
            


    @staticmethod
    def _read_data(json_data: str):
        return pd.read_json(json_data)

    def _extract_attributes(self):
        return self._data.apply(lambda x: pd.Series(x.attributes), axis=1)

    def _create_attrib_indexes(self):
        for key, type in self._ATTRIB_INDEXES.items():
            if isinstance(type, list):
                self._indexes[key] = self._create_list_index(
                    data=self.attributes,
                    key_col=key,
                )

    @staticmethod
    def _create_list_index(data, key_col):
        return data[[key_col]].explode(key_col).reset_index().set_index([key_col])



ds = DataStore(json_data=json.dumps(queries))

In [43]:
ds._data.head()
"8017527f-bdd6-44cb-8512-b8984cdc61b7" in ds._data.index

True

In [40]:
attr_df = ds._data.apply(lambda x: pd.Series(x.attributes), axis=1)
tactics_df = attr_df[["tactics"]].explode("tactics").reset_index()
display(tactics_df)
tactics_df = tactics_df.set_index(["tactics"])
display(tactics_df)
# tactics_df.iloc[:,"Exploitation"]
tactics_df.loc["Compromise"]

Unnamed: 0,query_id,tactics
0,8017527f-bdd6-44cb-8512-b8984cdc61b7,Exploitation
1,8017527f-bdd6-44cb-8512-b8984cdc61b7,Compromise
2,707a7a6c-7f3f-4927-9975-00e0c4b77ef5,Exploitation
3,707a7a6c-7f3f-4927-9975-00e0c4b77ef5,Compromise
4,6860c851-0758-4b73-a460-a542b4017b98,Exploitation
5,6860c851-0758-4b73-a460-a542b4017b98,Compromise
6,62450409-d53b-4f06-8e38-69c024fb74d5,Exploitation
7,62450409-d53b-4f06-8e38-69c024fb74d5,Compromise
8,b3812073-0cd4-47bc-b0c6-d1b3fc4f1c1e,Exploitation
9,b3812073-0cd4-47bc-b0c6-d1b3fc4f1c1e,Compromise


Unnamed: 0_level_0,query_id
tactics,Unnamed: 1_level_1
Exploitation,8017527f-bdd6-44cb-8512-b8984cdc61b7
Compromise,8017527f-bdd6-44cb-8512-b8984cdc61b7
Exploitation,707a7a6c-7f3f-4927-9975-00e0c4b77ef5
Compromise,707a7a6c-7f3f-4927-9975-00e0c4b77ef5
Exploitation,6860c851-0758-4b73-a460-a542b4017b98
Compromise,6860c851-0758-4b73-a460-a542b4017b98
Exploitation,62450409-d53b-4f06-8e38-69c024fb74d5
Compromise,62450409-d53b-4f06-8e38-69c024fb74d5
Exploitation,b3812073-0cd4-47bc-b0c6-d1b3fc4f1c1e
Compromise,b3812073-0cd4-47bc-b0c6-d1b3fc4f1c1e


Unnamed: 0_level_0,query_id
tactics,Unnamed: 1_level_1
Compromise,8017527f-bdd6-44cb-8512-b8984cdc61b7
Compromise,707a7a6c-7f3f-4927-9975-00e0c4b77ef5
Compromise,6860c851-0758-4b73-a460-a542b4017b98
Compromise,62450409-d53b-4f06-8e38-69c024fb74d5
Compromise,b3812073-0cd4-47bc-b0c6-d1b3fc4f1c1e


## Test data generation

```
source_path: github_path,          # we can prob make up a path for API-sourced queries
source_type: text, markdown, api...,
source_index: 0,                           # if there are multiple queries in a file
name: query_name,                     # either filename, filename_index, or a name from metadata
query: query_text,
context: text,                               # e.g. text from markdown
attributes: DICT {}
```

In [8]:
json_query_data = """
{
    "query_id": "1234291720927310",
    "source_path": "/github.com/foo",
    "source_type": "text",
    "source_index": 0,
    "name": "query_1",
    "query": "SecurityAlert\\n| Where foo == bar",
    "context": "text from markdown",
    "attributes": {
        "description": "Query one description",
        "tactics": ["Exploitation", "Compromise"],
        "techniques": ["T.1055", "T.1345"]
    }
}
"""
template_dict = json.loads(json_query_data)
print(template_dict["query"])

json_kql_parse = """
{
    "FunctionCalls":["count","tostring","make_list","toreal"],
    "Joins":["rightsemi","leftouter"],
    "Operators":["where","extend","summarize","mv-expand","project-away","project"],
    "Tables":["SigninLogs"]
}
"""

json.loads(json_query_data)

SecurityAlert
| Where foo == bar


In [17]:
table_names = ['AADB2CRequestLogs', 'AADDomainServicesAccountLogon',
       'AADDomainServicesAccountManagement',
       'AADDomainServicesDirectoryServiceAccess',
       'AADDomainServicesLogonLogoff', 'AADDomainServicesPolicyChange',
       'AADDomainServicesPrivilegeUse', 'AADManagedIdentitySignInLogs',
       'AADNonInteractiveUserSignInLogs', 'AADProvisioningLogs',
       'AADRiskyServicePrincipals', 'AADRiskyUsers',
       'AADServicePrincipalRiskEvents', 'AADServicePrincipalSignInLogs',
       'AADUserRiskEvents', 'ADFSSignInLogs', 'AlertEvidence',
       'Anomalies', 'AppServiceIPSecAuditLogs',
       'AppServiceServerlessSecurityPluginData', 'ASimDnsActivityLogs',
       'AuditLogs', 'AWSCloudTrail', 'AWSGuardDuty', 'AWSVPCFlow',
       'AZFWApplicationRule', 'AZFWApplicationRuleAggregation',
       'AZFWDnsQuery', 'AZFWIdpsSignature',
       'AZFWInternalFqdnResolutionFailure', 'AZFWNatRule',
       'AZFWNatRuleAggregation', 'AZFWNetworkRule',
       'AZFWNetworkRuleAggregation', 'AZFWThreatIntel', 'AzureActivity',
       'AzureDiagnostics', 'BehaviorAnalytics', 'CloudAppEvents',
       'CommonSecurityLog', 'ConfidentialWatchlist', 'DeviceEvents',
       'DeviceFileCertificateInfo', 'DeviceFileEvents',
       'DeviceImageLoadEvents', 'DeviceInfo', 'DeviceLogonEvents',
       'DeviceNetworkEvents', 'DeviceNetworkInfo', 'DeviceProcessEvents',
       'DeviceRegistryEvents', 'DeviceTvmSecureConfigurationAssessment',
       'DeviceTvmSoftwareInventory', 'DeviceTvmSoftwareVulnerabilities',
       'DSMAzureBlobStorageLogs', 'DSMDataClassificationLogs',
       'DSMDataLabelingLogs', 'DynamicEventCollection',
       'EmailAttachmentInfo', 'EmailEvents', 'EmailPostDeliveryEvents',
       'EmailUrlInfo', 'GCPAuditLogs', 'HDInsightSecurityLogs',
       'HuntingBookmark', 'IdentityDirectoryEvents',
       'IdentityLogonEvents', 'IdentityQueryEvents', 'LinuxAuditLog',
       'McasShadowItReporting', 'NetworkAccessTraffic', 'NetworkSessions',
       'NSPAccessLogs', 'OfficeActivity', 'PowerBIActivity',
       'ProjectActivity', 'ProtectionStatus',
       'PurviewDataSensitivityLogs', 'SecurityAlert', 'SecurityBaseline',
       'SecurityBaselineSummary', 'SecurityDetection', 'SecurityEvent',
       'SecurityIoTRawEvent', 'SecurityRecommendation', 'SentinelAudit',
       'SentinelHealth', 'SigninLogs', 'Syslog',
       'ThreatIntelligenceIndicator', 'Update', 'UrlClickEvents',
       'UserAccessAnalytics', 'UserPeerAnalytics', 'Watchlist',
       'WindowsEvent', 'WindowsFirewall', 'WireData']

field_names = ['SourceType', 'DomainBehaviorVersion', 'OperationName',
       'BookmarkName', 'SentinelResourceId', 'OSName', 'ActualResult',
       'CreatedBy', 'CreatedDateTime', 'LatencySamplingTimeStamp',
       'Environment', 'CorrelationId', 'MachineGroup',
       'SumResponseBodySize', 'RecordId', 'DstUserUpn', 'ResourceId',
       'InitiatingProcessSHA1', 'ObjectId', 'AssetType', 'Title',
       'InitiatingProcessAccountDomain', 'AuthorizationInfo',
       'TargetContextId', 'LogonId', 'CveTags', 'SourceComputerId',
       'ResourceIdentity', 'ClusterName', 'TdoAttributes',
       'EntityMapping', 'DnssecOkBit', 'DeviceCustomString5',
       'TransmittedServices', 'DeviceCustomDate2Label']


import random
import uuid

def get_random_items(data=table_names, count=3):
    return [
        random.choice(table_names)
        for _ in range(count)
    ]

def get_random_query(index=0):
    tactic_idx = index % 7
    return {
        "query_id": str(uuid.uuid4()),
        "source_path": f"/github.com/foo/{index}",
        "source_type": "text",
        "source_index": random.randint(0, 7),
        "name": f"query_{index}",
        "query": "SecurityAlert\\n| Where foo == bar",
        "context": "text from markdown",
        "attributes": {
            "description": "Query one description",
            "tactics": ["Exploitation", "Compromise"],
            "techniques": [f"T10{tactic_idx:0>2d}", f"T1{tactic_idx:0>2d}5"]
        }
    }

queries = [get_random_query(i) for i in range(5)]
json.dumps(queries)

'[{"query_id": "8017527f-bdd6-44cb-8512-b8984cdc61b7", "source_path": "/github.com/foo/0", "source_type": "text", "source_index": 0, "name": "query_0", "query": "SecurityAlert\\\\n| Where foo == bar", "context": "text from markdown", "attributes": {"description": "Query one description", "tactics": ["Exploitation", "Compromise"], "techniques": ["T1000", "T1005"]}}, {"query_id": "707a7a6c-7f3f-4927-9975-00e0c4b77ef5", "source_path": "/github.com/foo/1", "source_type": "text", "source_index": 2, "name": "query_1", "query": "SecurityAlert\\\\n| Where foo == bar", "context": "text from markdown", "attributes": {"description": "Query one description", "tactics": ["Exploitation", "Compromise"], "techniques": ["T1001", "T1015"]}}, {"query_id": "6860c851-0758-4b73-a460-a542b4017b98", "source_path": "/github.com/foo/2", "source_type": "text", "source_index": 1, "name": "query_2", "query": "SecurityAlert\\\\n| Where foo == bar", "context": "text from markdown", "attributes": {"description": "Que

In [12]:
import uuid
str(uuid.uuid4())

'829bd849-ee3c-4fcb-8c74-e808ada886f6'

In [68]:
import hashlib
import json
from dataclasses import dataclass, field, asdict
from typing import Any, Dict,Literal


_SOURCE_TYPES = ["text", "markdown", "sentinel_yaml", "api", "other"]

SourceType = Literal["text", "markdown", "sentinel_yaml", "api", "other"]


def _uuid_str():
    return str(uuid.uuid4())


@dataclass
class KqlQuery:
    """
    Data format for KqlQuery record.
    
    Attributes
    ----------
    source_path : str
        The path to the original file or API identifier.
    query : str
        The raw query string
    source_type : SourceType, optional
        String - the source file/data type. Valid types are:
        text, markdown, sentinel_yaml, api, other
    source_index : int, optional
        The index (0-based) if the query is one of several in the
        file pointed to by source_path. The default is 0.
    query_name : Optional[str]
        The name of the query. If None this will be derived from
        the last element of source_path
    attributes: Dict[str, Any], optional
        Dictionary of any metadata attributes read from the source
        file.
    kql_properties: Dict[str, Any], optional
        Dictionary of properties derived from the KQL query
    query_id: Optional[str], optional
        UUID used to identify the query
    query_hash: int, optional
        Hash of the query text
    query_version: int, optional
        Query version, not currently used. Default is 0

    Examples
    --------
    Create a KqlQuery instance
    >>>> kql = KqlQuery(
    ...     source_path="https://github.com/a/b/file.kql",
    ...     query="SecurityAlert | take 1"
    ... )

    Create a KqlQuery instance from a dict
    >>>> attribs = {
    ...     "source_path": "https://github.com/a/b/file.kql",
    ...     "query": "SecurityAlert | take 1",
    ... }
    ... kql = KqlQuery(**attribs)

    Different default representation
    >>>> kql
    KqlQuery(source_path='https://github.com/a/b/file.kql', query='SecurityAlert... query_version=0)

    As a dict
    >>>> print(kql.asdict())
    {'source_path': 'https://github.com/a/b/file.kql', 'query': 'SecurityAlert... 'query_version': 0}

    As JSON
    print(kql.to_json())
    {"source_path": "https://github.com/a/b/file.kql", "query": "SecurityAlert... "query_version": 0}

    Class method to convert a list of KqlQuery instances to a list of dicts
    >>>> KqlQuery.kql_list_to_pylist([kql, kql])

    Class method to convert a list of KqlQuery instances to JSON
    >>>> KqlQuery.kql_list_to_json([kql, kql])
    '[{"source_path": "https://github.com/a/b/file.kql", "query": "SecurityAlert... "query_version": 0}]'

    Class method to convert list of KqlQuery instances to a DataFrame
    """

    source_path: str
    query: str
    source_type: SourceType = "text"
    source_index: int = 0
    query_name: Optional[str] = None
    attributes: Dict[str, Any] = field(default_factory=dict)
    kql_properties: Dict[str, Any] = field(default_factory=dict)
    query_id: str = field(default_factory=_uuid_str)
    query_hash: int = 0
    query_version: int = 0

    def __post_init__(self):
        if self.source_path is not None:
            self.query_name = self.source_path.split("/")[-1]
        if self.query:
            self.query_hash = hashlib.sha256(bytes(self.query, encoding="utf-8"), usedforsecurity=False).hexdigest()

    def asdict(self):
        return asdict(self)

    def to_json(self):
        return json.dumps(self.asdict())

    # helper methods and properties
    @property
    def source_types(self):
        del self
        return _SOURCE_TYPES

    @staticmethod
    def kql_list_to_pylist(kql_queries: List[KqlQuery]):
        """Return a list of Python dicts from a list of KqlQuery instances."""
        return [
            kql.asdict() for kql in kql_queries
        ]

    @classmethod
    def kql_list_to_json(cls, kql_queries: List[KqlQuery]):
        """Return JSON from a list of KqlQuery instances."""
        return json.dumps(cls.kql_list_to_pylist(kql_queries))

    @classmethod
    def kql_list_to_df(cls, kql_queries: List[KqlQuery]):
        """Return JSON from a list of KqlQuery instances."""
        return pd.DataFrame(cls.kql_list_to_pylist(kql_queries))


In [72]:
from ..src.kql_query import KqlQuery

kql = KqlQuery(
    source_path="https://github.com/a/b/file.kql",
    query="SecurityAlert | take 1"
)
print(kql)
print(kql.asdict())
print(kql.to_json())

KqlQuery.kql_list_to_pylist([kql, kql])

KqlQuery.kql_list_to_json([kql, kql])

KqlQuery.kql_list_to_df([kql, kql])

ImportError: attempted relative import with no known parent package

In [67]:
kql

KqlQuery(source_path='https://github.com/a/b/file.kql', query='SecurityAlert | take 1', source_type='text', source_index=0, query_name='file.kql', attributes={}, kql_properties={}, query_id='eb422dee-ba53-4404-bb64-722d3658daf5', query_hash='c86ed067ad3516435ca23a7a963909069f7702c1038a5833dcc4363977f0f6b2', query_version=0)

In [64]:
help(KqlQuery)

Help on class KqlQuery in module __main__:

class KqlQuery(builtins.object)
 |  KqlQuery(source_path: str, query: str, source_type: Literal['text', 'markdown', 'sentinel_yaml', 'api', 'other'] = 'text', source_index: int = 0, query_name: Optional[str] = None, attributes: Dict[str, Any] = <factory>, kql_properties: Dict[str, Any] = <factory>, query_id: Optional[str] = None, query_hash: int = 0, query_version: int = 0) -> None
 |  
 |  Data format for KqlQuery record.
 |  
 |  Attributes
 |  ----------
 |  source_path : str
 |      The path to the original file or API identifier.
 |  query : str
 |      The raw query string
 |  source_type : SourceType, optional
 |      String - the source file/data type. Valid types are:
 |      text, markdown, sentinel_yaml, api, other
 |  source_index : int, optional
 |      The index (0-based) if the query is one of several in the
 |      file pointed to by source_path. The default is 0.
 |  query_name : Optional[str]
 |      The name of the query. I