# Authors

- Ikram Kohil, 2019115
- Johnatan Gao, 2013298

# Taint Analysis
In this lab, we will be implementing taint analysis to ensure that all data coming from the outside (user inputs) do not reach critical parts of the system.

# 1. Implementation and tests

In this first part we will implement the teint analysis algorithm called: Possibly Teinted Definitions. We will then test the implementation on four examples in the folder **part_1**.

## 1.1 Possibly Tainted Definitions

According to class notes, there are a couple of variables/concepts that we need to define in order to implement the conceptual algorithm.

$$
in\_taintedDefs: V → \Rho(DEFS)
$$
$$
out\_taintedDefs: V → \Rho(DEFS)
$$

In this statement:

- V: This typically represents a set or type of variables within the context of a programming language or a system. Variables can be anything from integers, strings, objects, etc., depending on the language or system under consideration.
- P(DEFS): P represents the power set, which is the set of all subsets of a given set. DEFS is a set of definitions. So, P(DEFS) would be the power set of the set of definitions. In this context, it suggests that for each variable in V, there is a set of definitions (DEFS) associated with it, and in_taintedDefs maps each variable in V to a subset of its associated definitions.

$$
in\_taintedDefs(v) = \bigcup_{p \in preds(node)} out\_taintedDefs(p)
$$

This equation defines the behavior of the in_taintedDefs function. It states that the in_taintedDefs for a variable v is the union (⋃) of the out_taintedDefs for all predecessors (preds(node)) of a given node. In other words, to compute the in_taintedDefs for a variable at a particular node, you take the union of the out_taintedDefs for all nodes that flow into that node.

