# Threat hunting workflow
This notebook contains an example of how to use the ReversingLabs SDK to hunt for
samples which fall out of the YARA Retro Hunt timeframe.

### Used Spectra Intelligence classes
- **YARARetroHunting** (*TCA-0319 - Start/Cancel YARA Retro Hunt*)
- **YARAHunting** (*TCA-0303 - YARA Matches Feed*)
- **AdvancedSearch** (*TCA-0320 - Advanced Search*)
- **RHA1FunctionalSimilarity** (*TCA-0301 - Group By RHA1 Single Query*)
- **ReanalyzeFile** (*TCA-0205 - Re-Analyze File Single Query*)
- **DynamicAnalysis** (*TCA-0106 - File Dynamic Analysis Report*)
- **FileReputation** (*TCA-0101 - File reputation (Malware Presence)*)
- **FileAnalysis** (*TCA-0104 - File Analysis (RLDATA)*)

### Credentials
Credentials are loaded from a local file instead of being written here in plain text.
To learn how to create credentials file, see the **Storing and using credentials** section in the [README file](./README.md)

### Threat Hunting Workflow
In this notebook we will use the RL SDK python package to look for samples which match our YARA rule reaching as far back as possible.
To do this we will have to build a pipeline which will employ various TiCloud APIs.
The complete pipeline is show on the following diagram.
Each of the elements in the diagram is explained in detail in this notebook.


![text](images/pipeline.png "Pipeline diagram")


The pipeline consists of three distinct stages: A, B and C.
The first stage is the setup stage in which we will create our rule on TiCloud and start a Retro Hunt for that rule.
Stage B handles processing of samples retrieved using the Advanced Search API.
This API is the keystone of our efforts to extend the YARA hunt beyond the reach of a YARA Retro Hunt (90 days).
Stage C collects matches from both YARA Retro Hunt feed and regular YARA feed.
These matches are then enriched with the static and dynamic reports (if those exist) and stored to local storage.

The pipeline has multiple parameters which you may tweak to modify its behaviour.
The first two parameters are of course the YARA rule and the Advanced Search query.
These will, of course, have the most impact on the collected samples.
**NOTE** that you should use search parameters which closely match what the YARA rule is hunting for.
There are 4 limit parameters as well.
Modifying or removing these limits will impact the performance of the pipeline.
**NOTE** to remove a limit simply assign the value `None` to the constant, for example `SEARCH_LIMIT = None`.
The primary intention of these limits is to allow users to reduce the number of samples which this pipeline produces.
TiCloud has a large number of samples, it may be impractical to pull a lot of reports onto a machine you which you are running this notebook.

**NOTE** that this pipeline is kept as simple as possible. There is minimal error handling.
We do not employ asynchronous, multithreading or prefetching techniques, any of which may be used to improve the performance of the pipeline.
Any sample reports we pull are stored on local storage.
The purpose of this pipeline is first and foremost to illustrate how you could enhance your usage of TiCloud APIs.

To better explain what we are doing we will use the following image.


![text](images/timeline.png "Timeline")


When a YARA ruleset is created any sample analyzed by TiCloud is matched against it.
If the sample matches the ruleset it will be present in the YARA matches feed.
We can at any time start a YARA Retro Hunt to try and match any sample which was uploaded to TiCloud in the last 90 days.
But if wish to hunt for samples which might match our YARA ruleset and have not appeared on TiCloud in the last 90 days we will have to do a little bit of work.

The Advanced Search API allows us to search for samples by any number of parameters.
The two most important ones are **firstseen** and **lastseen**.
If we create a ruleset and immediately start a Retro Hunt we can take a timestamp and calculate these values.
In this notebook we will only look a year into the past, but you may wish to go event further back.
The red part of the timeline is limited by the **firstseen** and **lastseen** parameters of the search query.

We will take the search results which fall under this timeframe and submit them for reanalysis.
Since we created a YARA ruleset before doing so these old samples will be matched against it.

The blue segment on the timeline represents the timeframe covered by the YARA Retro Hunt.
The green part of the timeline is the period in which we run the pipeline to consume the Retro Hunt and regular YARA feeds.

