### Streaming with the `yield` statement

This tutorial was extracted from the book Elegant Scipy.

We're so used to the input-output model of computation that **streaming** is even hard to fathom the first time it's introduced. The idea of streaming is to **process some of the input data, return the processed chunk, then, while downstream functions are dealing with that chunk, the function receives a bit more, and so on...**. Hence the name of the stream, instead of loading a full batch of data, you take a *stream* of the data flow and process sequentially, thereby bypassing loading all of the dataset into memory in its multiple transformations. 

It turns out that python comes with a native function called `yield` that allows us to do the aforementioned streaming operations. `yield` enables a function to process just one "sip" of the data, pass the result to the next process and *let the chain of processing complete* for that one opiece of data before moving on to the next one. In this sense the function *yields* control to the next function, waiting to resume processing the data until all of the downstram steps have processed that data point. 

Here's one way to go about streaming : for every processing function that would normally take a list (or for that matter, any chunk of data) and transform that list, you can rewrite that function as taking a *stream* and *yielding* the result of every element of that stream. 

In [1]:
%load_ext blackcellmagic
import toolz as tz
from toolz import curried as c 
from glob import glob 
import itertools as it 
import numpy as np 
import holoviews as hv
from collections import Counter
import itertools

hv.extension('bokeh')

In [2]:
def log_all_standard(input): 
    output = []
    
    for elem in input: 
        output.append(np.log(elem))
    return output

def log_all_streaming(input_stream): 
    for elem in input_stream: 
        yield np.log(elem)

Let's check that we get the same result. 

In [3]:
np.random.seed(seed = 7)

arr = np.random.rand(1000) + 0.5

result_batch = sum(log_all_standard(arr))
print('Batch result: ', result_batch )

# This version will perform a running sum 
result_stream = sum(log_all_streaming(arr))
print('Streaming result: ', result_stream)

Batch result:  -48.2409194560661
Streaming result:  -48.2409194560661


In [4]:
data_path = '../../elegant_scipy/notebooks/data/'

The advantage of the streaming approach is that elements of a stream aren't processed until needed, whether it's for computing a running sum, or for writing out to disk, or something else. This can conserve a lot of memory when you have many input items, or when each item is very big, or both ! This quote drives the point home.

> In my brief experience people rarely take the streaming route. They use single-threaded in-memory Python until it breaks, and then seek out Big Data infrastructure like Spark at relatively high productivity overhead. 



