# Intake: a prototype API for "data virtualization"

https://github.com/ContinuumIO/intake

Access to data has always been a strong suit of Python: many sources of data, whether on-disk file formats or remote databases, have Python libraries (of varying degrees of support, to be honest) to load them.  What has been missing is a consistent interface to these libraries that is suitable for data analytics.

The Python DB API interface is fine for SQL-like transactional databases, where iterating over query result elements is desirable.  However, the DB API interface is far too slow for bulk loading of data, which has lead to specific projects (like the old IOPro) to speed up loading data into efficient containers, like NumPy arrays and Pandas dataframes, with a variety of inconsistent interfaces.

"Intake" is a placeholder name for a project that will address a specific set of read-only usecases that we've seen many times:
 * Users want to quickly get data from a data source into memory in a familiar container.
 * Users want to be able to use a "catalog" to discover and load pre-defined data sources with a consistent interface, regardless of the underlying data storage system.
 * Users want to be able to efficiently load data from many sources into distributed computation systems like Dask.
 
Toward this goal, this notebook demostrates a very crude prototype of this functionality to get a sense for how it "feels" to use a system like this.

## Anti-Goals

Intake is **not**:
 * A complete API for interacting with any data store:  Intake is designed to facilitate bulk loading of data into memory and the creation of data catalogs exclusively.
 * A replacement for Blaze: Intake could be thought of as `[Blaze] - [Blaze Expressions] + [Dask I/O use cases]`
 * A tool for writing to data stores
 * "One I/O project to rule them all": The plugin system encourages Intake compatibility code for data sources to exist outside of Intake as separate installable projects, and/or eventually parts of the Python data source libraries themselves.

In [1]:
import intake

## Plugins

Intake maintains a registry of reader plugins that support the data loader interface.

In [2]:
intake.registry

{'csv': <intake.plugins.csv.Plugin at 0x10b1ee400>,
 'hdf5': <intake.plugins.hdf5.Plugin at 0x10b1ee4a8>,
 'postgres': <intake.plugins.postgres.Plugin at 0x10b1ee550>}

These plugins delegate the actual work to existing libraries, and only function as a shim to provide a consistent interface.  Here is a "short" list of plugins that could be easily made, or we need to deliver for customers:
 * `csv`
 * `hdf5`
 * `parquet`
 * `avro`
 * `arrow`
 * `netflow` (Cisco record format)
 * `pcap` (package capture format produced by tcpdump, etc)
 * `postgres`
 * `mongo`
 * `accumulo`
 * `odbc`
 * `elasticsearch`
 * `solr`
 * `splunk`
 * `hbase`

Plugins can be accessed through the registry:

In [3]:
intake.registry['csv'].open('data/trip_data_0.csv')

<intake.plugins.csv.CSVSource at 0x113da5b38>

but for convenience, `intake` automatically creates a shortcut to the `open()` method on each plugin called `open_[plugin_name]()`:

In [4]:
intake.open_csv('data/trip_data_0.csv')

<intake.plugins.csv.CSVSource at 0x113da5588>

## Interacting with data sources

Every plugin provides an `open` method that returns a `DataSource` object, representing a "lazy" reference to the data source.  The object will synchronously open files, initiate database connection, inspect data types, but will not load data into memory until asked.

The `DataSource` object that is returned by `open()` provides a simple interface for getting the data into memory using a standard container.  The initially supported containers are likely to be:
 
 * `dataframe`: A pandas dataframe.  This is the preferred form for tabular data.
 * `ndarray`: A Numpy array.  This form is used for multidimensional data.
 * `list`: A list of JSON-like data structures.  This form is used for unstructured/semi-structured data.
 
A given `DataSource` will chose one of these containers to return (see below) as there is usually one preferable choice depending on the structure of the data itself.

The attributes of `DataSource` describe the data that will be available before it is loaded (if possible):

 * `datashape`: (TBD) description of data using datashape
 * `dtype`: NumPy dtype, if applicable, or `None`
 * `shape`: NumPy-style shape tuple, if applicable, or `None`.  DataFrames and lists are considered 1D for the purposes of this attribute.  Specific dimensions that are unknown until read time should have a `None` placeholder.
 * `container`: String describing the container format that will be returned: `dataframe`, `ndarray`, or `list`.
 * `get_chunks_supported`: True/False flag indicating whether the `get_chunks()` method can return more than one chunk.

