In [None]:
!pip install parquetdb
!pip install ipykernel

# 03 - Graph Generators in ParquetGraphDB

In this notebook, we'll learn how to:

1. Create node generator
2. Add the node generator to the graph
3. Create edge generator
4. Add the edge generator to the graph
5. Defining dependencies between generators

We'll use the `ParquetGraphDB` class from `parquetdb` to demonstrate these features. If you haven't already installed `parquetdb`, run the previous cell.


## Example Scenario: Modeling Materials Data

Let's explore how `parquetdb` generators can build and maintain a graph using a materials science scenario. Materials, at their core, are described by their **structure** and the **chemical elements** they contain (their **composition**).

We can represent this information effectively using a **heterograph**:

* Nodes representing **Materials** (like $H_2O$ or $Fe$).
* Nodes representing **Elements** (like $H$, $O$, $Fe$).
* Edges showing which **Elements** make up which **Materials**.

The real power of generators becomes apparent when considering how this data originates and evolves. Material definitions might come from external files or databases, and the properties of elements might be sourced separately.

Generators provide a robust mechanism to:
* **Ingest and process** this source data into graph nodes and edges.
* **Establish dependencies**. For instance, the creation of material-element edges depends on both Material and Element nodes existing first.
* **Automate updates**. If the input file defining materials changes, or if an element's properties are updated in its source, generators allow `parquetdb` to potentially rebuild the affected parts of the graph automatically, ensuring consistency.

We'll now set up this example, starting with the data sources for elements and materials.

## Setup

In [2]:
import os
import shutil
import requests
import io
from pathlib import Path

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

def download_url(url,save_path):
    # Download the parquet file
    response = requests.get(url)
    if response.status_code == 200:
        # Load the parquet file into a pandas DataFrame
        parquet_file = io.BytesIO(response.content)
        periodic_table = pq.read_table(parquet_file)
        print(f"Downloaded periodic table data with {len(periodic_table)} elements")
    else:
        raise "Could not download data"

    pq.write_table(periodic_table, save_path)
    
    
FILE_DIR = Path(".")
DATA_DIR = FILE_DIR / "data"

if DATA_DIR.exists():
    shutil.rmtree(DATA_DIR)
    
DATA_DIR.mkdir(parents=True, exist_ok=True)

# URL to the raw data file in the GitHub repository
elements_url = "https://github.com/lllangWV/ParquetDB/raw/GraphDB/tests/graph/data/interim_periodic_table_values.parquet"
materials_url = "https://github.com/lllangWV/ParquetDB/raw/GraphDB/tests/graph/data/materials/materials_0.parquet"

elements_file = DATA_DIR / "elements.parquet"
materials_file = DATA_DIR / "materials.parquet"

download_url(elements_url,elements_file)
download_url(materials_url,materials_file)


Downloaded periodic table data with 118 elements
Downloaded periodic table data with 1000 elements


In [3]:
elements_table = pq.read_table(elements_file)
materials_table = pq.read_table(materials_file)
print(elements_table)
print(materials_table)

pyarrow.Table
long_name: string
symbol: string
abundance_universe: double
abundance_solar: double
abundance_meteor: double
abundance_crust: double
abundance_ocean: double
abundance_human: double
adiabatic_index: string
allotropes: string
appearance: string
atomic_mass: double
atomic_number: int64
block: string
boiling_point: double
classifications_cas_number: string
classifications_cid_number: string
classifications_rtecs_number: string
classifications_dot_numbers: string
classifications_dot_hazard_class: double
conductivity_thermal: double
cpk_hex: string
critical_pressure: double
critical_temperature: double
crystal_structure: string
density_stp: double
discovered_year: int64
discovered_by: string
discovered_location: string
electron_affinity: double
electron_configuration: string
electron_configuration_semantic: string
electronegativity_pauling: double
energy_levels: string
gas_phase: string
group: int64
extended_group: int64
half_life: string
heat_specific: double
heat_vaporization

Next, we can load the materials data them into `ParquetGraphDB`.

In [4]:
from parquetdb import ParquetGraphDB

# Create a temporary directory for our database
GRAPH_DB_DIR = DATA_DIR / "GraphDB"
if GRAPH_DB_DIR.exists():
    shutil.rmtree(GRAPH_DB_DIR)
GRAPH_DB_DIR.mkdir(parents=True, exist_ok=True)


# Initialize ParquetGraphDB
db = ParquetGraphDB(storage_path=GRAPH_DB_DIR)

# The data has an previous id column, we have to remove it
data = pq.read_table(materials_file)
data = data.drop_columns("id")
db.add_nodes(node_type="material", data=data)

print(db.summary(show_column_names=True))


