Skip to content

Commit

Permalink
vdk-oracle: create oracle plugin
Browse files Browse the repository at this point in the history
Why?

In order to support more use cases, vdk should support
connecting and ingesting to an oracle database

What?

Add oracle plugin. Plugin supports simple queries, cli queries
and ingestion.

How was this tested?

Local functional tests
CI tests

What kind of change is this?

Feature/non-breaking

Signed-off-by: Dilyan Marinov <mdilyan@vmware.com>
  • Loading branch information
antoniivanov authored and Dilyan Marinov committed Nov 24, 2023
1 parent 4ca6338 commit 05040eb
Show file tree
Hide file tree
Showing 26 changed files with 1,061 additions and 0 deletions.
22 changes: 22 additions & 0 deletions projects/vdk-plugins/vdk-oracle/.plugin-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0

image: "python:3.7"

.build-vdk-oracle:
variables:
PLUGIN_NAME: vdk-oracle
extends: .build-plugin-dind

build-py37-vdk-oracle:
extends: .build-vdk-oracle
image: "python:3.7"

build-py311-vdk-oracle:
extends: .build-vdk-oracle
image: "python:3.11"

release-vdk-oracle:
variables:
PLUGIN_NAME: vdk-oracle
extends: .release-plugin
111 changes: 111 additions & 0 deletions projects/vdk-plugins/vdk-oracle/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# oracle

Support for VDK Managed Oracle connection

TODO: what the project is about, what is its purpose


## Usage

```
pip install vdk-oracle
```

### Configuration

(`vdk config-help` is useful command to browse all config options of your installation of vdk)

| Name | Description | (example) Value |
|--------------------------|--------------------------------------------------|----------------------|
| oracle_user | Username used when connecting to Oracle database | "my_user" |
| oracle_password | Password used when connecting to Oracle database | "super_secret_shhhh" |
| oracle_connection_string | The Oracle connection string | "localhost/free" |

### Example

#### Ingestion

```python
import datetime
from decimal import Decimal

def run(job_input):

# Ingest object
payload_with_types = {
"id": 5,
"str_data": "string",
"int_data": 12,
"float_data": 1.2,
"bool_data": True,
"timestamp_data": datetime.datetime.fromtimestamp(1700554373),
"decimal_data": Decimal(0.1),
}

job_input.send_object_for_ingestion(
payload=payload_with_types, destination_table="test_table"
)

# Ingest tabular data
col_names = [
"id",
"str_data",
"int_data",
"float_data",
"bool_data",
"timestamp_data",
"decimal_data",
]
row_data = [
[
0,
"string",
12,
1.2,
True,
datetime.datetime.fromtimestamp(1700554373),
Decimal(1.1),
],
[
1,
"string",
12,
1.2,
True,
datetime.datetime.fromtimestamp(1700554373),
Decimal(1.1),
],
[
2,
"string",
12,
1.2,
True,
datetime.datetime.fromtimestamp(1700554373),
Decimal(1.1),
],
]
job_input.send_tabular_data_for_ingestion(
rows=row_data, column_names=col_names, destination_table="test_table"
)
```
### Build and testing

```
pip install -r requirements.txt
pip install -e .
pytest
```

In VDK repo [../build-plugin.sh](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/build-plugin.sh) script can be used also.


#### Note about the CICD:

.plugin-ci.yaml is needed only for plugins part of [Versatile Data Kit Plugin repo](https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins).

The CI/CD is separated in two stages, a build stage and a release stage.
The build stage is made up of a few jobs, all which inherit from the same
job configuration and only differ in the Python version they use (3.7, 3.8, 3.9 and 3.10).
They run according to rules, which are ordered in a way such that changes to a
plugin's directory trigger the plugin CI, but changes to a different plugin does not.
8 changes: 8 additions & 0 deletions projects/vdk-plugins/vdk-oracle/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# this file is used to provide testing requirements
# for requirements (dependencies) needed during and after installation of the plugin see (and update) setup.py install_requires section


pytest
testcontainers
vdk-core
vdk-test-utils
40 changes: 40 additions & 0 deletions projects/vdk-plugins/vdk-oracle/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import pathlib