The `DataSource` object also has two methods:

 * `read()`: Load all the data into memory at once and return the appropriate container.
 * `read_chunks(chunksize)`: Returns an iterator over data chunks of approximately `chunksize`.
 * `get_chunks(chunksize)`: Returns a list of data sources that retrieve individual chunks (as `read_chunks()` would).  If this chunked sources cannot be created (for example, a database query), then a list of length 1 is returned with a `DataSource` equivalent to `self`.
 
Although `DataSource` objects may have open file handles and socket connections, they need to be pickleable for transfer using Dask.  This means that they typically need to implement their own `__setstate__` method to record only pickleable attributes and a `__getstate__` method that can reopen file and network connections.

### `hdf5` - wrapper around h5py

In [5]:
hdf5_reader = intake.open_hdf5('data/images.hdf5', dataset='AER')
print(hdf5_reader.shape, hdf5_reader.dtype)

(194, 311, 223, 3) uint8


In [6]:
it = hdf5_reader.read_chunks(chunksize=10)
chunk = next(it)
print(chunk.shape)
chunk[:2,:2,:3] # a small slice of the chunk

(10, 311, 223, 3)


array([[[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],

        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]]],


       [[[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]],

        [[0, 0, 0],
         [0, 0, 0],
         [0, 0, 0]]]], dtype=uint8)

### `csv` - wrapper around `pandas.read_csv`

In [7]:
csv_reader = intake.open_csv('data/trip_data_*.csv')

In [8]:
df = csv_reader.read()
print(len(df))
df.head()

399998


Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


In [9]:
for chunk in csv_reader.read_chunks(chunksize=70000):
    print('Chunk, size =', len(chunk))

Chunk, size = 70000
Chunk, size = 70000
Chunk, size = 59999
Chunk, size = 70000
Chunk, size = 70000
Chunk, size = 59999


### `postgres` - wrapper around PostgresAdapter

In [10]:
## TBD

### Serialization and get_chunks()

To demonstrate the serializability of `DataSource` objects, lets save one to disk and reload it:

In [11]:
import pickle

with open('csv_reader.pkl', 'wb') as f:
    pickle.dump(csv_reader, f, protocol=pickle.HIGHEST_PROTOCOL)

In [12]:
with open('csv_reader.pkl', 'rb') as f:
    new_reader = pickle.load(f)
    
new_reader.read().head()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


One reason we want this feature is so that we can combine it with `get_chunks()`.  This method attempts to returns a list of `DataSource` objects representing consecutive segments of data covering the original source.  Like `read_chunks()`, the `chunksize` is a suggestion, and actual chunk sizes will vary.

Let's chunk up our HDF5 source:

In [13]:
chunk_sources = hdf5_reader.get_chunks(chunksize=10)
print(chunk_sources[:5])

[<intake.plugins.hdf5.HDF5Source object at 0x119369320>, <intake.plugins.hdf5.HDF5Source object at 0x119369208>, <intake.plugins.hdf5.HDF5Source object at 0x119369cc0>, <intake.plugins.hdf5.HDF5Source object at 0x114ac2f60>, <intake.plugins.hdf5.HDF5Source object at 0x114ac2b00>]


In [14]:
chunk_sources[1].read().shape

(10, 311, 223, 3)

## Data Catalogs

A data catalog is a collection of named catalog entries (effectively `DataSource` objects) that can be accessed by the user.  To allow for some flexibility in the returned data, a catalog entry can take zero or more user parameters, which will modify the query.  This puts some additional work on the catalog administrator to create parameterized queries, but it avoids the need for the full Blaze expression system to filter a data source.

The canonical description of a data catalog is a YAML file.  Let's make a catalog for the above examples:

In [15]:
%%writefile catalog.yml
taxi_data:
  description: NYC Taxi Data
  driver: csv
  args: # passed to the open() method
    file_pattern: data/trip_data_*.csv

card_images:
  description: Card images
  parameters: # User defined parameters
    setname:
      description: Name of MTG card set
      type: str
      default: AER
      allowed: ["AER"]
  driver: hdf5
  args:
    filename: data/images.hdf5
    dataset: !template_str "{{ setname }}"  # Mark this string as a Jinja2 template