$$
tainted\_GEN[node] = 
\left\{
  \begin{array}{ll}
    \ {d_i \in defs[node]  | \exists(d_j, r_k) \in defRefChains | (r_k \in refs[node]), (d_j \in in\_tainted[node])} &\text{ if } (node \in EXPR)\ \\

    \ {d_i \in defs[node]} & \text{ if } (node \in SOURCES) \\

    \emptyset & \text{ if } (node \in FILTERS) \\
    
    \emptyset & \text{ if } (node \in SAFESET)
  \end{array}
\right.
$$

In this statement, we are trying to populate our GEN array. For each node in our CFG, we check for four conditions:
- If the node is part of the FILTERS set, then the tainted_gen for this node is empty
- If the node is part of the SAFESET set, then the tainted_gen for this node is empty as well
- If the node is part of the SOURCES set, then the tainted_gen for this node is the definition for this node
- If the node is part of the EXPR set (in other words, is an expression), then we will:
  - Take the defintion for this node, if it exists
  - Take the definition/reference pair for this node, if it exists
  - Create a pair of (reference, definition), if the reference is part of our reference set and definition is part of the in_tainted set for this node
  
In other words, 
1. Check if node is a definition: First, determine if the current node is a definition.

2. If the node is a definition and the right side is a source: If the node is indeed a definition, and its right side is a source, then mark the definition as tainted. This means that the definition corresponds to a value that is considered tainted or untrusted.

3. If the node is a definition and it corresponds to a filter or safe: If the node is a definition but corresponds to a filter or safe, then no action is taken. This implies that the definition is associated with a value that is considered safe or filtered, so it doesn't need further processing.

4. If the node is a definition and the right side is an expression: If the node is a definition and its right side is an expression, further investigation is needed.
Check if any references in the expression are tainted: Examine all references in the expression associated with the definition. If any of these references are tainted (i.e., marked as untrusted), then the definition is also considered tainted.

$$
tainted\_KILL[node] = {d_k | (var(d_k) = var(d_m)) \land (d_m \in defs[node] )}
$$

In this statement, we are trying to populate the kill set. Essentially, everytime we detect a definition OR a redefinition, we must add it to our tainted_kill set for this particular node.

Here's the conceptual algorithm for Possibly-Tainted Definitions:

```python
def POSS_TAINTED_DEFS():
    for all node ∈ nodeSet do
        IN[node] = /0
        OUT[node] = /0
    end for

    changes = True

    while changes do
        changes = False
        for all node ∈ nodeSet do

            IN[node] = ⋃_(p∈preds(node)) OUT(p)
            old OUT[node] = OUT[node]
            OUT[node] = GEN[node] ∪ (IN[node] - KILL[node])

            if OUT[node] != old_OUT[node] then
                changes = True
            end if
        end for
    end while
```

In [None]:
import os
import glob

# Global variable - directory where cfg.json and .dot files generated by our code will be stored 
part1_output_directory = "output/part_1/"
part2_output_directory = "output/part_2/"

def get_json_files(pattern: str, directory: str):
   """
    Finds all files in the specified directory and its subdirectories that match the given pattern.

    This function uses the glob module to search for files that match the specified pattern.
    The pattern can include wildcards like '*' to match any sequence of characters and '?' to match any single character.

    Args:
        pattern (str): The pattern to match filenames against. For example, '*.json' to match all JSON files.
        directory (str): The directory to search in. This can be an absolute path or a relative path.

    Returns:
        list: A list of file paths that match the given pattern. Each path is a string.

    Example:
        >>> get_json_files('*.json', '/path/to/directory')
        ['/path/to/directory/file1.json', '/path/to/directory/subdir/file2.json']
    """
   files = glob.glob(f"{directory}/**/{pattern}", recursive=True)
   print(files)
   return files

def splitext_recurse(p):
    """
    Recursively splits the filename into base name and extensions.
    
    Args:
        p: The filename to split.
    
    Returns:
        A tuple containing the base name and all extensions.
    """
    base, ext = os.path.splitext(p)
    if ext == '':
        return (base,)
    else:
        return splitext_recurse(base) + (ext,)

In [None]:
from typing import Dict, List, Set
import os
import json
from code_analysis import CFGReader
from code_analysis import ASTReader

class TaintAnalysisAlgorithm:
    """
    A class for performing taint analysis on a given file.
    
    Attributes:
        cfg: The control flow graph (CFG) of the file.
        ast: The abstract syntax tree (AST) of the file.
        filename: The name of the file being analyzed.
        filepath: The path to the file being analyzed.
        taint_file_data: The data loaded from the taint file.
        tainted_params: A dictionary containing all necessary parameters for taint analysis.
        tainted_gen: A dictionary containing all tainted_gen nodes.
        tainted_kill: A dictionary containing all tainted_kill nodes.
        tainted_in: A dictionary containing all tainted_in nodes.
        tainted_out: A dictionary containing all tainted_out nodes.
    """
    
    def __init__(self, filepath: str):
        """
        Initializes the TaintAnalysisAlgorithm with the given filepath and filename.

        We are initializing all the necessary variables to perform a taint analysis.
        
        Args:
            filepath: The path to the directory containing the file.
            filename: The name of the file to analyze.
        """
        self.cfg = CFGReader().read_cfg(f"{filepath}.php.cfg.json")
        self.ast = ASTReader().read_ast(f"{filepath}.php.ast.json")
        self.filepath = filepath

        with open(f"{filepath}.php.taint.json") as taint_file:
            self.taint_file_data = json.load(taint_file)

        self.tainted_params: Dict[str, List[int]] = {
            "defs": self.taint_file_data['defs'],
            "refs": self.taint_file_data['refs'],
            "pairs" : self.taint_file_data['pairs'],
            "sinks" : self.taint_file_data['sinks'],
            "filters" : self.taint_file_data['filters'],
            "safes" : self.taint_file_data['safes'],
            "sources" : self.taint_file_data['sources']
        }

        self.tainted_gen: Dict[int, Set] = {}
        self.tainted_kill: Dict[int, Set] = {}
        self.tainted_in: Dict[int, Set] = {}
        self.tainted_out: Dict[int, Set] = {}

        for node in self.cfg.get_node_ids():
            self.tainted_in[node] = set()
            self.tainted_out[node] = set()
            self.tainted_gen[node] = set()
            self.tainted_kill[node] = set()

    def __is_binOp_equal(self, node_id: int) -> bool:
        """
        Checks if the node is a binary operation equal to '='.
        
        Args:
            node_id: The ID of the node to check.
        
        Returns:
            True if the node is a binary operation equal to '=', False otherwise.
        """
        return self.cfg.get_type(node_id) == "BinOP" and self.cfg.get_image(node_id) == "="

    def __get_expr_nodes(self, node: int) -> List[int]:
        """
        Retrieves all expression nodes from the given node.
        
        Args:
            node: The starting node ID.
        
        Returns:
            A list of node IDs representing the expression nodes.
        """
        ref = []
        stack = [node]

        while stack:
            current_node = stack.pop()
            node_type = self.cfg.get_type(current_node)

            if node_type == 'BinOP':
                operands = self.cfg.get_op_hands(current_node)
                stack.extend(operands)
            else:
                ref.append(current_node)

        return ref
    
    def _get_tainted_gen(self, node_id: int):
        """
        Determines the tainted_gen nodes for a given node.

        Here, we are essentially following the statement defined for tainted_GEN by
        handling what happens with the expression is part of the sources, safes and filters set.
        We determine that if the node is a "=", and that it's not part of the previously mentionned sets,
        it must be an expression.
        
        Args:
            node_id: The ID of the node to analyze.
        """
        if self.__is_binOp_equal(node_id):
            var_node_id, expr_node_id = self.cfg.get_op_hands(node_id)
            if expr_node_id in self.tainted_params["sources"]:
                var_node_id = self.cfg.get_op_hands(node_id)[0]
                self.tainted_gen[node_id].add(var_node_id)
            elif expr_node_id in self.tainted_params["safes"]:
                self.tainted_gen[node_id] = set()
            elif expr_node_id in self.tainted_params["filters"]:
                self.tainted_gen[node_id] = set()
            else:
                refExpr = self.__get_expr_nodes(expr_node_id)
                for ref in refExpr:
                    for defRef in self.tainted_params["pairs"]:
                        definition, reference = defRef
                        if reference == ref and definition in self.tainted_in[node_id]:
                            self.tainted_gen[node_id].add(var_node_id)
                            break
            
    def _get_tainted_kills(self, node_id: int):
        """
        Determines the tainted_kill nodes for a given node.
        
        Args:
            node_id: The ID of the node to analyze.
        """
        if self.__is_binOp_equal(node_id):
            var_node_id = self.cfg.get_op_hands(node_id)[0]
            if var_node_id in self.tainted_params['defs']:
                self.tainted_kill[node_id].add(var_node_id) 

    def _update_tainted_in(self, node_id: int):
        """
        Updates the tainted_in nodes for a given node.

        The reason why we do the condition for 'CallEnd' is due to the fact that the
        parent of CallEnd is not the desired outcome.
        
        Args:
            node_id: The ID of the node to update.
        """
        predecessors = [self.cfg.get_call_begin(node_id)] if self.cfg.get_type(node_id) == 'CallEnd' else self.cfg.get_parents(node_id)
        for predNode in predecessors:
            self.tainted_in[node_id] = self.tainted_in[node_id].union(self.tainted_out[predNode])

    def _update_tainted_out(self, node_id: int):
        """
        Updates the tainted_out set for a given node ID by combining the tainted_gen set with the difference between tainted_in and tainted_kill sets.

        This method is used to calculate the tainted output for a given node, which is crucial for data flow analysis. 
        It ensures that the tainted_out set for a node reflects the current state of tainted data that can flow out of the node.

        Args:
        - node_id: The unique identifier of the node for which the tainted_out set is to be updated.
        """
        diff = self.tainted_in[node_id].difference(self.tainted_kill[node_id])
        self.tainted_out[node_id] = self.tainted_gen[node_id].union(diff)
    
    def _perform_taint_analysis(self):
        """
        Performs the taint analysis on the file.
        """
        changes = True
        old_OUT = {}

        while changes:
            changes = False
            for node_id in self.cfg.get_node_ids():
                self._get_tainted_gen(node_id)                  
                self._get_tainted_kills(node_id)                
                self._update_tainted_in(node_id)
                old_OUT[node_id] = self.tainted_out[node_id]
                self._update_tainted_out(node_id)

                if self.tainted_out[node_id] != old_OUT[node_id]:
                    changes = True

    def output_results(self, directory: str, file_name: str):
        """
        Outputs the results of the taint analysis to a file.
        
        Args:
            directory: The directory where the output file should be saved.
            file_name: The name of the output file.
        """
        if not os.path.exists(directory):
            os.makedirs(directory)
        
        output_file_path = f"{directory}/{file_name}"
        self._perform_taint_analysis()
        
        with open(output_file_path, 'a+') as output_file:
            output_file.write(f"------------------------ File: {self.filepath} ------------------------\n")
            for sink in self.tainted_params['sinks']:
                for pair in self.tainted_params['pairs']:
                    definition, reference = pair
                    if sink == reference and definition in self.tainted_in[sink]:
                        output_file.write(f"'{self.cfg.get_image(definition)}' that's defined at line {self.ast.get_position(self.cfg.get_node_ast_ptr(definition))[0]} and referenced at line {self.ast.get_position(self.cfg.get_node_ast_ptr(reference))[0]} is tainted \n")

            for node in self.cfg.get_node_ids():
                output_file.write(f"{node}: , IN: {self.tainted_in[node]}, OUT: {self.tainted_out[node]}\n")
            output_file.write("\n")


In [None]:
def analyze_part_1(directory):
    part1_output_directory = "output/part_1"
    part_1_output_file = "part_1_output_file.txt"

    # Retrieve filenames of all php files in the specified directory
    php_filenames = get_json_files('*.php', directory)

    # Iterate over the filenames array once to visit all cfgs
    for php_filename in php_filenames:
        filename = splitext_recurse(php_filename)[0]
        analyser = TaintAnalysisAlgorithm(filename)
        analyser.output_results(part1_output_directory, part_1_output_file)

directory_to_analyze = "../part_1/"
analyze_part_1(directory_to_analyze)

In [None]:
def analyze_part_2(directory):
    part2_output_directory = "output/part_2"
    part2_output_file = "part_2_output_file.txt"

    # Retrieve filenames of all cfg in the specified directory
    cfg_filenames = get_json_files('*.php.cfg.json', directory)

    for cfg_filename in cfg_filenames:
        filename = splitext_recurse(cfg_filename)[0]
        analyser = TaintAnalysisAlgorithm(filename)
        analyser.output_results(part2_output_directory, part2_output_file)

directory_to_analyze = "../part_2/app.cfg"
analyze_part_2(directory_to_analyze)