-
Notifications
You must be signed in to change notification settings - Fork 54
/
03_ingest_table_assets.py
57 lines (44 loc) · 1.59 KB
/
03_ingest_table_assets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
# Copyright 2023-2024 Broadcom
# SPDX-License-Identifier: Apache-2.0
import inspect
import logging
import os
import pandas as pd
import requests
from vdk.api.job_input import IJobInput
log = logging.getLogger(__name__)
def run(job_input: IJobInput):
"""
Download datasets required by the scenario and put them in the data lake.
"""
log.info(f"Starting job step {__name__}")
api_key = job_input.get_property("api_key")
start = 1
rows = 100
basic_url = f"https://api.europeana.eu/record/v2/search.json?wskey={api_key}&query=who:%22Vincent%20Van%20Gogh%22"
url = f"{basic_url}&rows={rows}&start={start}"
response = requests.get(url)
response.raise_for_status()
payload = response.json()
n_items = int(payload["totalResults"])
while start < n_items:
if start > n_items - rows:
rows = n_items - start + 1
url = f"{basic_url}&rows={rows}&start={start}"
response = requests.get(url)
response.raise_for_status()
payload = response.json()["items"]
df = pd.DataFrame(payload)
job_input.send_tabular_data_for_ingestion(
df.itertuples(index=False),
destination_table="assets",
column_names=df.columns.tolist(),
)
start = start + rows
# df = pd.read_csv(url, dtype=dtypes).replace("'", "''", regex=True)
# df.columns = df.columns.str.replace(" ", "")
# job_input.send_tabular_data_for_ingestion(
# df.itertuples(index=False),
# destination_table="life_expectancy_2010_2015",
# column_names=df.columns.tolist(),
# )