# Apache AGE


## Deploy:


```
$ docker run \
    --name age \
    -p 5455:5432 \
    -e POSTGRES_USER=postgresUser \
    -e POSTGRES_PASSWORD=postgresPW \
    -e POSTGRES_DB=postgresDB \
    apache/age
    
$ docker exec -it age psql -U postgresUser -d postgres
> CREATE EXTENSION IF NOT EXISTS age;
> LOAD 'age';
> SET search_path = ag_catalog, "$user", public;

ou 

> ALTER SYSTEM SET session_preload_libraries = 'age'; (por padrão)
> SELECT pg_reload_conf();
> SHOW session_preload_libraries;
```


## Observações:
 * Não tem integração com o Spark https://github.com/apache/age/issues/1122
 * Usa Cipher;
 * Mesmo usando o protocolo JDBC, não é possível carregar vértices e arestas;

## Python API

In [1]:
! pip install psycopg2-binary antlr4-python3-runtime apache-age-python
# Último update em  22 de set. de 2023

Defaulting to user installation because normal site-packages is not writeable
Collecting antlr4-python3-runtime
  Downloading antlr4_python3_runtime-4.13.2-py3-none-any.whl.metadata (304 bytes)
Collecting apache-age-python
  Downloading apache_age_python-0.0.7-py3-none-any.whl.metadata (4.8 kB)
Collecting antlr4-python3-runtime
  Downloading antlr4_python3_runtime-4.11.1-py3-none-any.whl.metadata (291 bytes)
Downloading apache_age_python-0.0.7-py3-none-any.whl (25 kB)
Downloading antlr4_python3_runtime-4.11.1-py3-none-any.whl (144 kB)
Installing collected packages: antlr4-python3-runtime, apache-age-python
Successfully installed antlr4-python3-runtime-4.11.1 apache-age-python-0.0.7

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [3]:
import age
import psycopg2

# Configuração de conexão
GRAPH_NAME = "exemplo_grafo"
CONFIG = {
    "host": "localhost",
    "port": "5455",
    "dbname": "postgresDB", 
    "user": "postgresUser",
    "password": "postgresPW"
}

def conectar_age():
    """Estabelece conexão com Apache AGE"""
    try:
        # Conectar usando age driver
        connection = age.connect(
            graph=GRAPH_NAME,
            dsn=f"host={CONFIG['host']} port={CONFIG['port']} dbname={CONFIG['dbname']} user={CONFIG['user']} password={CONFIG['password']}"
        )
        
        return connection
    
    except Exception as e:
        print(f"Erro na conexão: {e}")
        return None

# Estabelecer conexão
conn = conectar_age()


In [31]:
# Criar vértices (pessoas)
pessoas = [
    "CREATE (p:Pessoa {nome: 'Joao Silva', idade: 35, profissao: 'Engenheiro'}) RETURN p",
    "CREATE (p:Pessoa {nome: 'Maria Santos', idade: 29, profissao: 'Designer'}) RETURN p", 
    "CREATE (p:Pessoa {nome: 'Carlos Lima', idade: 42, profissao: 'Gerente'}) RETURN p",
    "CREATE (p:Pessoa {nome: 'Ana Costa', idade: 31, profissao: 'Analista'}) RETURN p"
]

# Executar queries de criação de pessoas
for query in pessoas:
    cursor = conn.execCypher(query)
    for row in cursor:
        print(f"Pessoa criada: {row[0]}")


Pessoa criada: {label:Pessoa, id:1688849860263937, properties:{nome: Joao Silva, idade: 35, profissao: Engenheiro, }}::VERTEX
Pessoa criada: {label:Pessoa, id:1688849860263938, properties:{nome: Maria Santos, idade: 29, profissao: Designer, }}::VERTEX
Pessoa criada: {label:Pessoa, id:1688849860263939, properties:{nome: Carlos Lima, idade: 42, profissao: Gerente, }}::VERTEX
Pessoa criada: {label:Pessoa, id:1688849860263940, properties:{nome: Ana Costa, idade: 31, profissao: Analista, }}::VERTEX


In [32]:

# Criar vértices (empresas)
empresas = [
    "CREATE (e:Empresa {nome: 'TechCorp', setor: 'Tecnologia', funcionarios: 500}) RETURN e",
    "CREATE (e:Empresa {nome: 'DesignStudio', setor: 'Design', funcionarios: 50}) RETURN e",
    "CREATE (e:Empresa {nome: 'ConsultingGroup', setor: 'Consultoria', funcionarios: 200}) RETURN e"
]

