Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vdk-oracle: create oracle plugin #2927

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions projects/vdk-plugins/vdk-oracle/.plugin-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

image: "python:3.7"

.build-vdk-oracle:
variables:
PLUGIN_NAME: vdk-oracle
extends: .build-plugin-dind

build-py37-vdk-oracle:
extends: .build-vdk-oracle
image: "python:3.7"

build-py311-vdk-oracle:
extends: .build-vdk-oracle
image: "python:3.11"

release-vdk-oracle:
variables:
PLUGIN_NAME: vdk-oracle
extends: .release-plugin
111 changes: 111 additions & 0 deletions projects/vdk-plugins/vdk-oracle/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# oracle

Support for VDK Managed Oracle connection

TODO: what the project is about, what is its purpose


## Usage

```
pip install vdk-oracle
```

### Configuration

(`vdk config-help` is useful command to browse all config options of your installation of vdk)

| Name | Description | (example) Value |
|--------------------------|--------------------------------------------------|----------------------|
| oracle_user | Username used when connecting to Oracle database | "my_user" |
| oracle_password | Password used when connecting to Oracle database | "super_secret_shhhh" |
| oracle_connection_string | The Oracle connection string | "localhost/free" |

### Example

#### Ingestion

```python
import datetime
from decimal import Decimal

def run(job_input):

# Ingest object
payload_with_types = {
"id": 5,
"str_data": "string",
"int_data": 12,
"float_data": 1.2,
"bool_data": True,
"timestamp_data": datetime.datetime.fromtimestamp(1700554373),
"decimal_data": Decimal(0.1),
}

job_input.send_object_for_ingestion(
payload=payload_with_types, destination_table="test_table"
)

# Ingest tabular data
col_names = [
"id",
"str_data",
"int_data",
"float_data",
"bool_data",
"timestamp_data",
"decimal_data",
]
row_data = [
[
0,
"string",
12,
1.2,
True,
datetime.datetime.fromtimestamp(1700554373),
Decimal(1.1),
],
[
1,
"string",
12,
1.2,
True,
datetime.datetime.fromtimestamp(1700554373),
Decimal(1.1),
],
[
2,
"string",
12,
1.2,
True,
datetime.datetime.fromtimestamp(1700554373),
Decimal(1.1),
],
]
job_input.send_tabular_data_for_ingestion(
rows=row_data, column_names=col_names, destination_table="test_table"
)
```
### Build and testing

```
pip install -r requirements.txt
pip install -e .
pytest
```

In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also.


#### Note about the CICD:

.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins).

The CI/CD is separated in two stages, a build stage and a release stage.
The build stage is made up of a few jobs, all which inherit from the same
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10).
They run according to rules, which are ordered in a way such that changes to a
plugin's directory trigger the plugin CI, but changes to a different plugin does not.
8 changes: 8 additions & 0 deletions projects/vdk-plugins/vdk-oracle/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# this file is used to provide testing requirements
# for requirements (dependencies) needed during and after installation of the plugin see (and update) setup.py install_requires section


pytest
testcontainers
vdk-core
vdk-test-utils
40 changes: 40 additions & 0 deletions projects/vdk-plugins/vdk-oracle/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

import setuptools

