pytd provides user-friendly interfaces to Treasure Data’s REST APIs, Presto query engine, and Plazma primary storage.
The seamless connection allows your Python code to efficiently read/write a large volume of data from/to Treasure Data. Eventually, pytd makes your day-to-day data analytics work more productive.
pip install pytd
Set your API
key
and
endpoint
to the environment variables, TD_API_KEY
and TD_API_SERVER
,
respectively, and create a client instance:
import pytd
client = pytd.Client(database='sample_datasets')
# or, hard-code your API key, endpoint, and/or query engine:
# >>> pytd.Client(apikey='1/XXX', endpoint='https://api.treasuredata.com/', database='sample_datasets', default_engine='presto')
Issue Presto query and retrieve the result:
client.query('select symbol, count(1) as cnt from nasdaq group by 1 order by 1')
# {'columns': ['symbol', 'cnt'], 'data': [['AAIT', 590], ['AAL', 82], ['AAME', 9252], ..., ['ZUMZ', 2364]]}
In case of Hive:
client.query('select hivemall_version()', engine='hive')
# {'columns': ['_c0'], 'data': [['0.6.0-SNAPSHOT-201901-r01']]} (as of Feb, 2019)
It is also possible to explicitly initialize pytd.Client
for Hive:
client_hive = pytd.Client(database='sample_datasets', default_engine='hive')
client_hive.query('select hivemall_version()')
Here is an example of generator-based iterative retrieval using DB-API. For details, please refer to Documentation
from pytd.dbapi import connect
conn = connect(pytd.Client(database='sample_datasets'))
# or, connect with Hive:
# >>> conn = connect(pytd.Client(database='sample_datasets', default_engine='hive'))
def iterrows(sql, connection):
cur = connection.cursor()
cur.execute(sql)
index = 0
columns = None
while True:
row = cur.fetchone()
if row is None:
break
if columns is None:
columns = [desc[0] for desc in cur.description]
yield index, dict(zip(columns, row))
index += 1
for index, row in iterrows('select symbol, count(1) as cnt from nasdaq group by 1 order by 1', conn):
print(index, row)
When you face unexpected timeout error with Presto, you can try iterative way to retrieve data.
Data represented as pandas.DataFrame
can be written to Treasure Data
as follows:
import pandas as pd
df = pd.DataFrame(data={'col1': [1, 2], 'col2': [3, 10]})
client.load_table_from_dataframe(df, 'takuti.foo', writer='bulk_import', if_exists='overwrite')
For the writer
option, pytd supports three different ways to ingest
data to Treasure Data:
- Bulk Import API:
bulk_import
(default)- Convert data into a CSV file and upload in the batch fashion.
- Presto INSERT INTO query:
insert_into
- Insert every single row in
DataFrame
by issuing an INSERT INTO query through the Presto query engine. - Recommended only for a small volume of data.
- Insert every single row in
- td-spark:
spark
(No longer available)- Local customized Spark instance directly writes
DataFrame
to Treasure Data’s primary storage system.
- Local customized Spark instance directly writes
Characteristics of each of these methods can be summarized as follows:
bulk_import |
insert_into |
spark (No longer available) |
|
---|---|---|---|
Scalable against data volume | ✓ | ✓ | |
Write performance for larger data | ✓ | ||
Memory efficient | ✓ | ✓ | |
Disk efficient | ✓ | ||
Minimal package dependency | ✓ | ✓ |
Since td-spark gives special access to the main storage system via PySpark, follow the instructions below:
- Contact support@treasuredata.com to activate the permission to your Treasure Data account. Note that the underlying component, Plazma Public API, limits its free tier at 100GB Read and 100TB Write.
- Install pytd with
[spark]
option if you use the third option:pip install pytd[spark]
If you want to use existing td-spark JAR file, creating SparkWriter
with td_spark_path
option would be helpful.
from pytd.writer import SparkWriter
writer = SparkWriter(td_spark_path='/path/to/td-spark-assembly.jar')
client.load_table_from_dataframe(df, 'mydb.bar', writer=writer, if_exists='overwrite')
Treasure Data offers three different Python clients on GitHub, and the following list summarizes their characteristics.
- td-client-python
- Basic REST API wrapper.
- Similar functionalities to td-client-{ruby, java, node, go}.
- The capability is limited by what Treasure Data REST API can do.
- pytd
- Access to Plazma via td-spark as introduced above.
- Efficient connection to Presto based on presto-python-client.
- Multiple data ingestion methods and a variety of utility functions.
- pandas-td (deprecated)
- Old tool optimized for pandas and Jupyter Notebook.
- pytd offers its compatible function set (see below for the detail).
An optimal choice of package depends on your specific use case, but common guidelines can be listed as follows:
- Use td-client-python if you want to execute basic CRUD operations from Python applications.
- Use pytd for (1) analytical purpose relying on pandas and Jupyter Notebook, and (2) achieving more efficient data access at ease.
- Do not use pandas-td. If you are using pandas-td, replace the code with pytd based on the following guidance as soon as possible.
pytd offers pandas-td-compatible functions that provide the same functionalities more efficiently. If you are still using pandas-td, we recommend you to switch to pytd as follows.
First, install the package from PyPI:
pip install pytd
# or, `pip install pytd[spark]` if you wish to use `to_td`
Next, make the following modifications on the import statements.
Before:
import pandas_td as td
In [1]: %%load_ext pandas_td.ipython
After:
import pytd.pandas_td as td
In [1]: %%load_ext pytd.pandas_td.ipython
Consequently, all pandas_td
code should keep running correctly with
pytd
. Report an issue from
here if you
noticed any incompatible behaviors.