This is indeed very true. However, in the medium size data regime (too big for your laptop's RAM memory, but still smaller than say than what your disk could handle.) In some cases, it can get you there even faster than the supercomputing approach, by eliminating the overhead of multi-core communication and random access to dbs.[See this post where Frank McSherry](http://www.frankmcsherry.org/graph/scalability/cost/2015/02/04/COST2.html) processes a 128 billion graph on his laptop *faster* than using a graph db on a supercomputer(!).

To clarify the flow of control when using streaming-style functions, it's useful to make verbose versions of the functions, which print out a message with each operation. 

In [215]:
def csv_line_to_array(line): 
    lst = [float(elem) for elem in line.rstrip().split(',')]
    
    return np.array(lst)

def tsv_line_to_array(line): 
    lst = [float(elem) for elem in line.rstrip().split('\t')]
    
    return np.array(lst)

def read_csv(filename): 
    print('starting read tsv')
    with open(filename) as fn: 
        for ix, line in enumerate(fn): 
            print(f'reading line {ix}')
            
            yield csv_line_to_array(line)
            print('finished readtsv')
            
            
def read_tsv(filename): 
    print('starting read tsv')
    with open(filename) as fn: 
        for ix, line in enumerate(fn): 
            print(f'reading line {ix}')
            
            yield tsv_line_to_array(line)
            print('finished readtsv')
            

def add1(arrays_iter): 
    print('starting adding 1')
    
    for ix, arr in enumerate(arrays_iter): 
        print(f'adding 1 to line {ix}')
        
        yield arr + 1

    print('finished adding 1')
    
def log(arrays_iter): 
    print('starting log')
    for ix, arr in enumerate(arrays_iter): 
        print(f'taking log of array {ix}')

        yield np.log(arr)

    print('finished log')

def running_mean(arrays_iter): 
    print('starting running mean ')

    for ix, arr in enumerate(arrays_iter): 
        if ix == 0: 
            mean = arr

        mean += (arr - mean) / (ix + 1)

        print(f'adding line {ix} to the running mean')

    print('returning mean')

    return mean 

In [216]:
fn  = data_path + 'expr.tsv'

print('Creating lines iterator')
lines = read_tsv(fn)

print('Creating loglines iterator')
loglines = log(add1(lines))

print('computing mean')
mean = running_mean(loglines)
print(f' the mean log-row is : {mean}')

Creating lines iterator
Creating loglines iterator
computing mean
starting running mean 
starting log
starting adding 1
starting read tsv
reading line 0
adding 1 to line 0
taking log of array 0
adding line 0 to the running mean
finished readtsv
reading line 1
adding 1 to line 1
taking log of array 1
adding line 1 to the running mean
finished readtsv
reading line 2
adding 1 to line 2
taking log of array 2
adding line 2 to the running mean
finished readtsv
reading line 3
adding 1 to line 3
taking log of array 3
adding line 3 to the running mean
finished readtsv
reading line 4
adding 1 to line 4
taking log of array 4
adding line 4 to the running mean
finished readtsv
finished adding 1
finished log
returning mean
 the mean log-row is : [3.11797294 2.48682887 2.19580049 2.36001866 2.70124539 2.64721531
 2.43704834 3.28539133 2.05363724 2.37151577 3.85450782 3.9488385
 2.46680157 2.36334423 3.18381635 2.64438124 2.62966516 2.84790568
 2.61691451 4.12513405]


* Notice that the file is read line by line. 

* Note that the computation is run when creating the lines and loglines iterators. This is because the iterators are *lazy*, meaning they are not evaluated (or consumed) until a result is needed. 

* When the computation is finally triggered, by the call to the `running_mean`it jumps back and forth between all of the functions, as various computations are performed on each line, before moving on to the next line. 

### Introducing the toolz streaming library. 

In this chapter's code example, contributed by Matt Rocklin, we create a Markov model from an entire fly genome in under 5 minutes on a laptop with a few lines of code. Matt's example uses a human genome, but apparently our laptos weren't quite so fast, so we're going to use a fly genome instead. Over the course of the chapter we'll actually augment it a little bit to start from compressed data. 

In [9]:
LDICT = dict(zip("ACGTacgt", range(8)))

#  Initialize pairs dictionary using itertools'
# cartesian product
PDICT = {(a, b): (LDICT[a], LDICT[b]) for a, b in it.product(LDICT, LDICT)}

In [10]:
def is_sequence(line): 
    return not line.startswith('>')

def is_nucleotide(letter): 
    return letter in LDICT # Ignore 'N's

@tz.curry
def increment_model(model, index): 
    model[index] += 1
    
def genome(file_pattern): 
    """
    Stream a genome, letter by letter, from a list of FASTA filenames.

    Note: 
    If there is only one dataset, then the first four operations could be changed.
    """ 

    return tz.pipe(
        file_pattern, 
        glob, 
        sorted, 
        c.map(open), 
        tz.concat,
        c.filter(is_sequence), 
        tz.concat, 
        c.filter(is_nucleotide)
    )


def markov(seq): 
    "Get a 1st order markov model from a sequence of nucleotides."

    model = np.zeros((8,8))

    tz.last(
        tz.pipe(
            seq, 
            c.sliding_window(2), # Get succesive tuple
            c.map(PDICT.__getitem__), # Location in matrix of tuple
            c.map(increment_model(model)) # Increment matrix )
        )
    )
    
    # Convert counts to transition matrix 
    model /= np.sum(model, axis =1)[: , np.newaxis]

    return model 

With our functions in place, we use `c.take()` on the first 10 million bases, 
the take step can just be removed if you have 5-10 minutes. 


In [11]:
%%time

dm = data_path + 'dm6.fa'
model = tz.pipe(dm, genome, c.take(10**7), markov)

2.24 ms ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)




Notice there are three levels of abstraction for the streaming procedure.  

1. Streaming reading the genome. 

2. Streaming calculating the markov model. 

3. Bringing it all together in a final pipe : 
`filename -> genome -> markov = transition matrix`

In [34]:
hv.Image(model).opts(xaxis= None, yaxis = None, title = 'Fly genome transition matrix')

