In [1]:
import ibis
import os
hdfs_port = os.environ.get('IBIS_WEBHDFS_PORT', 50070)

ic = ibis.impala_connect(host='quickstart.cloudera', database='ibis_testing')
hdfs = ibis.hdfs_connect(host='quickstart.cloudera', port=hdfs_port)
con = ibis.make_client(ic, hdfs_client=hdfs)

ibis.options.interactive = True

Creating new Impala tables from Ibis expressions
---

Suppose you have an Ibis expression that produces a table:

In [2]:
table = con.table('functional_alltypes')

t2 = table[table, (table.bigint_col - table.int_col).name('foo')]

expr = (t2
        [t2.bigint_col > 30]
        .group_by('string_col')
        .aggregate([t2.foo.min().name('min_foo'),
                    t2.foo.max().name('max_foo'),
                    t2.foo.sum().name('sum_foo')]))
expr

  string_col  min_foo  max_foo  sum_foo
0          6       54       54    39420
1          4       36       36    26280
2          7       63       63    45990
3          8       72       72    52560
4          5       45       45    32850
5          9       81       81    59130

To create a table in the database from the results of this expression, use the connection's `create_table` method:

In [3]:
con.create_table('testing_table', expr, database='ibis_testing')

By default, this creates a table stored as **Parquet format** in HDFS. Support for views, external tables, configurable file formats, and so forth, will come in the future. Feedback on what kind of interface would be useful for that would help.

In [4]:
con.table('testing_table')

  string_col  min_foo  max_foo  sum_foo
0          6       54       54    39420
1          4       36       36    26280
2          7       63       63    45990
3          9       81       81    59130
4          8       72       72    52560
5          5       45       45    32850

Tables can be similarly dropped with `drop_table`

In [5]:
con.drop_table('testing_table', database='ibis_testing')

Inserting data into existing Impala tables
---

The client's `insert` method can append new data to an existing table or overwrite the data that is in there.

In [6]:
con.create_table('testing_table', expr)
con.table('testing_table')

  string_col  min_foo  max_foo  sum_foo
0          9       81       81    59130
1          6       54       54    39420
2          4       36       36    26280
3          7       63       63    45990
4          8       72       72    52560
5          5       45       45    32850

In [7]:
con.insert('testing_table', expr)
con.table('testing_table')

   string_col  min_foo  max_foo  sum_foo
0           8       72       72    52560
1           5       45       45    32850
2           6       54       54    39420
3           4       36       36    26280
4           7       63       63    45990
5           9       81       81    59130
6           8       72       72    52560
7           5       45       45    32850
8           9       81       81    59130
9           6       54       54    39420
10          4       36       36    26280
11          7       63       63    45990

In [8]:
con.drop_table('testing_table')

Uploading / downloading data from HDFS
---

If you've set up an HDFS connection, you can use the Ibis HDFS interface to look through your data and read and write files to and from HDFS:

In [9]:
hdfs = con.hdfs
hdfs.ls('/__ibis/ibis-testing-data')

[u'/__ibis/ibis-testing-data/avro',
 u'/__ibis/ibis-testing-data/csv',
 u'/__ibis/ibis-testing-data/parquet',
 u'/__ibis/ibis-testing-data/udf']

In [10]:
hdfs.ls('/__ibis/ibis-testing-data/parquet')

[u'/__ibis/ibis-testing-data/parquet/functional_alltypes',
 u'/__ibis/ibis-testing-data/parquet/tpch_ctas_cancel',
 u'/__ibis/ibis-testing-data/parquet/tpch_customer',
 u'/__ibis/ibis-testing-data/parquet/tpch_lineitem',
 u'/__ibis/ibis-testing-data/parquet/tpch_nation',
 u'/__ibis/ibis-testing-data/parquet/tpch_orders',
 u'/__ibis/ibis-testing-data/parquet/tpch_part',
 u'/__ibis/ibis-testing-data/parquet/tpch_partsupp',
 u'/__ibis/ibis-testing-data/parquet/tpch_region',
 u'/__ibis/ibis-testing-data/parquet/tpch_supplier']

Suppose we wanted to download `/__ibis/ibis-testing-data/parquet/functional_alltypes`, which is a directory. We need only do:

In [11]:
!rm -rf parquet_dir/
hdfs.get('/__ibis/ibis-testing-data/parquet/functional_alltypes', 'parquet_dir')

'parquet_dir'

Now we have that directory locally:

In [12]:
!ls parquet_dir/

e54d1a51b57207fb-757fe03770d6a8ab_722308252_data.0.parq
e54d1a51b57207fb-757fe03770d6a8ac_722308252_data.0.parq
e54d1a51b57207fb-757fe03770d6a8ad_1611361036_data.0.parq


Files and directories can be written to HDFS just as easily using `put`:

In [13]:
path = '/__ibis/dir-write-example'
if hdfs.exists(path):
    hdfs.rmdir(path)
