-
Notifications
You must be signed in to change notification settings - Fork 5
/
tm1.py
71 lines (61 loc) · 2.1 KB
/
tm1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from typing import Optional
from airflow.hooks.base_hook import BaseHook
from TM1py.Services import TM1Service
class TM1Hook(BaseHook):
"""
Interact with IBM Cognos TM1, using the TM1py library.
"""
def __init__(self, tm1_conn_id: str = "tm1_default") -> None:
"""
A hook that uses TM1py to connect to a TM1 database.
:param tm1_conn_id: The name of the TM1 connection to use.
:type tm1_conn_id: str
"""
self.tm1_conn_id = tm1_conn_id
self.tm1: Optional[TM1Service] = None
self.address = None
self.port = None
self.user = None
self.password = None
self.db = None
self.server_version = None
def get_conn(self) -> TM1Service:
"""
Uses the connection details to create and return an instance of a TM1Service object.
:return: TM1Service
"""
conn = self.get_connection(self.tm1_conn_id)
self.address = conn.host
self.port = conn.port
self.user = conn.login
self.password = conn.password
# check for relevant additional parameters in conn.extra
# except session_id as not sure if this makes sense in an Airflow context
extra_arg_names = [
"base_url",
"decode_b64",
"namespace",
"ssl",
"session_context",
"logging",
"timeout",
"connection_pool_size",
]
extra_args = {
name: val
for name, val in conn.extra_dejson.items()
if name in extra_arg_names
}
# Set a default for session context for easier identification in TM1top etc.
if "session_context" not in extra_args:
extra_args["session_context"] = "Airflow"
self.tm1 = TM1Service(
address=self.address,
port=self.port,
user=self.user,
password=self.password,
**extra_args
)
self.db = self.tm1.server.get_server_name()
self.server_version = self.tm1.server.get_product_version()
return self.tm1