In [1]:
# Packages
import sqlite3
import random
import pandas as pd
import requests
import json
import time
import math
from collections import Counter

New database for stratified sample

In [2]:
def create_table_with_same_schema(old_db_path, old_table_name, new_db_path, new_table_name):
    """
    Reads the schema from old_table_name in old_db_path,
    and creates an identical table (new_table_name) in new_db_path.
    """
    # Connect to old DB to get schema
    old_conn = sqlite3.connect(old_db_path)
    old_cursor = old_conn.cursor()
    
    # We'll read from the SQLite 'PRAGMA table_info(table_name)' command
    old_cursor.execute(f"PRAGMA table_info({old_table_name})")
    schema_info = old_cursor.fetchall()
    # schema_info: list of (cid, name, type, notnull, dflt_value, pk)
    
    column_defs = []
    for col in schema_info:
        col_name = col[1]
        col_type = col[2]  # e.g. TEXT, INTEGER
        # We won't replicate 'notnull', 'default_value', etc. for simplicity
        column_defs.append(f'"{col_name}" {col_type}')
    
    column_defs_str = ",\n    ".join(column_defs)
    
    # Close old DB
    old_conn.close()
    
    # Connect to new DB to create table
    new_conn = sqlite3.connect(new_db_path)
    new_cursor = new_conn.cursor()
    
    # Create table statement
    create_stmt = f"""
    CREATE TABLE IF NOT EXISTS "{new_table_name}" (
    {column_defs_str}
    )
    """
    new_cursor.execute(create_stmt)
    new_conn.commit()
    new_conn.close()

Sampling based on marketplace strata

In [None]:
def sample_20_collections_per_marketplace(old_db_path="data/all_collections.db",
                                          old_table_name="data/all_collections",
                                          new_db_path="data/stratified_sample.db",
                                          new_table_name="sampled_collections"):
    """
    Reads from old_table_name in old_db_path, samples 20 collections
    for each of the four marketplaces, then writes them into
    new_db_path:new_table_name.

    We'll add four new columns to the new table:
      marketplace_stratum, blockchain_stratum, category_stratum, token_standard_stratum

    Since we're only sampling by marketplace here,
    we'll set marketplace_stratum to mp and the rest to None.
    """

    marketplaces = ["OpenSea", "Rarible", "MagicEden", "Atomic"]

    # 1) Create the new .db file and a table that mirrors the old table's schema
    #    (You must have a function create_table_with_same_schema, not shown here.)
    create_table_with_same_schema(old_db_path, old_table_name, new_db_path, new_table_name)

    # 2) Connect to both DBs
    old_conn = sqlite3.connect(old_db_path)
    old_cursor = old_conn.cursor()

    new_conn = sqlite3.connect(new_db_path)
    new_cursor = new_conn.cursor()

    # 3) Add the new "strata" columns to the destination table
    #    If they already exist, you might catch exceptions or check PRAGMA table_info.
    try:
        new_cursor.execute(f"ALTER TABLE {new_table_name} ADD COLUMN marketplace_stratum TEXT")
        new_cursor.execute(f"ALTER TABLE {new_table_name} ADD COLUMN blockchain_stratum TEXT")
        new_cursor.execute(f"ALTER TABLE {new_table_name} ADD COLUMN category_stratum TEXT")
        new_cursor.execute(f"ALTER TABLE {new_table_name} ADD COLUMN token_standard_stratum TEXT")
    except sqlite3.OperationalError as e:
        # Columns may already exist, so ignore or handle gracefully
        print("Warning:", e)

    new_conn.commit()

    # Use a fixed seed for reproducibility
    random.seed(42)

    # Gather column names from the old table
    old_cursor.execute(f"PRAGMA table_info({old_table_name})")
    schema_info = old_cursor.fetchall()
    original_col_names = [col[1] for col in schema_info]  # e.g. ["id", "collection_slug", ...]

    # Extend the column list to include the new strata columns
    strata_cols = ["marketplace_stratum", "blockchain_stratum", "category_stratum", "token_standard_stratum"]
    extended_col_names = original_col_names + strata_cols

    # We'll construct an INSERT with placeholders for each column
    placeholders = ", ".join("?" for _ in extended_col_names)
    col_names_str = ", ".join(f'"{c}"' for c in extended_col_names)

    insert_sql = f"""
    INSERT INTO "{new_table_name}" ({col_names_str})
    VALUES ({placeholders})
    """

    for mp in marketplaces:
        # gather rowid from old table for this marketplace
        old_cursor.execute(
            f"SELECT rowid FROM {old_table_name} WHERE marketplace = ?",
            (mp,)
        )
        rowids = [r[0] for r in old_cursor.fetchall()]
        print(f"{mp} has {len(rowids)} total collections in the DB.")

        # sample 20
        desired_count = 20
        if len(rowids) < desired_count:
            chosen_ids = rowids
        else:
            chosen_ids = random.sample(rowids, desired_count)

        # Build query to fetch full row data
        rowid_q = ",".join("?" * len(chosen_ids))
        query = f"SELECT * FROM {old_table_name} WHERE rowid IN ({rowid_q})"
        old_cursor.execute(query, chosen_ids)
        selected_data = old_cursor.fetchall()

        # Insert into new DB with extended columns
        for row_data in selected_data:
            # row_data matches original_col_names in order
            # We append the 4 new columns
            strata_values = (
                mp,       # marketplace_stratum
                None,     # blockchain_stratum
                None,     # category_stratum
                None      # token_standard_stratum
            )
            extended_row = row_data + strata_values

            new_cursor.execute(insert_sql, extended_row)

        new_conn.commit()
        print(f"{mp}: Inserted {len(selected_data)} rows into {new_table_name} in {new_db_path}.")

    old_conn.close()
    new_conn.close()
    print("Sampling complete. New data stored in:", new_db_path, new_table_name)

