In [116]:
from neo4j import GraphDatabase
from LegalGraphDB import LegalGraphDB

from neo4j import GraphDatabase
import re
import json
import os 

from llama_index.core import Document
# JSON 파일을 로드하는 함수

def load_json_as_documents(input_file):
    with open(input_file, 'r', encoding='utf-8') as f:
        json_data = json.load(f)

    documents = []
    for entry in json_data:
        doc = Document(
            text=entry["content"],
            metadata={
                "index": entry["index"],
                "name": entry["subtitle"],
                "document_title": entry["metadata"]["document_title"],
                "created_date": entry["metadata"]["date"],
                "revise_info": entry["metadata"]["revise_info"],
                "source": entry["metadata"]["source"],
                "title_doc": entry["metadata"]["title"]["doc"],
                "title_chapter": entry["metadata"]["title"]["chapter"],
                "title_section": entry["metadata"]["title"]["section"],
                "title_subsection": entry["metadata"]["title"]["subsection"],
            }
        )
        documents.append(doc)
    
    return documents

# 문서 정보 추출
def extract_document_information(input_path):
    file_name = input_path.split("/")[-1]
    document_number = file_name.split("_")[0]
    law_type = file_name.split("_")[1]
    document_type = file_name.split("_")[2].split(".")[0]
    return document_number, law_type, document_type



In [117]:
import re

def find_clause_pattern(input_path, documents):
    connect_list = []
    document_number, law_type, document_type = extract_document_information(input_path)  # 메타데이터 추출
    current_document = f"{document_number}_{law_type}_{document_type}"
    #print("현재 문서:", current_document)
    referenced_indices = set()

    # 결과를 저장할 딕셔너리
    result_dict = {
        'meta': {
            'document_number': document_number,
            'law_type': law_type,
            'document_type': document_type,
            'current_document': current_document
        },
        'patterns': {}
    }

    for doc in documents: 
        #print("\n", "="*30, doc.metadata['index'], "="*30)
        text = doc.text
        index = doc.metadata['index']
        #print("contents:", text)

        # 기본 패턴 | 연속 패턴 | 다중 패턴을 처리하는 정규식
        pattern = (
            r"([가-힣\w\(\)\"「」]*)?\s*(제\d+조(?:의\d+)?(?:제\d+항)?)"  # 제~조제~항 패턴
            r"|([가-힣\w\(\)\"「」]*)?\s*(제\d+항)"  # 단독 제~항 패턴
            r"((?:,\s*제\d+조(?:의\d+)?(?:제\d+항)?| 및\s*제\d+조(?:의\d+)?(?:제\d+항)?|,\s*제\d+항| 및\s*제\d+항)*)"
        )
        
        # 연속 패턴 추가 (제~조부터, 제~항부터, 제~조까지 등)
        pattern += r"|(제\d+조(?:의\d+)?(?:제\d+항)?(?:부터|까지)|제\d+항(?:부터|까지))"

        # 텍스트에서 모든 매치된 패턴과 그 앞 단어 찾기
        matches = re.findall(pattern, text)

        if not matches:
            #print("No matches found.")
            continue

        # 문서의 index를 키로 하고 텍스트와 패턴 리스트를 함께 저장
        result_dict['patterns'][index] = {
            'text': text,  # 텍스트를 저장
            'matches': []  # 매칭된 패턴 리스트 저장
        }

        for match in matches:
            # '제~조제~항' 패턴과 '제~항' 단독 패턴을 구분하여 처리
            if match[1]:  # '제~조제~항' 패턴
                preceeding_word = match[0].strip() if match[0] else "\\n"
                matched_pattern = match[1].strip()
            elif match[3]:  # 단독 '제~항' 패턴
                preceeding_word = match[2].strip() if match[2] else "\\n"
                matched_pattern = match[3].strip()
            else:
                continue  # 패턴에 매칭되지 않으면 넘어감

            # 앞 단어에서 쉼표를 보존하고, 공백만 제거하도록 수정
            preceeding_word = preceeding_word if preceeding_word != "\\n" else ","

            # 패턴 리스트에 추가
            final = {
                'preceeding_word': preceeding_word,
                'matched_pattern': matched_pattern
            }
            #print(final)
            result_dict['patterns'][index]['matches'].append(final)

    return result_dict


