In [1]:
from question_types.sparql import SparqlQueries
from tabulate import tabulate
import json
import os
import unicodedata
import re

In [2]:
sparql = SparqlQueries("../dataset/14_graph.nt")

2024-10-31 13:59:00,278 | INFO | __init__ | Initializing SparqlQueries class...
2024-10-31 13:59:51,781 | INFO | __init__ | Graph parsed
2024-10-31 13:59:51,787 | INFO | __init__ | SparqlQueries class initialized successfully.


In [3]:
def execute_query(query):
    sparql_result = sparql.execute_query(query)
    result_lst = [
        [str(item) for item in (row if isinstance(row, tuple) else [row])]
        for row in sparql_result
    ]
    if not len(result_lst):
        return print("Results were empty")
    headers = [f"Col {idx}" for idx in range(len(result_lst[0]))]
    return result_lst

In [4]:
def get_all_of_type_film():
    query = """
        PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
        PREFIX wd: <http://www.wikidata.org/entity/>
        PREFIX wdt: <http://www.wikidata.org/prop/direct/>

        SELECT ?movie ?movieLabel WHERE {
            ?movie wdt:P31/wdt:P279* wd:Q11424 ;   # Match film or its subclasses
                   rdfs:label ?movieLabel .        # Get the label of the movie
            FILTER(LANG(?movieLabel) = "en")       # Only English labels
        }
    """
    return execute_query(query)


In [5]:
def normalize_string(s):
    """Cleans the input entity to a uniform naming convention, by removing non ascii characters, encoding it to utf, setting it to lowercase, and removing redundant spaces"""
    s = s.lower()
    s = unicodedata.normalize('NFKD', s)
    s = s.encode('ascii', 'ignore').decode('utf-8')
    s = re.sub(r'[^\w\s]', '', s)
    s = ' '.join(s.split())
    return s

def ensure_directory_exists(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)
        
def extend_if_exists(db, lst):
    """Extends the db with the given list of entities, if the entity already exists, it appends the type to the existing entity"""
    for row in lst:
        key = row[0]
        if not key:
            continue
        
        name = normalize_string(row[1])
        type = row[2]
        if key in db:
            db[key][1].append(type)
        else:
            db[key] = (name, [type])
               
def export_movies_json():
    ensure_directory_exists('exports')

    film_lst = get_all_of_type_film()
    print(tabulate(list(film_lst[:5]), headers=["Entity ID", "Name"], tablefmt="grid"))
    
    movie_json = {row[0]: normalize_string(row[1]) for row in film_lst if row[1] and len(row[1]) > 3}
     
    with open('exports/movie_db.json', 'w', encoding="utf-8") as file:
            json.dump(movie_json, file, ensure_ascii=False, indent=2)
    

In [6]:
export_movies_json()

+-------------------------------------------+----------------------------------------------------------------+
| Entity ID                                 | Name                                                           |
| http://www.wikidata.org/entity/Q1000825   | Jan Dara                                                       |
+-------------------------------------------+----------------------------------------------------------------+
| http://www.wikidata.org/entity/Q1001777   | Moondram Pirai                                                 |
+-------------------------------------------+----------------------------------------------------------------+
| http://www.wikidata.org/entity/Q1001943   | Buffalo Bill and the Indians, or Sitting Bull's History Lesson |
+-------------------------------------------+----------------------------------------------------------------+
| http://www.wikidata.org/entity/Q100232971 | What We Wanted                                                 |
+