<h1>IMFDB Connector</h1>
<h3>Python script for populating a PostgreSQL database with article data from the Internet Movies and Firearms Database Wiki</h3>

In [135]:
# Imports
import psycopg2
import requests
import os
import re
from bs4 import BeautifulSoup

In [136]:
# Establish a connection to the database
cnx = psycopg2.connect(
    host="localhost",
    user="imfdb",
    password=os.environ.get("PG_IMFDB_PASSWORD"),
    database="imfdb"
)

# Create a cursor object
cursor = cnx.cursor()

<h3> MediaWiki API related functions</h3>
These are used to query the MW API for article data and populating the Postgres DB with the minimum necessary data for further processing.

In [137]:
def api_request(url):
    # Makes a get request to the specified API endpoint. A JSON response is expected.

    # Make a GET request to the IMFDB API endpoint
    response = requests.get(url)

    # Check if the request was successful
    if response.status_code == 200:
        # Get the JSON data from the response
        return response.json()
    else:
        # Handle the error
        print(f"ERROR: api_request(): Request failed with status code: {response.status_code}")
        return None

In [138]:
def parse_page_by_id(pageid, prop, format):
    # Example: parse_page_by_id("215875","text", "json") to parse the wiki text of Weird Al Yankovic as json

    data = api_request(f"https://www.imfdb.org/api.php?action=parse&pageid={pageid}&prop={prop}&format={format}")

    # Error Handling
    if data is None:
        print("ERROR: parse_page_by_id(): Data is None!")
        return
    
    return data

In [139]:
def get_page_text_by_id(pageid):
    response = requests.get(f"https://www.imfdb.org/index.php?curid={pageid}")
    return str(response.text)
# We don't use the API anymore for this due to issues with the HTML it responds with
#def get_page_text_by_id(pageid):
#    data = parse_page_by_id(pageid, "text", "json")
#    return str(data["parse"]["text"]["*"])

In [140]:
def query_categorymembers(cmtitle, format):
    # Example: query_categorymembers("Category:Actor", "json") to query all actor pages as json.

    # Make a GET request to the IMFDB API endpoint
    data = api_request(f"https://www.imfdb.org/api.php?action=query&list=categorymembers&cmtitle={cmtitle}&format={format}")

    # Error Handling
    if data is None:
        print("ERROR: query_categorymembers(): Data is None!")
        return None

    #Initialize list
    categorymembers = []

    # Loop through the first batch of category members
    for member in data["query"]["categorymembers"]:
        print(f"Adding {member['title']}")
        categorymembers.append(member)

    # Continue fetching while there is something to be fetched
    while "continue" in data:
        data = api_request(f"https://www.imfdb.org/api.php?action=query&list=categorymembers&cmtitle={cmtitle}&format={format}&cmcontinue={data['continue']['cmcontinue']}")
        
        # Error Handling
        if data is None:
            print(f"ERROR: query_categorymembers(): Data is None in continuation batch {data['continue']['cmcontinue']}")
            return None
            
        # Loop through continuation batch:
        for member in data["query"]["categorymembers"]:
            print(f"Adding {member['title']}")
            categorymembers.append(member)

    return categorymembers

In [141]:
def populate_actors_table():
    actors = query_categorymembers("Category:Actor", "json")

    for actor in actors:

        actorpageid = str(actor['pageid'])
        actorurl = f"https://www.imfdb.org/index.php?curid={actorpageid}"
        actorpagecontent = get_page_text_by_id(actorpageid)
        actorname = str(actor['title'])
        if "Category:" in actorname:
            continue
        print(f"INSERTing: {actorname}, {actorpageid}")
        statement = "INSERT INTO actors (actorurl, actorpageid, actorpagecontent, actorname) VALUES (%s, %s, %s, %s)"
        cursor.execute(statement, (actorurl, actorpageid, actorpagecontent, actorname))
    
    cnx.commit()

In [142]:
def populate_movies_table():
    movies = query_categorymembers("Category:Movie", "json")

    for movie in movies:

        moviepageid = str(movie['pageid'])
        movieurl = f"https://www.imfdb.org/index.php?curid={moviepageid}"
        moviepagecontent = get_page_text_by_id(moviepageid)
        movietitle = str(movie['title'])
        if "Category:" in movietitle:
            continue
        print(f"INSERTing: {movietitle}, {moviepageid}")
        statement = "INSERT INTO movies (movieurl, moviepageid, moviepagecontent, movietitle) VALUES (%s, %s, %s, %s)"
        cursor.execute(statement, (movieurl, moviepageid, moviepagecontent, movietitle))
    
    cnx.commit()

