Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cli for clean database. #923

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion crawlers/mooncrawl/mooncrawl/moonworm_crawler/cli.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import argparse
import logging
from typing import Optional
from typing import Optional, Literal
from uuid import UUID

from moonstreamdb.blockchain import AvailableBlockchainType
from web3 import Web3
from web3.middleware import geth_poa_middleware

from .db import deduplicate_records
from ..db import yield_db_session_ctx
from ..settings import (
MOONSTREAM_MOONWORM_TASKS_JOURNAL,
Expand Down Expand Up @@ -341,6 +342,21 @@ def handle_historical_crawl(args: argparse.Namespace) -> None:
)


def handle_deduplicate(args: argparse.Namespace) -> None:
"""
Deduplicate database records
"""

with yield_db_session_ctx() as db_session:
deduplicate_records(
db_session,
args.blockchain_type,
args.table,
args.label,
args.type,
)


def main() -> None:
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda _: parser.print_help())
Expand Down Expand Up @@ -536,6 +552,49 @@ def main() -> None:
)
historical_crawl_parser.set_defaults(func=handle_historical_crawl)

database_cli = subparsers.add_parser("database", help="Database operations")
database_cli.add_argument(
"--blockchain-type",
"-b",
type=str,
help=f"Available blockchain types: {[member.value for member in AvailableBlockchainType]}",
)

database_cli.set_defaults(func=lambda _: database_cli.print_help())

database_cli_subparsers = database_cli.add_subparsers()

deduplicate_parser = database_cli_subparsers.add_parser(
"deduplicate",
help="Deduplicate database records",
)

deduplicate_parser.add_argument(
"--table",
"-t",
type=str,
choices=["blocks", "labels", "transactions"],
required=True,
help="Table type to deduplicate",
)

deduplicate_parser.add_argument(
"--label",
"-l",
type=str,
required=False,
help="Label to deduplicate",
)

deduplicate_parser.add_argument(
"--type",
"-y",
type=str,
choices=["event", "function"],
required=True,
help="Type to deduplicate",
)

args = parser.parse_args()
args.func(args)

Expand Down
142 changes: 141 additions & 1 deletion crawlers/mooncrawl/mooncrawl/moonworm_crawler/db.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import logging
import json
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Literal

from moonstreamdb.blockchain import AvailableBlockchainType, get_label_model
from moonstreamdb.models import Base
from moonworm.crawler.function_call_crawler import ContractFunctionCall # type: ignore
from sqlalchemy.orm import Session
from sqlalchemy.sql import text


from ..settings import CRAWLER_LABEL
from .event_crawler import Event
Expand Down Expand Up @@ -218,3 +220,141 @@ def add_function_calls_to_session(

logger.info(f"Saving {len(labels_to_save)} labels to session")
db_session.add_all(labels_to_save)


def deduplicate_records(
db_session: Session,
blockchain_type: AvailableBlockchainType,
table: Literal["labels", "transactions", "blocks"],
label_name: Optional[str] = None,
label_type: Optional[str] = None,
) -> None:
"""
Deduplicates records in the database.
label name and label type work only for labels table.
"""

if table == "blocks":
raise NotImplementedError("Deduplication for blocks is not implemented yet")

if table == "labels":
label_model = get_label_model(blockchain_type)

if label_name is None or label_type is None:
raise ValueError(
"label_name and label_type are required for deduplication of labels"
)

if label_type == "event":
# get list of all label_type addresses

all_addresses = (
db_session.query(label_model.address.label("address"))
.filter(label_model.label == label_name)
.filter(label_model.label_data["type"] == "event")
.distinct()
.all()
) # can take a while

for address_raw in all_addresses:
address = address_raw[0]

deduplicate_records = db_session.execute(
text(
"""
WITH lates_labels AS (
SELECT
DISTINCT ON (transaction_hash, log_index) transaction_hash, log_index,
block_number as block_number,
created_at as created_at
FROM
{}
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
ORDER BY
transaction_hash ASC,
log_index ASC,
block_number ASC,
created_at ASC
)
DELETE FROM
{} USING lates_labels
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number;
""".format(
table, table, table, table
)
),
{"address": address, "label": label_name, "label_type": label_type},
)

db_session.commit()

logger.info(
f"Deleted {deduplicate_records} duplicate labels for address {address}"
)

if label_type == "tx_call":
# get list of all label_type addresses

all_addresses = (
db_session.query(label_model.address.label("address"))
.filter(label_model.label == label_name)
.filter(label_model.label_data["type"] == "tx_call")
.distinct()
.all()
)

for address_raw in all_addresses:
address = address_raw[0]

deduplicate_records = db_session.execute(
text(
"""
WITH lates_labels AS (
SELECT
DISTINCT ON (transaction_hash) transaction_hash,
block_number as block_number,
created_at as created_at
FROM
{}
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
AND log_index is null
ORDER BY
transaction_hash ASC,
block_number ASC,
created_at ASC
)
DELETE FROM
{} USING lates_labels
WHERE
label=:label
AND address=:address
AND label_data->>'type' = :label_type
AND log_index is null
AND {}.id not in (select id from lates_labels ) RETURNING {}.block_number;
""".format(
table, table, table, table
)
),
{"address": address, "label": label_name, "label_type": label_type},
)

db_session.commit()

logger.info(
f"Deleted {deduplicate_records} duplicate labels for address {address}"
)

if table == "transactions":
raise NotImplementedError(
"Deduplication for transactions is not implemented yet"
)
Loading