import setuptools

"""

Check warning on line 7 in projects/vdk-plugins/vdk-oracle/setup.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/setup.py#L7

String statement has no effect
Builds a package with the help of setuptools in order for this package to be imported in other projects
"""

__version__ = "0.1.0"

setuptools.setup(
name="vdk-oracle",
version=__version__,
url="https://github.com/vmware/versatile-data-kit",
description="Support for VDK Managed Oracle connection",
long_description=pathlib.Path("README.md").read_text(),
long_description_content_type="text/markdown",
install_requires=["vdk-core", "oracledb", "tabulate"],
package_dir={"": "src"},
packages=setuptools.find_namespace_packages(where="src"),
# This is the only vdk plugin specifc part
# Define entry point called "vdk.plugin.run" with name of plugin and module to act as entry point.
entry_points={"vdk.plugin.run": ["vdk-oracle = vdk.plugin.oracle.oracle_plugin"]},
classifiers=[
"Development Status :: 2 - Pre-Alpha",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
project_urls={
"Documentation": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-oracle",
"Source Code": "https://github.com/vmware/versatile-data-kit/tree/main/projects/vdk-plugins/vdk-oracle",
"Bug Tracker": "https://github.com/vmware/versatile-data-kit/issues/new/choose",
},
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import datetime
import logging
from decimal import Decimal
from typing import Any
from typing import Dict
from typing import List
from typing import Optional

from vdk.internal.builtin_plugins.connection.impl.router import ManagedConnectionRouter
from vdk.internal.builtin_plugins.ingestion.ingester_base import IIngesterPlugin

log = logging.getLogger(__name__)


class IngestToOracle(IIngesterPlugin):
def __init__(self, connections: ManagedConnectionRouter):
self.conn = connections.open_connection("ORACLE").connect()
self.cursor = self.conn.cursor()
self.table_cache = set() # Cache to store existing tables
self.column_cache = {} # New cache for columns

@staticmethod
def _get_oracle_type(value: Any):
type_mappings = {
int: "NUMBER",
float: "FLOAT",
Decimal: "DECIMAL(14, 8)",
str: "VARCHAR2(255)",
datetime.datetime: "TIMESTAMP",
bool: "NUMBER(1)",
bytes: "BLOB",
}
return type_mappings.get(type(value), "VARCHAR2(255)")

def _table_exists(self, table_name: str):
if table_name.upper() in self.table_cache:
return True

self.cursor.execute(
f"SELECT COUNT(*) FROM user_tables WHERE table_name = :1",
[table_name.upper()],
)
exists = bool(self.cursor.fetchone()[0])

if exists:
self.table_cache.add(table_name.upper())

return exists

def _create_table(self, table_name: str, row: Dict[str, Any]):
column_defs = [f"{col} {self._get_oracle_type(row[col])}" for col in row.keys()]
create_table_sql = (
f"CREATE TABLE {table_name.upper()} ({', '.join(column_defs)})"
)
self.cursor.execute(create_table_sql)

def _cache_columns(self, table_name: str):
try:
self.cursor.execute(
f"SELECT column_name FROM user_tab_columns WHERE table_name = '{table_name.upper()}'"
)
result = self.cursor.fetchall()
self.column_cache[table_name.upper()] = {column[0] for column in result}
except Exception as e:
# TODO: https://github.com/vmware/versatile-data-kit/issues/2932
log.error(
"An exception occurred while trying to cache columns. Ignoring for now."
)
log.exception(e)

def _add_columns(self, table_name: str, payload: List[Dict[str, Any]]):
if table_name.upper() not in self.column_cache:
self._cache_columns(table_name)

existing_columns = self.column_cache[table_name.upper()]

# Find unique new columns from all rows in the payload
all_columns = {col.upper() for row in payload for col in row.keys()}
new_columns = all_columns - existing_columns

if new_columns:
column_defs = []
for col in new_columns:
sample_value = next(
(row[col] for row in payload if row.get(col) is not None), None
)
column_type = (
self._get_oracle_type(sample_value)
if sample_value is not None
else "VARCHAR2(255)"
)
column_defs.append(f"{col} {column_type}")

alter_sql = (
f"ALTER TABLE {table_name.upper()} ADD ({', '.join(column_defs)})"
)
self.cursor.execute(alter_sql)
self.column_cache[table_name.upper()].update(new_columns)

# TODO: https://github.com/vmware/versatile-data-kit/issues/2929
# TODO: https://github.com/vmware/versatile-data-kit/issues/2930
def _cast_to_correct_type(self, value: Any):
if type(value) is Decimal:
return float(value)
return value

# TODO: Look into potential optimizations
# TODO: https://github.com/vmware/versatile-data-kit/issues/2931
def _insert_data(self, table_name: str, payload: List[Dict[str, Any]]):
if not payload:
return

# group dicts by key set
batches = {}
for p in payload:
batch = frozenset(p.keys())
if batch not in batches:
batches[batch] = []
batches[batch].append(p)

# create queries for groups of dicts with the same key set
queries = []
batch_data = []
for column_names, batch in batches.items():
columns = list(column_names)
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({', '.join([':' + str(i + 1) for i in range(len(columns))])})"
queries.append(insert_sql)
temp_data = []
for row in batch:
temp = [self._cast_to_correct_type(row[col]) for col in columns]
temp_data.append(temp)
batch_data.append(temp_data)

# batch execute queries for dicts with the same key set
for i in range(len(queries)):
self.cursor.executemany(queries[i], batch_data[i])

def ingest_payload(
self,
payload: List[Dict[str, Any]],
destination_table: Optional[str] = None,
target: str = None,
collection_id: Optional[str] = None,
metadata: Optional[IIngesterPlugin.IngestionMetadata] = None,
) -> None:
if not payload:
return None
if not destination_table:
raise ValueError("Destination table must be specified if not in payload.")

if not self._table_exists(destination_table):
self._create_table(destination_table, payload[0])
self._cache_columns(destination_table)

self._add_columns(destination_table, payload)
self._insert_data(destination_table, payload)

self.conn.commit()
return metadata
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Copyright 2021-2023 VMware, Inc.
# SPDX-License-Identifier: Apache-2.0
import json
import pathlib

Check notice on line 4 in projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/oracle_configuration.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/oracle_configuration.py#L4

'pathlib' imported but unused (F401)
import tempfile

Check notice on line 5 in projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/oracle_configuration.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/oracle_configuration.py#L5

'tempfile' imported but unused (F401)
from typing import Dict
from typing import Optional

from vdk.internal.core.config import Configuration
from vdk.internal.core.config import ConfigurationBuilder

ORACLE_USER = "ORACLE_USER"
ORACLE_PASSWORD = "ORACLE_PASSWORD"

Check warning on line 13 in projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/oracle_configuration.py

View check run for this annotation

Codacy Production / Codacy Static Code Analysis

projects/vdk-plugins/vdk-oracle/src/vdk/plugin/oracle/oracle_configuration.py#L13

Possible hardcoded password: 'ORACLE_PASSWORD'
ORACLE_CONNECTION_STRING = "ORACLE_CONNECTION_STRING"


class OracleConfiguration:
def __init__(self, configuration: Configuration):
self.__config = configuration

def get_oracle_user(self) -> str:
return self.__config.get_value(ORACLE_USER)

def get_oracle_password(self) -> str:
return self.__config.get_value(ORACLE_PASSWORD)

def get_oracle_connection_string(self) -> Optional[Dict[str, str]]:
return self.__config.get_value(ORACLE_CONNECTION_STRING)

@staticmethod
def add_definitions(config_builder: ConfigurationBuilder):
config_builder.add(
key=ORACLE_USER,
default_value=None,
is_sensitive=True,
description="The Oracle user for the database connection",
)
config_builder.add(
key=ORACLE_PASSWORD,
default_value=None,
is_sensitive=True,
description="The oracle password for the database connection",
)
config_builder.add(
key=ORACLE_CONNECTION_STRING,
default_value=None,
is_sensitive=True,
description="The Oracle database connection string",
)
Loading

0 comments on commit 05040eb

Please sign in to comment.