-
Notifications
You must be signed in to change notification settings - Fork 71
/
create_hyper_file_from_parquet.py
83 lines (71 loc) · 3.72 KB
/
create_hyper_file_from_parquet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# -----------------------------------------------------------------------------
#
# This file is the copyrighted property of Tableau Software and is protected
# by registered patents and other applicable U.S. and international laws and
# regulations.
#
# You may adapt this file and modify it to fit into your context and use it
# as a template to start your own projects.
#
# -----------------------------------------------------------------------------
from datetime import date
from pathlib import Path
from tableauhyperapi import HyperProcess, Telemetry, \
Connection, CreateMode, \
NOT_NULLABLE, NULLABLE, SqlType, TableDefinition, \
Inserter, \
escape_name, escape_string_literal, \
HyperException
def run_create_hyper_file_from_parquet(
parquet_file_path: Path,
table_definition: TableDefinition,
hyper_database_path: Path):
"""
An example demonstrating how to load rows from an Apache Parquet file (`parquet_file_path`)
into a new Hyper file (`hyper_database_path`) using the COPY command. Currently the
table definition of the data to copy needs to be known and explicitly specified.
Reading Parquet data is analogous to reading CSV data. For more details, see:
https://tableau.github.io/hyper-db/docs/guides/hyper_file/insert_csv
"""
# Start the Hyper process.
#
# * Sending telemetry data to Tableau is encouraged when trying out an experimental feature.
# To opt out, simply set `telemetry=Telemetry.DO_NOT_SEND_USAGE_DATA_TO_TABLEAU` below.
with HyperProcess(telemetry=Telemetry.SEND_USAGE_DATA_TO_TABLEAU) as hyper:
# Open a connection to the Hyper process. This will also create the new Hyper file.
# The `CREATE_AND_REPLACE` mode causes the file to be replaced if it
# already exists.
with Connection(endpoint=hyper.endpoint,
database=hyper_database_path,
create_mode=CreateMode.CREATE_AND_REPLACE) as connection:
# Create the target table.
connection.catalog.create_table(table_definition=table_definition)
# Execute a COPY command to insert the data from the Parquet file.
copy_command = f"COPY {table_definition.table_name} FROM {escape_string_literal(parquet_file_path)} WITH (FORMAT PARQUET)"
print(copy_command)
count_inserted = connection.execute_command(copy_command)
print(f"-- {count_inserted} rows have been copied from '{parquet_file_path}' to the table {table_definition.table_name} in '{hyper_database_path}'.")
if __name__ == '__main__':
try:
# The `orders` table to read from the Parquet file.
table_definition = TableDefinition(
table_name="orders",
columns=[
TableDefinition.Column("o_orderkey", SqlType.int(), NOT_NULLABLE),
TableDefinition.Column("o_custkey", SqlType.int(), NOT_NULLABLE),
TableDefinition.Column("o_orderstatus", SqlType.text(), NOT_NULLABLE),
TableDefinition.Column("o_totalprice", SqlType.numeric(8, 2), NOT_NULLABLE),
TableDefinition.Column("o_orderdate", SqlType.date(), NOT_NULLABLE),
TableDefinition.Column("o_orderpriority", SqlType.text(), NOT_NULLABLE),
TableDefinition.Column("o_clerk", SqlType.text(), NOT_NULLABLE),
TableDefinition.Column("o_shippriority", SqlType.int(), NOT_NULLABLE),
TableDefinition.Column("o_comment", SqlType.text(), NOT_NULLABLE),
]
)
run_create_hyper_file_from_parquet(
"orders_10rows.parquet",
table_definition,
"orders.hyper")
except HyperException as ex:
print(ex)
exit(1)