There is a lot going on in the example so, we're going to unpack it bit by bit. The first thing to note is the many functions from the [`toolz`](https://toolz.readthedocs.io/en/latest/) library. For example, from Toolz we've used `pipe`, `sliding_window`, `frequencies` and a curried version of `map` (more on this later). That is because tools is written specifically to take advantage of Python's iterators and easily manipulate streams. 

Let's start with pipe. This function is simply synctactic sugar to make nested functions calls easier to read. This is important because that pattern becomes increasinglt common when dealing with iterators. 

As a simple example, let's rewrite our running mean using `pipe`. 


In [17]:
filename = data_path + 'expr.tsv'

mean = tz.pipe(
    filename, 
    read_tsv,
    add1, 
    log, 
    running_mean
)

# This is equivalent to nesting functions like : 
# running_mean(log(add1(readtsv(filename))))

starting running mean 
starting log
starting adding 1
starting read tsv
reading line 0
adding 1 to line 0
taking log of array 0
adding line 0 to the running mean
finished readtsv
reading line 1
adding 1 to line 1
taking log of array 1
adding line 1 to the running mean
finished readtsv
reading line 2
adding 1 to line 2
taking log of array 2
adding line 2 to the running mean
finished readtsv
reading line 3
adding 1 to line 3
taking log of array 3
adding line 3 to the running mean
finished readtsv
reading line 4
adding 1 to line 4
taking log of array 4
adding line 4 to the running mean
finished readtsv
finished adding 1
finished log
returning mean


In [18]:
mean

array([3.11797294, 2.48682887, 2.19580049, 2.36001866, 2.70124539,
       2.64721531, 2.43704834, 3.28539133, 2.05363724, 2.37151577,
       3.85450782, 3.9488385 , 2.46680157, 2.36334423, 3.18381635,
       2.64438124, 2.62966516, 2.84790568, 2.61691451, 4.12513405])

What was originally mutiple lines, or an unwieldy mess of parentheses, 
is now a clean description of the sequential transforms of the input data. 
Much easier to understand ! 

This strategy also has an advantage over the original NumPy implementation: if we scale our data to millions or billions of rows, our computer might struggle to hold all the data in memory. In contrast, here we are only loading lines from disk one at a time, and mantaining only a single line's worth of data. 

### k-mer counting and error correction

Let's continue our conversation by building functions for extracting and counting kmers. Let's see how these functions would be written using pure python. 

In [24]:
Counter(['dog', 'dog', 'cat', 'petit', 'petit'])

Counter({'dog': 2, 'cat': 1, 'petit': 2})

In [35]:
def is_sequence(line): 
    """
    Helper function to work with FASTA files. 
    Returns True if the line in a FASTA file is a sequence
    and not a header or empty line. 
    """
    line = line.rstrip() # Remove '\n' at the end of line. 
    return len(line) > 0 and not line.startswith('>')

def reads_to_kmers(reads_iter, k = 7): 
    """
    Builds a generator for kmers given an iterable of sequences. 
    """
    
    for read in reads_iter: 
        for start in range(0, len(read)- k): 
            yield read[start: start + k ] 

def kmer_counter(kmer_iter): 
    counts = Counter(kmer_iter)
    return counts

def kmer_counter_native_py(kmer_iter): 
    counts = {}
    for kmer in kmer_iter: 
        if kmer not in counts: 
            counts[kmer] = 0
        else: 
            counts[kmer]+= 1
            
    return counts

In [36]:
%%time
with open(data_path + 'sample.fasta') as fn :
    reads = filter(is_sequence, fn)
    kmers = reads_to_kmers(reads)
    counts = kmer_counter(kmers)

CPU times: user 185 ms, sys: 3.49 ms, total: 188 ms
Wall time: 191 ms


In [37]:
%%time
with open(data_path + 'sample.fasta') as fn :
    reads = filter(is_sequence, fn)
    kmers = reads_to_kmers(reads)
    counts = kmer_counter_native_py(kmers)

CPU times: user 250 ms, sys: 4.33 ms, total: 254 ms
Wall time: 252 ms


We can see that the `Counter` function actually speeds up the procedure a bit so let's
stick with that. 

This shows that our for kmer counting work and is streaming. So reads are loaded from disk
one at a time and piped through the k-er converter and into the counter. 
We can now plot a histogram of the counts, and confirm that there 
are two well-separated populations of correct and erroneous k-mers: 


