In [1]:
import pandas as pd
df_e = pd.read_csv('./data/3_e.csv')
df_e.info()
df_e.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1372 entries, 0 to 1371
Data columns (total 1 columns):
 #   Column   Non-Null Count  Dtype 
---  ------   --------------  ----- 
 0   wiki_id  1372 non-null   object
dtypes: object(1)
memory usage: 10.8+ KB


Unnamed: 0,wiki_id
0,Q43436
1,Q43088
2,Q393356
3,Q5283
4,Q138979


In [2]:
entity_list = list(df_e['wiki_id'])
print(len(entity_list))
entity_list[:3]

1372


['Q43436', 'Q43088', 'Q393356']

In [3]:
from SPARQLWrapper import SPARQLWrapper, JSON
def query_subject(entity_id='Q43088'):
    # SPARQL endpoint for Wikidata
    sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
    query = f"""
    SELECT ?subjectLabel ?predicateLabel ?objectLabel ?subject ?object 
    WHERE {{
      ?subject ?predicate ?object.
      ?subject rdfs:label ?subjectLabel.
      ?x wikibase:directClaim ?predicate.
      ?x rdfs:label ?predicateLabel.
      ?object rdfs:label ?objectLabel.
      BIND(wd:{entity_id} AS ?subject) 
      FILTER(LANG(?subjectLabel) = "en").
      FILTER(LANG(?predicateLabel) = "en").
      FILTER(LANG(?objectLabel) = "en").
    }}
    LIMIT 10000
    """
    # Set the query and return format
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    # Execute the query
    results = sparql.query().convert()
    return results

# query()

In [4]:
def query_object(entity_id='Q43088'):
    # SPARQL endpoint for Wikidata
    sparql = SPARQLWrapper("https://query.wikidata.org/sparql")
    query = f"""
    SELECT ?subjectLabel ?predicateLabel ?objectLabel ?subject ?object 
    WHERE {{
      ?subject ?predicate ?object.
      ?subject rdfs:label ?subjectLabel.
      ?x wikibase:directClaim ?predicate.
      ?x rdfs:label ?predicateLabel.
      ?object rdfs:label ?objectLabel.
      BIND(wd:{entity_id} AS ?object) 
      FILTER(LANG(?subjectLabel) = "en").
      FILTER(LANG(?predicateLabel) = "en").
      FILTER(LANG(?objectLabel) = "en").
    }}
    LIMIT 10000
    """
    # Set the query and return format
    sparql.setQuery(query)
    sparql.setReturnFormat(JSON)
    # Execute the query
    results = sparql.query().convert()
    return results
    
# query_object('Q43088')

In [5]:
import pandas as pd
def e2spo(entity_id='Q43088'):
    csv_data = []  # CSV header
    
    results = query_subject(entity_id)    
    for result in results["results"]["bindings"]:
        subject_label = result["subjectLabel"]["value"]
        predicate_label = result["predicateLabel"]["value"]
        object_label = result["objectLabel"]["value"]
        
        # Extract Wikidata IDs
        subject_id = result["subject"]["value"].split('/')[-1]  # Extract subject Wikidata ID
        object_id = result["object"]["value"].split('/')[-1]    # Extract object Wikidata ID
        
        # Add SPO triples to CSV data
        csv_data.append([subject_label, predicate_label, object_label, subject_id, object_id])

    results = query_object(entity_id)    
    for result in results["results"]["bindings"]:
        subject_label = result["subjectLabel"]["value"]
        predicate_label = result["predicateLabel"]["value"]
        object_label = result["objectLabel"]["value"]
        
        # Extract Wikidata IDs
        subject_id = result["subject"]["value"].split('/')[-1]  # Extract subject Wikidata ID
        object_id = result["object"]["value"].split('/')[-1]    # Extract object Wikidata ID
        
        # Add SPO triples to CSV data
        csv_data.append([subject_label, predicate_label, object_label, subject_id, object_id])
    
    df = pd.DataFrame(
        csv_data,
        columns=["subjectLabel","predicateLabel","objectLabel","subject","object"]
    )
    return df
    
df = e2spo(entity_id='Q43088')
df.info()
df.head()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 174 entries, 0 to 173
Data columns (total 5 columns):
 #   Column          Non-Null Count  Dtype 
---  ------          --------------  ----- 
 0   subjectLabel    174 non-null    object
 1   predicateLabel  174 non-null    object
 2   objectLabel     174 non-null    object
 3   subject         174 non-null    object
 4   object          174 non-null    object
dtypes: object(5)
memory usage: 6.9+ KB