### Building the pipeline
The first step is to import the modules necessary for our implementation.
Here we load the credentials and instantiate the required SDK objects.
Additionally, we define a dataclass **PipelineItem** which will simply hold any responses from TiCloud relating to a single sample.
This dataclass contains a few helper methods as well.

The **\*_LIMIT** constant are present here as well.

In [None]:
import csv
import datetime
import dateutil
import json
from pathlib import Path
from itertools import islice, chain
from dataclasses import dataclass
from typing import Optional, Dict, Iterable, List, Union

from ReversingLabs.SDK import ticloud
from ReversingLabs.SDK.helper import NotFoundError


CREDENTIALS = json.load(open('credentials.json'))
USERNAME = CREDENTIALS.get("ticloud").get("username")
PASSWORD = CREDENTIALS.get("ticloud").get("password")
USER_AGENT = json.load(open('../user_agent.json'))["user_agent"]

config = {
    "host": "https://data.reversinglabs.com",
    "username": USERNAME,
    "password": PASSWORD,
    "user_agent": USER_AGENT
}

yara_hunting = ticloud.YARAHunting(**config)
yara_retro = ticloud.YARARetroHunting(**config)
advanced_search = ticloud.AdvancedSearch(**config)
rha1_similarity = ticloud.RHA1FunctionalSimilarity(**config)
reanalyze_file = ticloud.ReanalyzeFile(**config)
dynamic_analysis = ticloud.DynamicAnalysis(**config)
file_reputation = ticloud.FileReputation(**config)
static_analysis = ticloud.FileAnalysis(**config)


# pipeline generator limits
SEARCH_LIMIT = 10000
SIMILAR_LIMIT = 10000
RETRO_LIMIT = 1000
YARA_LIMIT = 1000


@dataclass
class PipelineItem:
    yara: Optional[Dict] = None
    retro: Optional[Dict] = None
    rha1: Optional[Union[str, Dict]] = None
    search: Optional[Dict] = None
    reputation: Optional[Dict] = None
    dynamic: Optional[Dict] = None
    static: Optional[Dict] = None
    
    def sample_available(self):
        yara = self.yara.get("sample_available", False) if self.yara else False
        retro = self.retro.get("sample_available", False) if self.retro else False
        return yara or retro
    
    def get_sha1(self) -> str:
        if self.yara:
            return self.yara["sha1"]
        if self.retro:
            return self.retro["sha1"]
        if self.rha1:
            return self.rha1["sha1"] if not isinstance(self.rha1, str) else self.rha1
        if self.search:
            return self.search["sha1"]
        if self.reputation:
            return self.reputation["sha1"]
        raise ValueError("Empty PipelineItem")

    def get_firstseen(self) -> Optional[datetime.datetime]:
        if isinstance(self.rha1, dict) and self.rha1.get("first_seen"):
             # "first_seen": "2022-04-20T23:59:32.825000",
             return dateutil.parser.isoparse(
                 self.rha1["first_seen"]
             ).replace(tzinfo=datetime.timezone.utc)
        if self.search and self.search.get("firstseen"):
            #  "firstseen": "2024-03-13T02:38:00Z",
            return dateutil.parser.isoparse(
                self.search["firstseen"]
            ).replace(tzinfo=datetime.timezone.utc)
        if self.reputation and self.reputation.get("first_seen"):
            # "first_seen": "2024-01-12T23:00:13",
            return dateutil.parser.isoparse(
                self.reputation["first_seen"]
            ).replace(tzinfo=datetime.timezone.utc)
        return None
    
    def get_lastseen(self) -> Optional[datetime.datetime]:
        if isinstance(self.rha1, dict) and self.rha1.get("last_seen"):
             #  "last_seen": "2023-12-29T13:33:39.772000"
             return dateutil.parser.isoparse(
                 self.rha1["last_seen"]
             ).replace(tzinfo=datetime.timezone.utc)
        if self.search and self.search.get("lastseen"):
            #  "lastseen": "2024-05-27T14:12:54Z",
            return dateutil.parser.isoparse(
                self.search["lastseen"]
            ).replace(tzinfo=datetime.timezone.utc)
        if self.reputation and self.reputation.get("last_seen"):
            #  "last_seen": "2024-06-11T05:09:05",
            return dateutil.parser.isoparse(
                self.reputation["last_seen"]
            ).replace(tzinfo=datetime.timezone.utc)
        return None
    
    def as_dict(self):
        return {
            "yara": self.yara,
            "retro": self.retro,
            "rha1": self.rha1,
            "search": self.search,
            "reputation": self.reputation,
            "dynamic": self.dynamic,
            "static": self.static,
        }

