Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Support to read/write from/to ODBC (#849)
Browse files Browse the repository at this point in the history
Co-authored-by: Markus Klein <markus-klein@live.de>
  • Loading branch information
jorgecarleitao and pacman82 committed Mar 5, 2022
1 parent 972c6f8 commit c999595
Show file tree
Hide file tree
Showing 19 changed files with 1,036 additions and 1 deletion.
40 changes: 40 additions & 0 deletions .github/workflows/integration-odbc.yml
@@ -0,0 +1,40 @@
name: Integration ODBC

on: [push, pull_request]

env:
CARGO_TERM_COLOR: always

jobs:
linux:
name: Test
runs-on: ubuntu-latest

services:
sqlserver:
image: mcr.microsoft.com/mssql/server:2017-latest-ubuntu
ports:
- 1433:1433
env:
ACCEPT_EULA: Y
SA_PASSWORD: My@Test@Password1

steps:
- name: Checkout
uses: actions/checkout@v2
- name: Install ODBC Drivers
run: |
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
apt-get update
ACCEPT_EULA=Y apt-get install -y msodbcsql17
ln -s /opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.*.so.* /opt/microsoft/msodbcsql17/lib64/libmsodbcsql-17.so
shell: sudo bash {0}
- name: Setup Rust toolchain
run: |
rustup toolchain install stable
rustup default stable
rustup component add rustfmt clippy
- uses: Swatinem/rust-cache@v1
- name: Test
run: cd arrow-odbc-integration-testing && cargo test
5 changes: 5 additions & 0 deletions Cargo.toml
Expand Up @@ -87,6 +87,9 @@ strength_reduce = { version = "0.2", optional = true }
# For instruction multiversioning
multiversion = { version = "0.6.1", optional = true }

# For support for odbc
odbc-api = { version = "0.35", optional = true }

[dev-dependencies]
criterion = "0.3"
flate2 = "1"
Expand All @@ -106,6 +109,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = []
full = [
"io_odbc",
"io_csv",
"io_csv_async",
"io_json",
Expand All @@ -126,6 +130,7 @@ full = [
# parses timezones used in timestamp conversions
"chrono-tz",
]
io_odbc = ["odbc-api"]
io_csv = ["io_csv_read", "io_csv_write"]
io_csv_async = ["io_csv_read_async"]
io_csv_read = ["csv", "lexical-core"]
Expand Down
5 changes: 4 additions & 1 deletion README.md
Expand Up @@ -31,8 +31,9 @@ documentation of each of its APIs.
* Apache Arrow IPC (all types)
* Apache Arrow Flight (all types)
* Apache Parquet (except deep nested types)
* Apache Avro (not all types yet)
* Apache Avro (all types)
* NJSON
* ODBC (some types)
* Extensive suite of compute operations
* aggregations
* arithmetics
Expand All @@ -58,8 +59,10 @@ documentation of each of its APIs.
This crate uses `unsafe` when strickly necessary:
* when the compiler can't prove certain invariants and
* FFI

We have extensive tests over these, all of which run and pass under MIRI.
Most uses of `unsafe` fall into 3 categories:

* The Arrow format has invariants over utf8 that can't be written in safe Rust
* `TrustedLen` and trait specialization are still nightly features
* FFI
Expand Down
11 changes: 11 additions & 0 deletions arrow-odbc-integration-testing/Cargo.toml
@@ -0,0 +1,11 @@
[package]
name = "arrow-odbc-integration-testing"
version = "0.1.0"
authors = ["Jorge C. Leitao <jorgecarleitao@gmail.com>"]
edition = "2021"

[dependencies]
arrow2 = { path = "../", default-features = false, features = ["io_odbc"] }
lazy_static = "1.4.0"
# Function name macro is used to ensure unique table names in test
stdext = "0.3.1"
10 changes: 10 additions & 0 deletions arrow-odbc-integration-testing/docker-compose.yml
@@ -0,0 +1,10 @@
services:

mssql:
image: mcr.microsoft.com/mssql/server:2019-latest
ports:
- 1433:1433

environment:
- MSSQL_SA_PASSWORD=My@Test@Password1
command: ["/opt/mssql/bin/sqlservr", "--accept-eula", "--reset-sa-password"]
41 changes: 41 additions & 0 deletions arrow-odbc-integration-testing/src/lib.rs
@@ -0,0 +1,41 @@
#![cfg(test)]

mod read;
mod write;

use arrow2::io::odbc::api::{Connection, Environment, Error as OdbcError};
use lazy_static::lazy_static;

lazy_static! {
/// This is an example for using doc comment attributes
pub static ref ENV: Environment = Environment::new().unwrap();
}

/// Connection string for our test instance of Microsoft SQL Server
const MSSQL: &str =
"Driver={ODBC Driver 17 for SQL Server};Server=localhost;UID=SA;PWD=My@Test@Password1;";

/// Creates the table and assures it is empty. Columns are named a,b,c, etc.
pub fn setup_empty_table(
conn: &Connection<'_>,
table_name: &str,
column_types: &[&str],
) -> std::result::Result<(), OdbcError> {
let drop_table = &format!("DROP TABLE IF EXISTS {}", table_name);

let column_names = &["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k"];
let cols = column_types
.iter()
.zip(column_names)
.map(|(ty, name)| format!("{} {}", name, ty))
.collect::<Vec<_>>()
.join(", ");

let create_table = format!(
"CREATE TABLE {} (id int IDENTITY(1,1),{});",
table_name, cols
);
conn.execute(drop_table, ())?;
conn.execute(&create_table, ())?;
Ok(())
}
139 changes: 139 additions & 0 deletions arrow-odbc-integration-testing/src/read.rs
@@ -0,0 +1,139 @@
use stdext::function_name;

use arrow2::array::{Array, BinaryArray, BooleanArray, Int32Array, Utf8Array};
use arrow2::chunk::Chunk;
use arrow2::datatypes::Field;
use arrow2::error::Result;
use arrow2::io::odbc::api::{Connection, Cursor};
use arrow2::io::odbc::read::{buffer_from_metadata, deserialize, infer_schema};

use super::{setup_empty_table, ENV, MSSQL};

#[test]
fn int() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![Box::new(Int32Array::from_slice([1])) as _])];