Unnamed: 0,subjectLabel,predicateLabel,objectLabel,subject,object
0,ruby,instance of,mineral variety,Q43088,Q429795
1,ruby,described by source,Brockhaus and Efron Encyclopedic Dictionary,Q43088,Q602358
2,ruby,described by source,Encyclopædia Britannica 11th edition,Q43088,Q867541
3,ruby,described by source,Explanatory Dictionary of the Living Great Rus...,Q43088,Q1970746
4,ruby,described by source,The Nuttall Encyclopædia,Q43088,Q3181656


In [6]:
df.tail()

Unnamed: 0,subjectLabel,predicateLabel,objectLabel,subject,object
169,Ruby,inspired by,ruby,Q26714785,Q43088
170,Flirt,made from material,ruby,Q62409724,Q43088
171,Le Matin,made from material,ruby,Q62414841,Q43088
172,1911 Encyclopædia Britannica/Ruby,main subject,ruby,Q84683979,Q43088
173,ruby maser,uses,ruby,Q96217685,Q43088


```
from tqdm.auto import tqdm
list_df=[]
pbar = tqdm(entity_list)
for e in pbar:
    pbar.set_description(f'Query entity: {e:<15}')
    df=e2spo(entity_id = e)
    df['source']=e
    list_df.append(df)

# Concatenate the DataFrames row-wise (axis=0)
df = pd.concat(list_df, axis=0, ignore_index=True)
df.info()
df.head()
```

In [7]:
# pip install dask[dataframe] dask distributed "bokeh>=3.1.0"


In [8]:
from dask.distributed import Client, LocalCluster

# Set up a local Dask cluster
cluster = LocalCluster(n_workers=8)  # You can adjust settings here if needed (e.g., number of workers)
cluster

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 8
Total threads: 8,Total memory: 62.52 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:43929,Workers: 8
Dashboard: http://127.0.0.1:8787/status,Total threads: 8
Started: Just now,Total memory: 62.52 GiB

0,1
Comm: tcp://127.0.0.1:45661,Total threads: 1
Dashboard: http://127.0.0.1:43919/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:41729,
Local directory: /tmp/dask-scratch-space/worker-n4esxmqb,Local directory: /tmp/dask-scratch-space/worker-n4esxmqb

0,1
Comm: tcp://127.0.0.1:36359,Total threads: 1
Dashboard: http://127.0.0.1:33201/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:39691,
Local directory: /tmp/dask-scratch-space/worker-fls229ln,Local directory: /tmp/dask-scratch-space/worker-fls229ln

0,1
Comm: tcp://127.0.0.1:42257,Total threads: 1
Dashboard: http://127.0.0.1:37401/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:38049,
Local directory: /tmp/dask-scratch-space/worker-bi8obvh8,Local directory: /tmp/dask-scratch-space/worker-bi8obvh8

0,1
Comm: tcp://127.0.0.1:36235,Total threads: 1
Dashboard: http://127.0.0.1:34083/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:38967,
Local directory: /tmp/dask-scratch-space/worker-ieo2r8n2,Local directory: /tmp/dask-scratch-space/worker-ieo2r8n2

0,1
Comm: tcp://127.0.0.1:39307,Total threads: 1
Dashboard: http://127.0.0.1:38867/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:40567,
Local directory: /tmp/dask-scratch-space/worker-j7945702,Local directory: /tmp/dask-scratch-space/worker-j7945702

0,1
Comm: tcp://127.0.0.1:44427,Total threads: 1
Dashboard: http://127.0.0.1:45407/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:42657,
Local directory: /tmp/dask-scratch-space/worker-vkdzsb9q,Local directory: /tmp/dask-scratch-space/worker-vkdzsb9q

0,1
Comm: tcp://127.0.0.1:42153,Total threads: 1
Dashboard: http://127.0.0.1:41493/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:34077,
Local directory: /tmp/dask-scratch-space/worker-yzf11e08,Local directory: /tmp/dask-scratch-space/worker-yzf11e08

0,1
Comm: tcp://127.0.0.1:41361,Total threads: 1
Dashboard: http://127.0.0.1:35497/status,Memory: 7.81 GiB
Nanny: tcp://127.0.0.1:39645,
Local directory: /tmp/dask-scratch-space/worker-_jwu15ad,Local directory: /tmp/dask-scratch-space/worker-_jwu15ad


In [9]:
import dask.dataframe as dd
import pandas as pd
from dask import delayed
from tqdm.auto import tqdm
from dask.distributed import Client, LocalCluster

# Assume e2spo is a function that takes an entity ID and returns a Pandas DataFrame
# def e2spo(entity_id):
    # Your logic to fetch data goes here
    # Return a Pandas DataFrame
    # pass

