In [None]:
import requests
import pandas as pd
import re
import random
import numpy as np
import time
from dateutil.parser import parse
from dateutil.parser import isoparse

In [None]:
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
pd.set_option('display.max_colwidth',1000)

In [None]:
#Get list of all collections

url = 'https://www.loc.gov/collections?fo=json&at=results,pagination&c=100'
def paginate(url,collections=[]):
    print('\n'+url)
    print('Starting api request beginning at: '+ str(time.ctime(time.time())))
    response = requests.get(url)
    print('Done. Processing api request beginning at: '+ str(time.ctime(time.time())))
    if response.status_code == 429:
        return response.headers
        sys.exit()
    else:
        results = response.json()['results']
        for result in results:
            result_dict = {}
            result_dict['on'] = result['items']+'?fo=json&at=results'
            result_dict['title'] = result['title']
            collections.append(result_dict)
        pagination = response.json()['pagination']
        if pagination['next']:
            print('There is another page at: '+str(pagination['next']))
            next_url = pagination['next']
            time.sleep(10)
            paginate(next_url,collections)
        return collections

collections = paginate(url)

In [None]:
#Create df of collections

partof_pd = pd.DataFrame(collections)

#Get list of collection searches and titles
partof_urls = partof_pd['on'].to_list()
partof_titles = partof_pd['title'].to_list()
partof_urls = [url.replace(
    '?fo=json',
    '?fa=original-format!:web+page|original-format!:event|original-format!:collection&fo=json'
    ) for url in partof_urls]

In [None]:
partof_urls

In [None]:
#Get a sample record from each collection
#Sample record should be result #10 (11th result)
sample_items=[]
exclude = ['event','web page','collection','catalog']
for url in partof_urls:
    time.sleep(10)
    record_num=3
    finished = False
    print('Checking collection: ' + url)
    page1 = requests.get(url, params = {'c':100})
    url_fixed = url.replace(
        '?fa=original-format!:web+page|original-format!:event|original-format!:collection&fo=json',
        '?fo=json'
        )
    try:
        #Search until you find a record that's not an excluded format.
        #If one is not found in records #10 - 100, then a sample isn't 
        # pulled from this collection.
        while (record_num < 100) & (finished == False):
            sample_item = None
            test = page1.json()['results'][record_num]
            print('Reviewing item: '+ str(test['title']))
            print('Original format(s): '+ str(test['original_format']))
            #If the record is an excluded format, try again
            if any(original_format in exclude for original_format in test['original_format']):
                finished=False
                record_num += 1
                print('Not using this record, wrong format.')
            else:
                sample_item = page1.json()['results'][record_num]
                sample_item['partof_url'] = url_fixed
                finished=True
                print('Using this record! Moving on to next collection.\n')
    except:
        sample_item = None
        print('Something went wrong. Skipping this collection.\n')
    sample_items.append(sample_item)

#return partof_urls
partof_urls = [url.replace(
    '|original-format!:web+page|original-format!:event|original-format!:collection&fo=json',
    '&fo=json'
    ) for url in partof_urls]

In [None]:
backup = sample_items

In [None]:
#return partof_urls
'''partof_urls = [url.replace(
    '?fa=original-format!:web+page|original-format!:event|original-format!:collection&fo=json',
    '?fo=json'
    ) for url in partof_urls]'''

In [None]:
def flatten_json(y):
    out = {}

    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '.')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '.')
                i += 1
        else:
            out[name[:-1]] = x

    flatten(y)
    return out

In [None]:
flattened_sample_items = []
for sample in sample_items:
    flattened_sample = flatten_json(sample)
    flattened_sample_items.append(flattened_sample)

In [None]:
flattened_sample_items_pd = pd.DataFrame(flattened_sample_items)

In [None]:
partof_urls

In [None]:
partof_pd

In [None]:
#Drop blank rows created by collections with no sample items

flattened_sample_items_pd.dropna(axis=0, how='all', inplace=True)

In [None]:
cols = flattened_sample_items_pd.columns.values.tolist()
pattern = re.compile(r'^(.+)\.\d+$')
bases_checked = []
all_col_metadata = []

