In [2]:
from chunkyp import pipe, ppipe, p
import copy
import ray

# chunkyp

Below is a small example of what chunkyp does. Note:
* chunkyp processes **records** (lists of, or iterators across, dictionaries)
* records do not have to follow a consistent schema (see the third dictionary in the example below) 
* chunkyp can run as a single process with **pipe**; it can also be parallelized across cores, and even distributed across machines, with **ppipe**
* pipes and ppipes take chains of **p** functions, these are described in a below section

In [3]:
records = [
    {'headline': 'Web ads for junk food could be banned in the UK', 'source': 'Guardian', 'nwords':11},
    {'headline': 'The Olympics will be delayed', 'source': 'Guardian', 'nwords':5},
    {'date': '2020-07-28', 'headline': 'Usability of Footnotes', 'source': 'https://news.ycombinator.com/item?id=23964200', 'nwords':10} # different schema from the rest
]

In [4]:
res = pipe(
    copy.deepcopy(records), # we pass a copy of records because we will reuse it later
    p('headline', lambda x: x.lower()),
    p('headline', lambda x: x.upper(), 'lowercased_headline'),
    p(['headline', 'nwords'], lambda x,y: len(x.split()) == y, 'nwords_correct'),
)

res = list(res)
res

[{'headline': 'web ads for junk food could be banned in the uk',
  'source': 'Guardian',
  'nwords': 11,
  'lowercased_headline': 'WEB ADS FOR JUNK FOOD COULD BE BANNED IN THE UK',
  'nwords_correct': True},
 {'headline': 'the olympics will be delayed',
  'source': 'Guardian',
  'nwords': 5,
  'lowercased_headline': 'THE OLYMPICS WILL BE DELAYED',
  'nwords_correct': True},
 {'date': '2020-07-28',
  'headline': 'usability of footnotes',
  'source': 'https://news.ycombinator.com/item?id=23964200',
  'nwords': 10,
  'lowercased_headline': 'USABILITY OF FOOTNOTES',
  'nwords_correct': False}]

## p functions
These are functions which wrap existing functions and point them to:
* the input field, or list of fields, in a record to process
* the output field, or list of fields, in a record to write, or overwrite, the results to

Here's an example:

In [5]:
def remove_words(text: str, words_to_remove: set) -> str:
    word_list = text.split()
    word_list = [w for w in word_list if w not in words_to_remove]
    return ' '.join(word_list)

res = pipe(
    copy.deepcopy(records),
    p('headline', lambda x: x.lower()), # no output field is provided - overwrite the 'headline' field
    p('headline', lambda x: x.upper(), 'clean_headline'), # take the headline field, uppercase it, and store in the new 'clean_headline' field
    p(['headline', 'nwords'], lambda x,y: len(x.split()) == y, 'nwords_correct'), # take two inputs and store the results in a new field called 'nwords_correct'
    p(['headline', 'nwords'], lambda x,y: (len(x), len(x)/int(y) if int(y) > 0 else 0), ['nchars', 'chars_to_words']), # take two inputs and produce two outputs
    p('clean_headline', remove_words, words_to_remove={'THE', 'WILL', 'FOR', 'OF'}) # you can pass additional arguments to the wrapped function by passing them as kwargs to the p function
)

res = list(res)
res

[{'headline': 'web ads for junk food could be banned in the uk',
  'source': 'Guardian',
  'nwords': 11,
  'clean_headline': 'WEB ADS JUNK FOOD COULD BE BANNED IN UK',
  'nwords_correct': True,
  'nchars': 47,
  'chars_to_words': 4.2727272727272725},
 {'headline': 'the olympics will be delayed',
  'source': 'Guardian',
  'nwords': 5,
  'clean_headline': 'OLYMPICS BE DELAYED',
  'nwords_correct': True,
  'nchars': 28,
  'chars_to_words': 5.6},
 {'date': '2020-07-28',
  'headline': 'usability of footnotes',
  'source': 'https://news.ycombinator.com/item?id=23964200',
  'nwords': 10,
  'clean_headline': 'USABILITY FOOTNOTES',
  'nwords_correct': False,
  'nchars': 22,
  'chars_to_words': 2.2}]

## pipes and ppipes

* **pipes** pass records through p functions on a single core
* **ppipes** pass records through p functions on multiple cores using [**ray**](https://github.com/ray-project/ray). In essence, ppipes just launch multiple pipes. 

We focus on **ppipes** below since the we've already seen two examples of **pipe**. 

Note:
* ppipe syntax is the same as pipe apart from two additional keyword arguments:
    * chunksize: the number of records to read into memory and then send off to the parallel pipes - if chunksize > len(records) then deafault to len(records)
    * n_pipes: how many parallel pipes to launch (Default: #logical_cores - 1)
* ray needs to be initialized before any ppipe is unwound
* p functions can handle ray object store references passed to it (see next example)

In [10]:
ray.shutdown()

res = ppipe(
    copy.deepcopy(records), # we pass a copy of records because we will reuse it later
    p('headline', lambda x: x.lower()),
    p('headline', lambda x: x.upper(), 'clean_headline'),
    p(['headline', 'nwords'], lambda x,y: len(x.split()) == y, 'nwords_correct'),
    chunksize=20
)

ray.init()

res = list(res)
ray.shutdown()
res

2020-08-07 02:28:18,934	INFO resource_spec.py:212 -- Starting Ray with 28.12 GiB memory available for workers and up to 14.07 GiB for objects. You can adjust these settings with ray.init(memory=<bytes>, object_store_memory=<bytes>).
2020-08-07 02:28:19,297	INFO services.py:1165 -- View the Ray dashboard at [1m[32mlocalhost:8265[39m[22m
INFO:root:setting batch_size to 3


[{'headline': 'web ads for junk food could be banned in the uk',
  'source': 'Guardian',
  'nwords': 11,
  'clean_headline': 'WEB ADS FOR JUNK FOOD COULD BE BANNED IN THE UK',
  'nwords_correct': True},
 {'headline': 'the olympics will be delayed',
  'source': 'Guardian',
  'nwords': 5,
  'clean_headline': 'THE OLYMPICS WILL BE DELAYED',
  'nwords_correct': True},
 {'date': '2020-07-28',
  'headline': 'usability of footnotes',
  'source': 'https://news.ycombinator.com/item?id=23964200',
  'nwords': 10,
  'clean_headline': 'USABILITY OF FOOTNOTES',
  'nwords_correct': False}]

ppipe with ray object store:

In [None]:
ray.shutdown()
ray.init()

words_to_remove={'THE', 'WILL', 'FOR', 'OF'}

w2r = ray.put(words_to_remove)

res = ppipe(
    copy.deepcopy(records), # we pass a copy of records because we will reuse it later
    p('headline', lambda x: x.lower()),
    p('headline', lambda x: x.upper(), 'clean_headline'),
    p(['headline', 'nwords'], lambda x,y: len(x.split()) == y, 'nwords_correct'),
    p('clean_headline', remove_words, words_to_remove=w2r),
    batch_size=40
)

res = list(res)
res