In [1]:
!pip install tree_sitter

You should consider upgrading via the '/opt/app-root/bin/python3.9 -m pip install --upgrade pip' command.[0m[33m
[0m

In [2]:
!git clone https://github.com/tree-sitter/tree-sitter-python

Cloning into 'tree-sitter-python'...
remote: Enumerating objects: 3041, done.[K
remote: Counting objects: 100% (1167/1167), done.[K
remote: Compressing objects: 100% (122/122), done.[K
remote: Total 3041 (delta 1069), reused 1087 (delta 1043), pack-reused 1874[K
Receiving objects: 100% (3041/3041), 21.39 MiB | 41.88 MiB/s, done.
Resolving deltas: 100% (1900/1900), done.


In [3]:
from collections import defaultdict
from tree_sitter import Language, Parser
from pprint import pprint
import json
import ast
from typing import List, Dict, Optional
import re

In [4]:
Language.build_library(
    # Store the library in the `build` directory
    "build/my-languages.so",
    # Include one or more languages
    ["tree-sitter-python"],
)

True

In [5]:
PY_LANGUAGE = Language("build/my-languages.so", "python")

In [6]:
parser = Parser()
parser.set_language(PY_LANGUAGE)

In [7]:
# load nested data
dataset_path = "../../data/raw/nested_data.json"
with open(dataset_path, 'r') as f:
		data = json.load(f)

In [8]:
data.keys()

dict_keys(['errors', 'oidc', 'sign', 'transparency', 'verify_models', 'verify_policy', 'verify_verifier'])

In [9]:
data["errors"].keys()

dict_keys(['markdown', 'code'])

In [10]:
files = {
    "errors": "errors.py",
    "oidc": "oidc.py",
    "sign": "sign.py",
    "transparency": "transparency.py",
    "verify_models": "verify/models.py",
    "verify_policy": "verify/policy.py",
    "verify_verifier": "verify/verifier.py"
}

In [11]:
def extract_functions_classes_imports(node):
    if node.type == 'import_statement':
        import_text = node.text.strip() if hasattr(node, 'text') else ""
        return {'imports': [import_text.decode("utf-8")]}

    elif node.type == 'function_definition':
        function_text = node.text.strip() if hasattr(node, 'text') else ""
        return {'functions': [function_text.decode("utf-8")]}

    elif node.type == 'class_definition':
        class_text = node.text.strip().decode("utf-8") if hasattr(node, 'text') else ""
        return {'classes': {class_text}}

    # Check if it's a documentation string for the entire module
    elif node.type == 'expression_statement' and node.child_count > 0 and node.children[0].type == 'string':
        doc_string = node.children[0].text.strip() if hasattr(node.children[0], 'text') else ""
        return {'documentation': [doc_string.decode("utf-8")]}

    # If it's not an import, function, or class, and not a documentation string, consider it as 'other'
    else:
        other_text = node.text.strip() if hasattr(node, 'text') else ""
        return {'other': [other_text.decode("utf-8")]}
    
    for child in node.children:
        extract_functions_classes_imports(child)

In [12]:
def categorize_code(root_node):
    result = {'imports': [], 'functions': [], 'classes': [], 'documentation': [], 'other': []}
    for node in root_node.children:
        category_result = extract_functions_classes_imports(node)
        for category, items in category_result.items():
            result[category].extend(items)

    return result

In [13]:
def extract_docstrings(code: str) -> List[str]:
    tree = ast.parse(code)
    return [node.value.s for node in ast.walk(tree) if isinstance(node, ast.Expr) and isinstance(node.value, ast.Str)]

In [14]:
def remove_extra_chars(data):
    if isinstance(data, dict):
        for key, value in data.items():
            data[key] = remove_extra_chars(value)
    elif isinstance(data, list):
        data = [remove_extra_chars(item) for item in data]
    elif isinstance(data, str):
        data = re.sub(r'\"{3},?', '', data)

    return data

In [15]:
def process_code(data: Dict[str, Dict[str, List[str]]]) -> Dict[str, Dict[str, List[str]]]:
    for category, category_data in data.items():
        if "code_chunks" in category_data:
            code_chunks = category_data["code_chunks"]

            for key in ["functions", "classes"]:
                new_key_code = f"{key}_code"
                new_key_docstrings = f"{key}_docstrings"

                if key in code_chunks:
                    # Ensure the keys are present, even if initially empty
                    code_chunks[new_key_code] = code_chunks.get(new_key_code, [])
                    code_chunks[new_key_docstrings] = code_chunks.get(new_key_docstrings, [])

                    for i, code_block in enumerate(code_chunks[key]):
                        docstrings = extract_docstrings(code_block)
                        code_chunks[new_key_docstrings].extend(docstrings)

                        # Remove docstrings from the original code block
                        for docstring in docstrings:
                            code_chunks[key][i] = code_chunks[key][i].replace(f'"""{docstring}"""', '""",', 1)

                        # Append the modified code block to the new key
                        code_chunks[new_key_code].append(code_chunks[key][i])

    return data

In [16]:
for k in data.keys():
    code_chunk = data[k]["code"][0][files[k]]
    tree = parser.parse(bytes(code_chunk, "utf8"))
    root_node = tree.root_node
    k_categorized_code = categorize_code(root_node)
    data[k]['code_chunks'] = k_categorized_code

In [17]:
data = process_code(data)

In [18]:
data = remove_extra_chars(data)

In [19]:
data.keys()

dict_keys(['errors', 'oidc', 'sign', 'transparency', 'verify_models', 'verify_policy', 'verify_verifier'])

In [20]:
data["oidc"].keys()

dict_keys(['markdown', 'code', 'code_chunks'])

In [21]:
data["oidc"]["code_chunks"].keys()

dict_keys(['imports', 'functions', 'classes', 'documentation', 'other', 'functions_code', 'functions_docstrings', 'classes_code', 'classes_docstrings'])

In [22]:
data["errors"]["code_chunks"].keys()

dict_keys(['imports', 'functions', 'classes', 'documentation', 'other', 'functions_code', 'functions_docstrings', 'classes_code', 'classes_docstrings'])

In [23]:
data["oidc"]["code_chunks"]["functions_code"]

['def detect_credential() -> Optional[str]:\n    \n    try:\n        return cast(Optional[str], id.detect_credential(_DEFAULT_AUDIENCE))\n    except id.IdentityError as exc:\n        IdentityError.raise_from_id(exc)']

In [24]:
data["oidc"]["code_chunks"]["functions_docstrings"]

['Calls `id.detect_credential`, but wraps exceptions with our own exception type.']

In [25]:
data["oidc"]["code_chunks"]["classes_code"][0]

'class _OpenIDConfiguration(BaseModel):\n    \n\n    authorization_endpoint: StrictStr\n    token_endpoint: StrictStr'

In [26]:
data["oidc"]["code_chunks"]["classes_docstrings"]

["\n    Represents a (subset) of the fields provided by an OpenID Connect provider's\n    `.well-known/openid-configuration` response, as defined by OpenID Connect Discovery.\n\n    See: <https://openid.net/specs/openid-connect-discovery-1_0.html>\n    ",
 'An error raised when an identity token is expired.',
 '\n    An OIDC "identity", corresponding to an underlying OIDC token with\n    a sensible subject, issuer, and audience for Sigstore purposes.\n    ',
 '\n        Create a new `IdentityToken` from the given OIDC token.\n        ',
 "\n        Returns whether or not this `Identity` is currently within its self-stated validity period.\n\n        NOTE: As noted in `Identity.__init__`, this is not a verifying wrapper;\n        the check here only asserts whether the *unverified* identity's claims\n        are within their validity period.\n        ",
 '\n        Returns this `IdentityToken`\'s underlying "subject".\n\n        Note that this is **not** always the `sub` claim in the co

In [27]:
with open('../../data/raw/chunked_data.json', 'w', encoding="utf-8") as json_file:
    json.dump(data, json_file, indent=4)