In [118]:
import re

def extract_article_clause(text):
    # '제301조'와 같은 패턴에서 article, subarticle, clause 추출
    article_match = re.search(r"제(\d+)조", text)
    clause_match = re.search(r"제(\d+)항", text)
    subarticle_match = re.search(r"의(\d+)", text)
    
    article = int(article_match.group(1)) if article_match else None
    clause = int(clause_match.group(1)) if clause_match else None
    subarticle = int(subarticle_match.group(1)) if subarticle_match else None

    
    return article, subarticle, clause


def make_target_index(target_article,target_clause=None, target_subarticle=None, ):
    if target_subarticle:
        if target_clause:
            return f"제{target_article}조의{target_subarticle}제{target_clause}항"
        return f"제{target_article}조의{target_subarticle}"
    else : 
        if target_clause:
            return f"제{target_article}조제{target_clause}항"
        return f"제{target_article}조"

In [119]:
# def process_range_pattern(start_index, end_index, edges,current_index):
#     print("range pattern", start_index, end_index)
#     start_article, start_subarticle, start_clause = extract_article_clause(start_index)
#     end_article, end_subarticle, end_clause = extract_article_clause(end_index)
#     previous_article, previous_subarticle, previous_clause =extract_article_clause(edges[-1]['target_index'])
#     current_article, current_subarticle, current_clause = extract_article_clause(current_index)
#     target_list = []

    
#     # 제~조부터 제~조까지
#     if (start_article is not None) and (start_clause is None) and (end_article is not None) and (end_clause is None):
#         for article in range(start_article, end_article + 1):
#             if start_subarticle:
#                 target_list.append(f"제{article}조의{start_subarticle}")
#             else:
#                 target_list.append(f"제{article}조")

#     # 제~항부터 제~항까지 ->article없음 -> 현재 article 가져와야되나? 
#     elif (start_article is None) and (start_clause is not None) and (end_article is None) and (end_clause is not None):
#         if previous_article:
#             target_article = previous_article
#             target_subarticle = previous_subarticle
#         elif current_article : 
#             target_article = current_article
#             target_subarticle = current_subarticle
#         for clause in range(start_clause, end_clause + 1):
#            target_list.append(make_target_index(target_article, clause, target_subarticle))
            
#     # 제~조제~항부터 제~항까지
#     elif (start_article is not None) and (start_clause is not None) and( end_article is None) and (end_clause is not None):
#         for clause in range(start_clause, end_clause + 1):
#             target_list.append(f"제{start_article}조의{start_subarticle}제{clause}항" if start_subarticle else f"제{start_article}조제{clause}항")
    
#     #제180조의2 부터 제180조의5까지'
#     elif (start_article is not None) and (start_subarticle is not None) and (end_article is not None) and (end_subarticle is not None):
#         for subarticle in range(start_subarticle, end_subarticle + 1):
#             target_list.append(f"제{start_article}조의{subarticle}")
            
#     print("target_list:", target_list)
#     return target_list

def process_multiple_pattern(first, second): #다중 패턴 
    #EX. 제30조제1항 및 제2항 
    #print("multiple pattern")
    first_article, first_subarticle, first_clause = extract_article_clause(first)
    second_article, second_subarticle, second_clause = extract_article_clause(second)

    second_index = make_target_index(first_article, first_clause, first_subarticle)

    return (second_index)