In [None]:
if __name__ == "__main__":
    sample_20_collections_per_marketplace(
        old_db_path="data/all_collections.db",
        old_table_name="all_collections",
        new_db_path="data/stratified_sample.db",
        new_table_name="sampled_collections"
    )

OpenSea has 1207071 total collections in the DB.
OpenSea: Inserted 20 rows into sampled_collections in stratified_sample.db.
Rarible has 4201341 total collections in the DB.
Rarible: Inserted 20 rows into sampled_collections in stratified_sample.db.
MagicEden has 28469 total collections in the DB.
MagicEden: Inserted 20 rows into sampled_collections in stratified_sample.db.
Atomic has 110142 total collections in the DB.
Atomic: Inserted 20 rows into sampled_collections in stratified_sample.db.
Sampling complete. New data stored in: stratified_sample.db sampled_collections


Sample based on Blockchain strata

In [None]:
def sample_2_collections_per_chain(old_db_path="data/all_collections.db",
                                   old_table_name="all_collections",
                                   new_db_path="data/stratified_sample.db",
                                   new_table_name="sampled_collections"):
    """
    Reads from old_table_name in old_db_path, samples 2 collections per distinct chain
    (derived by splitting the 'collection_id' field on ':' -> chain).
    Writes them into new_db_path:new_table_name with the same schema + new strata columns.
    """
    # 1) Create new DB & table
    create_table_with_same_schema(old_db_path, old_table_name, new_db_path, new_table_name)

    old_conn = sqlite3.connect(old_db_path)
    old_cursor = old_conn.cursor()

    new_conn = sqlite3.connect(new_db_path)
    new_cursor = new_conn.cursor()

    # 2) Add the new strata columns if not exist
    for col in ["marketplace_stratum", "blockchain_stratum", "category_stratum", "token_standard_stratum"]:
        try:
            new_cursor.execute(f"ALTER TABLE {new_table_name} ADD COLUMN {col} TEXT")
        except sqlite3.OperationalError:
            pass

    new_conn.commit()

    random.seed(42)

    # Build insert statement
    old_cursor.execute(f"PRAGMA table_info({old_table_name})")
    schema_info = old_cursor.fetchall()
    original_col_names = [col[1] for col in schema_info]

    strata_cols = ["marketplace_stratum", "blockchain_stratum", "category_stratum", "token_standard_stratum"]
    extended_col_names = original_col_names + strata_cols

    placeholders = ", ".join("?" for _ in extended_col_names)
    col_names_str = ", ".join(f'"{c}"' for c in extended_col_names)
    insert_sql = f"""
    INSERT INTO "{new_table_name}" ({col_names_str})
    VALUES ({placeholders})
    """

    # 3) chain -> list of rowids
    chain_dict = {}
    old_cursor.execute(f"SELECT rowid, collection_id FROM {old_table_name}")
    all_rows = old_cursor.fetchall()

    for rowid, coll_id in all_rows:
        if not coll_id or ":" not in coll_id:
            continue
        _id, chain = coll_id.split(":", 1)
        chain = chain.strip().lower()
        if not chain:
            continue
        chain_dict.setdefault(chain, []).append(rowid)

    for chain, rowids in chain_dict.items():
        desired_count = 2
        chosen_ids = random.sample(rowids, desired_count) if len(rowids) > desired_count else rowids

        if chosen_ids:
            rowid_q = ",".join("?" * len(chosen_ids))
            query = f"SELECT * FROM {old_table_name} WHERE rowid IN ({rowid_q})"
            old_cursor.execute(query, chosen_ids)
            selected_data = old_cursor.fetchall()

            for row_data in selected_data:
                # Append strata
                strata_values = (
                    None,  # marketplace_stratum
                    chain, # blockchain_stratum
                    None,  # category_stratum
                    None   # token_standard_stratum
                )
                extended_row = row_data + strata_values
                new_cursor.execute(insert_sql, extended_row)
            new_conn.commit()

    old_conn.close()
    new_conn.close()
    print("Sampling by chain complete. Data stored in:", new_db_path, new_table_name)