In [143]:
def populate_tvseries_table():
    tvseries = query_categorymembers("Category:Television", "json")

    for series in tvseries:

        tvseriespageid = str(series['pageid'])
        tvseriesurl = f"https://www.imfdb.org/index.php?curid={tvseriespageid}"
        tvseriespagecontent = get_page_text_by_id(tvseriespageid)
        tvseriestitle = str(series['title'])
        if "Category:" in tvseriestitle:
            continue
        print(f"INSERTing: {tvseriestitle}, {tvseriespageid}")
        statement = "INSERT INTO tvseries (tvseriesurl, tvseriespageid, tvseriespagecontent, tvseriestitle) VALUES (%s, %s, %s, %s)"
        cursor.execute(statement, (tvseriesurl, tvseriespageid, tvseriespagecontent, tvseriestitle))
    
    cnx.commit()

In [144]:
def populate_firearms_table_minimally():
    firearms = query_categorymembers("Category:Gun", "json")

    for firearm in firearms:

        firearmpageid = str(firearm['pageid'])
        firearmurl = f"https://www.imfdb.org/index.php?curid={firearmpageid}"
        firearmpagecontent = get_page_text_by_id(firearmpageid)
        firearmtitle = str(firearm['title'])
        if "Category:" in firearmtitle:
            continue
        print(f"INSERTing: {firearmtitle}, {firearmpageid}")
        statement = "INSERT INTO firearms (firearmurl, firearmpageid, firearmpagecontent, firearmtitle) VALUES (%s, %s, %s, %s)"
        cursor.execute(statement, (firearmurl, firearmpageid, firearmpagecontent, firearmtitle))
    
    cnx.commit()

<h3>Database related functions</h3>
Once we have downloaded the necessary article data, we extract useful information from the HTML to fill the rest of our column.

In [145]:
# Some useful dictionaries #

firearms_dict = {
    "firearmid" : 0,
    "firearmurl" : 1,
    "parentfirearmid" : 2,
    "firearmpageid" : 3,
    "firearmpagecontent" : 4,
    "specificationid" : 5,
    "firearmtitle" : 6,
    "firearmversion" : 7,
    "isfamily" : 8,
    "isfictional" : 9
}

In [146]:
def update_firearms_isfictional():
    statement = "UPDATE firearms SET isfictional = FALSE WHERE NOT firearmtitle LIKE '(%) -%';"
    cursor.execute(statement)
    statement = "UPDATE firearms SET isfictional = TRUE WHERE firearmtitle LIKE '(%) -%';"
    cursor.execute(statement)
    cnx.commit()

In [147]:
def is_multi_gun_page(pageid):
    soup = soup = BeautifulSoup(get_page_content_from_db(pageid, "firearms"), 'html.parser')

    h1_tags = soup.find_all("h1")
    count = len(h1_tags)

    if count > 1:
        return True
    else:
        return False

In [148]:
def update_firearms_isfamily():
    keyword1 = "series"
    keyword2 = "Series"
    statement = "UPDATE firearms set isfamily = FALSE WHERE NOT (firearmtitle LIKE '%%%s%%' OR firearmtitle LIKE '%%%s%%' OR firearmtitle = 'Air Guns')" % (keyword1, keyword2)
    cursor.execute(statement)
    statement = "UPDATE firearms set isfamily = TRUE WHERE (firearmtitle LIKE '%%%s%%' OR firearmtitle LIKE '%%%s%%' OR firearmtitle = 'Air Guns')" % (keyword1, keyword2)
    cursor.execute(statement)

    statement = "SELECT * FROM firearms"
    cursor.execute(statement)
    firearms = cursor.fetchall()
    for firearm in firearms:
        if (is_multi_gun_page(pageid=firearm[3]) and firearm[8] == False):
            statement = "UPDATE firearms set isfamily = TRUE WHERE firearmid = '%s'" % (firearm[0])
            cursor.execute(statement)

    cnx.commit()

In [149]:
def get_page_content_from_db(pageid, table):
    if table in ["actors", "movies", "tvseries", "firearms"]:
        singular = table.rstrip("s")
        content = "{}pagecontent".format(singular)
        id = "{}pageid".format(singular)
    else:
        print("ERROR: get_page_content_from_db(): {} is not a valid table!".format(table))
        return None

    statement = "SELECT {} from {} where {} = '{}';".format(content, table, id, pageid)
    cursor.execute(statement)
    return cursor.fetchone()[0]

In [150]:
def get_number_of_specifications(html_content):
    soup = BeautifulSoup(html_content, 'html.parser')
    
    spec_tags = soup.find_all(id=lambda value: value and value.startswith("Specifications"))
    spec_count = len(spec_tags)

    print("The number of h2 tags with 'Specifications' is:", spec_count)
    return spec_count

In [151]:
def strip_spec_list_item(item):
    index = item.index(":")
    return item[index + 1:].strip()