In [120]:
def process_range_pattern(start_index, end_index, edges,current_index):
    #print("range pattern", start_index, end_index)
    
    # Extract article, subarticle, clause for both start and end indices
    start_article, start_subarticle, start_clause = extract_article_clause(start_index)
    end_article, end_subarticle, end_clause = extract_article_clause(end_index)
    previous_article, previous_subarticle, previous_clause =extract_article_clause(edges[-1]['target_index'])
    current_article, current_subarticle, current_clause = extract_article_clause(current_index)

    target_list = []

    # 제~조부터 제~조까지
    if (start_article is not None) and (start_clause is None) and (end_article is not None) and (end_clause is None):
        #print(f"Processing range: 제{start_article}조부터 제{end_article}조까지")
        for article in range(start_article, end_article + 1):
            target_list.append(f"제{article}조")

    # 제~항부터 제~항까지 -> article 없음
    elif (start_article is None) and (start_clause is not None) and (end_article is None) and (end_clause is not None):
        #print(f"Processing range: 제{start_clause}항부터 제{end_clause}항까지")
        if previous_article:
            target_article = previous_article
            target_subarticle = previous_subarticle
        elif current_article : 
            target_article = current_article
            target_subarticle = current_subarticle
        for clause in range(start_clause, end_clause + 1):
           target_list.append(make_target_index(target_article, clause, target_subarticle))
    
    # 제~조제~항부터 제~항까지
    elif (start_article is not None) and (start_clause is not None) and (end_article is None) and (end_clause is not None):
        #print(f"Processing range: 제{start_article}조의{start_subarticle}제{start_clause}항부터 제{end_clause}항까지")
        for clause in range(start_clause, end_clause + 1):
            target_list.append(f"제{start_article}조제{clause}항" if not start_subarticle else f"제{start_article}조의{start_subarticle}제{clause}항")
    
    # 제180조의2부터 제180조의5까지 (조의 범위)
    elif (start_article is not None) and (start_subarticle is not None) and (end_article == start_article) and (end_subarticle is not None):
        #print(f"Processing range: 제{start_article}조의{start_subarticle}부터 제{end_subarticle}까지")
        for subarticle in range(start_subarticle, end_subarticle + 1):
            target_list.append(f"제{start_article}조의{subarticle}")
    
    else:
        pass 
        
    #print("target_list:", target_list)
    return target_list