As mentioned before the first step on our hunt for samples is to create a YARA ruleset on the TiCloud.
When a ruleset is created its matches feed is filled automatically with new samples.
**NOTE** that trying to recreate a ruleset with the same name will result in a bad request error.

In [None]:
RULESET_NAME = "threat_hunting_workflow_NSIS_Installer"
RULESET_CONTENT = f"""
import "pe"

rule {RULESET_NAME}
{{
	/* a */
    meta:
        offset = "0x4031d1"
        examplar = "4313d352e0dafd1f22b6517126a655cae3b444fa758d2845eddfbe72f24f7bdd"
    strings:
        $op = {{
            81[2-3]efbeadde [2-6]
            81[2-3]496e7374 [2-6]
            81[2-3]736f6674 [2-6]
            81[2-3]4e756c6c
        }}
        $nsis = "\\xef\\xbe\\xad\\xdeNullsoftInst"
    condition:
        pe.sections[pe.section_index(@op)].characteristics & (pe.SECTION_MEM_READ | pe.SECTION_MEM_EXECUTE) and
        $nsis in (pe.overlay.offset..pe.overlay.offset+pe.overlay.size)
}}
"""

response = yara_hunting.create_ruleset(RULESET_NAME, RULESET_CONTENT)
print(response.status_code)
print(json.dumps(response.json(), indent=1))

response = yara_hunting.get_ruleset_info(RULESET_NAME)
print(json.dumps(response.json(), indent=1))

**NOTE:** Rule delete API call is provided here for convenience.
It is commented out to prevent it being run if you wish to run all the cells automatically.
Deleting a rule will stop its Retro Hunt and clear its YARA matches feed.

In [None]:
# response = yara_hunting.delete_ruleset(RULESET_NAME)
# print(response.status_code)

From this point on any sample analyzed by TiCloud will be matched against the ruleset we just created.
Let us now try to match any samples which have already been analyzed by TiCloud.
We will do this with a YARA Retro Hunt.
We take a timestamp the moment we start the Retro Hunt.
This will be our point of reference for timegating found samples.

In [None]:
retro_started = datetime.datetime.utcnow().replace(tzinfo=datetime.timezone.utc)
# response = yara_retro.enable_retro_hunt(RULESET_NAME)
# print(json.dumps(response.json(), indent=1))

response = yara_retro.start_retro_hunt(RULESET_NAME)
print(json.dumps(response.json(), indent=1))

While the Retro Hunt works in the background we can take a moment to explore how Advanced Search find samples on TiCloud.
We use the firstseen and lastseen parameters to limit the number of samples on the input of our pipeline.
Here we print out the first search result to see what we are working with.

**NOTE** that the search parameters should closely match the content of what the YARA ruleset is searching for.
In this example the YARA ruleset is looking for NSIS installers and our search query is looking for sample types
which match "PE/Exe/NSIS".

In [None]:
SEARCH_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
year_ago = retro_started - datetime.timedelta(days=365)
ninety_days_ago = retro_started - datetime.timedelta(days=90)
formatted_y = year_ago.strftime(SEARCH_FORMAT)
formatted_nda = ninety_days_ago.strftime(SEARCH_FORMAT)

search_query = (f"sampletype:\"PE/Exe/NSIS\" AND "
                f"classification:malicious AND "
                f"firstseen:[{formatted_y} TO {formatted_nda}] AND "
                f"lastseen:[{formatted_y} TO {formatted_nda}]")
