Closed - Design docs for "Construction of Enriched Research Knowledge Graphs and an Interactive User Interface for Project Grants and Research Findings"#65
Conversation
…o support scalability
Summary of ChangesHello @tekrajchhetri, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a comprehensive design document for a new research knowledge graph system, outlining its architecture, use cases, and requirements. Concurrently, it delivers substantial enhancements to the underlying infrastructure, focusing on improving the robustness, scalability, and user experience of the platform. Key areas of improvement include database connection management, asynchronous processing for CPU-bound tasks, a sophisticated job tracking and recovery system for data ingestion, and better handling of large files. These changes collectively aim to build a more resilient and performant foundation for the knowledge graph project. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant set of improvements focused on performance, scalability, and robustness, particularly within the data ingestion and job management systems. Key enhancements include optimizing database connection pooling, making password hashing asynchronous to prevent event loop blocking, and implementing a sophisticated job recovery mechanism for stuck jobs. The addition of a load testing script is a commendable step towards ensuring system stability. The new design document is also a valuable contribution. My feedback primarily focuses on minor code refinements to improve maintainability and memory efficiency.
|  | ||
|  | ||
|
|
||
| By bringing together concepts discussed across channels, PoC artifacts, and related efforts, this document aims to align define scope, tasks and connect parallel work streams. |
There was a problem hiding this comment.
There appears to be a minor grammatical issue in this sentence. It reads ...aims to align define scope..., which seems to be missing a conjunction or comma. To improve clarity, I suggest rephrasing it.
| By bringing together concepts discussed across channels, PoC artifacts, and related efforts, this document aims to align define scope, tasks and connect parallel work streams. | |
| By bringing together concepts discussed across channels, PoC artifacts, and related efforts, this document aims to align, define scope, and connect parallel work streams. |
|
|
||
| 7. **Administrative Data Ingestion and Management** | ||
| The system **must provide an administrative interface** that allows authorized users to configure and trigger data ingestion from external sources. This interface should support updating, refreshing, and managing source data so that the knowledge graph can be **automatically constructed and updated** based on incoming data. | ||
|
|
There was a problem hiding this comment.
This 'Important' note about model-driven UI feels disconnected from the surrounding text, placed between two different requirement sections. To improve the document's structure and flow, consider integrating this point as a formal requirement within the 'User Interface Requirements' section or as a key consideration under the 'User Interface Design and Development' task list.
| if conn is not None: | ||
| # Use provided connection | ||
| results = await conn.fetch(query, user_id) | ||
| assigned_scopes_to_user = [result["name"] for result in results] | ||
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | ||
| return assigned_scopes_to_user | ||
| else: | ||
| # Manage our own connection | ||
| async with get_db_connection() as connection: | ||
| results = await connection.fetch(query, user_id) | ||
| assigned_scopes_to_user = [result["name"] for result in results] | ||
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | ||
| return assigned_scopes_to_user |
There was a problem hiding this comment.
The logic to fetch scopes and create the list of scope names is duplicated in both the if conn is not None: and the else: block. To adhere to the DRY (Don't Repeat Yourself) principle and improve maintainability, this logic can be extracted into a local helper function.
| if conn is not None: | |
| # Use provided connection | |
| results = await conn.fetch(query, user_id) | |
| assigned_scopes_to_user = [result["name"] for result in results] | |
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | |
| return assigned_scopes_to_user | |
| else: | |
| # Manage our own connection | |
| async with get_db_connection() as connection: | |
| results = await connection.fetch(query, user_id) | |
| assigned_scopes_to_user = [result["name"] for result in results] | |
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | |
| return assigned_scopes_to_user | |
| async def _fetch_scopes(db_conn): | |
| results = await db_conn.fetch(query, user_id) | |
| assigned_scopes_to_user = [result["name"] for result in results] | |
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | |
| return assigned_scopes_to_user | |
| if conn is not None: | |
| # Use provided connection | |
| return await _fetch_scopes(conn) | |
| else: | |
| # Manage our own connection | |
| async with get_db_connection() as connection: | |
| return await _fetch_scopes(connection) |
| DB_SETTINGS = { | ||
| "user": load_environment()["JWT_POSTGRES_DATABASE_USER"], | ||
| "password": load_environment()["JWT_POSTGRES_DATABASE_PASSWORD"], | ||
| "database": load_environment()["JWT_POSTGRES_DATABASE_NAME"], | ||
| "host": load_environment()["JWT_POSTGRES_DATABASE_HOST_URL"], | ||
| "port": load_environment()["JWT_POSTGRES_DATABASE_PORT"], | ||
| } |
There was a problem hiding this comment.
The previous get_db_settings function provided default values for database connection settings (e.g., host = 'postgres'), which made local setup more robust if the .env file was incomplete. The new direct assignment to DB_SETTINGS will cause the application to fail if any of these environment variables are missing. While explicit configuration is good, this could be a regression in developer experience for local setups. Consider reintroducing default values using .get('VAR', 'default').
| DB_SETTINGS = { | |
| "user": load_environment()["JWT_POSTGRES_DATABASE_USER"], | |
| "password": load_environment()["JWT_POSTGRES_DATABASE_PASSWORD"], | |
| "database": load_environment()["JWT_POSTGRES_DATABASE_NAME"], | |
| "host": load_environment()["JWT_POSTGRES_DATABASE_HOST_URL"], | |
| "port": load_environment()["JWT_POSTGRES_DATABASE_PORT"], | |
| } | |
| env = load_environment() | |
| DB_SETTINGS = { | |
| "user": env.get("JWT_POSTGRES_DATABASE_USER", "postgres"), | |
| "password": env.get("JWT_POSTGRES_DATABASE_PASSWORD", ""), | |
| "database": env.get("JWT_POSTGRES_DATABASE_NAME", "brainkb"), | |
| "host": env.get("JWT_POSTGRES_DATABASE_HOST_URL", "postgres"), | |
| "port": int(env.get("JWT_POSTGRES_DATABASE_PORT", 5432)), | |
| } |
| if conn is not None: | ||
| # Use provided connection | ||
| results = await conn.fetch(query, user_id) | ||
| assigned_scopes_to_user = [result["name"] for result in results] | ||
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | ||
| return assigned_scopes_to_user | ||
| else: | ||
| # Manage our own connection | ||
| async with get_db_connection() as connection: | ||
| results = await connection.fetch(query, user_id) | ||
| assigned_scopes_to_user = [result["name"] for result in results] | ||
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | ||
| return assigned_scopes_to_user |
There was a problem hiding this comment.
The logic to fetch scopes and create the list of scope names is duplicated in both the if conn is not None: and the else: block. To adhere to the DRY (Don't Repeat Yourself) principle and improve maintainability, this logic can be extracted into a local helper function.
| if conn is not None: | |
| # Use provided connection | |
| results = await conn.fetch(query, user_id) | |
| assigned_scopes_to_user = [result["name"] for result in results] | |
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | |
| return assigned_scopes_to_user | |
| else: | |
| # Manage our own connection | |
| async with get_db_connection() as connection: | |
| results = await connection.fetch(query, user_id) | |
| assigned_scopes_to_user = [result["name"] for result in results] | |
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | |
| return assigned_scopes_to_user | |
| async def _fetch_scopes(db_conn): | |
| results = await db_conn.fetch(query, user_id) | |
| assigned_scopes_to_user = [result["name"] for result in results] | |
| logger.debug(f"Scopes for user {user_id}: {assigned_scopes_to_user}") | |
| return assigned_scopes_to_user | |
| if conn is not None: | |
| # Use provided connection | |
| return await _fetch_scopes(conn) | |
| else: | |
| # Manage our own connection | |
| async with get_db_connection() as connection: | |
| return await _fetch_scopes(connection) |
| async def _process_large_file_with_lightweight_provenance( | ||
| filepath: str, | ||
| user_id: str, | ||
| ext: str, | ||
| ) -> Tuple[str, bool]: | ||
| """ | ||
| OPTIMIZATION: For large files, append lightweight provenance without full graph parsing. | ||
| This avoids loading entire 40MB+ files into memory for parsing. | ||
| """ | ||
| from rdflib import Graph, URIRef, Literal, RDF, XSD, DCTERMS, PROV | ||
| from rdflib import Namespace | ||
| import datetime | ||
| import uuid | ||
|
|
||
| filename = os.path.basename(filepath) | ||
| processed_filepath = filepath + ".processed.ttl" | ||
|
|
||
| try: | ||
| # Generate lightweight provenance (minimal RDF, no full graph parsing) | ||
| start_time = datetime.datetime.utcnow().isoformat() + "Z" | ||
| provenance_uuid = str(uuid.uuid4()) | ||
| BASE = Namespace("https://identifiers.org/brain-bican/vocab/") | ||
|
|
||
| prov_entity = URIRef(BASE[f"provenance/{provenance_uuid}"]) | ||
| ingestion_activity = URIRef(BASE[f"ingestionActivity/{provenance_uuid}"]) | ||
| user_uri = URIRef(BASE[f"agent/{user_id}"]) | ||
|
|
||
| # Create minimal provenance graph | ||
| prov_graph = Graph() | ||
| prov_graph.add((prov_entity, RDF.type, PROV.Entity)) | ||
| prov_graph.add((prov_entity, PROV.generatedAtTime, Literal(start_time, datatype=XSD.dateTime))) | ||
| prov_graph.add((prov_entity, PROV.wasAttributedTo, user_uri)) | ||
| prov_graph.add((prov_entity, PROV.wasGeneratedBy, ingestion_activity)) | ||
| prov_graph.add((ingestion_activity, RDF.type, PROV.Activity)) | ||
| prov_graph.add((ingestion_activity, RDF.type, BASE["IngestionActivity"])) | ||
| prov_graph.add((ingestion_activity, PROV.generatedAtTime, Literal(start_time, datatype=XSD.dateTime))) | ||
| prov_graph.add((ingestion_activity, PROV.wasAssociatedWith, user_uri)) | ||
| prov_graph.add((prov_entity, DCTERMS.provenance, Literal(f"Data ingested by {user_id} on {start_time}"))) | ||
|
|
||
| # Serialize provenance to Turtle | ||
| provenance_ttl = prov_graph.serialize(format="turtle") | ||
|
|
||
| # Stream original file and append provenance (memory efficient) | ||
| with open(processed_filepath, "w", encoding="utf-8") as out: | ||
| # Copy original file content | ||
| with open(filepath, "r", encoding="utf-8", errors="ignore") as inf: | ||
| # For very large files, copy in chunks | ||
| while True: | ||
| chunk = inf.read(10 * 1024 * 1024) # 10 MB chunks | ||
| if not chunk: | ||
| break | ||
| out.write(chunk) | ||
|
|
||
| # Append provenance at the end | ||
| out.write("\n\n# Provenance metadata\n") | ||
| out.write(provenance_ttl) | ||
|
|
||
| return processed_filepath, True | ||
|
|
||
| except Exception as e: | ||
| logger.warning(f"Failed lightweight provenance for {filename}: {e}. Using original file.", exc_info=True) | ||
| return filepath, False | ||
|
|
There was a problem hiding this comment.
The function _process_large_file_with_lightweight_provenance is defined here, but it doesn't appear to be called anywhere in the codebase included in this pull request. If it's intended for future use, it might be fine, but as it stands, it's dead code. Please either integrate it or remove it to keep the codebase clean.
| try: | ||
| with open(processed_filepath, "r", encoding="utf-8") as f: | ||
| file_data = f.read() | ||
| payload = file_data.encode("utf-8") | ||
| content_type = "text/turtle" | ||
| content_type = "text/turtle" if processed_filepath.endswith(".ttl") else get_content_type_for_ext(ext) | ||
|
|
||
| # Read file in chunks and combine (more memory efficient than reading all at once) | ||
| chunks = [] | ||
| with open(processed_filepath, "rb") as f: | ||
| while True: | ||
| chunk = f.read(16 * 1024 * 1024) # 16 MB chunks | ||
| if not chunk: | ||
| break | ||
| chunks.append(chunk) | ||
| payload = b"".join(chunks) | ||
|
|
||
| resp = await client.post( | ||
| url, | ||
| content=payload, | ||
| headers={"Content-Type": content_type}, | ||
| auth=auth, | ||
| ) |
There was a problem hiding this comment.
This optimization for large files is a good step, but it still loads the entire file content into memory with payload = b"".join(chunks) before making the HTTP request. For extremely large files, this can still cause memory issues. To achieve true streaming and minimize memory usage, you can pass an async generator to httpx.AsyncClient.post. This would stream the file from disk directly to the request body.
content_type = "text/turtle" if processed_filepath.endswith(".ttl") else get_content_type_for_ext(ext)
async def file_streamer():
with open(processed_filepath, "rb") as f:
while True:
chunk = f.read(16 * 1024 * 1024) # 16 MB chunks
if not chunk:
break
yield chunk
resp = await client.post(
url,
content=file_streamer(),
headers={"Content-Type": content_type},
auth=auth,
)| if job.get("current_stage"): | ||
| resp["stage_description"] = stage_descriptions.get( | ||
| job["current_stage"], | ||
| f"Current stage: {job['current_stage']}" | ||
| ) |
| # def attach_provenance(user: str, ttl_data: str) -> str: | ||
| # """ | ||
| # Attach the provenance information about the ingestion activity. Saying, we received this triple by X user on XXXX date. | ||
| # It appends provenance triples externally while keeping the original triples intact. | ||
| # | ||
| # Parameters: | ||
| # - user (str): The username of the person posting the data. | ||
| # - ttl_data (str): The existing Turtle (TTL) RDF data. | ||
| # | ||
| # Returns: | ||
| # - str: Combined RDF (Turtle format) containing original data and provenance metadata. | ||
| # """ | ||
| # # Validate input parameters | ||
| # if not isinstance(user, str) or not user.strip(): | ||
| # raise ValueError("User must be a non-empty string.") | ||
| # if not isinstance(ttl_data, str) or not ttl_data.strip(): | ||
| # raise ValueError("TTL data must be a non-empty string.") | ||
| # | ||
| # try: | ||
| # original_graph = Graph() | ||
| # original_graph.parse(data=ttl_data, format="turtle") | ||
| # except Exception as e: | ||
| # raise RuntimeError(f"Error parsing TTL data: {e}") | ||
| # | ||
| # try: | ||
| # BASE = extract_base_namespace(original_graph) | ||
| # except Exception as e: | ||
| # raise RuntimeError(f"Failed to extract base namespace: {e}") | ||
| # | ||
| # try: | ||
| # # Create provenance graph | ||
| # prov_graph = Graph() | ||
| # | ||
| # # Generate timestamps (ISO 8601 format, UTC) | ||
| # start_time = datetime.datetime.utcnow().isoformat() + "Z" | ||
| # | ||
| # # Generate a unique UUID for provenance entity | ||
| # provenance_uuid = str(uuid.uuid4()) | ||
| # prov_entity = URIRef(BASE[f"provenance/{provenance_uuid}"]) | ||
| # ingestion_activity = URIRef(BASE[f"ingestionActivity/{provenance_uuid}"]) | ||
| # user_uri = URIRef(BASE[f"agent/{user}"]) | ||
| # | ||
| # # Define provenance entity | ||
| # prov_graph.add((prov_entity, RDF.type, PROV.Entity)) | ||
| # prov_graph.add((prov_entity, PROV.generatedAtTime, Literal(start_time, datatype=XSD.dateTime))) | ||
| # prov_graph.add((prov_entity, PROV.wasAttributedTo, user_uri)) | ||
| # prov_graph.add((prov_entity, PROV.wasGeneratedBy, ingestion_activity)) | ||
| # | ||
| # # Define ingestion activity | ||
| # # here we say IngestionActivity is an activity of type prov:Activity | ||
| # prov_graph.add((ingestion_activity, RDF.type, PROV.Activity)) | ||
| # prov_graph.add((ingestion_activity, RDF.type, BASE["IngestionActivity"])) | ||
| # prov_graph.add((ingestion_activity, PROV.generatedAtTime, Literal(start_time, datatype=XSD.dateTime))) | ||
| # prov_graph.add((ingestion_activity, PROV.wasAssociatedWith, user_uri)) | ||
| # | ||
| # # Attach provenance to original triples | ||
| # # OPTIMIZATION: Use set to avoid duplicate checks and limit entities for performance | ||
| # # Adaptive limit based on graph size to balance performance vs completeness | ||
| # graph_size = len(original_graph) | ||
| # if graph_size > 100000: # Very large graphs (>100k triples) | ||
| # max_entities = 500 # Limit more aggressively | ||
| # elif graph_size > 50000: # Large graphs (50k-100k triples) | ||
| # max_entities = 750 | ||
| # else: # Medium/small graphs (<50k triples) | ||
| # max_entities = 1000 # Can process more entities | ||
| # | ||
| # entity_count = 0 | ||
| # seen_entities = set() | ||
| # | ||
| # for entity in original_graph.subjects(): | ||
| # if entity_count >= max_entities: | ||
| # break | ||
| # if isinstance(entity, URIRef) and entity not in seen_entities: | ||
| # seen_entities.add(entity) | ||
| # prov_graph.add((ingestion_activity, PROV.wasAssociatedWith, entity)) | ||
| # entity_count += 1 | ||
| # | ||
| # # add a Dublin Core provenance statement -- this is the new addition to say it's ingested by user | ||
| # prov_graph.add((prov_entity, DCTERMS.provenance, Literal(f"Data ingested by {user} on {start_time}"))) | ||
| # | ||
| # # Combine both graphs (original + provenance) so that we have new provenance information attached. | ||
| # final_graph = original_graph + prov_graph | ||
| # | ||
| # return final_graph.serialize(format="turtle") | ||
| # except Exception as e: | ||
| # raise RuntimeError(f"Error generating provenance RDF: {e}") | ||
|
|
|
You may notice additional changes in this PR. These are due to other PRs that have not yet been merged. For this review, please focus only on the design document https://github.com/sensein/BrainKB/blob/design_docs/docs/design_docs/PI-Grant-Skills-Research-Design.md. |
|
@tekrajchhetri - could you please open a new PR with the design doc only? |
|
@djarecka It will be the same unless the other PR is merged as it's created from other working branch not main. Let me create a new branch from main and do that. |
|
let's try to create new branches from also, this PR contained more than the design doc, and PR #59 , so perhaps you ave more chnage that want to commit |
This PR contains the high-level design docs for "Construction of Enriched Research Knowledge Graphs and an Interactive User Interface for Project Grants and Research Findings"