In [None]:
if __name__ == "__main__":
    sample_2_collections_per_chain(
        old_db_path="data/all_collections.db",
        old_table_name="all_collections",
        new_db_path="data/stratified_sample.db",
        new_table_name="sampled_collections"
    )

Sampling by chain complete. Data stored in: stratified_sample.db sampled_collections


Sample based on Category strata

In [None]:
def sample_collections_per_category(old_db_path="data/all_collections.db",
                                    old_table_name="all_collections",
                                    new_db_path="data/stratified_sample.db",
                                    new_table_name="sampled_collections",
                                    samples_per_category=5):
    """
    Reads from old_table_name in old_db_path, samples N collections per distinct category,
    writes them into new_db_path:new_table_name with the same schema + new strata columns.
    """
    # 1) Create new DB & table
    create_table_with_same_schema(old_db_path, old_table_name, new_db_path, new_table_name)

    old_conn = sqlite3.connect(old_db_path)
    old_cursor = old_conn.cursor()

    new_conn = sqlite3.connect(new_db_path)
    new_cursor = new_conn.cursor()

    random.seed(42)

    # 2) Add new strata columns
    for col in ["marketplace_stratum", "blockchain_stratum", "category_stratum", "token_standard_stratum"]:
        try:
            new_cursor.execute(f"ALTER TABLE {new_table_name} ADD COLUMN {col} TEXT")
        except sqlite3.OperationalError:
            pass

    new_conn.commit()

    # Build insert statement
    old_cursor.execute(f"PRAGMA table_info({old_table_name})")
    schema_info = old_cursor.fetchall()
    original_col_names = [col[1] for col in schema_info]

    strata_cols = ["marketplace_stratum", "blockchain_stratum", "category_stratum", "token_standard_stratum"]
    extended_col_names = original_col_names + strata_cols

    placeholders = ", ".join("?" for _ in extended_col_names)
    col_names_str = ", ".join(f'"{c}"' for c in extended_col_names)
    insert_sql = f"""
    INSERT INTO "{new_table_name}" ({col_names_str})
    VALUES ({placeholders})
    """

    # 3) category -> rowids
    category_dict = {}
    old_cursor.execute(f"SELECT rowid, category FROM {old_table_name}")
    all_rows = old_cursor.fetchall()

    for rowid, cat_val in all_rows:
        if not cat_val or not cat_val.strip():
            continue

        cat_val = cat_val.strip()
        categories = []

        try:
            # If it's JSON-like, parse it
            if cat_val.startswith("[") and cat_val.endswith("]"):
                parsed = json.loads(cat_val)
                categories = [c for c in parsed if c]
            else:
                categories = [cat_val]
        except Exception:
            categories = [cat_val]

        for cat in categories:
            cat = cat.strip().lower()
            if cat:
                category_dict.setdefault(cat, []).append(rowid)

    unique_categories = list(category_dict.keys())
    print(f"Total distinct categories found: {len(unique_categories)}")

    for cat, rowids in category_dict.items():
        if len(rowids) <= samples_per_category:
            chosen_ids = rowids
        else:
            chosen_ids = random.sample(rowids, samples_per_category)

        if chosen_ids:
            rowid_q = ",".join("?" * len(chosen_ids))
            query = f"SELECT * FROM {old_table_name} WHERE rowid IN ({rowid_q})"
            old_cursor.execute(query, chosen_ids)
            selected_data = old_cursor.fetchall()

            for row_data in selected_data:
                strata_values = (
                    None,    # marketplace_stratum
                    None,    # blockchain_stratum
                    cat,     # category_stratum
                    None     # token_standard_stratum
                )
                extended_row = row_data + strata_values
                new_cursor.execute(insert_sql, extended_row)
            new_conn.commit()

    old_conn.close()
    new_conn.close()
    print("Sampling by category complete. Data stored in:", new_db_path, new_table_name)