for query in empresas:
    cursor = conn.execCypher(query)
    for row in cursor:
        print(f"Empresa criada: {row[0]}")


Empresa criada: {label:Empresa, id:1970324836974593, properties:{nome: TechCorp, setor: Tecnologia, funcionarios: 500, }}::VERTEX
Empresa criada: {label:Empresa, id:1970324836974594, properties:{nome: DesignStudio, setor: Design, funcionarios: 50, }}::VERTEX
Empresa criada: {label:Empresa, id:1970324836974595, properties:{nome: ConsultingGroup, setor: Consultoria, funcionarios: 200, }}::VERTEX


In [33]:
# Criar relacionamentos
relacionamentos = [
    "MATCH (p:Pessoa {nome: 'Joao Silva'}), (e:Empresa {nome: 'TechCorp'}) CREATE (p)-[:TRABALHA_EM {cargo: 'Engenheiro Senior', salario: 8500}]->(e)",
    "MATCH (p:Pessoa {nome: 'Maria Santos'}), (e:Empresa {nome: 'DesignStudio'}) CREATE (p)-[:TRABALHA_EM {cargo: 'Designer Pleno', salario: 6000}]->(e)",
    "MATCH (p:Pessoa {nome: 'Carlos Lima'}), (e:Empresa {nome: 'ConsultingGroup'}) CREATE (p)-[:TRABALHA_EM {cargo: 'Gerente', salario: 12000}]->(e)",
    "MATCH (p:Pessoa {nome: 'Ana Costa'}), (e:Empresa {nome: 'TechCorp'}) CREATE (p)-[:TRABALHA_EM {cargo: 'Analista', salario: 4500}]->(e)"
]

for query in relacionamentos:
    cursor = conn.execCypher(query)
    print("Relacionamento criado")

Relacionamento criado
Relacionamento criado
Relacionamento criado
Relacionamento criado


In [49]:
# Relacionamentos pessoais
amizades = [
    "MATCH (p1:Pessoa {nome: 'Joao Silva'}), (p2:Pessoa {nome: 'Maria Santos'}) CREATE (p1)-[:CONHECE {proximidade: 'amigo_proximo'}]->(p2)",
    "MATCH (p1:Pessoa {nome: 'Maria Santos'}), (p2:Pessoa {nome: 'Carlos Lima'}) CREATE (p1)-[:CONHECE {proximidade: 'conhecido'}]->(p2)"
]

for query in amizades:
    cursor = conn.execCypher(query)
    print("Amizade criada")


Amizade criada
Amizade criada


In [34]:

# Commit das mudanças
conn.commit()
print("Grafo criado com sucesso!")
        

Grafo criado com sucesso!


In [15]:
ag = conectar_age()

with ag.connection.cursor() as cursor:

    query = """
    SELECT * FROM cypher('exemplo_grafo', $$
        MATCH (p:Pessoa) 
        RETURN p.nome, p.idade, p.profissao
    $$) AS (nome agtype, idade agtype, profissao agtype);
    """

    cursor.execute(query)
    results = cursor.fetchall()

    print("=== PESSOAS ENCONTRADAS ===")
    for row in results:
        print(f"Nome: {row[0]}, Idade: {row[1]}, Profissão: {row[2]}")



=== PESSOAS ENCONTRADAS ===
Nome: Joao Silva, Idade: 35, Profissão: Engenheiro
Nome: Maria Santos, Idade: 29, Profissão: Designer
Nome: Carlos Lima, Idade: 42, Profissão: Gerente
Nome: Ana Costa, Idade: 31, Profissão: Analista


In [50]:
ag = conectar_age()
    
# Testes progressivos
tests = [
    {
        'nome': 'Teste básico do grafo',
        'query': "SELECT * FROM ag_graph WHERE name = 'exemplo_grafo';"
    },
    {
        'nome': 'Count simples',
        'query': """
        SELECT * FROM cypher('exemplo_grafo', $$
            MATCH (n) 
            RETURN count(n)
        $$) AS (total agtype);
        """
    },
    {
        'nome': 'Retorna vértices completos',
        'query': """
        SELECT * FROM cypher('exemplo_grafo', $$
            MATCH (p:Pessoa) 
            RETURN p
        $$) AS (pessoa agtype);
        """
    },
    {
        'nome': 'Uma propriedade específica',
        'query': """
        SELECT * FROM cypher('exemplo_grafo', $$
            MATCH (p:Pessoa) 
            RETURN p.nome
        $$) AS (nome agtype);
        """
    },
    {
        'nome': 'Múltiplas propriedades',
        'query': """
        SELECT * FROM cypher('exemplo_grafo', $$
            MATCH (p:Pessoa) 
            RETURN p.nome, p.idade, p.profissao
        $$) AS (nome agtype, idade agtype, profissao agtype);
        """
    }
]