In [121]:
def law_refers_to_edges(matched_dict):
    #matched_dict에서 patterns의 key 값이 index가 되고, value는 리스트로 저장되어 있음 
    #리스트의 원소는 딕셔너리로 저장되어 있음
    #딕셔너리의 key는 preceeding_word와 matched_pattern
    #matched_pattern의 value를 확인해서 target_document와 target_index, target_clause 찾기 
    document_number = matched_dict['meta']['document_number']
    law_type = matched_dict['meta']['law_type']
    document_type = matched_dict['meta']['document_type']

    current_document =  matched_dict['meta']['current_document']
    edges =  [] 

    for index, information in matched_dict['patterns'].items():
        
        current_index = index 
        current_text = information['text']
        #print("\n", "="*30, current_index, "="*30)
        #print("contents:", current_text)
        
        current_article, current_subarticle, current_clause = extract_article_clause(current_index)
            
        #기본은 현재 문서, 현재 조항 기준 
        target_document = current_document
        target_article = current_article
        target_clause = current_clause
        
        preceeding_word = "" 
        #탐지한 패턴 확인하면서 target_document, target_article, target_clause 추출 
        start_index = current_index 
        for pattern in information['matches']:
            #print(f"--------{pattern}--------- ")
            
            target_article, target_subarticle, target_clause = extract_article_clause(pattern['matched_pattern'])
            if  ("," or "및" ) in pattern['preceeding_word'] : # 이전의 preceeding_word를 확인해야 함 
                #다중 패턴 -> 이전 조항 연결 
                target_document = edges[-1]['target_document']
                target_index = process_multiple_pattern(preceeding_word,pattern['matched_pattern'])
                


            ######### Range Pattern ####### 
            elif ("부터" or "까지") in pattern['preceeding_word']:
                target_article, target_subarticle, target_clause = extract_article_clause(edges[-1]['target_index']) if edges[-1] else extract_article_clause(current_index)
                #print("이전 조항 정보:", target_article, target_subarticle, target_clause)
                if ("제" in pattern["preceeding_word"]):    #EX. {"preceeding_word": "제1조부터", "matched_pattern": "제3조"}
                    start_index = pattern['preceeding_word']
                    end_index = pattern['matched_pattern']
                else : # 이전 조항 연결                         EX. {"preceeding_word": "부터", "matched_pattern": "제3조"}
                    start_index =matched_pattern 
                    end_index = pattern['matched_pattern']

                
                target_list = process_range_pattern(start_index,end_index,edges,current_index)
                
                for target_index in target_list:
                    target_article, target_subarticle, target_clause = extract_article_clause(target_index)
                    #target_article이 none이면 current_article로 대체
                    target_article = target_article if target_article else current_article
                    #target_clause가 none이면 ""로 대체 
                    target_index = make_target_index(target_article, target_clause, target_subarticle)
                    edge = {
                        'current_document': current_document,
                        'current_index': current_index,
                        'target_document': target_document,
                        'target_index': target_index
                    }
                    #print(edge)
                    edges.append(edge)
                continue
                
                
            preceeding_word = pattern['preceeding_word']
            matched_pattern =   pattern['matched_pattern']
            target_article, target_subarticle, target_clause = extract_article_clause(matched_pattern)
            #preceeding_word에 포함된 단어에 따라 구분 
            
            ###### 다른 문서, 다른 조 ########
            if "「" in pattern['preceeding_word'] or "」" in pattern['preceeding_word']:
                #print("다른 문서, 다른 조")
                match = re.search(r'(.*?)」', pattern['preceeding_word'])
                if match:
                    target_document = match.group(1)
                #print("target_document:", target_document)
                target_index = make_target_index(target_article,  target_clause, target_subarticle) 

            elif ("조" not in pattern['matched_pattern'] ) and ("항" in pattern['matched_pattern']):
                if (pattern['preceeding_word']=="조"): #같은 조 라고 가져왔을 때 이전의 조 정보 가져와야 함 
                    #print("같은 문서, 앞에랑 같은 조")
                    target_document = edges[-1]['target_document']
                    previous_article, previous_subarticle, previous_clause = extract_article_clause(edges[-1]['target_index'])
                    current_article, current_subarticle, current_clause = extract_article_clause(pattern['matched_pattern'])
                    target_index = make_target_index(previous_article,  current_clause, previous_subarticle)
                
                else : 
                    #print("같은 문서, 같은 조")
                    target_document = current_document
                    current_article, current_subarticle, current_clause = extract_article_clause(current_index)
                    target_article, target_subarticle, target_clause = extract_article_clause(pattern['matched_pattern'])
                    target_index = make_target_index(current_article,  target_clause, current_subarticle) 
            
            ###### 같은 문서, 다른 조 ########
            #EX. {'preceeding_word': '', 'matched_pattern': '제78조제1항'}
            elif ("조" in pattern['matched_pattern'] ) and ("항" in pattern['matched_pattern']):
                #print("같은 문서, 다른 조")
                target_document = current_document
                target_article = target_article
                target_clause = target_clause
                target_index = make_target_index(target_article,  target_clause, target_subarticle) 
            ###### 같은 문서, 같은 조 ########
            #EX>--------{'preceeding_word': '경우에는', 'matched_pattern': '제7항'}--------- 
            #EX. {'preceeding_word': ',', 'matched_pattern': '제1항'}
            
            
            
            else : #항 정보 없이 조만 포함하는 경우
                target_index = make_target_index(target_article, target_clause, target_subarticle)
            
            
            
            if pattern['preceeding_word'][-1]=="법":
                target_document = f"{document_number}_law_{document_type}"
            elif pattern['preceeding_word'][-1]=="영":
                target_document = f"{document_number}_order_{document_type}"
            elif pattern['preceeding_word'][-1]=="부":
                target_document = f"{document_number}_order_{document_type}"
            else : 
                pass 
                
            edge = {
                'current_document': current_document,
                'current_index': current_index, 
                'target_document': target_document,
                'target_index': target_index
            }
            #print(edge,"\n")
            edges.append(edge)
    return edges




