-
Notifications
You must be signed in to change notification settings - Fork 14
/
fabric_repository_factory.py
39 lines (31 loc) · 1.77 KB
/
fabric_repository_factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import os
from typing import List
from data_factory_testing_framework._deserializers._deserializer_fabric import (
parse_fabric_pipeline_from_pipeline_json_files,
)
from data_factory_testing_framework._repositories._factories.base_repository_factory import BaseRepositoryFactory
from data_factory_testing_framework.models import Pipeline
class FabricRepositoryFactory(BaseRepositoryFactory):
def _get_data_factory_pipelines_by_folder_path(self, folder_path: str) -> list[Pipeline]:
pipeline_folders = FabricRepositoryFactory._find_folders_containing_pipeline(folder_path)
pipelines = []
for pipeline_folder in pipeline_folders:
pipeline_file = os.path.join(pipeline_folder, "pipeline-content.json")
pipeline_metadata_file = os.path.join(pipeline_folder, "item.metadata.json")
with open(pipeline_file, "r") as pipeline_file, open(pipeline_metadata_file, "r") as pipeline_metadata_file:
pipelines.append(
parse_fabric_pipeline_from_pipeline_json_files(pipeline_metadata_file.read(), pipeline_file.read())
)
return pipelines
@staticmethod
def _find_folders_containing_pipeline(search_path: str) -> List[str]:
pipeline_folders = []
# Walk through the directory tree and fine pipeline folders
for root, _, files in os.walk(search_path):
if "pipeline-content.json" in files:
pipeline_folders.append(root)
# Check if each folder contains metadata file
for pipeline_folder in pipeline_folders:
if "item.metadata.json" not in os.listdir(pipeline_folder):
raise FileNotFoundError(f"Pipeline folder {pipeline_folder} does not contain metadata file")
return pipeline_folders