with ag.connection.cursor() as cursor:
    for test in tests:
        print(f"\n=== {test['nome'].upper()} ===")
        try:
            cursor.execute(test['query'])
            results = cursor.fetchall()

            print(f"Sucesso: {len(results)} resultado(s)")
            for i, row in enumerate(results[:3]):  # Mostrar até 3 resultados
                print(f"  {i+1}: {row}")

        except Exception as e:
            print(f"Erro: {e}")




=== TESTE BÁSICO DO GRAFO ===
Sucesso: 1 resultado(s)
  1: (16973, 'exemplo_grafo', 'exemplo_grafo')

=== COUNT SIMPLES ===
Sucesso: 1 resultado(s)
  1: (7,)

=== RETORNA VÉRTICES COMPLETOS ===
Sucesso: 4 resultado(s)
  1: ({label:Pessoa, id:1688849860263937, properties:{nome: Joao Silva, idade: 35, profissao: Engenheiro, }}::VERTEX,)
  2: ({label:Pessoa, id:1688849860263938, properties:{nome: Maria Santos, idade: 29, profissao: Designer, }}::VERTEX,)
  3: ({label:Pessoa, id:1688849860263939, properties:{nome: Carlos Lima, idade: 42, profissao: Gerente, }}::VERTEX,)

=== UMA PROPRIEDADE ESPECÍFICA ===
Sucesso: 4 resultado(s)
  1: ('Joao Silva',)
  2: ('Maria Santos',)
  3: ('Carlos Lima',)

=== MÚLTIPLAS PROPRIEDADES ===
Sucesso: 4 resultado(s)
  1: ('Joao Silva', 35, 'Engenheiro')
  2: ('Maria Santos', 29, 'Designer')
  3: ('Carlos Lima', 42, 'Gerente')


## Pyspark


Atualmente não tem como executar consultas ou ler/escrever vertices arestas.

ISSUE: https://github.com/apache/age/issues/1122
        
        
Mesmo usando o protocolo JDBC, não é possível, pois a cada sessão é necessário fazer o LOAD do age.

https://age.apache.org/age-manual/master/intro/setup.html#post-installation-setup



In [1]:
from pyspark.sql import SparkSession


    
spark = SparkSession.builder \
    .appName("AGE-Spark") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()



25/09/10 10:50:52 WARN Utils: Your hostname, lucasmsp-Inspiron-7580 resolves to a loopback address: 127.0.1.1; using 192.168.15.13 instead (on interface wlp3s0)
25/09/10 10:50:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/spark-3.3.0/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/lucasmsp/.ivy2/cache
The jars for the packages stored in: /home/lucasmsp/.ivy2/jars
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a2564b4a-dbff-4d94-801c-251f6210409b;1.0
	confs: [default]
	found org.postgresql#postgresql;42.6.0 in central
	found org.checkerframework#checker-qual;3.31.0 in central
:: resolution report :: resolve 95ms :: artifacts dl 4ms
	:: modules in use:
	org.checkerframework#checker-qual;3.31.0 from central in [default]
	org.postgresql#postgresql;42.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   2   |   0   |   0   |   0   ||   2   |   0   |
	----------------------------------------

25/09/10 10:50:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/09/10 10:50:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [19]:
jdbc_url = "jdbc:postgresql://localhost:5455/postgres"

query = """
    SELECT * FROM cypher('exemplo_grafo', $$
        MATCH (p:Pessoa) 
        RETURN p.nome, p.idade, p.profissao
    $$) AS (nome agtype, idade agtype, profissao agtype)"""

vertices_df = spark.read \
    .format("jdbc") \
    .option("url", jdbc_url) \
    .option("query", query) \
    .option("user", "postgresUser") \
    .option("password", "postgresPW") \
    .option("driver", "org.postgresql.Driver") \
    .load()

Py4JJavaError: An error occurred while calling o113.load.
: org.postgresql.util.PSQLException: ERROR: function cypher(unknown, unknown) does not exist
  Dica: No function matches the given name and argument types. You might need to add explicit type casts.
  Posição: 35
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2713)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2401)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:368)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:498)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:415)
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
	at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:134)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:68)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:242)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:37)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
