-
Notifications
You must be signed in to change notification settings - Fork 145
/
meltano_file.py
138 lines (113 loc) · 4.91 KB
/
meltano_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""Module for working with meltano.yml files."""
from __future__ import annotations
import copy
from typing import Iterable
from meltano.core.behavior.canonical import Canonical
from meltano.core.environment import Environment
from meltano.core.plugin import PluginType
from meltano.core.plugin.project_plugin import ProjectPlugin
from meltano.core.schedule import Schedule
from meltano.core.task_sets import TaskSets
VERSION = 1
class MeltanoFile(Canonical):
"""Data and loading methods for meltano.yml files."""
def __init__(
self,
version: int = VERSION,
plugins: dict[str, dict] = None,
schedules: list[dict] = None,
environments: list[dict] = None,
jobs: list[dict] = None,
env: dict[str, str] = None,
**extras,
):
"""Construct a new MeltanoFile object from meltano.yml file.
Args:
version: The meltano.yml version, currently always 1.
plugins: Plugin configuration for this project.
schedules: Schedule configuration for this project.
environments: Environment configuration for this project.
jobs: Job configuration for this project.
env: Environment variables for this project.
extras: Additional configuration for this project.
"""
super().__init__(
# Attributes will be listed in meltano.yml in this order:
version=version,
extras=extras,
plugins=self.load_plugins(plugins or {}),
schedules=self.load_schedules(schedules or []),
environments=self.load_environments(environments or []),
jobs=self.load_job_tasks(jobs or []),
env=env or {},
)
def load_plugins(self, plugins: dict[str, dict]) -> Canonical:
"""Parse the `meltano.yml` file and return it as `ProjectPlugin` instances.
Args:
plugins: Dictionary of plugin configurations.
Returns:
New ProjectPlugin instances.
"""
plugin_type_plugins = Canonical()
for ptype in PluginType:
plugin_type_plugins[ptype] = []
# this will parse the meltano.yml file and create an instance of the
# corresponding `plugin_class` for all the plugins.
for plugin_type, raw_plugins in plugins.items():
if plugin_type == PluginType.MAPPERS:
for mapper in raw_plugins:
plugin_type_plugins[PluginType.MAPPERS].append(
ProjectPlugin(PluginType.MAPPERS, **mapper)
)
plugin_type_plugins[PluginType.MAPPERS].extend(
self.get_plugins_for_mappings(mapper)
)
else:
for raw_plugin in raw_plugins:
plugin = ProjectPlugin(PluginType(plugin_type), **raw_plugin)
plugin_type_plugins[plugin.type].append(plugin)
return plugin_type_plugins
def load_schedules(self, schedules: list[dict]) -> list[Schedule]:
"""Parse the meltano.yml file and return it as Schedule instances.
Args:
schedules: List of schedule configurations.
Returns:
List of new Schedule instances.
"""
return list(map(Schedule.parse, schedules))
@staticmethod
def load_environments(environments: Iterable[dict]) -> list[Environment]:
"""Parse `Environment` objects from python objects.
Args:
environments: Sequence of environment dictionaries.
Returns:
A list of `Environment` objects.
"""
return [Environment.parse(obj) for obj in environments]
@staticmethod
def load_job_tasks(jobs: Iterable[dict]) -> list[TaskSets]:
"""Parse `TaskSets` objects from python objects.
Args:
jobs: Sequence of job dictionaries.
Returns:
A list of `Job` objects.
"""
return [TaskSets.parse(obj) for obj in jobs]
@staticmethod
def get_plugins_for_mappings(mapper_config: dict) -> list[ProjectPlugin]:
"""Mapper plugins are a special case. They are not a single plugin, but actually a list of plugins generated from the mapping config defined within the mapper config.
Args:
mapper_config: The dict representation of a mapper config found in in meltano.yml.
Returns:
A list of `ProjectPlugin` instances.
"""
mapping_plugins: list[ProjectPlugin] = []
for mapping in mapper_config.get("mappings", []):
raw_mapping_plugin = copy.deepcopy(mapper_config)
raw_mapping_plugin["mapping"] = True
raw_mapping_plugin["mapping_name"] = mapping.get("name")
raw_mapping_plugin["config"] = mapping.get("config")
mapping_plugins.append(
ProjectPlugin(PluginType.MAPPERS, **raw_mapping_plugin)
)
return mapping_plugins