In [128]:
def create_refers_to_triplets_list(input_path):
    document_number, law_type, document_type = extract_document_information(input_path)
    print("============input_path:", input_path,"===================")
    print("document_number:", document_number, "law_type:", law_type, "document_type:", document_type)
    
    documents = load_json_as_documents(input_path)
    print(f"Number of documents loaded: {len(documents)}")
    
    matched_dict = find_clause_pattern(input_path, documents)
    print(f"Number of matched patterns: {len(matched_dict['patterns'])}")
    
    triplets = law_refers_to_edges(matched_dict)
    print("!Finish creating triplets ")
    print("Number of triplets created: ", len(triplets))
    print("Sample triplets: ", triplets[:5])
    
    
    return triplets

In [129]:
refers_to_triplets = []

# JSON 파일 경로
input_path = "../../data/DCM/DCM_json/01_law_main.json"
refers_to_triplets.append(create_refers_to_triplets_list(input_path))
# JSON 파일 경로
input_path = "../../data/DCM/DCM_json/01_enforcement_main.json"
refers_to_triplets.append(create_refers_to_triplets_list(input_path))
# JSON 파일 경로
input_path = "../../data/DCM/DCM_json/01_order_main.json"
refers_to_triplets.append(create_refers_to_triplets_list(input_path))

print("Finish creating all triplets")


    

document_number: 01 law_type: law document_type: main
Number of documents loaded: 1968
Number of matched patterns: 1239
!Finish creating triplets 
Number of triplets created:  4008
Sample triplets:  [{'current_document': '01_law_main', 'current_index': '제3조제1항', 'target_document': '「신탁법', 'target_index': '제78조제1항'}, {'current_document': '01_law_main', 'current_index': '제3조제1항', 'target_document': '01_law_main', 'target_index': '제103조제1항'}, {'current_document': '01_law_main', 'current_index': '제3조제1항', 'target_document': '「신탁법', 'target_index': '제46조'}, {'current_document': '01_law_main', 'current_index': '제3조제1항', 'target_document': '「신탁법', 'target_index': '제46조'}, {'current_document': '01_law_main', 'current_index': '제3조제1항', 'target_document': '「신탁법', 'target_index': '제47조'}]
document_number: 01 law_type: enforcement document_type: main
Number of documents loaded: 1648
Number of matched patterns: 1455
!Finish creating triplets 
Number of triplets created:  4245
Sample triplets:  [{'c

# NEO4J Clause Graph RefersTo Edge 생성


In [None]:
from LegalGraphDB import LegalGraphDB


uri = os.getenv("NEO4J_URI")
user = os.getenv("NEO4J_USERNAME")
password = os.getenv("NEO4J_PASSWORD")


lgdb = LegalGraphDB(uri, user, password)
lgdb.create_clause_relationships(refers_to_triplets)

for triplet in refers_to_triplets:
    lgdb.create_clause_relationships(triplet, )
"""
# # Relation 생성 메서드
    def create_clause_relationship(self, triplet:list, database):
        from_id, from_type = triplet[0]
        edge_type = triplet[1]
        to_id, to_type = triplet[2]

        with self.driver.session(database=database) as session:
            session.run(
                f\"""
                MATCH (a: {from_type}), (b: {to_type})
                WHERE id(a) = $from_id AND id(b) = $to_id
                MERGE (a)-[r:{edge_type}]->(b)
                \""",
                from_id=from_id,
                from_type=from_type,
                to_id=to_id,
                to_type=to_type
            )

"""

def create_clause_relationship(self, triplet:list, database):
    start_document=triplet['current_document']
    start_id = triplet['current_index']
    end_document = triplet['target_document']
    end_id = triplet['target_index']


    with self.driver.session(database=database) as session:
        session.run(
            f"""
            MATCH (a: {start_document}), (b: {end_document})
            WHERE a.index = $start_id AND b.index = $end_id
            MERGE (a)-[r:refers_to]->(b)
            """,
            start_id=start_id,
            end_id=end_id,
            start_document=start_document,
            end_document=end_document
            
        )