# Create a delayed version of e2spo
@delayed
def delayed_e2spo(entity_id):
    df = e2spo(entity_id)
    df['source'] = entity_id
    return df

# Use context manager to create a Dask client
with Client(cluster) as client:
    
    # List to hold delayed tasks
    delayed_tasks = []
    
    # Use tqdm for progress bar
    pbar = tqdm(entity_list)
    for e in pbar:
        pbar.set_description(f'Query entity: {e:<15}')
        delayed_tasks.append(delayed_e2spo(e))
    
    # Compute the tasks in parallel using the Dask cluster
    list_df = delayed(delayed_tasks).compute()

    # Concatenate the DataFrames row-wise
    df = pd.concat(list_df, axis=0, ignore_index=True)

    # Convert Pandas DataFrame to Dask DataFrame for writing to Parquet
    ddf = dd.from_pandas(df, npartitions=1)

    # Write the Dask DataFrame to partitioned Parquet
    parquet_dir = './data/parquet_dir/'
    ddf.to_parquet(parquet_dir, partition_on=['source'], engine='pyarrow')

    # Display some information about the Dask DataFrame
    print("Partitioned Dask DataFrame written to Parquet.")


  0%|          | 0/1372 [00:00<?, ?it/s]

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Partitioned Dask DataFrame written to Parquet.


In [10]:
import pandas as pd

# Specify the directory where the partitioned Parquet files are stored
# parquet_dir = 'data/output_parquet/'

# Read the partitioned Parquet files into a Pandas DataFrame
df = pd.read_parquet(parquet_dir, engine='pyarrow')
df=df.reset_index(drop=True)
df.info()
df.head()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 869059 entries, 0 to 869058
Data columns (total 6 columns):
 #   Column          Non-Null Count   Dtype   
---  ------          --------------   -----   
 0   subjectLabel    869059 non-null  string  
 1   predicateLabel  869059 non-null  string  
 2   objectLabel     869059 non-null  string  
 3   subject         869059 non-null  string  
 4   object          869059 non-null  string  
 5   source          869059 non-null  category
dtypes: category(1), string(5)
memory usage: 34.9 MB


Unnamed: 0,subjectLabel,predicateLabel,objectLabel,subject,object,source
0,Mohs' hardness,instance of,Wikidata property related to mineralogy,P1088,Q24041781,P1088
1,Mohs' hardness,related property,hardness,P1088,P5483,P1088
2,Mohs' hardness,Wikidata item of this property,Mohs scale of mineral hardness,P1088,Q41472,P1088
3,Mohs' hardness,Wikidata property example,diamond,P1088,Q5283,P1088
4,Mohs' hardness,Wikidata property example,quartz,P1088,Q43010,P1088


In [11]:
df_pivot = pd.pivot_table(
    df.reset_index(),
    # columns=['col1'],
    index=['predicateLabel'],
    values=['index'], 
    aggfunc={
        'index': ["count"],
    },
    fill_value=0
)
df_pivot.columns = ['_'.join(col).strip() for col in df_pivot.columns.values]
df_pivot.sort_values('index_count',ascending=False).head(50)

Unnamed: 0_level_0,index_count
predicateLabel,Unnamed: 1_level_1
country,149159
main subject,111438
occupation,101291
instance of,87574
country of citizenship,87075
made from material,45561
located in the administrative territorial entity,28507
color,19283
field of work,15837
country of origin,15443


In [12]:
values_to_drop = {
    'diplomatic relation',
    'contains the administrative territorial entity',
    'emergency phone number',
    'topic\'s main Wikimedia portal',
    'public holiday',
    'highest judicial authority',
    'Wikidata property',
    'Wikimedia outline',
    'described by source',
    'different from',
    "topic's main category",
    'on focus list of Wikimedia project',
    "topic's main template",
    'maintained by WikiProject',
    'located in the administrative territorial entity',
    'top-level Internet domain',
    'flag',
    'currency',
    'language used'
}
df2 = df[~df['predicateLabel'].isin(values_to_drop)]
df_pivot = pd.pivot_table(
    df2.reset_index(),
    # columns=['col1'],
    index=['predicateLabel'],
    values=['index'], 
    aggfunc={
        'index': ["count"],
    },
    fill_value=0
)
df_pivot.columns = ['_'.join(col).strip() for col in df_pivot.columns.values]
df_pivot.sort_values('index_count',ascending=False).head(50)

Unnamed: 0_level_0,index_count
predicateLabel,Unnamed: 1_level_1
country,149159
main subject,111438
occupation,101291
instance of,87574
country of citizenship,87075
made from material,45561
color,19283
field of work,15837
country of origin,15443
category combines topics,14462