test(expected, "INT", "(1)", table_name)
}

#[test]
fn int_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(Int32Array::from([Some(1), None])) as _,
])];

test(expected, "INT", "(1),(NULL)", table_name)
}

#[test]
fn bool() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(BooleanArray::from_slice([true])) as _
])];

test(expected, "BIT", "(1)", table_name)
}

#[test]
fn bool_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(BooleanArray::from([Some(true), None])) as _,
])];

test(expected, "BIT", "(1),(NULL)", table_name)
}

#[test]
fn binary() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected = vec![Chunk::new(vec![
Box::new(BinaryArray::<i32>::from([Some(b"ab")])) as _,
])];

test(
expected,
"VARBINARY(2)",
"(CAST('ab' AS VARBINARY(2)))",
table_name,
)
}

#[test]
fn binary_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected =
vec![Chunk::new(vec![
Box::new(BinaryArray::<i32>::from([Some(b"ab"), None, Some(b"ac")])) as _,
])];

test(
expected,
"VARBINARY(2)",
"(CAST('ab' AS VARBINARY(2))),(NULL),(CAST('ac' AS VARBINARY(2)))",
table_name,
)
}

#[test]
fn utf8_nullable() -> Result<()> {
let table_name = function_name!().rsplit_once(':').unwrap().1;
let expected =
vec![Chunk::new(vec![
Box::new(Utf8Array::<i32>::from([Some("ab"), None, Some("ac")])) as _,
])];

test(expected, "VARCHAR(2)", "('ab'),(NULL),('ac')", table_name)
}

fn test(
expected: Vec<Chunk<Box<dyn Array>>>,
type_: &str,
insert: &str,
table_name: &str,
) -> Result<()> {
let connection = ENV.connect_with_connection_string(MSSQL).unwrap();
setup_empty_table(&connection, table_name, &[type_]).unwrap();
connection
.execute(&format!("INSERT INTO {table_name} (a) VALUES {insert}"), ())
.unwrap();

// When
let query = format!("SELECT a FROM {table_name} ORDER BY id");

let chunks = read(&connection, &query)?.1;

assert_eq!(chunks, expected);
Ok(())
}

pub fn read(
connection: &Connection<'_>,
query: &str,
) -> Result<(Vec<Field>, Vec<Chunk<Box<dyn Array>>>)> {
let mut a = connection.prepare(query).unwrap();
let fields = infer_schema(&a)?;

let max_batch_size = 100;
let buffer = buffer_from_metadata(&a, max_batch_size).unwrap();

let cursor = a.execute(()).unwrap().unwrap();
let mut cursor = cursor.bind_buffer(buffer).unwrap();

let mut chunks = vec![];
while let Some(batch) = cursor.fetch().unwrap() {
let arrays = (0..batch.num_cols())
.zip(fields.iter())
.map(|(index, field)| {
let column_view = batch.column(index);
deserialize(column_view, field.data_type.clone())
})
.collect::<Vec<_>>();
chunks.push(Chunk::new(arrays));
}

Ok((fields, chunks))
}

0 comments on commit c999595

Please sign in to comment.