In [1]:
import pandas as pd
import numpy as np
import os
import sys
import shutil
import zipfile
from zipfile import ZipFile, ZIP_DEFLATED
import json


In [2]:
#path to metdata zip files, uncomment the collection to be processed

#metadata_zip_filename = "/data/cryptopunks.zip"
metadata_zip_filename = "/data/boredapeyachtclub_metadata_archive.zip"
#metadata_zip_filename = "/data/hapeprime_metadata_archive.zip"
#metadata_zip_filename = "/data/meebits_metadata_archive.zip"


In [3]:
#provide a list of token names ordered by highest rarity score
#path and filename to zip that has the metadata!
#percent of zipcollection to pull 0 to 1.0, default all
def filterCollectionByRareTraits(metadata_zip_fileName, percent_collection = 1):
    
    # open zipped dataset 
    collection_list = list()
    collection_df = None
    traits_df = None
    full_traits_list = list()
    collection_count = 0
    with zipfile.ZipFile(metadata_zip_filename) as z:

        listOfiles = z.namelist()
        for fileName in listOfiles:
            #just the json
            if ".json" in fileName:
                collection_count = collection_count + 1
                with z.open(fileName) as f:

                    #read nft metadata into a df
                    df = pd.read_json(f, orient='index')
                    df = df.reset_index()            

                    #creates traits dict list
                    traits_series = df[df['index'] == 'traits'][0]
                    traits_list = traits_series.to_list()[0]
                    #add id field to each trait_dict in list
                    id_val = df[df['index'] == 'token_id'][0].to_list()[0]
                    for traits_dict in traits_list:
                        traits_dict['token_id'] = id_val
                    full_traits_list.extend(traits_list)

    #create dataframe with token_id, trait and trait count
    traits_df = pd.DataFrame(full_traits_list)
    traits_df = traits_df.astype({"token_id": int, "trait_count": int})

    #calculate trait rarity score based on rarity.tools approach
    traits_score_df = traits_df.groupby(['trait_type','value']).size().reset_index()
    traits_score_df['trait_type_value'] = traits_score_df['trait_type'] + traits_score_df['value'] 
    traits_score_df.columns = ['trait_type', 'value', 'total count', 'trait_type_value',]
    traits_score_df = traits_score_df.sort_values(by=['total count'], ascending=False)
    traits_score_df = traits_score_df.astype({"total count": int})
    traits_score_df['rarity_score'] = 1/(traits_score_df['total count']/collection_count) 
    traits_score_df['rarity_score_log'] = np.log10(traits_score_df['rarity_score'])
    traits_score_df = traits_score_df[['trait_type_value', 'rarity_score', 'rarity_score_log']] 

    #sum trait score based on rarity.tools calculation
    traits_group_df = traits_df.groupby(['token_id','trait_type','value']).size().reset_index()
    traits_group_df.rename({'0': 'count'}, axis=1, inplace=True)
    traits_group_df.columns = ['token_id','trait_type', 'trait_value', 'total_count']
    #combine trait type and value as the key
    traits_group_df['trait_type_value'] = traits_group_df['trait_type'] + traits_group_df['trait_value']

    #merge with traits_score_df
    token_traits_rarity_total_df = pd.merge(traits_group_df, traits_score_df, on='trait_type_value')
    #group tokens on rarity_score total
    token_traits_rarity_total_df = token_traits_rarity_total_df.groupby(['token_id']).agg({'rarity_score': ['sum'], })
    token_traits_rarity_total_df.columns = ['rarity_total']
    token_traits_rarity_total_df = token_traits_rarity_total_df.reset_index()
    #sort on the highest rarity score
    token_traits_rarity_total_df = token_traits_rarity_total_df.sort_values(by=['rarity_total'], ascending=False)
    
    #pull the percentage of collection requested
    col_size = int(percent_collection * collection_count)
    return token_traits_rarity_total_df.head(col_size)['token_id'].to_list()        
        
            

In [9]:
#grap token list needed by create_filtered_collection_zip function
tokens_list = filterCollectionByRareTraits(metadata_zip_filename, .01)


In [5]:
#provide a list of token names ordered by highest total price 
#path and filename to zip that has the metadata!
#percent of zipcollection to pull 0 to 1.0, default all
def filterCollectionByHighestTotalPrice(metadata_zip_fileName, percent_collection = 1):
    
    full_sales_list = list()
    sales_df = None
    collection_count = 0
    # open zipped dataset
    with zipfile.ZipFile(metadata_zip_filename) as z:

        listOfiles = z.namelist()
        for fileName in listOfiles:
            #just the json
            if ".json" in fileName:
                collection_count = collection_count + 1
                with z.open(fileName) as f:

                    #read nft metadata into a df
                    df = pd.read_json(f, orient='index')
                    df = df.reset_index()            

                    #create costs list
                    field_dic = df[df['index'] == 'last_sale'][0].to_list()[0]

                    if field_dic is not None:

                        sales_dic = dict()
                        sales_dic['token_id'] = field_dic['asset']['token_id']
                        sales_dic['total_price'] = int(field_dic['total_price'])
                        if field_dic['quantity'] is None:
                                sales_dic['quantity'] = 1
                        else:
                            sales_dic['quantity'] = int(field_dic['quantity'])
                        sales_dic['event_timestamp'] = field_dic['event_timestamp']
                        sales_dic['decimals'] = int(field_dic['payment_token']['decimals'])
                        sales_dic['eth_price'] = float(field_dic['payment_token']['eth_price'])
                        sales_dic['usd_price'] = float(field_dic['payment_token']['usd_price'])

                        # Converting sales price from WEI to ETH then USD in log
                        sales_dic['total_price_usd'] = sales_dic['total_price']/(10**sales_dic['decimals'])  * sales_dic['usd_price']/sales_dic['quantity']
                        if sales_dic['total_price_usd'] > 0:
                            sales_dic['total_price_usd_log'] = np.log10(sales_dic['total_price_usd'])
                        else:
                            sales_dic['total_price_usd_log'] = 0
                        full_sales_list.append(sales_dic)

    sales_df = pd.DataFrame(full_sales_list)
    sales_df = sales_df.astype({"token_id": int, "total_price": float, "eth_price": float, "usd_price": float, "total_price_usd":float, "total_price_usd_log":float})
    
    #sort on the highest total price usd log
    sales_df = sales_df.sort_values(by=['total_price_usd_log'], ascending=False)
    #print(sales_df.head(5))
    
    #pull the percentage of collection requested
    col_size = int(percent_collection * collection_count)
    return sales_df.head(col_size)['token_id'].to_list()        


