Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,6 @@ orchestrator/prisma/.db*
.infisical.json
.aptos
migration_lock.toml

# Keys and sensitive data
keys/
Empty file added aptos/aptos
Empty file.
Binary file added docs/diagrams/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/diagrams/sequence_diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
214 changes: 136 additions & 78 deletions orchestrator/src/indexer/aptos.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import env from "@/env";
import { log } from "@/logger";
import { fetchTwitterData } from "@/services/twitter";
import type { AptosBlockMetadataTransaction, AptosTransactionData, ProcessedRequestAdded } from "@/types";
import { decodeNotifyValue } from "@/util";
import { Account, Aptos, AptosConfig, Ed25519PrivateKey, Network } from "@aptos-labs/ts-sdk";
Expand All @@ -26,15 +27,21 @@ export default class AptosIndexer extends Indexer {
this.account = account;
this.lastTxVersion = 0;
log.info(`Aptos Indexer initialized`);
log.info(`Chain ID: ${this.getChainId()} \n\t\tOrchestrator Oracle Node Address: ${this.orchestrator}`);
log.info(
`Chain ID: ${this.getChainId()} \n\t\tOracle Address: ${this.oracleAddress}\n\t\tAccount Address: ${account.accountAddress.toString()}`,
);
}

getChainId(): string {
return `APTOS-${this.chainId}`;
}

/**
* Returns the RPC URL for the Aptos network.
* Using official Aptos Labs endpoint for better reliability.
*/
getRpcUrl(): string {
return `https://aptos-${this.chainId === Network.TESTNET ? "testnet" : "mainnet"}.nodit.io`;
return `https://fullnode.${this.chainId === Network.TESTNET ? "testnet" : "mainnet"}.aptoslabs.com/v1`;
}