response = advanced_search.search(search_query, page_number=1)
print(response.status_code)
print(json.dumps(response.json()["rl"]["web_search_api"]["entries"][0], indent=1))

Now let us define a generator function which will be the input for our pipeline.
Let's call it `complete_search_stream()`.
Note that most of the paginated APIs in this example are consumed with such generator functions.
In this example we will not be using the sorting capabilities of the search API.
But keep in mind that you may improve performance with smart use of these parameters.
Notice that the search results are wrapped in the `PipelineItem` dataclass.

We can use the standard `itertools.islice()` iterator to limit the number of elements we wish to 
consume from the generator. This is how our **\*_LIMIT**s are enforced.

As always we print out one of the items from the generator as an example.

In [None]:
def complete_search_stream(query):
    next_page = 1
    more_pages = True
    consumed = 0
    while more_pages:
        search_page = advanced_search.search(query, page_number=next_page).json()["rl"]["web_search_api"]
        next_page = search_page.get("next_page")
        more_pages = search_page.get("more_pages", False)
        entries = search_page["entries"]
        for e in entries:
            yield PipelineItem(search=e)
        consumed += len(entries)
        total_count = search_page.get("total_count", "Unknown")
        print(f"Consumed {consumed}/{total_count} samples from Advanced Search")
            
second_result = list(
    islice(complete_search_stream(search_query), 2)
)[1].search
print(json.dumps(second_result, indent=1))