In [77]:
def integer_histogram(counts, normed = True, 
                     xlim = [], ylim = [], *args, **kwargs): 
    
    hist = np.bincount(counts)
    
    if normed : 
        hist = hist / np.sum(hist)
        
    histogram = hv.Curve((np.arange(hist.size), hist)).opts(
        padding = 0.1, 
        xlabel = 'counts', 
        ylabel = 'frequency', 
        #logx = True, 
        #logy = True
    )
    
    return histogram

In [42]:
counts_arr = np.fromiter(counts.values(), dtype=int, count=len(counts))

In [43]:
integer_histogram(counts_arr)



Notice the nice distribution of k-mer frequencies, along with a big bump of kmers at the left of the plot that appear only once. Such low frequency k-mers are likely to be errors. 

Let's quickly filter the kmers that have less than two counts. 

In [None]:
filtered_counts = dict()

for val in counts.keys(): 
    if counts[val] > 2: 
        filtered_counts[val] = int(counts[val])
    else: 
        pass

Let's see how many counts we're left with. 

In [79]:
len(filtered_counts)

16385

In [80]:
# And the original ones. 
len(counts)

16466

We can see that quite astoundingly (or not), after filtering, we get close to $4^7$ = 16,348 possible sequences. This confirms that in fact the kmers that had 2 counts or less were probably coming from unknown nucleotides, most likely sequencing errors. Let's plot the filtered distribution. 

In [82]:
counts_arr_afilt = np.fromiter(filtered_counts.values(), dtype=int)

integer_histogram(counts_arr_afilt)

But with that code above, we're actually doing a bit too much work. A lot of the functionality we wrote in for loops and yields is actually stream manipulation: transforming a stream of data into a different kind of data, and accumulating it at the end. Toolz has a lot of stream manipulation primitives. 

For example, the sliding window function is exactly what we need to make the k-mers. 

In [83]:
print(tz.sliding_window.__doc__)

 A sequence of overlapping subsequences

    >>> list(sliding_window(2, [1, 2, 3, 4]))
    [(1, 2), (2, 3), (3, 4)]

    This function creates a sliding window suitable for transformations like
    sliding means / smoothing

    >>> mean = lambda seq: float(sum(seq)) / len(seq)
    >>> list(map(mean, sliding_window(2, [1, 2, 3, 4])))
    [1.5, 2.5, 3.5]
    


Additionally, the frequencies function counts the appearance of individual items 
in a data stream. Together with pipe, we can count k-mers in a single function call:

In [89]:
k = 4

counts = tz.pipe(
    data_path + 'sample.fasta',
    open, 
    c.filter(is_sequence), 
    c.map(str.rstrip), 
    c.map(c.sliding_window(k)), 
    tz.concat, 
    c.map(''.join), 
    tz.frequencies
)

In [92]:
len(counts)

303

But, what are all those `c.function` call from `toolz.curried`? 

### Currying: the spice of streaming 

Earlier, we briefly used a *curried* version of the `map` function, which applies a given 
function element-wise. Now that we've mixed a few more curried calls in there, it's time to share what it means ! Currying is not named after the spice blend (though it does spice up code !). It is named for Haskell Curry, the mathemetaician who invented the concept. Haskell Curry is also the namesake of the Haskell programming language -- in which *all* functions are curried ! 

"Currying" means *partially* evaluating a function and returning another, "smaller" function.Normally in python you give a function all of its required arguments and expect an output. In contrast, a curried function can just take *some* of the arguments. **If the curried function doesn't get enough args it returns a new function that takes the leftover arguments. Once that second function is called with the remaining args, it can perform the original task.** Another word for currying is partial evaluation. In functional programming, currying is a way to produce a function that can wait for the rest of the arguments to show up lateraterateraterater.

So while the function call `map(np.log, numbers_list)` applies the `np.log` function element-wise and returns a sequence of logged numbers, the call `toolz.curried.map(np.log)` *returns a function* that takes in a sequence of numbers that then returns a sequence of logged numbers. 

It turns out that having a function that already knows about some of the arguments is perfect for streaming ! We've seen a hint of how powerful currying and pipes can be together in the above snippet. 

But currying can be a bit of a mind-bend when you first start, so we'll try it with some simple examples to demonstrate hwo it works. Let's start by writing a simple, non-curried function : 

In [93]:
def add(a,b): 
    return a + b 