def get_single_specification(html_content, pageid=None):
    # Finds the first specification within any given HTML code

    spec_dict = {
        "production_year":None,
        "production_end_year":None,
        "type":None,
        "caliber":None,
        "capacity":None,
        "fire_modes":None
    }

    soup = BeautifulSoup(html_content, 'html.parser')

    # Find all h1 headers in the content
    headers = soup.find_all("h1")

    if (headers is None or len(headers) == 0):
        if pageid is not None:
            print(f"ERROR: get_first_specification(): {pageid} does not contain any headers!")
            return

    # Page Title
    title = headers[0].text

    # Find the first Element with the Specifications id
    span = soup.find(id = "Specifications")
    if span is None:
        print("ERROR: get_first_specification(): No <span> tag with id = 'Specifications' was found!")
        return

    spec_lists = span.parent.find_next_siblings("ul")

    # Each spec row is its own unordered list with only a single list item
    if spec_lists is None:
        print("ERROR: get_first_specification(): No <ul> tags were found!")
        return

    specifications = [li.text for ul in spec_lists for li in ul.find_all('li')]

    if specifications is None:
        print("ERROR: get_first_specification(): No <li> tags were found!")
        return

    # Iterate through the list items and set a dict for each
    for item in specifications:
        if "Type:" in item:
            spec_dict["type"] = strip_spec_list_item(item)
        elif "Caliber:" in item:
            spec_dict["caliber"] = strip_spec_list_item(item)
        elif "Capacity:" in item:
            spec_dict["capacity"] = strip_spec_list_item(item)
        elif "Fire Modes:" in item:
            spec_dict["fire_modes"] = strip_spec_list_item(item)
    
    # Initialize with None so we can check later if a value was extracted from the HTML
    production_year = None
    production_end_year = None

    # Sometimes the production years are contained in an <i> Element within a <p> Element
    possible_i_tag = span.parent.find_next_sibling("p").find()
    if possible_i_tag is not None:
        if span.parent.find_next_sibling("p").find().name == "i":
            production_years = span.parent.find_next_sibling("p").find("i", string=re.compile(r"\(\d{4}s?.(-|–).(\d{4}s?|[^\)|\s]*)\s*.*\)"))
    else:
        # Production_years is <p> Element which contains production year numbers
        production_years = span.parent.find_next_sibling("p", string=re.compile(r"\(\d{4}s?.(-|–).(\d{4}s?|[^\)|\s]*)\s*.*\)"))

    match = re.search(r"\((\d{4})s?\s*(-\s|–\s)(\d{4}|Present)s?\s*\)", str(production_years))

    if match is not None:
        production_year = match.group(1).strip()
        production_end_year = match.group(3).strip()
    if (production_end_year != "Present" and production_end_year is not None):
        production_end_year = int(production_end_year)
    if production_end_year == "Present":
        production_end_year = 9999

    # Add both to the spec dict
    spec_dict["production_year"] = production_year
    spec_dict["production_end_year"] = production_end_year

    if (spec_dict["production_year"] is None or spec_dict["production_end_year"] is None):
        print(f"WARNING: get_first_specification(): {title} is missing production years!")

    return spec_dict

In [152]:
def check_for_family_candidates():
# Debugging function
    statement = "SELECT * FROM firearms"
    cursor.execute(statement)
    firearms = cursor.fetchall()
    
    with open('candidates.txt', 'w') as writer:
        for firearm in firearms:
            if (is_multi_gun_page(pageid=firearm[3]) and firearm[8] == False):
                writer.write(f"{firearm[3]}\n")

In [153]:
def populate_specs_for_singles():
    statement = "SELECT * FROM firearms WHERE isfamily = 'False';"
    cursor.execute(statement)
    firearms = cursor.fetchall()

    for firearm in firearms:
        print(f"Currently: {firearm[3]}")
        spec = get_single_specification(firearm[4])
        print(f"{firearm[3]}'s Spec: \n {spec}")

In [154]:
def populate_specifications_table():
    return

<h3>Corner and edge cases:<h3>

In [155]:
# Corner-case handling:

# # Starting from the second header, remove all the following HTML including the second h1 itself
    #for elem in headers[1].find_next_siblings():
    #    elem.extract()
    #headers[1].extract()

<h3>Main<h3>

In [156]:
### We do stuff here: ###

#Populate the database skeleton:
#populate_actors_table()
#populate_movies_table()
#populate_tvseries_table()
#populate_firearms_table_minimally()

#update_firearms_isfictional()
#update_firearms_isfamily()

#populate_specifications_table()
#populate_specs_for_singles()

# Debugging get_single_specification:
data = get_page_content_from_db("353349", "firearms")
spec = get_single_specification(data, pageid="353349")
print(spec)


AttributeError: 'NoneType' object has no attribute 'find'

In [None]:
# Close database connection
cursor.close()
cnx.close()