"""
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.0"

setuptools.setup(
name="vdk-oracle",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Support for VDK Managed Oracle connection",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core", "oracledb", "tabulate"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
# This is the only vdk plugin specifc part
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point.
entry_points={"vdk.plugin.run": ["vdk-oracle = vdk.plugin.oracle.oracle_plugin"]},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
project_urls={
"Documentation": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-oracle",
"Source Code": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-oracle",
"Bug Tracker": "https://github.com/vmware/versatile-data-kit/issues/new/choose",
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import datetime
import logging
from decimal import Decimal
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Set

from vdk.api.plugin.plugin_input import PEP249Connection
from vdk.internal.builtin_plugins.connection.impl.router import ManagedConnectionRouter
from vdk.internal.builtin_plugins.connection.managed_cursor import ManagedCursor
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin

log = logging.getLogger(__name__)


class IngestToOracle(IIngesterPlugin):
DeltaMichael marked this conversation as resolved.
Show resolved Hide resolved
def __init__(self, connections: ManagedConnectionRouter):
self.conn: PEP249Connection = connections.open_connection("ORACLE").connect()
self.cursor: ManagedCursor = self.conn.cursor()
self.table_cache: Set[str] = set() # Cache to store existing tables
self.column_cache: Dict[str, str] = {} # New cache for columns

@staticmethod
def _get_oracle_type(value: Any) -> str:
type_mappings = {
int: "NUMBER",
float: "FLOAT",
Decimal: "DECIMAL(14, 8)",
str: "VARCHAR2(255)",
datetime.datetime: "TIMESTAMP",
bool: "NUMBER(1)",
bytes: "BLOB",
}
return type_mappings.get(type(value), "VARCHAR2(255)")

def _table_exists(self, table_name: str) -> bool:
if table_name.upper() in self.table_cache:
return True

self.cursor.execute(
f"SELECT COUNT(*) FROM user_tables WHERE table_name = :1",
[table_name.upper()],
)
exists = bool(self.cursor.fetchone()[0])

if exists:
self.table_cache.add(table_name.upper())

return exists

def _create_table(self, table_name: str, row: Dict[str, Any]) -> None:
column_defs = [f"{col} {self._get_oracle_type(row[col])}" for col in row.keys()]
create_table_sql = (
f"CREATE TABLE {table_name.upper()} ({', '.join(column_defs)})"
)
self.cursor.execute(create_table_sql)

def _cache_columns(self, table_name: str) -> None:
try:
self.cursor.execute(
f"SELECT column_name FROM user_tab_columns WHERE table_name = '{table_name.upper()}'"
)
result = self.cursor.fetchall()
self.column_cache[table_name.upper()] = {column[0] for column in result}
except Exception as e:
# TODO: https://github.com/vmware/versatile-data-kit/issues/2932
log.error(
"An exception occurred while trying to cache columns. Ignoring for now."
)
log.exception(e)

def _add_columns(self, table_name: str, payload: List[Dict[str, Any]]) -> None:
if table_name.upper() not in self.column_cache:
self._cache_columns(table_name)

existing_columns = self.column_cache[table_name.upper()]

# Find unique new columns from all rows in the payload
all_columns = {col.upper() for row in payload for col in row.keys()}
new_columns = all_columns - existing_columns

if new_columns:
column_defs = []
for col in new_columns:
sample_value = next(
(row[col] for row in payload if row.get(col) is not None), None
)
column_type = (
self._get_oracle_type(sample_value)
if sample_value is not None
else "VARCHAR2(255)"
)
column_defs.append(f"{col} {column_type}")

alter_sql = (
f"ALTER TABLE {table_name.upper()} ADD ({', '.join(column_defs)})"
)
self.cursor.execute(alter_sql)
self.column_cache[table_name.upper()].update(new_columns)

# TODO: https://github.com/vmware/versatile-data-kit/issues/2929
# TODO: https://github.com/vmware/versatile-data-kit/issues/2930
def _cast_to_correct_type(self, value: Any) -> Any:
if type(value) is Decimal:
return float(value)
return value

# TODO: Look into potential optimizations
# TODO: https://github.com/vmware/versatile-data-kit/issues/2931
def _insert_data(self, table_name: str, payload: List[Dict[str, Any]]) -> None:
if not payload:
return

# group dicts by key set
batches = {}
for p in payload:
batch = frozenset(p.keys())
if batch not in batches:
batches[batch] = []
batches[batch].append(p)

# create queries for groups of dicts with the same key set
queries = []
batch_data = []
for column_names, batch in batches.items():
columns = list(column_names)
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join([':' + str(i + 1) for i in range(len(columns))])})"
queries.append(insert_sql)
temp_data = []
for row in batch:
temp = [self._cast_to_correct_type(row[col]) for col in columns]
temp_data.append(temp)
batch_data.append(temp_data)

# batch execute queries for dicts with the same key set
for i in range(len(queries)):
self.cursor.executemany(queries[i], batch_data[i])

def ingest_payload(
self,
payload: List[Dict[str, Any]],
destination_table: Optional[str] = None,
target: str = None,
collection_id: Optional[str] = None,
metadata: Optional[IIngesterPlugin.IngestionMetadata] = None,
) -> None:
if not payload:
return None
if not destination_table:
raise ValueError("Destination table must be specified if not in payload.")

if not self._table_exists(destination_table):
self._create_table(destination_table, payload[0])
self._cache_columns(destination_table)

self._add_columns(destination_table, payload)
self._insert_data(destination_table, payload)

# TODO: test if we need this commit statement (most probably we don't, the connection already commits after every transaction)
self.conn.commit()
DeltaMichael marked this conversation as resolved.
Show resolved Hide resolved
return metadata
Loading
Loading