In [6]:
#test get tokens py price
tokens_list = filterCollectionByHighestTotalPrice(metadata_zip_filename, .01)
#print(tokens_list)

In [10]:
#create a new image_zip file that is a subset based on tokens_list
#src_zip that has the image files!
#dst_zip is the new zip that contains the subset of png files based on tokens list
#tokens_list generated from filterCollectionBy... functions

def create_filtered_collection_zip(src_zip, dst_zip, tokens_list):
    #conv token list to str for matching
    tokens_list = list(map(str, tokens_list))
    with ZipFile(src_zip, "r", compression=ZIP_DEFLATED) as src_zip_archive:
        with ZipFile(dst_zip, "w", compression=ZIP_DEFLATED) as dst_zip_archive:
            for zitem in src_zip_archive.namelist():
                #if zitem in tokens_list and zitem:
                filename = os.path.basename(zitem)
                # skip directories and json files
                if not filename or ".png" not in filename:
                    continue
                else:
                    #strip off .png extension to find file and check in tokens_list
                    if filename[:-4] in tokens_list:
                        #print(filename)
                        if sys.version_info >= (3, 6):
                            with src_zip_archive.open(zitem) as from_item:
                                with dst_zip_archive.open(zitem, "w") as to_item:
                                    shutil.copyfileobj(from_item, to_item)
                        else:
                            # warning, may blow up memory
                            dst_zip_archive.writestr(zitem, 
                            src_zip_archive.read(zitem))

                               

In [9]:
#create a new metadata_zip file that is a subset based on tokens_list
#src_zip that has the metadata files!
#dst_zip is the new zip that contains the subset of metadata files based on tokens list
#tokens_list generated from filterCollectionBy... functions

def create_filtered_collection_metadata_zip(src_zip, dst_zip, tokens_list):
    #conv token list to str for matching
    tokens_list = list(map(str, tokens_list))
    with ZipFile(src_zip, "r", compression=ZIP_DEFLATED) as src_zip_archive:
        with ZipFile(dst_zip, "w", compression=ZIP_DEFLATED) as dst_zip_archive:
            for zitem in src_zip_archive.namelist():
                #if zitem in tokens_list and zitem:
                filename = os.path.basename(zitem)
                # skip directories and json files
                if not filename or ".png" in filename:
                    continue
                else:
                    #strip off .png extension to find file and check in tokens_list
                    if filename[:-5] in tokens_list:
                        #print(filename)
                        if sys.version_info >= (3, 6):
                            with src_zip_archive.open(zitem) as from_item:
                                with dst_zip_archive.open(zitem, "w") as to_item:
                                    shutil.copyfileobj(from_item, to_item)
                        else:
                            # warning, may blow up memory
                            dst_zip_archive.writestr(zitem, 
                            src_zip_archive.read(zitem))

In [11]:
 
#update for location of image zip and metadata archive to be processed

#images_in_zip_filename = "/data/cryptopunks.zip"
images_in_zip_filename = "/data/boredapeyachtclub_archive.zip"
metadata_zip_filename = "/data/boredapeyachtclub_metadata_archive.zip"

#name of zip to create
#zip_out_filename = "/data/cryptopunks_filtered.zip"
zip_out_filename = "/data/boredapeyachtclub_filtered.zip"
metadata_zip_out_filename = "/data/boredapeyachtclub_metadata_filtered.zip"


create_filtered_collection_zip(images_in_zip_filename, zip_out_filename, tokens_list)
create_filtered_collection_metadata_zip(metadata_zip_filename, metadata_zip_out_filename, tokens_list)



In [12]:
#retrieve the valuable list of apes being trained in the model, pkl file contains dataframe with tokens of interest to
#create collection, output used to train the filtered most valuable Bored Ape model, which is served by the web app 
df = pd.read_pickle('subset_metadata.pkl')

valuable_apes_tokens_list = df['token_id'].to_list()
#print(valuable_apes_tokens_list)

metadata_zip_filename = "/data/boredapeyachtclub_metadata_archive.zip"
zip_out_filename = "/data/boredapeyachtclub_valuable_metadata_archive.zip"

create_filtered_collection_metadata_zip(metadata_zip_filename, zip_out_filename, valuable_apes_tokens_list)