Overwriting catalog.yml


### Local Catalogs

A local catalog is one that exists in the client.  This can be useful when you want to declare a bunch of data sources in an external configuration file and use them directly.  One could image Anaconda projects shipping with a catalog describing all the data sources they use.  (The local catalog also becomes the basis of the remote catalog shown later.)

In [16]:
import intake
catalog = intake.load_catalog('catalog.yml')
print(catalog.list())
print(catalog.describe('card_images'))

['taxi_data', 'card_images']
{'description': 'Card images', 'user_parameters': [{'name': 'setname', 'description': 'Name of MTG card set', 'type': 'str', 'default': 'AER', 'allowed': ['AER']}]}


In [17]:
tsource = catalog.get('taxi_data')

In [18]:
tsource.read().head()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


User parameters to a data source are passed as keyword arguments to `catalog.get()`:

In [19]:
csource = catalog.get('card_images', setname='AER')

### Remote Catalogs

Remote catalogs are catalogs that communicate over a network connection.  They have two purposes:

 * Sharing a data catalog with many clients
 * Proxying a data source to the client without having to share direct access privileges (passwords, disk access, etc)
 
For this prototype, we're using ZeroMQ and pickle to send Python objects.  The real system probably requires something else.

In [20]:
import intake
catalog = intake.load_catalog('tcp://127.0.0.1:5555')
print('Available datasets: ' + ','.join(catalog.list()))

Available datasets: taxi_data,card_images


In [21]:
remote_source = catalog.get('taxi_data')
remote_source.read().head()

Unnamed: 0,medallion,hack_license,vendor_id,rate_code,store_and_fwd_flag,pickup_datetime,dropoff_datetime,passenger_count,trip_time_in_secs,trip_distance,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude
0,89D227B655E5C82AECF13C3F540D4CF4,BA96DE419E711691B9445D6A6307C170,CMT,1,N,2013-01-01 15:11:48,2013-01-01 15:18:10,4,382,1.0,-73.978165,40.757977,-73.989838,40.751171
1,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-06 00:18:35,2013-01-06 00:22:54,1,259,1.5,-74.006683,40.731781,-73.994499,40.75066
2,0BD7C8F5BA12B88E0B67BED28BEA73D8,9FD8F69F0804BDB5549F40E9DA1BE472,CMT,1,N,2013-01-05 18:49:41,2013-01-05 18:54:23,1,282,1.1,-74.004707,40.73777,-74.009834,40.726002
3,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:54:15,2013-01-07 23:58:20,2,244,0.7,-73.974602,40.759945,-73.984734,40.759388
4,DFD2202EE08F7A8DC9A57B02ACB81FE2,51EE87E3205C985EF8431D850C786310,CMT,1,N,2013-01-07 23:25:03,2013-01-07 23:34:24,1,560,2.1,-73.97625,40.748528,-74.002586,40.747868


In [22]:
for chunk in remote_source.read_chunks(chunksize=70000):
    print('Chunk, size =', len(chunk))

Chunk, size = 70000
Chunk, size = 70000
Chunk, size = 59999
Chunk, size = 70000
Chunk, size = 70000
Chunk, size = 59999


## Dask interoperability

One other goal of this system is to provide an easy on-ramp to get data sources into Dask.  The chunking functionality is intended to allow loading and scattering data sources that don't fit into the memory of a single system.

In [23]:
import intake
from dask.distributed import Client
client = Client()

catalog = intake.load_catalog('catalog.yml')
images = catalog.get('card_images')


a = intake.to_dask(images, client=client)
print(a.shape)

(194, 311, 223, 3)


## Future Extensions

This kind of catalog system has a natural product extensions:

* Local catalogs defined in a `catalog.yml` file could be part of an Anaconda Project
* Anaconda Enterprise could give users/administrators the ability to create remote data catalogs and set access permissions for the data sources.
* Handling of authentication credentials is not described here.
* Does a `read()` on a remote data sources have to be proxied through the catalog server?  There are data sources that would be more efficiently accessed directly, so the remote catalog could  pass along the connection information to the client (assuming it had the appropriate plugin).