/**
Expand All @@ -48,15 +55,10 @@ export default class AptosIndexer extends Indexer {
* @returns {Promise<AptosBlockMetadataTransaction[]>} A promise that resolves to an array of Aptos block metadata transactions.
*/
async fetchTransactionList(transactionIDs: number[]): Promise<AptosBlockMetadataTransaction[]> {
// Could be optimized by using Promise.settled preventing a single failed request to fail the entire operation
return await Promise.all(
transactionIDs.map(async (transaction) => {
const response: AptosBlockMetadataTransaction = (
await axios.get(`${this.getRpcUrl()}/v1/transactions/by_version/${transaction}`, {
headers: {
"X-API-KEY": env.aptos.noditKey,
},
})
await axios.get(`${this.getRpcUrl()}/transactions/by_version/${transaction}`)
).data;
return response;
}),
Expand All @@ -77,7 +79,11 @@ export default class AptosIndexer extends Indexer {
*/
async fetchRequestAddedEvents(cursor: null | number | string = null): Promise<ProcessedRequestAdded<any>[]> {
try {
const endpoint = `${this.getRpcUrl()}/${env.aptos.noditKey}/v1/graphql`;
const client = new GraphQLClient("https://indexer-testnet.staging.gcp.aptosdev.com/v1/graphql", {
headers: {
"Content-Type": "application/json",
},
});

const document = gql`
query Account_transactions ($version: bigint!,$address: String!) {
Expand All @@ -102,13 +108,9 @@ export default class AptosIndexer extends Indexer {
internalCursor = this.lastTxVersion;
}

const client = new GraphQLClient(endpoint);
const gqlData: AptosTransactionData = await client.request({
document,
variables: {
version: internalCursor,
address: this.oracleAddress.toLowerCase(),
},
const gqlData: AptosTransactionData = await client.request(document, {
version: internalCursor,
address: this.oracleAddress.toLowerCase(),
});

if (gqlData.account_transactions.length === 0) {
Expand Down Expand Up @@ -186,77 +188,133 @@ export default class AptosIndexer extends Indexer {
* @returns {Promise<any>} - The receipt of the transaction.
*/
async sendFulfillment(data: ProcessedRequestAdded<any>, status: number, result: string) {
// Set up the Aptos client
const aptosConfig = new AptosConfig({ network: Network.TESTNET });
const aptos = new Aptos(aptosConfig);
const view_request = await aptos.view({
payload: {
function: `${this.oracleAddress}::oracles::get_response_status`,
functionArguments: [data.request_id],
},
});
if (view_request[0] !== 0) {
log.debug({ message: `Request: ${data.request_id} as already been processed` });
return null;
}
try {
// Build the transaction payload
const payload = await aptos.transaction.build.simple({
sender: this.account.accountAddress,
data: {
function: `${this.oracleAddress}::oracles::fulfil_request`,
functionArguments: [data.request_id, status, result],
},
});
const client = new Aptos(
new AptosConfig({
network: this.chainId,
}),
);

// Sign and submit the transaction
const pendingTxn = await aptos.signAndSubmitTransaction({
signer: this.account,
transaction: payload,
});
try {
const accountInfo = await client.account.getAccountInfo({ accountAddress: this.account.accountAddress });
log.debug(`Account verified: ${this.account.accountAddress.toString()}`);
} catch (e) {
log.error(`Account not found or not funded: ${this.account.accountAddress.toString()}`);
throw new Error(
`Account not initialized on chain. Please fund account: ${this.account.accountAddress.toString()}`,
);
}

try {
const transaction = await client.transaction.build.simple({
sender: this.account.accountAddress,
data: {
function: `${this.oracleAddress}::oracles::fulfil_request` as const,
typeArguments: [],
functionArguments: [data.request_id, status, result],
},
});

// Wait for the transaction to be processed
const executedTransaction = await aptos.waitForTransaction({ transactionHash: pendingTxn.hash });
const signedTx = await client.signAndSubmitTransaction({
signer: this.account,
transaction,
});

log.debug("Transaction executed:", executedTransaction.hash);
const txResult = await client.waitForTransaction({
transactionHash: signedTx.hash,
});

log.info(`Fulfillment transaction successful: ${signedTx.hash}`);
return txResult;
} catch (e) {
log.error("Transaction failed:", e);
throw e;
}
} catch (error) {
console.error("Error calling entry function:", error);
log.error("Error in sendFulfillment:", error);
throw error;
}
}

async save(
event: ProcessedRequestAdded<{
event_id: {
event_handle_id: string;
event_seq: number;
async processRequestAddedEvent(event: ProcessedRequestAdded<any>) {
try {
if (event.params.url.includes("api.x.com")) {
console.log("Using Twitter API with token:", env.integrations.xBearerToken);
return await fetchTwitterData(event.params.url);
} else {
throw new Error(`Unsupported API endpoint: ${event.params.url}`);
}
} catch (error: any) {
log.error("Error in processRequestAddedEvent:", error);
return {
status: 500,
message: error instanceof Error ? error.message : String(error),
};
event_index: number;
event_data: {
[key: string]: any;
}
}

async processRequest(data: ProcessedRequestAdded<any>) {
try {
if (data.params.url.includes("api.x.com")) {
log.info("Processing Twitter API request");
const response = await fetchTwitterData(data.params.url);

if (!response) {
throw new Error("No response from Twitter API");
}

log.debug("Twitter API Response:", { status: response.status });
await this.sendFulfillment(data, response.status, response.message);
} else {
throw new Error(`Unsupported API endpoint: ${data.params.url}`);
}
} catch (error) {
log.error("Error processing request:", error);
await this.save(
data,
{
error: error instanceof Error ? error.message : String(error),
},
500,
);
throw error;
}
}

async save(event: ProcessedRequestAdded<any>, data: any, status: number) {
try {
const dbEventData = {
eventHandleId: event.fullData.event_id.event_handle_id,
eventSeq: +event.fullData.event_id.event_seq,
eventData: JSON.stringify(event.fullData.event_data),
eventType: event.fullData.event_type,
eventIndex: event.fullData.event_index.toString(),
decoded_event_data: JSON.stringify(event.fullData.decoded_event_data),
retries: 0,
response: JSON.stringify(data),
chain: this.getChainId(),
status,
};
event_type: string;
decoded_event_data: string;
}>,
data: any,
status: number,
) {
const dbEventData = {
eventHandleId: event.fullData.event_id.event_handle_id,
eventSeq: +event.fullData.event_id.event_seq,
eventData: JSON.stringify(event.fullData.event_data),
eventType: event.fullData.event_type,
eventIndex: event.fullData.event_index.toString(),
decoded_event_data: JSON.stringify(event.fullData.decoded_event_data),
retries: 0,
response: JSON.stringify(data),
chain: this.getChainId(),
status,
};
log.debug({ eventHandleId: event.fullData.event_id.event_handle_id, eventSeq: +event.fullData.event_id.event_seq });
await prismaClient.events.create({
data: {
...dbEventData,
},
});

log.debug("Attempting to save event to database:", dbEventData);

const savedEvent = await prismaClient.events.create({
data: dbEventData,
});

log.info("Successfully saved event to database:", {
eventId: savedEvent.id,
status: savedEvent.status,
});

return savedEvent;
} catch (error) {
log.error("Failed to save event to database:", {
error: error instanceof Error ? error.message : String(error),
eventData: event,
status,
});
throw error;
}
}
}
26 changes: 26 additions & 0 deletions orchestrator/src/services/twitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import env from "@/env";
import axios from "axios";

export async function fetchTwitterData(url: string) {
try {
const response = await axios.get(url, {
headers: {
Authorization: `Bearer ${env.integrations.xBearerToken}`,
"Content-Type": "application/json",
},
});

return {
status: response.status,
message: JSON.stringify(response.data),
};
} catch (error) {
if (axios.isAxiosError(error)) {
return {
status: error.response?.status || 500,
message: JSON.stringify(error.response?.data || { error: "Twitter API request failed" }),
};
}
throw error;
}
}
Empty file added scripts/aptos
Empty file.
13 changes: 13 additions & 0 deletions scripts/create-account.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import { Account, Ed25519PrivateKey } from "@aptos-labs/ts-sdk";

async function main() {
// Generate new private key and account
const privateKey = Ed25519PrivateKey.generate();
const account = Account.fromPrivateKey({ privateKey });

console.log("\nSave these securely:");
console.log("Private Key:", privateKey.toString());
console.log("Account Address:", account.accountAddress.toString());
}

main().catch(console.error);