In the same vein as the search generator we define a generator for the RHA1 Functional Similarity API.
You can see more example of how to use this API in the [Certificate Search](./certificate_search.ipynb) notebook.
This generator takes a sample hash and returns a stream of `PipelineItem`s which are funcationally similar to the argument sample.
You can read more on the RHA1 hash [here](https://www.reversinglabs.com/technology/reversinglabs-hash-algorithm).

Notice that we ignore two types of exceptions.
The first one is thrown when the requested sample's type is not supported by the RHA1 API.
The second one is thrown when no similar samples are found.
In either case the result is an empty generator which may be consumed by client code without issue.

**Note** that the return type of the RHA1 API differs based on the value of the `extended` parameter.
This is reflected in the type signature of `PipelineItem` as well.

In [None]:
def rha1_similar_to_sample(sample_hash, classification=None, extended=False):
    next_page = None
    has_more_pages = True
    while has_more_pages:
        try:
            similar_result = rha1_similarity.get_similar_hashes(
                sample_hash, 
                extended_results=extended,
                page_sha1=next_page, 
                classification=classification
            ).json()["rl"]["group_by_rha1"]
        except (ValueError, NotFoundError):
            # only some sample types support the rha1 similarity check
            # a ValueError is thrown in case a sample is not supported
            # see Documentation and ticloud.RHA1_TYPE_MAP
            return
        next_page = similar_result.get("next_page_sha1")
        has_more_pages = next_page is not None
        for entry in similar_result["sha1_list"]:
            yield PipelineItem(rha1=entry)
            
example_hash = "00c3ddd93924df51e10585167271e27b4cfb29c5"
rha1_simple = list(islice(rha1_similar_to_sample(example_hash), 1))[0]
rha1_extended = list(islice(rha1_similar_to_sample(example_hash, extended=True), 1))[0]
print(json.dumps(rha1_simple.rha1, indent=1))
print(json.dumps(rha1_extended.rha1, indent=1))

Note that using the `extended=True` parameter of the API significantly slows it down. 
We use it in our pipeline since it returns the firstseen and lastseen timestamps.
If we wanted to we could have added these timestamps to our `PipelineItem`s using the reputation API.

While we were busy with setting up our Advanced Search and RHA1 Functionality generator functions
the YARA retro hunt has been scanning the samples on TiCloud.
Now it's time to set up our Retro Hunt Matches Feed generator function.
Let's call it `complete_retro_feed()` just like the example in [Retro Hunt with timegating](./retro_hunt_with_timegating.ipynb) notebook.
Although this time we will forgo the limit parameter in favor of `itertools.islice()`.

In [None]:
def complete_retro_feed(start_time):
    next_page = str(int(start_time.timestamp()))
    while next_page:
        raw = yara_retro.yara_retro_matches_feed("timestamp", next_page)
        parsed = raw.json()["rl"]["feed"]
        next_page = str(parsed.get("last_timestamp"))
        for e in parsed.get("entries", []):
            yield PipelineItem(retro=e)

retro_example = list(islice(complete_retro_feed(retro_started), 1))[0]
print(json.dumps(retro_example.retro, indent=1))

We define the regular YARA feed generator the same way we define the Retro Hunt generator.

In [None]:
def complete_yara_feed(start_time):
    next_page = str(int(start_time.timestamp()))
    while next_page:
        raw = yara_hunting.yara_matches_feed("timestamp", next_page)
        parsed = raw.json()["rl"]["feed"]
        next_page = str(parsed.get("last_timestamp"))
        for e in parsed.get("entries", []):
            yield PipelineItem(yara=e)
            
yara_example = list(islice(complete_yara_feed(retro_started), 1))[0]
print(json.dumps(yara_example.yara, indent=1))

Next we define some utility functions.
The first two are filter functions which return an inner function.
We will use these in combination with python's `filter()` remove samples from our pipeline which do not fall under our timeframe.
The `batched()` function consumes items from an iterator and returns an iterator of at most `n` elements.
We need this function to allow us to use TiCloud bulk queries in our pipeline.

In [None]:
def lastseen_older_than(limit):
    def inner(i: PipelineItem):
        lastseen = i.get_lastseen()
        return lastseen and lastseen < limit
    return inner


def firstseen_newer_than(limit):
    def inner(i: PipelineItem):
        firstseen = i.get_firstseen()
        return firstseen and limit <= firstseen
    return inner


def batched(iterable, n):
    it = iter(iterable)
    while True:
        b = list(islice(it, n))
        if not b:
            return
        yield b

The first of our enrichment functions is `group_with_reputation()`.
This function takes an iterable of `PipelineItem`s and uses the `batched()` function to gather them in batches of 100.
We query the File Reputation API to get the reputations for these batches.
A reputation is then added to each `PipelineItem` from the parameter iterable.
Each item is then yielded, thus creating a generator of enriched items.

In the example we create a `complete_retro_feed()` generator, wrap it in `group_with_reputation()` and consume one element from it.
We print two JSON responses, one from the YARA Feed and one from the File Reputation.

In [None]:
def group_with_reputation(items: Iterable[PipelineItem]):
    for candidates in batched(items, 100):
        sha1_to_item = {e.get_sha1(): e for e in candidates}
        batch_reputation = file_reputation.get_file_reputation(
            list(sha1_to_item.keys())
        ).json()["rl"]
        if batch_reputation.get("invalid_hashes"):
            print("Invalid hashes in stream", batch_reputation["invalid_hashes"])
        for reputation in batch_reputation["entries"]:
            reputation_hash = reputation["query_hash"]["sha1"]
            i = sha1_to_item[reputation_hash]
            i.reputation = reputation
            yield i
            

enriched = list(islice(group_with_reputation(complete_retro_feed(retro_started)), 1))[0]
print(json.dumps(enriched.retro, indent=1))
print(json.dumps(enriched.reputation, indent=1))

Now we are one step closer to combining the streams we defined into a pipeline.
But first lets go over what we are trying to do:
1. We create a nd activate our threat hunting YARA ruleset to hunt new incoming samples
2. We start a retro hunt with our YARA ruleset to hunt for samples up to 90 days old
3. To use our threat hunting YARA ruleset to hunt for samples older than 90 days we:
    - Use the Advanced Search to form a query that will loosely correspond to samples that might be a match for our YARA ruleset for a period older than 90 days
    - If hunting for executables, use the RHA1 Functional Similarity API to find samples similar to the YARA and retro YARA ruleset matched files
To improve the performance of the pipeline, and to reduce the "noise" - i.e. samples we're not interested in, we are filtering RHA1 Advanced Search & Retro YARA source files by defined FS and LS times.

In the case of samples found by RHA1 we have the firstseen information (`extend_results=True`).
In the case of Advanced Search we can skip the filtering step if we use the `firsteen:` and `lastseen:`
search parameters. Which we do in this example.
In the case of Retro Hunt we will query the File Reputation API to retrieve the first seen value for a sample.

Next we want to explore hashes similar to the ones found by Advanced Search.
We will define a function `expand_similar()` which will generate PipelineItems with rha1 similarity using the previously defined `rha1_similar_to_sample()`.
This function will iterate over the items in the argument.
Each item is yielded and then any similar items are yielded after it.
This behaviour is similar to a depth first search.

In [None]:
def expand_similar(items: Iterable[PipelineItem]):
    for i in items:
        yield i
        stream_of_similar_samples = islice(rha1_similar_to_sample(i.get_sha1()), SIMILAR_LIMIT)
        for similar in stream_of_similar_samples:
            yield similar

The final step before composing our pipeline is to add the Filtering capability.
We define the `PipelineFilter` class.
Note that this class contains some statistics as well.
Since we will be looking for similar samples it is possible for samples to re-appear in our pipeline (RHA1 similarity).
Therefore, we need to keep track of the sample we have already seen.
Additionally, we will keep track of samples which we queued for reanalysis so that we can single the out from the YARA matches feed.

In [None]:
class PipelineFilter:
    
    def __init__(self, fs_filter, ls_filter):
        self.sent_to_reanalysis = set()
        self.known = set()
        self.fs_filter = fs_filter
        self.ls_filter = ls_filter
        self.rejected_time = 0
        self.rejected_seen = 0
        self.call_count = 0
    
    def never_seen(self, i: PipelineItem):
        self.call_count += 1
        if self.call_count % 1000 == 0:
            print(self.summary())
        sha1 = i.get_sha1()
        retval = sha1 not in self.known
        self.known.add(sha1)
        if not retval:
            self.rejected_seen += 1
        return retval
    
    def time_gate(self, i: PipelineItem):
        self.call_count += 1
        if self.call_count % 1000 == 0:
            print(self.summary())
        retval = self.fs_filter(i) and self.ls_filter(i)
        if not retval:
            self.rejected_time += 1
        return retval
    
    def newer_than_fs(self, i: PipelineItem):
        self.call_count += 1
        if self.call_count % 1000 == 0:
            print(self.summary())
        sha1 = i.get_sha1()
        self.known.add(sha1)
        retval = self.fs_filter(i)
        if not retval:
            self.rejected_time += 1
        return retval
    
    def add_reanalyzed(self, hashes: List[str]):
        self.call_count += 1
        if self.call_count % 1000 == 0:
            print(self.summary())
        self.sent_to_reanalysis.update(hashes)
    
    def was_reanalyzed(self, i: PipelineItem):
        self.call_count += 1
        if self.call_count % 1000 == 0:
            print(self.summary())
        return i.get_sha1() in self.sent_to_reanalysis
    
    def summary(self) -> str:
        return (f"Rejected time: {self.rejected_time}\r\n"
                f"Rejected seen: {self.rejected_seen}\r\n"
                f"Reanalyzed hashes: {len(self.sent_to_reanalysis)}\r\n"
                f"Unique hashes: {len(self.known)}\r\n\r\n")
    

pipeline_filter = PipelineFilter(
    firstseen_newer_than(year_ago), 
    lastseen_older_than(ninety_days_ago)
)

We compose the generators, and filter to create the `search_and_rha1_stream`.
This is the part of the pipeline illustrated by the Stage B on the sketch.
We consume items from this stream in batches of 100, since this is the upper bound for the bulk reanalysis request.
Note that depending on your Advanced Search query and the limits you set this part of the pipeline may take some time.
So take a break while this runs.

Note that it will take some time for TiCloud to reanalyze all the sample this stage of the pipeline submits.
This part is represented by the dotted circle marked 1 on the sketch.

In [None]:
search_and_rha1_stream = filter(
    pipeline_filter.time_gate,
    group_with_reputation(
        filter(
            pipeline_filter.never_seen,
            expand_similar(
                islice(complete_search_stream(search_query), SEARCH_LIMIT)
            )
        )
    )
)

for batch in batched(search_and_rha1_stream, 100):
    samples = [i.get_sha1() for i in batch]
    reanalyze_file.reanalyze_samples(samples)
    pipeline_filter.add_reanalyzed(samples)
    print(f"Submitted {len(samples)} samples for reanalysis")

For the final stage of the pipeline we define two enrichment functions.
These will query TiCloud report APIs to pull static and dynamic (if one exists) analysis reports for samples.
These work on the same principle as other function in this example so we will not go into detail.

After the samples have been reanalyzed they will appear in the regular YARA matches feed, stage C on the sketch.
**NOTE** that this feed will contain any sample which arrived to TiCloud since the creation of the rule.
You may wish to filter out these samples using the `PipelineFilter.was_reanalyzed()` function.
**NOTE** that this feed will contain samples who's firstseen timestamp may fall out of our timeframe.
We will filter these out.
The same is done with the samples caught by the Retro Hunt feed.

In either case we will use the previously defined functions `add_static()` and `add_dynamic()` to enrich the items in the pipeline
with static and dynamic reports.
Finally, we will further decorate the items in the pipeline with their source stream's name, "RETRO" and "YARA" respectively.

In [None]:
def add_static(items: Iterable[PipelineItem]):
    for candidates in batched(items, 100):
        sha1_to_item = {e.get_sha1(): e for e in candidates}
        static = static_analysis.get_analysis_results(list(sha1_to_item.keys())).json()["rl"]
        if static.get("invalid_hashes"):
            print("Invalid hashes in stream", static["invalid_hashes"])
        for analysis in static["entries"]:
            analysis_hash = analysis["sha1"]
            i = sha1_to_item[analysis_hash]
            i.static = analysis
            yield i
            
            
def add_dynamic(items: Iterable[PipelineItem]):
    for i in items:
        try:
            i.dynamic = dynamic_analysis.get_dynamic_analysis_results(i.get_sha1()).json()["rl"]["report"]
        except NotFoundError:
            pass
        yield i


def add_source(s):
    def inner(i: PipelineItem):
        return s, i
    return inner


retro_stream = map(
    add_source("RETRO"), 
    add_dynamic(add_static(
        filter(
            pipeline_filter.newer_than_fs,
            islice(
                group_with_reputation(
                    complete_retro_feed(retro_started)
                ), 
                RETRO_LIMIT
            )
        )
    ))
)

yara_stream = map(
    add_source("YARA"), 
    add_dynamic(add_static(
        islice(
            filter(
                pipeline_filter.newer_than_fs, 
                complete_yara_feed(retro_started)
            ),
            YARA_LIMIT
        )
    ))
)


reports_dir = Path("./reports")
reports_dir.mkdir(exist_ok=True)

with open("report.csv", "w", newline="", buffering=1) as report:
    writer = csv.writer(report, delimiter=" ", quoting=csv.QUOTE_MINIMAL)
    writer.writerow(("source", "sha1", "sample_type", "sample_available", "timestamp", "file_size", "reanalyzed_via_pipeline"))
    for source, item in chain(retro_stream, yara_stream):
        yara_report = item.retro if item.retro else item.yara
        writer.writerow((
            source,
            item.get_sha1(),
            yara_report.get("file_type"),
            item.sample_available(),
            yara_report.get("timestamp"),
            yara_report.get("file_size"),
            "Reanalyzed" if pipeline_filter.was_reanalyzed(item) else "Not reanalyzed",
        ))
        with open(reports_dir / f"{item.get_sha1()}.json", "w") as sample_report:
            json.dump(item.as_dict(), sample_report, indent=4)

We combine the two streams into a single one using the `itertoosl.chain()` iterator.
Each item from the combined stream is serialized and stored as a json file in the `./reports` directory.
We create a CSV file which contains a manifest of collected samples.