In [None]:
from google.colab import drive

drive.mount("/gdrive")

In [None]:
!pip install stanza
!pip install --upgrade spacy[cuda12x]
!pip install --no-deps spacy-stanza
!pip install --upgrade networkx

!sudo apt-get install graphviz graphviz-dev
!pip install pygraphviz

!pip install streamlit

In [None]:
!npm install localtunnel

In [None]:
%%writefile app.py
from collections import defaultdict, namedtuple
from itertools import product
from operator import attrgetter

from networkx import MultiDiGraph
from networkx.drawing.nx_agraph import to_agraph
from spacy import prefer_gpu
from spacy.displacy import render
from spacy.tokens import Doc, Span
from spacy_stanza import load_pipeline
from stanza import download
import streamlit as st


def is_prepositional_nmod(nmod):
    return any(token.dep_ == "case" for token in nmod.lefts)


def expand_noun(noun):
    def _expand_noun(_noun):
        _noun_phrase = [_noun]
        for token in _noun.children:
            if (token.pos_ in {"NOUN", "PROPN", "ADJ"}
                    and token.dep_ in {"flat:name", "compound",
                                       "nmod", "amod"}):
                if token.dep_ == "nmod" and is_prepositional_nmod(token):
                    continue
                for conjunct in token.conjuncts:
                    if conjunct.pos_ in {"NOUN", "PROPN", "ADJ"}:
                        _noun_phrase.extend(_expand_noun(conjunct))
                        for token_ in conjunct.lefts:
                            if token_.dep_ in {"cc", "punct"}:
                                _noun_phrase.append(token_)
                _noun_phrase.extend(_expand_noun(token))
        return _noun_phrase

    noun_phrase = _expand_noun(noun)
    noun_phrase.sort(key=attrgetter("i"))
    return noun_phrase


def extract_args(head_noun):
    arguments = [expand_noun(head_noun)]
    for conjunct in head_noun.conjuncts:
        if conjunct.pos_ in {"NOUN", "PROPN"}:
            arguments.append(expand_noun(conjunct))
    return arguments


def dist_subjs_mods_to_verb_conjs(verbs):
    for verb, details in verbs.items():
        head = verbs.get(verb.head)
        if head is not None and verb.dep_ == "conj":
            if not details.get("subjects") and head.get("subjects"):
                details["subjects"] = head["subjects"]
            if not details.get("modifiers") and head.get("modifiers"):
                details["modifiers"] = head["modifiers"]
    return None


def dist_objs_to_verb_conjs(verbs):
    for verb, details in verbs.items():
        if objects := details.get("objects"):
            for conjunct in verb.conjuncts:
                conj = verbs.get(conjunct)
                if (conj is not None
                        and conjunct.i < verb.i
                        and not conj.get("objects")):
                    conj["objects"] = objects
    return None


def handle_xcomp_subjs(verbs):
    for verb, details in verbs.items():
        head = verbs.get(verb.head)
        if (head is not None
                and verb.dep_ == "xcomp"
                and details.get("objects")):
            details["subjects"] = head.get("objects") or head["subjects"]
    return None


def extract_svo_triples(sentence):
    SVOTriple = namedtuple("SVOTriple", ["s", "v", "o"])
    svo_triples = []
    verbs = defaultdict(lambda: defaultdict(list))

    for token in sentence:
        head = token.head
        if token.pos_ == "VERB":
            verb = verbs[token]
            if (token.dep_ in {"acl", "acl:relcl"}
                    and head.pos_ in {"NOUN", "PROPN"}):
                verb["subjects"].extend(extract_args(head))
        elif head.pos_ == "VERB":
            head = verbs[head]
            if token.pos_ in {"NOUN", "PROPN"}:
                if token.dep_ in {"nsubj", "nsubj:pass"}:
                    head["subjects"].extend(extract_args(token))
                elif token.dep_ == "obj":
                    head["objects"].extend(extract_args(token))
            elif token.dep_ in {"advmod", "aux"}:
                head["modifiers"].append(token)

    dist_objs_to_verb_conjs(verbs)
    handle_xcomp_subjs(verbs)
    dist_subjs_mods_to_verb_conjs(verbs)

    for verb, details in verbs.items():
        subjects = details.get("subjects")
        objects = details.get("objects")
        if subjects and objects:
            vp = [verb]
            if verb_modifiers := details.get("modifiers"):
                vp.extend(verb_modifiers)
                vp.sort(key=attrgetter("i"))
            for s, o in product(subjects, objects):
                svo_triples.append(SVOTriple(s=s, v=vp, o=o))

    return svo_triples


def merge_tokens_into_text(tokens):
    text = "".join(token.text_with_ws for token in tokens)
    return text.lower().rstrip()


def collect_svo_triples(doc):
    svo_triples = []
    for sentence in doc.sents:
        for s, v, o in sentence._.svo_triples:
            s = merge_tokens_into_text(s)
            v = merge_tokens_into_text(v)
            o = merge_tokens_into_text(o)
            svo_triples.append((s, v, o))
    return svo_triples


def construct_kg(doc):
    svo_triples = set(doc._.svo_triples)
    G = MultiDiGraph()
    for s, v, o in svo_triples:
        G.add_edge(s, o, label=v)
    A = to_agraph(G)
    A.graph_attr.update(overlap="scale", splines="true")
    A.layout(prog="sfdp")
    return A


stanza_config = {
    "lang": "id",
    "package": "default_accurate",
    "processors": "tokenize,mwt,pos,lemma,depparse",
    "model_dir": "/gdrive/MyDrive/stanza_resources",
}

download(**stanza_config)

prefer_gpu()

nlp = load_pipeline(
    name="id", use_gpu=True, download_method=None, **stanza_config
)

Span.set_extension("svo_triples", getter=extract_svo_triples, force=True)
Doc.set_extension("svo_triples", getter=collect_svo_triples, force=True)
Doc.set_extension("kg", getter=construct_kg, force=True)


st.set_page_config(page_title="IDNLSR2KG", page_icon=":brain:")

st.title(":brain: IDNLSR2KG")
st.write(
    "Effortlessly transform software requirements into structured knowledge."
)

dataset = st.file_uploader(
    "Upload software requirement dataset", type="txt"
)

if dataset:
    with st.spinner("Processing..."):
        unique_lines = set()

        for line in dataset:
            if line := line.decode().strip():
                unique_lines.add(line.rstrip("."))

        requirements = ". ".join(unique_lines)

        doc = nlp(requirements)

        st.header("Knowledge Graph")
        kg_svg = doc._.kg.draw(format="svg")
        st.image(kg_svg.decode())

        st.divider()

        for sent in doc.sents:
            st.subheader("Requirement Sentence")
            st.write(sent.text)

            st.subheader("POS & Dependency")
            dep_svg = render(
                sent,
                style="dep",
                jupyter=False,
                options={"collapse_punct": False},
            )
            st.image(dep_svg)

            st.subheader("Extracted SVO Triples")
            svo_triples = sent._.svo_triples
            if not svo_triples:
                st.write("None")
                st.divider()
                continue
            for s, v, o in svo_triples:
                s = merge_tokens_into_text(s)
                v = merge_tokens_into_text(v)
                o = merge_tokens_into_text(o)
                st.write(f"({s}, {v}, {o})")

            st.divider()

In [None]:
!wget -qO- ifconfig.me

In [None]:
!streamlit run app.py & npx localtunnel --port 8501