# Examples

## Fetch KODI data

In this example, we fetch data from the Kenya Open Data Initiative API.

In [2]:
from riko.modules.fetchdata import pipe
stream = pipe(conf={'url': 'opendata.go.ke/resource/i5bp-z9aq.json'})
next(stream)

{'district_name': 'Turkana',
 'number_of_poor_2005_06_': '481442',
 'poverty_rate_2005_06_': '94.3'}

## Summing

Now we will sum the total number of poor in Kenya

In [38]:
from riko.collections.sync import SyncPipe

sum_conf = {'sum_key': 'number_of_poor_2005_06_'}
stream = SyncPipe('fetchdata', conf={'url': url}).sum(conf=sum_conf).output
"{:,}".format(next(stream))

'15,139,820'

## Sorting

In [10]:
sort_conf = {'rule': {'sort_key': 'poverty_rate_2005_06_'}}
stream = SyncPipe('fetchdata', conf={'url': url}).sort(conf=sort_conf).output
next(stream)

{'district_name': 'Kajiado',
 'number_of_poor_2005_06_': '46578',
 'poverty_rate_2005_06_': '11.6'}

## Filtering and truncating

Now, we will filter the data for districts with more than 70% poverty and list the top 5.

In [9]:
sort_conf = {'rule': {'sort_key': 'poverty_rate_2005_06_', 'sort_dir': 'desc'}}
filter_conf = {'rule': {'field': 'poverty_rate_2005_06_', 'op': 'greater', 'value': 70}}
stream = (
    SyncPipe('fetchdata', conf={'url': url})
        .filter(conf=filter_conf)
        .sort(conf=sort_conf)
        .truncate(conf={'count': '5'})
        .output)

list(stream)

[{'district_name': 'Turkana',
  'number_of_poor_2005_06_': '481442',
  'poverty_rate_2005_06_': '94.3'},
 {'district_name': 'Marsabit',
  'number_of_poor_2005_06_': '118786',
  'poverty_rate_2005_06_': '91.7'},
 {'district_name': 'Mandera',
  'number_of_poor_2005_06_': '225812',
  'poverty_rate_2005_06_': '87.8'},
 {'district_name': 'Wajir',
  'number_of_poor_2005_06_': '301422',
  'poverty_rate_2005_06_': '84.0'},
 {'district_name': 'Tana River',
  'number_of_poor_2005_06_': '191856',
  'poverty_rate_2005_06_': '76.9'}]

## Joining

Now, we will join Kenya Primary School data to the poverty data.

In [2]:
from riko.modules import csv, fetchdata, join

poverty_url = 'opendata.go.ke/resource/i5bp-z9aq.json'
schools_url = 'opendata.go.ke/resource/ud2q-hvhq.json'
schools_url = 'file://schools.csv'
# next(csv.pipe(conf={'url': schools_url}))
next(fetchdata.pipe(conf={'url': poverty_url}))
# poverty_stream = fetchdata.pipe(conf={'url': poverty_url})
# schools_stream = fetchdata.pipe(conf={'url': schools_url})

# join_conf = {'join_key': 'x', 'other_join_key': 'y'}
# joined = pipe(items, conf=conf, other=other)
# joined_stream = join.pipe

{'district_name': 'Turkana',
 'number_of_poor_2005_06_': '481442',
 'poverty_rate_2005_06_': '94.3'}

## Parallel processing

An example using `riko`'s parallel API to spawn a `ThreadPool`. You can instead enable a `ProcessPool` by additionally passing `threads=False` to `SyncPipe`, i.e., `SyncPipe('fetch', conf={'url': url}, parallel=True, threads=False)`.

In [12]:
from riko.collections.sync import SyncPipe

### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}

### Create a parallel SyncPipe flow ###
#
# The following flow will:
#   1. fetch the hackernews RSS feed 
#   2. filter for items with '.com' in the article link
#   3. fetch the first comment from all items in parallel (using 4 workers)
#   4. flatten the result into one raw stream
#   5. extract the first item's content
#
# Note: no point in sorting after the filter since parallel fetching doesn't guarantee 
# order
flow = (
    SyncPipe('fetch', conf=fetch_conf, parallel=True, workers=4)  # 1
        .filter(conf={'rule': filter_rule})                       # 2
        .xpathfetchpage(conf=xpath_conf))                         # 3

stream = flow.output                                              # 4
next(stream)['content']                                           # 5

'He uses the following example for when to throw your own errors:'

## Asynchronous processing

To enable asynchronous processing, you must install the `async` module.

`pip install riko[async]`

An example using `riko`'s asynchronous API.

In [2]:
from riko.bado import coroutine, react
from riko.collections.async import AsyncPipe

### Set the pipe configurations ###
fetch_conf = {'url': 'https://news.ycombinator.com/rss'}
filter_rule = {'field': 'link', 'op': 'contains', 'value': '.com'}
xpath = '/html/body/center/table/tr[3]/td/table[2]/tr[1]/td/table/tr/td[3]/span/span'
xpath_conf = {'url': {'subkey': 'comments'}, 'xpath': xpath}

### Create an AsyncPipe flow ###
#
# The following flow will:
#   1. fetch the hackernews RSS feed
#   2. filter for items with '.com' in the article link
#   3. asynchronously fetch the first comment from each item (using 4 connections)
#   4. flatten the result into one raw stream
#   5. extract the first item's content
#
# Note: no point in sorting after the filter since async fetching doesn't guarantee 
# order
@coroutine
def run(reactor):
    stream = yield (
        AsyncPipe('fetch', conf=fetch_conf, connections=4)  # 1
            .filter(conf={'rule': filter_rule})             # 2
            .xpathfetchpage(conf=xpath_conf)                # 3
            .output)                                        # 4
    
    print(next(stream)['content'])                          # 5
    
try:
    react(run)
except SystemExit:
    pass

Here's how iteration works ():