In [9]:
sample_collections_per_category(samples_per_category=5)

Total distinct categories found: 15
Sampling by category complete. Data stored in: stratified_sample.db sampled_collections


Sample based on Token Standard Strata

In [None]:
def count_unique_token_standards(db_path="data/all_collections.db", table_name="all_collections"):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    cursor.execute(f"SELECT token_standard FROM {table_name}")
    all_token_standards = cursor.fetchall()
    
    counter = Counter()
    
    for (cat_val,) in all_token_standards:
        if cat_val is None or cat_val.strip() == "":
            continue
        cat_val = cat_val.strip()
        counter[cat_val] += 1
    
    conn.close()
    
    print(f"Total unique token_standards found: {len(counter)}")
    print("Token Standards (with counts):")
    for standard, count in sorted(counter.items()):
        print(f"{standard}: {count}")
    
    return counter

In [19]:
count_unique_token_standards()

Total unique token_standards found: 6
Token Standards (with counts):
APTOS: 168782
CRYPTO_PUNKS: 1
ERC1155: 927807
ERC721: 3104663
FLOW: 18
TOKEN_GROUP_2022: 70


Counter({'ERC721': 3104663,
         'ERC1155': 927807,
         'TOKEN_GROUP_2022': 70,
         'APTOS': 168782,
         'FLOW': 18,
         'CRYPTO_PUNKS': 1})

In [None]:
def sample_collections_per_token_standard(existing_db_path="data/stratified_sample.db",
                                          existing_table_name="sampled_collections",
                                          source_db_path="data/all_collections.db",
                                          source_table_name="all_collections",
                                          samples_per_standard=10,
                                          seed=42):
    """
    Samples N collections per unique token_standard from the source database and appends
    them into an existing target table in an existing target database, adding the
    four strata columns if needed.
    """
    source_conn = sqlite3.connect(source_db_path)
    source_cursor = source_conn.cursor()

    target_conn = sqlite3.connect(existing_db_path)
    target_cursor = target_conn.cursor()

    random.seed(seed)

    # 1) Add new strata columns if not exist
    for col in ["marketplace_stratum", "blockchain_stratum", "category_stratum", "token_standard_stratum"]:
        try:
            target_cursor.execute(f"ALTER TABLE {existing_table_name} ADD COLUMN {col} TEXT")
        except sqlite3.OperationalError:
            pass

    target_conn.commit()

    # 2) Build insert statement based on source table columns
    source_cursor.execute(f"PRAGMA table_info({source_table_name})")
    schema_info = source_cursor.fetchall()
    original_col_names = [col[1] for col in schema_info]

    strata_cols = ["marketplace_stratum", "blockchain_stratum", "category_stratum", "token_standard_stratum"]
    extended_col_names = original_col_names + strata_cols

    placeholders = ", ".join("?" for _ in extended_col_names)
    col_names_str = ", ".join(f'"{c}"' for c in extended_col_names)
    insert_sql = f"""
    INSERT INTO "{existing_table_name}" ({col_names_str})
    VALUES ({placeholders})
    """

    # 3) token_standard -> list of rowids
    token_dict = {}
    source_cursor.execute(f"SELECT rowid, token_standard FROM {source_table_name}")
    all_rows = source_cursor.fetchall()

    for rowid, token_val in all_rows:
        if not token_val or not token_val.strip():
            continue
        token = token_val.strip().lower()
        token_dict.setdefault(token, []).append(rowid)

    print(f"Total distinct token standards found: {len(token_dict)}")

    # 4) Sample per token standard
    for token, rowids in token_dict.items():
        if len(rowids) <= samples_per_standard:
            chosen_ids = rowids
        else:
            chosen_ids = random.sample(rowids, samples_per_standard)

        if chosen_ids:
            rowid_q = ",".join("?" * len(chosen_ids))
            query = f"SELECT * FROM {source_table_name} WHERE rowid IN ({rowid_q})"
            source_cursor.execute(query, chosen_ids)
            selected_data = source_cursor.fetchall()

            for row_data in selected_data:
                strata_values = (
                    None,     # marketplace_stratum
                    None,     # blockchain_stratum
                    None,     # category_stratum
                    token     # token_standard_stratum
                )
                extended_row = row_data + strata_values
                target_cursor.execute(insert_sql, extended_row)
            target_conn.commit()

    source_conn.close()
    target_conn.close()
    print("Sampling by token standard complete. Data appended to:", existing_db_path, existing_table_name)

In [11]:
sample_collections_per_token_standard()

Total distinct token standards found: 6
Sampling by token standard complete. Data appended to: stratified_sample.db sampled_collections