add(2,5)

7

Now, let's write a similar function which we curry manually. 


In [98]:
def add_curried(a, b = None):
    if b is None: 
        # Second arg not given, so return a function 
        def add_partial(b): 
            return add(a,b)
        return add_partial
    else: 
        return add(a, b)

In [105]:
# add_5 is now our new function 
add_5 = add_curried(5)

In [107]:
add_5(2)

7

Despite that the above code works, it's a bit hard to read. Future you will probably have trouble remembering how we wrote that code. Luckily, Toolz has a decorator that automatically *curries* a function. 

In [109]:
@tz.curry
def add_curried(x,y):
    return x+y

add_partial  = add_curried(2)
add_partial(5)

7

In brief, `add_curried` is now a curried function, so it can take one of the arguments and return another function, `add_partial` which "remembers" the first argument. 

In fact, all of the `toolz` functions are also available as curried functions in the `toolz.curried` namespace. Toolz also includes curried version of some handy higher order python functions like `map`, `filter` and `reduce`. We will import the `curried` namespace as `c` so our code doesn't get too cluttered. So for our example the curried version of `map` will be `c.map`. Note, that the curried functions (e.g. `c.map`) are different from the `@curry` decorator which is used to create a curried function. 



As a reminder, `map` is a built-in python function : 

In [111]:
print(map.__doc__)

map(func, *iterables) --> map object

Make an iterator that computes the function using arguments from
each of the iterables.  Stops when the shortest iterable is exhausted.


A curried version of `map` is particularly handy whem working in a Toolz pipe. You can just pass a function to `c.map` and then stream in the iterator later using `tz.pipe`. Take another look at our function for reading in the genome to see how this works in practice. 

In [112]:
def genome(file_pattern): 
    """
    Stream a genome, letter by letter, from a list of FASTA filenames. 
    
    Params 
    ------
    file_pattern (str)
        A glob-able file pattern. 
        
    Returns 
    -------
    genome_ (tz.pipe)
        A string representing a genome sequence. This is designed to be a 
        module of a larger pipeline. 
    """
    
    genome_ = tz.pipe(
        file_pattern, 
        glob,
        sorted,
        tz.concat, # concat lines from all files
        c.filter(is_sequence), 
        tz.concat, #concat characters from all lines
        c.filter(is_nucleotide) # discard newlines and 'N's
    )

> #### Tips for working with streams 
> * Convert "list of lists" to "long list" with `tz.concat`
> * Don't get caught out: 
    > * Iterators get consumed. So if you make a generator object and some processing on it, and then a later step fails, you need to re-create the generator. The original is already gone. 
    >* Iterators are lazy. You need to force evaluation sometimes. 
>* When you have lots of funcitions in a pipe, it's sometimes hard to figure out where things go wrong. Take a small stream and add functions to your pipe one by one. You can also insert `c.map(c.do(print))` at any point in a stream to print each element while it streams through. 

In [114]:
c.do?

#### Exercise 

The scikit-learn ilbrary has an IncrementalPCA class, which allows you to run PCA on a dataset withour loading the whole data into memory. But you need to chunk your data yourself, which makes the code a bit awkward to use. Make a function that takes a stream of data samples and perform PCA. Then use the function to compute the PCA of the `iris` dataset which is at `data/iris.csv`. Optionally, you can color the points using the species number found in `data/iris-target.csv`.

*Hint:* Look at the `toolz.curried.partition` function to create a stream of batcges from a stream of data points. 

In [115]:
from sklearn.decomposition import IncrementalPCA as iPCA

All right, for our function we will need a trainer and a transformer. Let's write down the logic steps of our code 

#### Trainer 

* Initialize a stream using the iris dataset line by line from the `iris.csv` file. 
* Use `c.partition` to make a minibatch of data. Our function will take a `batch_size` argument to control how many lines of the dataset will be processed. 
* Train the iPCA model using this minibatch of data. 
* Return the trained iPCA model. 

#### Transformer. 

* Initialize a stream using the iris dataset line by line from the `iris.csv` file. 
* Use `c.partition` to make a minibatch of data.
* Transform that minibatch of data. 
* Concatenate the minibatches to get the full dataset. 

Let's see what the `c.partition` function does. 

