# api

This is the primary interface to running squ wrappers 

In [None]:
#| default_exp api

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import pandas, json, logging, time, requests
from nbdev_squ.core import *
from diskcache import memoize_stampede
from subprocess import run, CalledProcessError
from azure.monitor.query import LogsQueryClient, LogsBatchQuery, LogsQueryStatus
from azure.identity import AzureCliCredential

In [None]:
logging.basicConfig(level=logging.INFO)

In [None]:
#| export
logger = logging.getLogger(__name__)

## List Workspaces

The `list_workspaces` function retreives a list of workspaces from blob storage and returns it in various formats

In [None]:
#| exports
@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_workspaces(fmt: str = "df", # df, csv, json, list
                    agency: str = "ALL"): # Agency alias or ALL
    path = datalake_path()
    df = pandas.read_csv((path / "notebooks/lists/SentinelWorkspaces.csv").open())
    df = df.join(pandas.read_csv((path / "notebooks/lists/SecOps Groups.csv").open()).set_index("Alias"), on="SecOps Group", rsuffix="_secops")
    df = df.rename(columns={"SecOps Group": "alias", "Domains and IPs": "domains"})
    df = df.dropna(subset=["customerId"]).sort_values(by="alias").convert_dtypes().reset_index()
    if agency != "ALL":
        df = df[df["alias"] == agency]
    if fmt == "df":
        return df
    elif fmt == "csv":
        return df.to_csv()
    elif fmt == "json":
        return df.fillna("").to_dict("records")
    elif fmt == "list":
        return list(df["customerId"].unique())
    else:
        raise ValueError("Invalid format")

You can use it do lookup an agencies alias based on its `customerId` (also known as `TenantId`, Log Analytics `WorkspaceId`)

In [None]:
workspaces = list_workspaces()
workspaces.query(f'customerId == "{workspaces["customerId"][0]}"')["alias"].str.cat()

# Log Analytics Query
The below function makes it easy to query all workspaces with sentinel installed using log analytics.

In [None]:
#| exports
@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_subscriptions():
    return pandas.DataFrame(azcli(["account", "list"]))["id"].unique()

@memoize_stampede(cache, expire=60 * 60 * 3) # cache for 3 hours
def list_securityinsights():
    return pandas.DataFrame(azcli([
        "graph", "query", "--first", "1000", "-q", 
        """
        resources
        | where type =~ 'microsoft.operationsmanagement/solutions'
        | where name startswith 'SecurityInsights'
        | project wlid = tolower(tostring(properties.workspaceResourceId))
        | join kind=leftouter (
            resources | where type =~ 'microsoft.operationalinsights/workspaces' | extend wlid = tolower(id))
            on wlid
        | extend customerId = properties.customerId
        """
    ])["data"])

def chunks(items, size):
    # Yield successive `size` chunks from `items`
    for i in range(0, len(items), size):
        yield items[i:i + size]

@memoize_stampede(cache, expire=60 * 5) # cache for 5 mins
def loganalytics_query(queries: list[str], timespan=pandas.Timedelta("14d"), batch_size=200, batch_delay=35):
    """
    Run queries across all workspaces, in batches of `batch_size` with a minimum delay of `batch_delay` between batches.
    Returns a dictionary of queries and results
    """
    client = LogsQueryClient(AzureCliCredential())
    workspaces = list_workspaces(fmt="df")
    sentinel_workspaces = list_securityinsights()
    requests, results = [], []
    for query in queries:
        for workspace_id in sentinel_workspaces["customerId"]:
            requests.append(LogsBatchQuery(workspace_id=workspace_id, query=query, timespan=timespan))
    querytime = pandas.Timestamp("now")
    logger.info(f"Executing {len(requests)} queries at {querytime}")
    for request_batch in chunks(requests, batch_size):
        batch_start = pandas.Timestamp("now")
        results += client.query_batch(request_batch)
        duration = pandas.Timestamp("now") - batch_start
        logger.info(f"Completed {len(results)} in {duration}")
        if duration.total_seconds() < batch_delay:
            time.sleep(batch_delay - duration.total_seconds())
    dfs = {}
    for request, result in zip(requests, results):
        if result.status == LogsQueryStatus.PARTIAL:
            tables = result.partial_data
            tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
        elif result.status == LogsQueryStatus.SUCCESS:
            tables = result.tables
            tables = [pandas.DataFrame(table.rows, columns=table.columns) for table in tables]
        else:
            tables = [pandas.DataFrame([result.__dict__])]
        df = pandas.concat(tables).dropna(axis=1, how='all') # prune empty columns
        df["TenantId"] = request.workspace
        alias = workspaces.query(f'customerId == "{request.workspace}"')["alias"].str.cat()
        if alias == '':
            alias = sentinel_workspaces.query(f'customerId == "{request.workspace}"')["name"].str.cat()
        df["_alias"] = alias
        query = request.body["query"]
        if query in dfs:
            dfs[query].append(df)
        else:
            dfs[query] = [df]
    return {query: pandas.concat(results, ignore_index=True).convert_dtypes() for query, results in dfs.items()}

def query_all(query, fmt="df", timespan=pandas.Timedelta("14d")):
    try:
        # Check query is not a plain string and is iterable
        assert not isinstance(query, str)
        iter(query)
    except (AssertionError, TypeError):
        # if it is a plain string or it's not iterable, convert into a list of queries
        query = [query]
    df = pandas.concat(loganalytics_query(query, timespan).values())
    if fmt == "df":
        return df
    elif fmt == "csv":
        return df.to_csv()
    elif fmt == "json":
        return df.fillna("").to_dict("records")
    else:
        raise ValueError("Invalid format")

In [None]:
query_all(["SecurityAlert"]).groupby("_alias")[["Tactics", "TenantId"]].nunique().sort_values(by="Tactics", ascending=False).head()

In [None]:
#| exports
def atlaskit_transformer(inputtext, inputfmt="md", outputfmt="wiki", runtime="node"):
    import nbdev_squ
    transformer = dirs.user_cache_path / f"atlaskit-transformer.bundle_v{nbdev_squ.__version__}.js"
    if not transformer.exists():
        transformer_url = f'{nbdev_squ._modidx.d["settings"]["git_url"]}/releases/download/v{nbdev_squ.__version__}/atlaskit-transformer.bundle.js'
        transformer.write_bytes(requests.get(transformer_url).content)
    cmd = [runtime, str(transformer), inputfmt, outputfmt]
    logger.debug(" ".join(cmd))
    try:
        return run(cmd, input=inputtext, text=True, capture_output=True, check=True).stdout
    except CalledProcessError:
        run(cmd, input=inputtext, text=True, check=True)

In [None]:
print(atlaskit_transformer("""# Heading 1

- a bullet
[a link](https://github.com]
"""))

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()