# Transform tabular data with composable workflows

txtai executes machine-learning workflows to transform data and build AI-powered semantic search applications. txtai has support for processing both unstructured and structured data. Structured or tabular data is grouped into rows and columns. This can be a spreadsheet, an API call that returns JSON or XML or even list of key-value pairs.

This notebook will walk through examples on how to use workflows with the tabular pipeline to transform and index structured data.


# Install dependencies

Install `txtai` and all dependencies. We will install the api, pipeline and workflow optional extras packages. 

In [61]:
%%capture
!pip install git+https://github.com/neuml/txtai#egg=txtai[api,pipeline,workflow]

# CSV Workflow

The first example will transform and index a CSV file. The [COVID-19 Open Research Dataset](https://allenai.org/data/cord-19) (CORD-19) is a repository of medical articles covering COVID-19. This workflow reads the input CSV and builds a semantic search index.

The first step is downloading the dataset locally.

In [78]:
%%capture
# Get CORD-19 metadata file
!wget https://ai2-semanticscholar-cord-19.s3-us-west-2.amazonaws.com/2021-11-01/metadata.csv
!head -1 metadata.csv > input.csv
!tail -10000 metadata.csv >> input.csv

The next section creates a simple workflow consisting of a tabular pipeline. The tabular pipeline builds a list of (id, text, tag) tuples that can be easily loaded into an Embeddings index. For this example, we'll use the `url` column as the id and the `title` column as the text column. The textcolumns parameter takes a list of columns to support indexing text content from multiple columns. 

The file input.csv is processed and the first 5 rows are shown.

In [63]:
from txtai.pipeline import Tabular
from txtai.workflow import Task, Workflow

# Create tabular instance mapping input.csv fields
tabular = Tabular("url", ["title"])

# Create workflow
workflow = Workflow([Task(tabular)])

# Print 5 rows of input.csv via workflow
list(workflow(["input.csv"]))[:5]

[('https://doi.org/10.1016/j.cmpb.2021.106469; https://www.ncbi.nlm.nih.gov/pubmed/34715516/',
  'Computer simulation of the dynamics of a spatial susceptible-infected-recovered epidemic model with time delays in transmission and treatment.',
  None),
 ('https://www.ncbi.nlm.nih.gov/pubmed/34232002/; https://doi.org/10.36849/jdd.5544',
  'Understanding the Potential Role of Abrocitinib in the Time of SARS-CoV-2',
  None),
 ('https://doi.org/10.1186/1471-2458-8-42; https://www.ncbi.nlm.nih.gov/pubmed/18234083/',
  "Can the concept of Health Promoting Schools help to improve students' health knowledge and practices to combat the challenge of communicable diseases: Case study in Hong Kong?",
  None),
 ('https://www.ncbi.nlm.nih.gov/pubmed/32983582/; https://www.sciencedirect.com/science/article/pii/S2095809920302514?v=s5; https://api.elsevier.com/content/article/pii/S2095809920302514; https://doi.org/10.1016/j.eng.2020.07.018',
  'Buying time for an effective epidemic response: The impact

Next, we take the workflow output, build an Embeddings index and run a search query.

In [64]:
from txtai.embeddings import Embeddings

# Embeddings with sentence-transformers backend
embeddings = Embeddings({"method": "transformers", "path": "sentence-transformers/paraphrase-mpnet-base-v2"})

# Index subset of CORD-19 data
data = list(workflow(["input.csv"]))
embeddings.index(data)

for uid, _ in embeddings.search("insulin"):
  title = [text for url, text, _ in data if url == uid][0]
  print(title, uid)

Importance of diabetes management during the COVID-19 pandemic. https://doi.org/10.1080/00325481.2021.1978704; https://www.ncbi.nlm.nih.gov/pubmed/34602003/
Position Statement on How to Manage Patients with Diabetes and COVID-19 https://www.ncbi.nlm.nih.gov/pubmed/33442169/; https://doi.org/10.15605/jafes.035.01.03
Successful blood glucose management of a severe COVID-19 patient with diabetes: A case report https://www.ncbi.nlm.nih.gov/pubmed/32590779/; https://doi.org/10.1097/md.0000000000020844


The example searched for the term `insulin`. The top results mention diabetes and blood glucose which are a closely associated terms for diabetes.

# JSON Service Workflow

The next example builds a workflow that runs a query against a remote URL, retrieves the results, then transforms and indexes the tabular data. This example gets the top results from the [Hacker News front page](https://news.ycombinator.com/). 

Below shows how to build the ServiceTask and prints the first JSON result. Details on how to configure the ServiceTask can be found in [txtai's documentation](https://neuml.github.io/txtai/workflows/).

In [65]:
from txtai.workflow import ServiceTask

service = ServiceTask(url="https://hn.algolia.com/api/v1/search", method="get", params={"tags": None}, batch=False, extract="hits")
workflow = Workflow([service])

list(workflow(["front_page"]))[:1]

[{'_highlightResult': {'author': {'matchLevel': 'none',
    'matchedWords': [],
    'value': 'withzombies'},
   'title': {'matchLevel': 'none',
    'matchedWords': [],
    'value': 'An opinionated guide on how to reverse engineer software'},
   'url': {'matchLevel': 'none',
    'matchedWords': [],
    'value': 'https://margin.re/media/an-opinionated-guide-on-how-to-reverse-engineer-software-part-1.aspx'}},
  '_tags': ['story', 'author_withzombies', 'story_29084716', 'front_page'],
  'author': 'withzombies',
  'comment_text': None,
  'created_at': '2021-11-02T17:43:47.000Z',
  'created_at_i': 1635875027,
  'num_comments': 25,
  'objectID': '29084716',
  'parent_id': None,
  'points': 254,
  'story_id': None,
  'story_text': None,
  'story_title': None,
  'story_url': None,
  'title': 'An opinionated guide on how to reverse engineer software',
  'url': 'https://margin.re/media/an-opinionated-guide-on-how-to-reverse-engineer-software-part-1.aspx'}]

Next we'll map the JSON data using the tabular pipeline. `url` will be used as the id column and `title` as the text to index.

In [66]:
from txtai.workflow import Task

# Recreate service applying the tabular pipeline to each result
service = ServiceTask(action=tabular, url="https://hn.algolia.com/api/v1/search", method="get", params={"tags": None}, batch=False, extract="hits")
workflow = Workflow([service])

list(workflow(["front_page"]))[:1]

[('https://margin.re/media/an-opinionated-guide-on-how-to-reverse-engineer-software-part-1.aspx',
  'An opinionated guide on how to reverse engineer software',
  None)]

As we did previously, let's build an Embeddings index and run a search query.

In [67]:
# Embeddings with sentence-transformers backend
embeddings = Embeddings({"method": "transformers", "path": "sentence-transformers/paraphrase-mpnet-base-v2"})

# Index Hacker News front page
data = list(workflow(["front_page"]))
embeddings.index(data)

for uid, _ in embeddings.search("cloud"):
  title = [text for url, text, _ in data if url == uid][0]
  print(title, uid)

How Temporal Workload Shifting Can Reduce Carbon Emissions in the Cloud https://arxiv.org/abs/2110.13234
Hetzner Cloud Goes US (Ashburn/VA) https://twitter.com/Hetzner_Online/status/1455837994259210242
Hetzner cloud servers are now available in Ashburn USA https://www.hetzner.com/news/11-21-usa-cloud


# XML Service workflow

txtai's ServiceTask can consume both JSON and XML. This example runs a query against the [arXiv API](https://arxiv.org/), transforms the results and indexes them for search.

Below shows how to build the ServiceTask and prints the first XML result.

In [68]:
service = ServiceTask(url="http://export.arxiv.org/api/query", method="get", params={"search_query": None, "max_results": 25}, batch=False, extract=["feed", "entry"])
workflow = Workflow([service])

list(workflow(["all:aliens"]))[:1]

[OrderedDict([('id', 'http://arxiv.org/abs/2102.01522v3'),
              ('updated', '2021-09-06T14:18:23Z'),
              ('published', '2021-02-01T18:27:12Z'),
              ('title',
               'If Loud Aliens Explain Human Earliness, Quiet Aliens Are Also Rare'),
              ('summary',
               "If life on Earth had to achieve n 'hard steps' to reach humanity's level,\nthen the chance of this event rose as time to the n-th power. Integrating this\nover habitable star formation and planet lifetime distributions predicts >99%\nof advanced life appears after today, unless n<3 and max planet duration\n<50Gyr. That is, we seem early. We offer this explanation: a deadline is set by\n'loud' aliens who are born according to a hard steps power law, expand at a\ncommon rate, change their volumes' appearances, and prevent advanced life like\nus from appearing in their volumes. 'Quiet' aliens, in contrast, are much\nharder to see. We fit this three-parameter model of loud aliens 

Next we'll map the XML data using the tabular pipeline. `id` will be used as the id column and `title` as the text to index.

In [69]:
from txtai.workflow import Task

# Create tablular pipeline with new mapping
tabular = Tabular("id", ["title"])

# Recreate service applying the tabular pipeline to each result
service = ServiceTask(action=tabular, url="http://export.arxiv.org/api/query", method="get", params={"search_query": None, "max_results": 25}, batch=False, extract=["feed", "entry"])
workflow = Workflow([service])

list(workflow(["all:aliens"]))[:1]

[('http://arxiv.org/abs/2102.01522v3',
  'If Loud Aliens Explain Human Earliness, Quiet Aliens Are Also Rare',
  None)]

As we did previously, let's build an Embeddings index and run a search query.

In [70]:
# Embeddings with sentence-transformers backend
embeddings = Embeddings({"method": "transformers", "path": "sentence-transformers/paraphrase-mpnet-base-v2"})

# Index Hacker News front page
data = list(workflow(["all:aliens"]))
embeddings.index(data)

for uid, _ in embeddings.search("alien radio signals"):
  title = [text for url, text, _ in data if url == uid][0]
  print(title, uid)

Calculating the probability of detecting radio signals from alien
  civilizations http://arxiv.org/abs/0707.0011v2
Field Trial of Alien Wavelengths on GARR Optical Network http://arxiv.org/abs/1805.04278v1
Aliens on Earth. Are reports of close encounters correct? http://arxiv.org/abs/1203.6805v2


# Build a workflow with no code!

The next example shows how one of the same workflows above can be constructed via API configuration. This is a no-code way to build a txtai indexing workflow!

In [71]:
%%writefile workflow.yml
# Index settings
writable: true
embeddings:
    path: sentence-transformers/nli-mpnet-base-v2

# Tabular pipeline
tabular:
  idcolumn: url
  textcolumns: 
    - title

# Workflow definitions
workflow:
  index:
    tasks:
      - task: service
        action: tabular
        url: https://hn.algolia.com/api/v1/search
        method: get
        params:
          tags: null
        batch: false
        extract: hits
      - action: upsert

Overwriting workflow.yml


This workflow connects to the Hacker News API, gets the data for the front page and indexes it. The workflow configures the same actions that were configured in Python previously. 

Let's start an API instance 

In [72]:
!killall -9 uvicorn
!CONFIG=workflow.yml nohup uvicorn "txtai.api:app" &> api.log &
!sleep 30
!cat api.log

INFO:     Started server process [911]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


Next we'll execute the workflow. txtai has API bindings for [JavaScript](https://github.com/neuml/txtai.js), [Java](https://github.com/neuml/txtai.java), [Rust](https://github.com/neuml/txtai.rs) and [Golang](https://github.com/neuml/txtai.go). But to keep things simple, we'll just run the commands via cURL. 

In [73]:
# Execute workflow via API call
!curl -X POST "http://localhost:8000/workflow" -H  "accept: application/json" -H  "Content-Type: application/json" -d "{\"name\":\"index\",\"elements\":[\"front_page\"]}"

[["https://margin.re/media/an-opinionated-guide-on-how-to-reverse-engineer-software-part-1.aspx","An opinionated guide on how to reverse engineer software",null],["https://jichu4n.com/posts/how-x-window-managers-work-and-how-to-write-one-part-i/","How X Window Managers Work, and How to Write One (2014)",null],["https://mattferraro.dev/posts/geometric-algebra","What is the inverse of a vector?",null],["https://www.hetzner.com/news/11-21-usa-cloud","Hetzner cloud servers are now available in Ashburn USA",null],["https://zigmonthly.org/letters/2021/october/","Zig monthly, October 2021: Games, gamedev, Elixir, tools and more",null],["https://twitter.com/Hetzner_Online/status/1455837994259210242","Hetzner Cloud Goes US (Ashburn/VA)",null],["https://zig.news/sobeston/using-zig-and-translate-c-to-understand-weird-c-code-4f8","Using Zig's translate-C to understand weird C code",null],["https://real-italian-coffee.herokuapp.com/home","Show HN: A generator of Fake Italian Coffee names",null],["h

The data is now indexed. Note that the index configuration has an `upsert` action. Each workflow call will insert new rows or update existing rows. This call could be scheduled with a system cron to execute periodically and build an index of top stories on Hacker News over time. 

Now that the index is ready, let's run a search.

In [74]:
# Run a search
!curl -X GET "http://localhost:8000/search?query=cloud&limit=3" -H  "accept: application/json"

[{"id":"https://twitter.com/Hetzner_Online/status/1455837994259210242","score":0.475553035736084},{"id":"https://arxiv.org/abs/2110.13234","score":0.3434651494026184},{"id":"https://olivierlacan.com/posts/high-fidelity-remote-communication/","score":0.2550020217895508}]

# Add a translation step to workflow

Next we'll recreate the workflow, adding one additional step, translating the text into French before indexing. This workflow gets the top results from Hacker News, translates the results and builds an semantic index of titles in French. 

In [75]:
%%writefile workflow.yml
# Index settings
writable: true
embeddings:
    path: sentence-transformers/nli-mpnet-base-v2

# Tabular pipeline
tabular:
  idcolumn: url
  textcolumns: 
    - title

# Translation pipeline
translation:

# Workflow definitions
workflow:
  index:
    tasks:
      - task: service
        action: tabular
        url: https://hn.algolia.com/api/v1/search
        method: get
        params:
          tags: null
        batch: false
        extract: hits
      - action: translation
        args: [fr]
      - action: upsert

Overwriting workflow.yml


In [76]:
!killall -9 uvicorn
!CONFIG=workflow.yml nohup uvicorn "txtai.api:app" &> api.log &
!sleep 30
!cat api.log

INFO:     Started server process [935]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


Same as before, we'll run the index workflow and a search

In [77]:
# Execute workflow via API call
!curl -s -X POST "http://localhost:8000/workflow" -H  "accept: application/json" -H  "Content-Type: application/json" -d "{\"name\":\"index\",\"elements\":[\"front_page\"]}" > /dev/null

# Run a search
!curl -X GET "http://localhost:8000/search?query=nuage&limit=3" -H  "accept: application/json"

[{"id":"https://arxiv.org/abs/2110.13234","score":0.46083423495292664},{"id":"https://twitter.com/Hetzner_Online/status/1455837994259210242","score":0.37807518243789673},{"id":"https://jichu4n.com/posts/how-x-window-managers-work-and-how-to-write-one-part-i/","score":0.36641401052474976}]

# Wrapping up

This notework demonstrated how to transform, index and search tabular data from a variety of sources. txtai offers maximum flexibility in building composable workflows to maximize the number of ways data can be indexed for semantic search. 