In [119]:
print(c.partition.__doc__)

 Partition sequence into tuples of length n

    >>> list(partition(2, [1, 2, 3, 4]))
    [(1, 2), (3, 4)]

    If the length of ``seq`` is not evenly divisible by ``n``, the final tuple
    is dropped if ``pad`` is not specified, or filled to length ``n`` by pad:

    >>> list(partition(2, [1, 2, 3, 4, 5]))
    [(1, 2), (3, 4)]

    >>> list(partition(2, [1, 2, 3, 4, 5], pad=None))
    [(1, 2), (3, 4), (5, None)]

    See Also:
        partition_all
    


Super cool ! Given an input list and a length $n$, it returns a list of tuples of size $n$. It can also pad the sequences. This is just what we need. We want to take in our stream of arrays as a list, and then partition it into tuples. This tuples will represent our minibatches. The last thing we want to do is to make a general function that allow us to make a generator object from a .txt file. We can modify the functions we had from the beginning of the tutorial. 

In [244]:
def line_to_array(line, delim = ','): 
    """
    Returns a numpy array from a line of a text file separated by a delimiter. 
    
    Params 
    ------
    line (str)
        Line from a text file generated from an iterator using 
        the built-in open() function.
        
    delim (str, default = ',')
        Delimiter of the input txt file. Examples are tabs '\t' and commas ','. 
        
        
    Returns
    -------
    arr (array-like)
        Numpy array version of the input line.
    """
    arr = np.array([float(elem) for elem in line.rstrip().split(delim)])
    
    return np.array(arr)

def stream_file(filename, delim = ','): 
    
    """
    
    Params 
    ------
    filename (str)
        Path to input file. 
    
    delim (str, default = ',')
        Delimiter of the input txt file. Examples are tabs '\t' and commas ','. 
        
    Returns 
    -------
    
    File generator. 
    """
    print('Starting reading file ... \n')
    with open(filename) as fn: 
        for ix, line in enumerate(fn): 
            
            yield line_to_array(line, delim = delim)
        
        print('Finished reading file...')

In [252]:
def streaming_ipca_train(fname, n_components = 2, batch_size= 128): 
    """
    Initializes and trains an IncrementalPCA object using streaming. 
    This is a wrapper to stream and train using the toolz library. 
    
    Params
    ------
    fname
    
    n_components(int, default = 2)
        Number of components in the PCA object. 
        
    batch_size (int, default = 128)
        Number of lines to stream per batch. 
    
    Returns 
    -------
    ipca_ (sklearn.decomposition.IncrementalPCA)
        Trained PCA object. 
    """
    
    ipca_ = iPCA(n_components, batch_size = batch_size)
    
    tz.pipe(
        fname, 
        stream_file, # Generator of 1d arrays
        c.partition(batch_size),# Iterator of tuples of 1-d arrays
        c.map(np.array), # iterator of 2d arrays
        c.map(ipca_.partial_fit),
        tz.last
    )
    
    print('Finished training !' )
    return ipca_

In [253]:
%%time
pca_obj = streaming_ipca_train(data_path + 'iris.csv')

Starting reading file ... 

Finished reading file...
Finished training !
CPU times: user 7.07 ms, sys: 2.93 ms, total: 9.99 ms
Wall time: 4.96 ms


Wow- that was fast ! The function takes only 5ms to train a PCA object on the Iris dataset..  

In [254]:
def streaming_ipca_transform(fname, ipca_object): 
    """
    """
    transformed = tz.pipe(
        fname, 
        stream_file, # Generator of 1d arrays
        c.partition(ipca_object.batch_size), # Iterator of tuples of 1d arrays 
        c.map(np.array), # Iterator of 2D arrays
        c.map(ipca_object.transform), # Iterator of (batch_size, n_components) arrays
        np.vstack 
    )
    
    return transformed

In [257]:
%%time
transformed = streaming_ipca_transform(data_path + 'iris.csv', pca_obj)

Starting reading file ... 

Finished reading file...
CPU times: user 4.24 ms, sys: 3.48 ms, total: 7.72 ms
Wall time: 4.76 ms


  data = func(data)


Just another 5 ms to transform the dataset ! 

In [258]:
# Check the output shape is 2 
transformed.shape[1] == 2

True

In [264]:
# Take a look 
hv.Points((transformed)).opts(padding = 0.1, size = 8, alpha = 0.4, xlabel = 'PC1', ylabel= 'PC2')