GRAPH DATABASE SUMMARY
Name: GraphDB
Storage path: data\GraphDB
└── Repository structure:
    ├── nodes/                 (data\GraphDB\nodes)
    ├── edges/                 (data\GraphDB\edges)
    ├── edge_generators/       (data\GraphDB\edge_generators)
    ├── node_generators/       (data\GraphDB\node_generators)
    └── graph/                 (data\GraphDB\graph)

############################################################
NODE DETAILS
############################################################
Total node types: 1
------------------------------------------------------------
• Node type: material
  - Number of nodes: 1000
  - Number of features: 136
  - Columns:
       - bonding.cutoff_method.bond_connections
       - bonding.electric_consistent.bond_connections
       - bonding.electric_consistent.bond_orders
       - bonding.geometric_consistent.bond_connections
       - bonding.geometric_consistent.bond_orders
       - bonding.geometric_electric_consistent.bond_connections
    

## Generators

A **Generator** is a callable (function) that returns a [PyArrow Table](https://arrow.apache.org/docs/python/api/table.html) of either nodes or edges. By adding a generator to `ParquetGraphDB`, you can:

1. Register the generator, so it can be re-run on demand.
2. Optionally specify arguments/kwargs to pass into the generator.
3. Automatically store the output in a **NodeStore** or **EdgeStore** with the same name as the generator function (or a custom name, if you prefer).

This is especially handy for generating nodes from external data sources or from computational routines.

In the following sections we will create custom node and edge generators. These can be create by wrapping existing functions with the `node_generator` or `edge_generator` decorators.

These can be imported like:

```python
from parquetdb import node_generator, edge_generator
```

### Element Node Generator


#### 1. Define the Generator

In our first example, we will create a node generator that creates element nodes. 

As mentioned above to create a node generator, we will wrap an existing function with the `node_generator` decorator. The function name will be the name of the node type.

```python
@node_generator
def element():
    ...
```

For this example, we will import an periodic table data. This is a dataframe with 118 rows representing 118 elements of the periodic table. We have also added some transformations to the data to make it more useful for our purposes.

In [5]:
### Element Node Generator
from parquetdb import node_generator

# Define the generator with the @node_generator decorator
@node_generator
def element(base_file=elements_file):
    """
    Creates Element nodes from a local file (CSV or Parquet).
    Returns a Pandas DataFrame (or PyArrow Table) with one row per element.
    """

    try:
        # Read the file
        file_ext = os.path.splitext(base_file)[-1][
            1:
        ].lower()  # e.g. "parquet" or "csv"
        if file_ext == "parquet":
            df = pd.read_parquet(base_file)
        elif file_ext == "csv":
            df = pd.read_csv(base_file)
        else:
            raise ValueError("base_file must be a parquet or csv file")

        # Apply some transformations
        # Example transformations
        df["oxidation_states"] = df["oxidation_states"].apply(
            lambda x: x.replace("]", "").replace("[", "")
        )
        df["oxidation_states"] = df["oxidation_states"].apply(
            lambda x: ",".join(x.split())
        )
        df["oxidation_states"] = df["oxidation_states"].apply(
            lambda x: eval("[" + x + "]")
        )
        df["experimental_oxidation_states"] = df["experimental_oxidation_states"].apply(
            lambda x: eval(x)
        )
        df["ionization_energies"] = df["ionization_energies"].apply(lambda x: eval(x))

    except Exception as e:
        print(f"Error reading element file: {e}")
        return None

    return df  # Return the transformed dataframe


df = element()

print(df)

       long_name symbol  abundance_universe  abundance_solar  \
0       Hydrogen      H        7.500000e+01     7.500000e+01   
1         Helium     He        2.300000e+01     2.300000e+01   
2        Lithium     Li        6.000000e-07     6.000000e-09   
3      Beryllium     Be        1.000000e-07     1.000000e-08   
4          Boron      B        1.000000e-07     2.000000e-07   
..           ...    ...                 ...              ...   
113    Flerovium     Fl        0.000000e+00     0.000000e+00   
114    Moscovium     Mc        0.000000e+00     0.000000e+00   
115  Livermorium     Lv        0.000000e+00     0.000000e+00   
116   Tennessine     Ts        0.000000e+00     0.000000e+00   
117    Oganesson     Og        0.000000e+00     0.000000e+00   

     abundance_meteor  abundance_crust  abundance_ocean  abundance_human  \
0            2.400000     1.500000e-01     1.100000e+01     1.000000e+01   
1                 NaN     5.500000e-07     7.200000e-10              NaN   
2  

#### 2. Add the Generator to the ParquetGraphDB

Now that we have defined the generator, we can add it to the `ParquetGraphDB` instance. We do this by calling the `add_node_generator` method. Here we give the function, the arguments, and the kwargs. We also have the option to run the generator immediately or later. Default is True.

The node generator will be stored in the `node_generator_store` of the `ParquetGraphDB` instance.



In [6]:
db.add_node_generator(
    generator_func=element,
    generator_args={},
    generator_kwargs={"base_file": elements_file},
    run_immediately=False,  # We have the option to run the generator immediately or later. Default is True.
)

# Check the node generators in the MatGraphDB

print(db.node_generator_store)

GENERATOR STORE SUMMARY
• Number of generators: 1
Storage path: data\GraphDB\node_generators


############################################################
METADATA
############################################################
• class: GeneratorStore
• class_module: parquetdb.graph.generator_store

############################################################
GENERATOR DETAILS
############################################################
• Columns:
    - generator_func
    - generator_kwargs.base_file
    - generator_name
    - id

• Generator names:
    - element



### Running a Node Generator Later

Now we can run the node generator with `db.run_node_generator(generator_name)`.

> `Note:` Here we run the node generator. Notice how we do not need pass the arguments or kwargs, this information is stored in the node generator store.
> However, we can override the arguments or kwargs if we want to.

In [7]:
table = db.run_node_generator("element")

Lets check the node store for the elements.


In [8]:
element_node_store = db.get_node_store("element")
print(element_node_store)

NODE STORE SUMMARY
Node type: element
• Number of nodes: 118
• Number of features: 99
Storage path: data\GraphDB\nodes\element


############################################################
METADATA
############################################################
• class: NodeStore
• class_module: parquetdb.graph.nodes
• node_type: element
• name_column: id

############################################################
NODE DETAILS
############################################################



### Material-Element Edge Generator

#### 1. Define the Generator

An **edge generator** is similar to a node generator but returns a PyArrow Table describing edges. Each generated edge must have at least these fields:

- `source_id` (int)
- `source_type` (string)
- `target_id` (int)
- `target_type` (string)

Additionally, edge_generators must have the corresponding node_stores in the `ParquetGraphDB` instance as an argument. This is to ensure that the ids of the nodes are valid and in the correct node store. 

For edges we use the `edge_generator` decorator.


In [9]:
from parquetdb import edge_generator
import pyarrow as pa


@edge_generator
def material_element_has(
    material_store, element_store
):  # We have the material_store and element_store as an argument
    try:
        connection_name = "has"

        # We select only the necessary columns from the node stores
        material_table = material_store.read_nodes(
            columns=["id", "core.material_id", "core.elements"]
        )
        element_table = element_store.read_nodes(columns=["id", "symbol"])

        # We rename for utility purposes
        material_table = material_table.rename_columns(
            {"id": "source_id", "core.material_id": "material_name"}
        )
        material_table = material_table.append_column(
            "source_type", pa.array(["material"] * material_table.num_rows)
        )

        element_table = element_table.rename_columns({"id": "target_id"})
        element_table = element_table.append_column(
            "target_type", pa.array(["elements"] * element_table.num_rows)
        )

        # We convert the tables to pandas for easier manipulation
        material_df = material_table.to_pandas()
        element_df = element_table.to_pandas()

        # We create a map of the element symbols to the target_id for quick lookup
        element_target_id_map = {
            row["symbol"]: row["target_id"] for _, row in element_df.iterrows()
        }

        # We create a dictionary to store the edge data
        table_dict = {
            "source_id": [],
            "source_type": [],
            "target_id": [],
            "target_type": [],
            "edge_type": [],
            "name": [],
            "weight": [],
        }

        # We iterate over the material nodes
        for _, row in material_df.iterrows():
            # We get the elements composing the material
            elements = row["core.elements"]
            source_id = row["source_id"]
            material_name = row["material_name"]
            if elements is None:
                continue

            # We iterate over the elements
            for element in elements:
                # We get the target_id for the element
                target_id = element_target_id_map[element]

                # We append the edge data to the dictionary. Here we could also define the reverse edge as well.
                table_dict["source_id"].append(source_id)
                table_dict["source_type"].append(material_store.node_type)
                table_dict["target_id"].append(target_id)
                table_dict["target_type"].append(element_store.node_type)
                table_dict["edge_type"].append(connection_name)

                name = f"{material_name}_{connection_name}_{element}"
                table_dict["name"].append(name)
                table_dict["weight"].append(1.0)

        df = pd.DataFrame(table_dict)
    except Exception as e:
        print(f"Error creating material-element-has relationships: {e}")

    return df

#### 2. Add the Generator to the ParquetGraphDB

Now that we have defined the generator, we can add it to the `ParquetGraphDB` instance. We do this by calling the `add_edge_generator` method.

The edge generator will be stored in the `edge_generator_store` of the `ParquetGraphDB` instance.


In [10]:
element_store = db.get_node_store("element")
material_store = db.get_node_store("material")

db.add_edge_generator(
    generator_func=material_element_has,
    generator_args={
        "material_store": material_store,
        "element_store": element_store,
    },
    generator_kwargs={},
    run_immediately=True,
)

Lets check the edge generator store.

In [11]:
print(db.edge_generator_store)

GENERATOR STORE SUMMARY
• Number of generators: 1
Storage path: data\GraphDB\edge_generators


############################################################
METADATA
############################################################
• class: GeneratorStore
• class_module: parquetdb.graph.generator_store

############################################################
GENERATOR DETAILS
############################################################
• Columns:
    - generator_args.element_store
    - generator_args.material_store
    - generator_func
    - generator_name
    - id

• Generator names:
    - material_element_has



Let's check to see if the edge created the edges in the edge store.

In [12]:
edge_store = db.get_edge_store("material_element_has")
print(edge_store)

EDGE STORE SUMMARY
Edge type: material_element_has
• Number of edges: 3348
• Number of features: 8
Storage path: data\GraphDB\edges\material_element_has


############################################################
METADATA
############################################################
• class: EdgeStore
• class_module: parquetdb.graph.edges

############################################################
EDGE DETAILS
############################################################



### Updates to node stores.

By default, when node and edge generators are added their argument store dependencies are added to the `ParquetGraphDB` instance. This means that when parent stores are updated, the geneator will run and update their corresponding stores.

These stores are stored in the `ParquetGraphDB/generator_dependency.json` file.


In [14]:
materials_df = db.read_nodes(node_type="material", columns=["id"], ids=[0]).to_pandas()
print(materials_df)

db.delete_nodes(node_type="material",ids=[0])

materials_df = db.read_nodes(node_type="material", columns=["id"], ids=[0]).to_pandas()
print(materials_df)

   id
0   0
Empty DataFrame
Columns: [id]
Index: []


As you can see the material node with `id=0` is now gone.

Let's check the `material_element_has` edges to see if the update has been propagated

In [15]:
edge_store = db.get_edge_store("material_element_has")
print(edge_store)

EDGE STORE SUMMARY
Edge type: material_element_has
• Number of edges: 3345
• Number of features: 8
Storage path: data\GraphDB\edges\material_element_has


############################################################
METADATA
############################################################
• class: EdgeStore
• class_module: parquetdb.graph.edges

############################################################
EDGE DETAILS
############################################################



Now there are 3345 `material_element_has` edges which has reduced from 3348 from before the deletion

Let's check the `material_element_has` dataframe.

In [16]:
df = edge_store.read_edges().to_pandas()
print(df)

     edge_type    id               name  source_id source_type  target_id  \
0          has     0   mp-1222351_has_F          1    material          8   
1          has     1  mp-1222351_has_Fe          1    material         25   
2          has     2  mp-1222351_has_Li          1    material          2   
3          has     3    mp-651087_has_F          2    material          8   
4          has     4   mp-651087_has_Gd          2    material         63   
...        ...   ...                ...        ...         ...        ...   
3340       has  3340  mp-2714707_has_Al        999    material         12   
3341       has  3341  mp-2714707_has_Na        999    material         10   
3342       has  3342   mp-2714707_has_O        999    material          7   
3343       has  3343   mp-2714707_has_S        999    material         15   
3344       has  3344  mp-2714707_has_Zn        999    material         29   

     target_type  weight  
0        element     1.0  
1        element     

As you can see, the `source_id` does not have and `id=0`.

We can double check this with the following:

In [20]:
df[df["source_type"] == 0]

Unnamed: 0,edge_type,id,name,source_id,source_type,target_id,target_type,weight


This is empty as we should expect.

In [21]:
print(db)

GRAPH DATABASE SUMMARY
Name: GraphDB
Storage path: data\GraphDB
└── Repository structure:
    ├── nodes/                 (data\GraphDB\nodes)
    ├── edges/                 (data\GraphDB\edges)
    ├── edge_generators/       (data\GraphDB\edge_generators)
    ├── node_generators/       (data\GraphDB\node_generators)
    └── graph/                 (data\GraphDB\graph)

############################################################
NODE DETAILS
############################################################
Total node types: 2
------------------------------------------------------------
• Node type: material
  - Number of nodes: 999
  - Number of features: 136
  - db_path: data\GraphDB\nodes\material
------------------------------------------------------------
• Node type: element
  - Number of nodes: 118
  - Number of features: 99
  - db_path: data\GraphDB\nodes\element
------------------------------------------------------------

############################################################


## 6. Summary

In this notebook, we showed how to define custom node and edge generators and showed how to run them.