hdfs.put(path, 'parquet_dir', verbose=True)

{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File does not exist: /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ac_722308252_data.0.parq"}}
{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ac_722308252_data.0.parq does not exist."}}
{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File does not exist: /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ab_722308252_data.0.parq"}}


Writing local parquet_dir/e54d1a51b57207fb-757fe03770d6a8ac_722308252_data.0.parq to HDFS /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ac_722308252_data.0.parq
Writing local parquet_dir/e54d1a51b57207fb-757fe03770d6a8ab_722308252_data.0.parq to HDFS /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ab_722308252_data.0.parq

{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ab_722308252_data.0.parq does not exist."}}
{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File does not exist: /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ad_1611361036_data.0.parq"}}



Writing local parquet_dir/e54d1a51b57207fb-757fe03770d6a8ad_1611361036_data.0.parq to HDFS /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ad_1611361036_data.0.parq

{"RemoteException":{"exception":"FileNotFoundException","javaClassName":"java.io.FileNotFoundException","message":"File /__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ad_1611361036_data.0.parq does not exist."}}





In [14]:
hdfs.ls('/__ibis/dir-write-example')

[u'/__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ab_722308252_data.0.parq',
 u'/__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ac_722308252_data.0.parq',
 u'/__ibis/dir-write-example/e54d1a51b57207fb-757fe03770d6a8ad_1611361036_data.0.parq']

Delete files with `rm` or directories with `rmdir`:

In [15]:
hdfs.rmdir('/__ibis/dir-write-example')

In [16]:
!rm -rf parquet_dir/

Queries on Parquet, Avro, and Delimited files in HDFS
---

Ibis can easily create temporary or persistent Impala tables that reference data in the following formats:

- Parquet (`parquet_file`)
- Avro (`avro_file`)
- Delimited text formats (CSV, TSV, etc.) (`delimited_file`)

Parquet is the easiest because the schema can be read from the data files:

In [17]:
path = '/__ibis/ibis-testing-data/parquet/tpch_lineitem'

lineitem = con.parquet_file(path)
lineitem.limit(2)

      l_orderkey  l_partkey  l_suppkey  l_linenumber l_quantity  \
0              1     155190       7706             1      17.00   
1              1      67310       7311             2      36.00   
2              1      63700       3701             3       8.00   
3              1       2132       4633             4      28.00   
4              1      24027       1534             5      24.00   
5              1      15635        638             6      32.00   
6              2     106170       1191             1      38.00   
7              3       4297       1798             1      45.00   
8              3      19036       6540             2      49.00   
9              3     128449       3474             3      27.00   
10             3      29380       1883             4       2.00   
11             3     183095        650             5      28.00   
12             3      62143       9662             6      26.00   
13             4      88035       5560             1      30.0

In [18]:
lineitem.l_extendedprice.sum()

Decimal('229577310901.20')

If you want to query a Parquet file and also create a table in Impala that remains after your session, you can pass more information to `parquet_file`:

In [19]:
table = con.parquet_file(path, name='my_parquet_table', 
                         database='ibis_testing',
                         persist=True)
table.l_extendedprice.sum()

Decimal('229577310901.20')

In [20]:
con.table('my_parquet_table').l_extendedprice.sum()

Decimal('229577310901.20')

In [21]:
con.drop_table('my_parquet_table')

To query delimited files, you need to write down an Ibis schema. At some point we'd like to build some helper tools that will infer the schema for you, all in good time.

There's some CSV files in the test folder, so let's use those:

In [22]:
hdfs.get('/__ibis/ibis-testing-data/csv', 'csv-files')

'csv-files'

In [23]:
!cat csv-files/0.csv

dNmzRWWCKJ,0.558914329982219,42
kpXp9U5Kv4,-0.8221358158299105,28
7BcTT5B4wp,-1.2180159173981855,18
W4vOjjJQrT,-1.7791277450782457,45
tiVZsQWzpN,0.32936255878849235,84
6ibbcK16f2,-0.36602855877465035,56
QBsUQwKw0Z,0.1206123669383656,29
FMp2sWNgaY,-0.015090055989338344,27
UPXjybUQSj,0.42642200098179384,77
L2f2ylNoIw,-0.5091003484680737,6


In [24]:
!rm -rf csv-files/

The schema here is pretty simple (see `ibis.schema` for more):

In [25]:
schema = ibis.schema([('foo', 'string'),
                      ('bar', 'double'),
                      ('baz', 'int32')])

table = con.delimited_file('/__ibis/ibis-testing-data/csv',
                           schema)
table.limit(10)

           foo       bar  baz
0   dNmzRWWCKJ  0.558914   42
1   kpXp9U5Kv4 -0.822136   28
2   7BcTT5B4wp -1.218016   18
3   W4vOjjJQrT -1.779128   45
4   tiVZsQWzpN  0.329363   84
5   6ibbcK16f2 -0.366029   56
6   QBsUQwKw0Z  0.120612   29
7   FMp2sWNgaY -0.015090   27
8   UPXjybUQSj  0.426422   77
9   L2f2ylNoIw -0.509100    6
10  dNmzRWWCKJ  0.558914   42
11  kpXp9U5Kv4 -0.822136   28
12  7BcTT5B4wp -1.218016   18
13  W4vOjjJQrT -1.779128   45
14  tiVZsQWzpN  0.329363   84
15  6ibbcK16f2 -0.366029   56
16  QBsUQwKw0Z  0.120612   29
17  FMp2sWNgaY -0.015090   27
18  UPXjybUQSj  0.426422   77
19  L2f2ylNoIw -0.509100    6
20  dNmzRWWCKJ  0.558914   42
21  kpXp9U5Kv4 -0.822136   28
22  7BcTT5B4wp -1.218016   18
23  W4vOjjJQrT -1.779128   45
24  tiVZsQWzpN  0.329363   84
25  6ibbcK16f2 -0.366029   56
26  QBsUQwKw0Z  0.120612   29
27  FMp2sWNgaY -0.015090   27
28  UPXjybUQSj  0.426422   77
29  L2f2ylNoIw -0.509100    6
..         ...       ...  ...
70  dNmzRWWCKJ  0.558914   42
71  kpXp9U

In [26]:
table.bar.summary()

   count  nulls       min       max        sum      mean  approx_nunique
0    100      0 -1.779128  0.558914 -32.741872 -0.327419              10

For functions like `parquet_file` and `delimited_file`, an HDFS directory must be passed (we'll add support for S3 and other filesystems later) and the directory must contain files all having the same schema.

If you have Avro data, you can query it too if you have the full avro schema:

In [27]:
avro_schema = {
    "fields": [
        {"type": ["int", "null"], "name": "R_REGIONKEY"},
        {"type": ["string", "null"], "name": "R_NAME"},
        {"type": ["string", "null"], "name": "R_COMMENT"}],
    "type": "record",
    "name": "a"
}

table = con.avro_file('/__ibis/ibis-testing-data/avro/tpch.region', avro_schema)
table

Empty DataFrame
Columns: [r_regionkey, r_name, r_comment]
Index: []

Other helper functions for interacting with the database
---

We're adding a growing list of useful utility functions for interacting with an Impala cluster on the client object. The idea is that you should be able to do any database-admin-type work with Ibis and not have to switch over to the Impala SQL shell. Any ways we can make this more pleasant, please let us know.

Here's some of the features, which we'll give examples for:

- Listing and searching for available databases and tables
- Creating and dropping databases
- Getting table schemas

In [28]:
con.list_databases(like='ibis*')

['ibis_testing']

In [29]:
con.list_tables(database='ibis_testing', like='tpch*')

['tpch_ctas_cancel',
 'tpch_customer',
 'tpch_lineitem',
 'tpch_nation',
 'tpch_orders',
 'tpch_part',
 'tpch_partsupp',
 'tpch_region',
 'tpch_region_avro',
 'tpch_supplier']

In [30]:
schema = con.get_schema('functional_alltypes')
schema

ibis.Schema {  
  id               int32
  bool_col         boolean
  tinyint_col      int8
  smallint_col     int16
  int_col          int32
  bigint_col       int64
  float_col        float
  double_col       double
  date_string_col  string
  string_col       string
  timestamp_col    timestamp
  year             int32
  month            int32
}

Databases can be created, too, and you can set the storage path in HDFS you want for the data files

In [31]:
db = 'ibis_testing2'
con.create_database(db, path='/__ibis/my-test-database')
con.create_table('example_table', con.table('functional_alltypes'),
                 database=db)

Hopefully, there will be data files in the indicated spot in HDFS:

In [32]:
hdfs.ls('/__ibis/my-test-database')

[u'/__ibis/my-test-database/example_table']

To drop a database, including all tables in it, you can use `drop_database` with `force=True`:

In [33]:
con.drop_database(db, force=True)

Dealing with Partitioned tables in Impala
---

**Placeholder:** This is not yet implemented. If you have use cases, please let us know.

Faster queries on small data in Impala
---

Since Impala internally uses LLVM to compile parts of queries (aka "codegen") to make them faster on large data sets there is a certain amount of overhead with running many kinds of queries, even on small datasets. You can disable LLVM code generation when using Ibis, which may significantly speed up queries on smaller datasets:

In [34]:
from numpy.random import rand

In [35]:
con.disable_codegen()

In [36]:
t = con.table('ibis_testing.functional_alltypes')

%timeit (t.double_col + rand()).sum().execute()

10 loops, best of 3: 141 ms per loop


In [37]:
# Turn codegen back on
con.disable_codegen(False)

In [38]:
%timeit (t.double_col + rand()).sum().execute()

1 loops, best of 3: 2.03 s per loop


It's important to remember that codegen is a fixed overhead and will significantly speed up queries on big data