In [13]:
df2.to_csv('./data/4_spo_e.csv')
df2.info()
df2.head()

<class 'pandas.core.frame.DataFrame'>
Index: 829669 entries, 0 to 869058
Data columns (total 6 columns):
 #   Column          Non-Null Count   Dtype   
---  ------          --------------   -----   
 0   subjectLabel    829669 non-null  string  
 1   predicateLabel  829669 non-null  string  
 2   objectLabel     829669 non-null  string  
 3   subject         829669 non-null  string  
 4   object          829669 non-null  string  
 5   source          829669 non-null  category
dtypes: category(1), string(5)
memory usage: 39.6 MB


Unnamed: 0,subjectLabel,predicateLabel,objectLabel,subject,object,source
0,Mohs' hardness,instance of,Wikidata property related to mineralogy,P1088,Q24041781,P1088
1,Mohs' hardness,related property,hardness,P1088,P5483,P1088
2,Mohs' hardness,Wikidata item of this property,Mohs scale of mineral hardness,P1088,Q41472,P1088
3,Mohs' hardness,Wikidata property example,diamond,P1088,Q5283,P1088
4,Mohs' hardness,Wikidata property example,quartz,P1088,Q43010,P1088


In [14]:
import json
def spo2kg(df_spo_wiki):
    df = df_spo_wiki
    df_spo = df_spo_wiki[['subjectLabel','predicateLabel','objectLabel']]
    
    metadata = {}
    # Base Wikipedia URL for metadata
    wiki_base_url = "https://en.wikipedia.org/wiki/"
    
    for i in df.itertuples():
        # print(i.subjectLabel)
        
        # Add subject metadata (Wikidata ID and Wikipedia URL)
        if i.subjectLabel not in metadata:
            metadata[i.subjectLabel] = {
                "url": f"{wiki_base_url}{i.subjectLabel.replace(' ', '_')}",
                "wiki_id": i.subject
            }
        
        # Add object metadata (Wikidata ID and Wikipedia URL)
        if i.objectLabel not in metadata:
            metadata[i.objectLabel] = {
                "url": f"{wiki_base_url}{i.objectLabel.replace(' ', '_')}",
                "wiki_id": i.object
            }
    
    return df_spo, metadata

df_spo, metadata = spo2kg(df2)
df_spo.info()
df_spo.head()

<class 'pandas.core.frame.DataFrame'>
Index: 829669 entries, 0 to 869058
Data columns (total 3 columns):
 #   Column          Non-Null Count   Dtype 
---  ------          --------------   ----- 
 0   subjectLabel    829669 non-null  string
 1   predicateLabel  829669 non-null  string
 2   objectLabel     829669 non-null  string
dtypes: string(3)
memory usage: 25.3 MB


Unnamed: 0,subjectLabel,predicateLabel,objectLabel
0,Mohs' hardness,instance of,Wikidata property related to mineralogy
1,Mohs' hardness,related property,hardness
2,Mohs' hardness,Wikidata item of this property,Mohs scale of mineral hardness
3,Mohs' hardness,Wikidata property example,diamond
4,Mohs' hardness,Wikidata property example,quartz


In [15]:
csv_file= './data/4_data_e.csv'
metadata_file='./data/4_metadata_e.json'

df_spo.to_csv(csv_file, header=False, index=False)
with open(metadata_file, 'w', encoding='utf-8') as f:
    json.dump(metadata, f, indent=2)

In [16]:
!kg add -f ./data/4_data_e.csv
!kg meta -f ./data/4_metadata_e.json
!kg start

🎉 Starting the app.
 * Serving Flask app 'kgsearch.app.app' (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off
 * Running on http://127.0.0.1:5000
[33mPress CTRL+C to quit[0m
127.0.0.1 - - [19/Oct/2024 13:29:34] "GET /search/1/1/1/blue HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:29:47] "GET /search/1/1/1/blue; HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:29:52] "GET /search/2/1/1/blue; HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:29:58] "GET /search/2/1/1/blue;topaz HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:30:12] "GET /search/2/1/1/blue;topaz;iodite HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:31:01] "GET /search/2/1/2/blue;topaz;iodite HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:31:16] "GET /search/2/1/2/blue;topaz;iodite; HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:31:28] "GET /search/2/1/2/blue;topaz;iodite;; HTTP/1.1" 200 -
127.0.0.1 - - [19/Oct/2024 13:31:47] "GET /search/2/1/2/blue;topaz;iodite;;lapis HTTP