#For each column
for col in cols:
    col_metadata = {}
    types = []

    #If the column ends in a number (is a list column, split up)    
    if pattern.match(col):
        base = re.match(r'^(.+)\.\d+$', col)[1]
        #If this split-up column hasn't been reviewed yet
        if base not in bases_checked:
            bases_checked.append(base) #mark as checked
            matches = []
            sample_values=[]
            #Find all the columns in this group
            for other_col in cols: 
                if re.match(base+r'\.\d+', other_col):
                    matches.append(other_col)
                    #Make a list of the value types found in this group
                    types_per_instance = [x.__name__ for x in flattened_sample_items_pd[other_col].dropna().apply(type).unique()]
                    types.extend(types_per_instance)
            #For each collection
            for collection in partof_urls:
                sample_values_per_collection = {}
                sample_values_at_collection = []
                #Get collection title based on collection url
                collection_title = partof_pd[partof_pd['on']==collection]['title'].to_list()[0]
                sample_values_per_collection['collection'] = collection_title
                #Get sample values from each column in the group
                for other_col in cols:
                    if re.match(base+r'\.\d+', other_col):
                        try:
                            sample_value = flattened_sample_items_pd[flattened_sample_items_pd['partof_url'] == collection][other_col].to_list()[0]
                            sample_values_at_collection.append(sample_value)
                        except:
                            pass
                sample_values_per_collection['samples'] = sample_values_at_collection
                sample_values.append(sample_values_per_collection)
                drop_nans = [x for x in sample_values_at_collection if str(x) != 'nan']
                if len(drop_nans)>0:
                    col_metadata[collection_title] = random.choice(drop_nans)
                else:
                    col_metadata[collection_title] = None
            
            
            #col_metadata['sample_values'] = sample_values
    
            col_metadata['types'] = list(set(types))
            col_metadata['field'] = base.replace('.',' > ')
            col_metadata['list'] = True
            col_metadata['max_values_in_sample'] = len(matches)
            col_metadata['cols'] = matches
            
            #Calculate how often the field is used vs. blank in collections
            empty = flattened_sample_items_pd[base+'.0'].isna().sum()
            col_metadata['used_in'] = len(flattened_sample_items_pd) - empty
            col_metadata['used_in_percent'] = (len(flattened_sample_items_pd) - empty)*100/len(flattened_sample_items_pd)
        
            
            all_col_metadata.append(col_metadata)
            
    else:
        col_metadata['field'] = col.replace('.',' > ')
        col_metadata['list'] = False
        col_metadata['max_values_in_sample'] = 1
        col_metadata['cols'] = col
        col_metadata['types'] = [x.__name__ for x in flattened_sample_items_pd[col].dropna().apply(type).unique()]
        empty = flattened_sample_items_pd[col].isna().sum()
        col_metadata['used_in'] = len(flattened_sample_items_pd) - empty
        col_metadata['used_in_percent'] = (len(flattened_sample_items_pd) - empty)*100/len(flattened_sample_items_pd)
        

        for collection in partof_urls:
            try: #collections without any sample items won't work
                collection_title = partof_pd[partof_pd['on']==collection]['title'].to_list()[0]
                sample_value = flattened_sample_items_pd[flattened_sample_items_pd['partof_url'] == collection][col].to_list()[0]
                col_metadata[collection_title] = sample_value 
            except:
                pass
        all_col_metadata.append(col_metadata)
    
        
    
        
all_fields = pd.DataFrame(all_col_metadata)

In [None]:
#Reorder the columns, to put the basic metadata in front and the collection sample values after
move_to_front = ['field','used_in','used_in_percent','types','list','max_values_in_sample','cols']
popoff = all_fields[move_to_front].copy()
remainder = all_fields.drop(move_to_front, axis=1)
move_to_back = remainder.columns.to_list()
new_order = move_to_front + move_to_back
all_fields = all_fields[new_order].copy()

In [None]:
#For repeat fields that have lists mid-way through the field hierarchy, drop all instances after the first one.
# Drop any row where the field name has any digit between 1 and 9 (e.g., 10, 3, 27)
all_fields = all_fields[all_fields['field'].str.contains(r'[1-9]')==False].copy()
all_fields = all_fields[all_fields['field']!='partof_url'].copy()

In [None]:
#Drop the list of all the matching column groupings
all_fields.drop('cols', axis=1, inplace=True)
#Skip the field "partof_url". That was just for processing purposes
all_fields = all_fields[all_fields['field']!='partof_url'].copy()

In [None]:
all_fields

In [None]:
all_fields.columns

In [None]:
#DataTables errors if a column has a period or apostrophe. 
# Replace periods with spaces. Remove apostrophes
all_fields.columns = all_fields.columns.str.replace(".", " ")
all_fields.columns = all_fields.columns.str.replace("\'", "")

In [None]:
#Drop any rows that have a blank field name column
all_fields = all_fields[all_fields['field'].str.contains(r'^$')==False].copy()

#Drop any collection columns that aren't loc.gov collections
not_locgov = partof_pd[partof_pd['on'].str.contains('www.loc.gov')==False]['title'].to_list()
not_locgov = [x.replace('.', ' ') for x in not_locgov]
all_fields.drop(not_locgov, axis=1, inplace=True)

#Sort rows (datatable also sorts for you, so could skip this)
all_fields = all_fields.sort_values(by=['used_in','field','max_values_in_sample'], ascending=False)

In [None]:
#Define patterns to check for
url_pattern = re.compile(r'^http')
integer = re.compile(r'^\d+$')
decimal = re.compile(r'^\d+\.\d+$')
#checking for timestamp is a little more complex:
def is_date(string):
    try: 
        isoparse(string)
        return True

    except ValueError:
        return False

#For all rows, get a list of value types from all collection samples
types = []
i = 0
for index, row in all_fields.iterrows():
    format_types = []
    #If it's already recognized as boolean, move on to next row.
    if 'bool' in row['types']:
        format_types.append('bool')
    else:
        for column in row[6:]:
            if pd.isna(column):
                continue
            elif bool(url_pattern.match(str(column))):
                format_types.append('URL')
            elif is_date(str(column)):
                format_types.append('iso timestamp')
            elif bool(integer.match(str(column))):
                format_types.append('int')
            elif bool(decimal.match(str(column))):
                format_types.append('decimal number')
            else:
                format_types.append('str')

            
    format_types = list(set(format_types))
    types.append(format_types)

print('These should be equal before replacing the types column:')
print(len(types))
print(len(all_fields))

In [None]:
#Replace the types column with more detailed types
all_fields['types'] = types

In [None]:
len(all_fields)

In [None]:
all_fields

In [None]:
#Export to JSON file
all_fields.to_json('loc_fields.json', orient="records")

In [None]:
#For use in html page
for field in all_fields.columns.to_list():
    print('{ \'data\': \''+field+'\'},')

In [None]:
#For use in html page
for field in all_fields.columns.to_list():
    print('<th>'+field+'</th>')

In [None]:
len(all_fields.columns)