# Onboarding Notebook

The aim is to import webpages by providing a list of URLs. The list may come from any source, typically from a sitemap.



In [None]:
import logging

import advertools as adv
import aiohttp
import pandas as pd
import wordlift_client
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from pandas import Series
from rdflib import URIRef, Literal
from tenacity import retry, stop_after_attempt, wait_fixed
from tqdm.asyncio import tqdm
from wordlift_client import SitemapImportsApi, SitemapImportRequest, EmbeddingRequest, ApiException
from wordlift_sdk.graphql import GraphQLClientFactory
from wordlift_sdk.utils import create_entity_patch_request
from wordlift_sdk.client import ClientConfigurationFactory

# Configuration is in the `config/default.py` file.
from config import default as config

logging.basicConfig(level=logging.WARN, force=True)
logger = logging.getLogger(__name__)

# Defining the host is optional and defaults to https://api.wordlift.io
# See configuration.py for a list of all supported configuration parameters.
api_url = 'https://api.wordlift.io'
output_type = config.OUTPUT_TYPE or 'http://schema.org/WebPage'
configuration = ClientConfigurationFactory(key=config.WORDLIFT_KEY).create()


async def kg():
    from gql import gql

    # Create a GraphQL client using the defined transport
    client = GraphQLClientFactory(key=config.WORDLIFT_KEY).create()

    # Define the GraphQL query
    query = gql("""
        query getEntities($type_constraint: String!) {
          entities(
            query: { typeConstraint: { in: [$type_constraint] } }
          ) {
            id: iri
            keywords: string(name: "schema:keywords")
            url: string(name: "schema:url")
          }
        }
    """)

    variables = {
        "type_constraint": output_type
    }

    # Asynchronous function to execute the query
    async with client as session:
        response = await session.execute(query, variable_values=variables)
        return pd.DataFrame(response['entities'], columns=['id', 'keywords', 'url'])


@retry(
    stop=stop_after_attempt(5),  # Retry up to 5 times
    wait=wait_fixed(2)  # Wait 2 seconds between retries
)
async def enrich(row: Series):
    async def fetch(session: ClientSession, url: str) -> str:
        async with session.get(url) as response:
            return await response.text()

    async def parse(html: str) -> [str]:
        soup = BeautifulSoup(html, 'html.parser')

        # Initialize an empty list to hold the combined results
        combined_content = []

        # Extract the 'mz:section' meta tag
        section_meta = soup.find('meta', attrs={'property': 'mz:section'})
        if section_meta and 'content' in section_meta.attrs:
            section_content = [item.strip() for item in section_meta['content'].split(',')]
            combined_content.extend(section_content)

        # Extract the 'mz:subsection' meta tag
        subsection_meta = soup.find('meta', attrs={'property': 'mz:subsection'})
        if subsection_meta and 'content' in subsection_meta.attrs:
            subsection_content = [item.strip() for item in subsection_meta['content'].split(',')]
            combined_content.extend(subsection_content)

        return combined_content

    async with aiohttp.ClientSession() as session:
        entity_url = row['url']
        entity_id = row['id']
        html = await fetch(session, entity_url)
        keywords = await parse(html)

        logger.info(f"Processing entity: {entity_id} for URL: {entity_url}")
        resource = URIRef(entity_id)

        payloads = []

        for value in keywords:
            payloads.append(
                create_entity_patch_request(
                    resource,
                    URIRef('http://schema.org/keywords'),
                    Literal(value)
                )
            )

        # If the payloads are empty, exit.
        if not payloads:
            return

        async with wordlift_client.ApiClient(configuration) as api_client:
            api_instance = wordlift_client.EntitiesApi(api_client)

            logger.info(f"Created {len(payloads)} patch requests for entity {entity_id}")

            try:
                logger.info(f"Sending patch request for entity {entity_id}")
                api_response = await api_instance.patch_entities(entity_id, payloads)
                logger.info(f"Successfully added attributes to entity {entity_id}")
                logger.debug(f"API response: {api_response}")
            except ApiException as e:
                logger.error(f"ApiException when calling EntitiesApi->patch_entities: {e}")
                logger.error(f"Response body: {e.body}")
                logger.error(f"Response headers: {e.headers}")
            except Exception as e:
                logger.error(f"Unexpected exception when calling EntitiesApi->patch_entities: {e}")

            logger.info(f"Finished processing entity: {entity_id}")


async def main():
    import asyncio
    from os import cpu_count

    def delayed(callback, concurrency=cpu_count() + 1):
        sem = asyncio.Semaphore(concurrency)

        async def callback_with_semaphore(row):
            async with sem:
                return await callback(row)

        return callback_with_semaphore

    @retry(
        stop=stop_after_attempt(5),
        wait=wait_fixed(2)
    )
    async def import_url(url_list: list[str]) -> None:
        import wordlift_client

        async with wordlift_client.ApiClient(configuration) as api_client:
            imports_api = SitemapImportsApi(api_client)
            request = SitemapImportRequest(
                embedding=EmbeddingRequest(
                    properties=["http://schema.org/headline", "http://schema.org/abstract", "http://schema.org/text"]
                ),
                output_types=[output_type],
                urls=url_list,
                overwrite=True,
                id_generator="headline-with-url-hash"
            )

            try:
                await imports_api.create_sitemap_import(sitemap_import_request=request)
            except Exception as e:
                logger.error("Error importing URLs: %s", e)

    sitemap_df = adv.sitemap_to_df(config.SITEMAP_URL)
    kg_df = await kg()
    missing_url_list = list(set(sitemap_df['loc']) - set(kg_df['url']))
    callback = delayed(import_url)
    await tqdm.gather(*[callback([url]) for url in missing_url_list], total=len(missing_url_list))

    no_keywords_df = kg_df[kg_df['keywords'].isna()]
    await tqdm.gather(
        *[delayed(enrich)(row) for index, row in no_keywords_df.iterrows()],
        total=len(no_keywords_df)
    )


await main()



100%|██████████| 1/1 [00:02<00:00,  2.73s/it]
 95%|█████████▌| 39/41 [00:17<00:00, 17.77it/s]