diff --git a/poetry.lock b/poetry.lock index d778bb51..191abc20 100644 --- a/poetry.lock +++ b/poetry.lock @@ -77,6 +77,21 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" [package.dependencies] pytz = ">=2015.7" +[[package]] +name = "backports.entry-points-selectable" +version = "1.1.0" +description = "Compatibility shim providing selectable entry points for older implementations" +category = "dev" +optional = false +python-versions = ">=2.7" + +[package.dependencies] +importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} + +[package.extras] +docs = ["sphinx", "jaraco.packaging (>=8.2)", "rst.linker (>=1.9)"] +testing = ["pytest (>=4.6)", "pytest-flake8", "pytest-cov", "pytest-black (>=0.3.7)", "pytest-mypy", "pytest-checkdocs (>=2.4)", "pytest-enabler (>=1.0.1)"] + [[package]] name = "black" version = "20.8b1" @@ -143,6 +158,17 @@ category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +[[package]] +name = "charset-normalizer" +version = "2.0.3" +description = "The Real First Universal Charset Detector. Open, modern and actively maintained alternative to Chardet." +category = "main" +optional = false +python-versions = ">=3.5.0" + +[package.extras] +unicode_backport = ["unicodedata2"] + [[package]] name = "click" version = "7.1.2" @@ -313,11 +339,11 @@ license = ["editdistance-s"] [[package]] name = "idna" -version = "2.10" +version = "3.2" description = "Internationalized Domain Names in Applications (IDNA)" category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" +python-versions = ">=3.5" [[package]] name = "imagesize" @@ -558,11 +584,11 @@ six = ">=1.9.0" [[package]] name = "pathspec" -version = "0.8.1" +version = "0.9.0" description = "Utility library for gitignore style pattern matching of file paths." category = "dev" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" [[package]] name = "pefile" @@ -586,6 +612,14 @@ python-versions = "*" [package.extras] dev = ["cython", "metapensiero.tool.bump-version", "pycparser", "readme-renderer"] +[[package]] +name = "platformdirs" +version = "2.0.2" +description = "A small Python module for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." +category = "dev" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" + [[package]] name = "pluggy" version = "0.13.1" @@ -676,7 +710,7 @@ python-versions = ">=3.5" [[package]] name = "pyinstaller" -version = "4.3" +version = "4.4" description = "PyInstaller bundles a Python application and all its dependencies into a single package." category = "dev" optional = false @@ -692,7 +726,7 @@ pywin32-ctypes = {version = ">=0.2.0", markers = "sys_platform == \"win32\""} [package.extras] encryption = ["tinyaes (>=1.0.0)"] -hook_testing = ["pytest (>=2.7.3)", "execnet (>=1.5.0)", "psutil"] +hook_testing = ["execnet (>=1.5.0)", "psutil", "pytest (>=2.7.3)"] [[package]] name = "pyinstaller-hooks-contrib" @@ -784,7 +818,7 @@ pytest = ">=2.6.0" [[package]] name = "python-dateutil" -version = "2.8.1" +version = "2.8.2" description = "Extensions to the standard Python datetime module" category = "main" optional = true @@ -835,21 +869,21 @@ python-versions = "*" [[package]] name = "requests" -version = "2.25.1" +version = "2.26.0" description = "Python HTTP for Humans." category = "main" optional = false -python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" [package.dependencies] certifi = ">=2017.4.17" -chardet = ">=3.0.2,<5" -idna = ">=2.5,<3" +charset-normalizer = {version = ">=2.0.0,<2.1.0", markers = "python_version >= \"3\""} +idna = {version = ">=2.5,<4", markers = "python_version >= \"3\""} urllib3 = ">=1.21.1,<1.27" [package.extras] -security = ["pyOpenSSL (>=0.14)", "cryptography (>=1.3.4)"] socks = ["PySocks (>=1.5.6,!=1.5.7)", "win-inet-pton"] +use_chardet_on_py3 = ["chardet (>=3.0.2,<5)"] [[package]] name = "six" @@ -880,7 +914,7 @@ requests = ">=2.20.0" [[package]] name = "sphinx" -version = "4.1.0" +version = "4.1.1" description = "Python documentation generator" category = "dev" optional = false @@ -899,10 +933,10 @@ requests = ">=2.5.0" snowballstemmer = ">=1.1" sphinxcontrib-applehelp = "*" sphinxcontrib-devhelp = "*" -sphinxcontrib-htmlhelp = "*" +sphinxcontrib-htmlhelp = ">=2.0.0" sphinxcontrib-jsmath = "*" sphinxcontrib-qthelp = "*" -sphinxcontrib-serializinghtml = "*" +sphinxcontrib-serializinghtml = ">=1.1.5" [package.extras] docs = ["sphinxcontrib-websupport"] @@ -1013,7 +1047,7 @@ test = ["nose (==1.3.7)", "mock (==3.0.5)", "pylint (==2.4.4)", "nose-cov (==1.6 [[package]] name = "sqlalchemy" -version = "1.4.20" +version = "1.4.21" description = "Database Abstraction Library" category = "main" optional = true @@ -1141,18 +1175,19 @@ socks = ["PySocks (>=1.5.6,!=1.5.7,<2.0)"] [[package]] name = "virtualenv" -version = "20.4.7" +version = "20.6.0" description = "Virtual Python Environment builder" category = "dev" optional = false -python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.7" +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" [package.dependencies] -appdirs = ">=1.4.3,<2" +"backports.entry-points-selectable" = ">=1.0.4" distlib = ">=0.3.1,<1" filelock = ">=3.0.0,<4" importlib-metadata = {version = ">=0.12", markers = "python_version < \"3.8\""} importlib-resources = {version = ">=1.0", markers = "python_version < \"3.7\""} +platformdirs = ">=2,<3" six = ">=1.9.0,<2" [package.extras] @@ -1193,7 +1228,7 @@ pandas = ["pandas", "sqlalchemy"] [metadata] lock-version = "1.1" python-versions = ">=3.6.1,<4.0" -content-hash = "c3d3572bfcf9cb51212e793f8a3062ae71d183f9a55fa8f9b2b0b1c6e14e541d" +content-hash = "7f21902e06d6dfa0aaf00e2a5d28d04db21133c5b0dad2045a447acb4f80fd3f" [metadata.files] alabaster = [ @@ -1227,6 +1262,10 @@ babel = [ {file = "Babel-2.9.1-py2.py3-none-any.whl", hash = "sha256:ab49e12b91d937cd11f0b67cb259a57ab4ad2b59ac7a3b41d6c06c0ac5b0def9"}, {file = "Babel-2.9.1.tar.gz", hash = "sha256:bc0c176f9f6a994582230df350aa6e05ba2ebe4b3ac317eab29d9be5d2768da0"}, ] +"backports.entry-points-selectable" = [ + {file = "backports.entry_points_selectable-1.1.0-py2.py3-none-any.whl", hash = "sha256:a6d9a871cde5e15b4c4a53e3d43ba890cc6861ec1332c9c2428c92f977192acc"}, + {file = "backports.entry_points_selectable-1.1.0.tar.gz", hash = "sha256:988468260ec1c196dab6ae1149260e2f5472c9110334e5d51adcb77867361f6a"}, +] black = [ {file = "black-20.8b1.tar.gz", hash = "sha256:1c02557aa099101b9d21496f8a914e9ed2222ef70336404eeeac8edba836fbea"}, ] @@ -1288,6 +1327,10 @@ chardet = [ {file = "chardet-4.0.0-py2.py3-none-any.whl", hash = "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5"}, {file = "chardet-4.0.0.tar.gz", hash = "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa"}, ] +charset-normalizer = [ + {file = "charset-normalizer-2.0.3.tar.gz", hash = "sha256:c46c3ace2d744cfbdebceaa3c19ae691f53ae621b39fd7570f59d14fb7f2fd12"}, + {file = "charset_normalizer-2.0.3-py3-none-any.whl", hash = "sha256:88fce3fa5b1a84fdcb3f603d889f723d1dd89b26059d0123ca435570e848d5e1"}, +] click = [ {file = "click-7.1.2-py2.py3-none-any.whl", hash = "sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc"}, {file = "click-7.1.2.tar.gz", hash = "sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a"}, @@ -1457,8 +1500,8 @@ identify = [ {file = "identify-2.2.11.tar.gz", hash = "sha256:a0e700637abcbd1caae58e0463861250095dfe330a8371733a471af706a4a29a"}, ] idna = [ - {file = "idna-2.10-py2.py3-none-any.whl", hash = "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0"}, - {file = "idna-2.10.tar.gz", hash = "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6"}, + {file = "idna-3.2-py3-none-any.whl", hash = "sha256:14475042e284991034cb48e06f6851428fb14c4dc953acd9be9a5e95c7b6dd7a"}, + {file = "idna-3.2.tar.gz", hash = "sha256:467fbad99067910785144ce333826c71fb0e63a425657295239737f7ecd125f3"}, ] imagesize = [ {file = "imagesize-1.2.0-py2.py3-none-any.whl", hash = "sha256:6965f19a6a2039c7d48bca7dba2473069ff854c36ae6f19d2cde309d998228a1"}, @@ -1671,8 +1714,8 @@ parsimonious = [ {file = "parsimonious-0.8.1.tar.gz", hash = "sha256:3add338892d580e0cb3b1a39e4a1b427ff9f687858fdd61097053742391a9f6b"}, ] pathspec = [ - {file = "pathspec-0.8.1-py2.py3-none-any.whl", hash = "sha256:aa0cb481c4041bf52ffa7b0d8fa6cd3e88a2ca4879c533c9153882ee2556790d"}, - {file = "pathspec-0.8.1.tar.gz", hash = "sha256:86379d6b86d75816baba717e64b1a3a3469deb93bb76d613c9ce79edc5cb68fd"}, + {file = "pathspec-0.9.0-py2.py3-none-any.whl", hash = "sha256:7d15c4ddb0b5c802d161efc417ec1a2558ea2653c2e8ad9c19098201dc1c993a"}, + {file = "pathspec-0.9.0.tar.gz", hash = "sha256:e564499435a2673d586f6b2130bb5b95f04a3ba06f81b8f895b651a3c76aabb1"}, ] pefile = [ {file = "pefile-2021.5.24.tar.gz", hash = "sha256:ed79b2353daa58421459abf4d685953bde0adf9f6e188944f97ba9795f100246"}, @@ -1696,6 +1739,10 @@ pglast = [ {file = "pglast-1.17-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:4999901ac3ff4fe2237506b81e45abd32ff6656771700d3c1bab3a9de9e00e27"}, {file = "pglast-1.17.tar.gz", hash = "sha256:2979b38ca5f72cfa0a5db78af2f62d04db6a7647ee7f03eac7a67f9e86e3f5f9"}, ] +platformdirs = [ + {file = "platformdirs-2.0.2-py2.py3-none-any.whl", hash = "sha256:0b9547541f599d3d242078ae60b927b3e453f0ad52f58b4d4bc3be86aed3ec41"}, + {file = "platformdirs-2.0.2.tar.gz", hash = "sha256:3b00d081227d9037bbbca521a5787796b5ef5000faea1e43fd76f1d44b06fcfa"}, +] pluggy = [ {file = "pluggy-0.13.1-py2.py3-none-any.whl", hash = "sha256:966c145cd83c96502c3c3868f50408687b38434af77734af1e9ca461a4081d2d"}, {file = "pluggy-0.13.1.tar.gz", hash = "sha256:15b2acde666561e1298d71b523007ed7364de07029219b604cf808bfa1c765b0"}, @@ -1776,7 +1823,13 @@ pygments = [ {file = "Pygments-2.9.0.tar.gz", hash = "sha256:a18f47b506a429f6f4b9df81bb02beab9ca21d0a5fee38ed15aef65f0545519f"}, ] pyinstaller = [ - {file = "pyinstaller-4.3.tar.gz", hash = "sha256:5ecf8bbc230d7298a796e52bb745b95eee12878d141f1645612c99246ecd23f2"}, + {file = "pyinstaller-4.4-py3-none-macosx_10_13_universal2.whl", hash = "sha256:0e802c082487719aab6b2a93575819944c008e6e1ba44c7836fc45660a9586c0"}, + {file = "pyinstaller-4.4-py3-none-manylinux2014_aarch64.whl", hash = "sha256:2ef691eeb360073075b6c355535a15dcdedac28d2ecc8448d34bce671ec2dff4"}, + {file = "pyinstaller-4.4-py3-none-manylinux2014_i686.whl", hash = "sha256:2cfef29a878fc54040e2287c19ea4bdc667f473b59918fcb51a1e68366ecb814"}, + {file = "pyinstaller-4.4-py3-none-manylinux2014_x86_64.whl", hash = "sha256:b0f13d0a33b9a21659967db65b475c14645b456dc70ba68fa9cebf4bc29ef58e"}, + {file = "pyinstaller-4.4-py3-none-win32.whl", hash = "sha256:d0b35f885650b9dc69072adf948b608bcb506c5256dd7f0d0967c1f177bb41b1"}, + {file = "pyinstaller-4.4-py3-none-win_amd64.whl", hash = "sha256:4485046ca929e15f6a2db746ce183c14e1fc3c1e8d2102aad8f1403b25d9ebdf"}, + {file = "pyinstaller-4.4.tar.gz", hash = "sha256:af3ef0b9f265a2d3859357a25ab16743fbb6143c89fd7c3570cb84b8d24db0ba"}, ] pyinstaller-hooks-contrib = [ {file = "pyinstaller-hooks-contrib-2021.2.tar.gz", hash = "sha256:7f5d0689b30da3092149fc536a835a94045ac8c9f0e6dfb23ac171890f5ea8f2"}, @@ -1825,8 +1878,8 @@ pytest-env = [ {file = "pytest-env-0.6.2.tar.gz", hash = "sha256:7e94956aef7f2764f3c147d216ce066bf6c42948bb9e293169b1b1c880a580c2"}, ] python-dateutil = [ - {file = "python-dateutil-2.8.1.tar.gz", hash = "sha256:73ebfe9dbf22e832286dafa60473e4cd239f8592f699aa5adaf10050e6e1823c"}, - {file = "python_dateutil-2.8.1-py2.py3-none-any.whl", hash = "sha256:75bb3f31ea686f1197762692a9ee6a7550b59fc6ca3a1f4b5d7e32fb98e2da2a"}, + {file = "python-dateutil-2.8.2.tar.gz", hash = "sha256:0123cacc1627ae19ddf3c27a5de5bd67ee4586fbdd6440d9748f8abb483d3e86"}, + {file = "python_dateutil-2.8.2-py2.py3-none-any.whl", hash = "sha256:961d03dc3453ebbc59dbdea9e4e11c5651520a876d0f4db161e8674aae935da9"}, ] pytz = [ {file = "pytz-2021.1-py2.py3-none-any.whl", hash = "sha256:eb10ce3e7736052ed3623d49975ce333bcd712c7bb19a58b9e2089d4057d0798"}, @@ -1925,8 +1978,8 @@ regex = [ {file = "regex-2021.7.6.tar.gz", hash = "sha256:8394e266005f2d8c6f0bc6780001f7afa3ef81a7a2111fa35058ded6fce79e4d"}, ] requests = [ - {file = "requests-2.25.1-py2.py3-none-any.whl", hash = "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e"}, - {file = "requests-2.25.1.tar.gz", hash = "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804"}, + {file = "requests-2.26.0-py2.py3-none-any.whl", hash = "sha256:6c1246513ecd5ecd4528a0906f910e8f0f9c6b8ec72030dc9fd154dc1a6efd24"}, + {file = "requests-2.26.0.tar.gz", hash = "sha256:b8aa58f8cf793ffd8782d3d8cb19e66ef36f7aba4353eec859e74678b01b07a7"}, ] six = [ {file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"}, @@ -1941,8 +1994,8 @@ sodapy = [ {file = "sodapy-2.1.0.tar.gz", hash = "sha256:44e701efc16600d2b3b24b56b6e1d3a0e55567909b9ac84af8f9d1eb4870dc0f"}, ] sphinx = [ - {file = "Sphinx-4.1.0-py3-none-any.whl", hash = "sha256:51028bb0d3340eb80bcc1a2d614e8308ac78d226e6b796943daf57920abc1aea"}, - {file = "Sphinx-4.1.0.tar.gz", hash = "sha256:4219f14258ca5612a0c85ed9b7222d54da69724d7e9dd92d1819ad1bf65e1ad2"}, + {file = "Sphinx-4.1.1-py3-none-any.whl", hash = "sha256:3d513088236eef51e5b0adb78b0492eb22cc3b8ccdb0b36dd021173b365d4454"}, + {file = "Sphinx-4.1.1.tar.gz", hash = "sha256:23c846a1841af998cb736218539bb86d16f5eb95f5760b1966abcd2d584e62b8"}, ] sphinx-rtd-theme = [ {file = "sphinx_rtd_theme-0.5.2-py2.py3-none-any.whl", hash = "sha256:4a05bdbe8b1446d77a01e20a23ebc6777c74f43237035e76be89699308987d6f"}, @@ -1977,36 +2030,36 @@ splitgraph-pipelinewise-target-postgres = [ {file = "splitgraph_pipelinewise_target_postgres-2.1.0-py3-none-any.whl", hash = "sha256:9b761b768b14c67f0f69b122c047209a0c0efb415c1eff15b9f5d7b31d61a8a5"}, ] sqlalchemy = [ - {file = "SQLAlchemy-1.4.20-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:525dd3c2205b11a2bc6d770bf1ec63bde0253fd754b4c19c399d27ddc9dad0d3"}, - {file = "SQLAlchemy-1.4.20-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:4a67371752fd86d1d03a3b82d4e75404608f6f4d579b9676124079a22a40c79f"}, - {file = "SQLAlchemy-1.4.20-cp27-cp27m-win32.whl", hash = "sha256:7150e5b543b466f45f668b352f7abda27998cc8035f051d1b7e9524ca9eb2f5f"}, - {file = "SQLAlchemy-1.4.20-cp27-cp27m-win_amd64.whl", hash = "sha256:6da83225a23eaf7b3f48f3d5f53c91b2cf00fbfa48b24a7a758160112dd3e123"}, - {file = "SQLAlchemy-1.4.20-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:9841762d114018c49483c089fa2d47f7e612e57666323f615913d7d7f46e9606"}, - {file = "SQLAlchemy-1.4.20-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:eaee5dd378f6f0d7c3ec49aeeb26564d55ac0ad73b9b4688bf29e66deabddf73"}, - {file = "SQLAlchemy-1.4.20-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:9eb25bcf9161e2fcbe9eebe8e829719b2334e849183f0e496bf4b83722bcccfa"}, - {file = "SQLAlchemy-1.4.20-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:8d860c62e3f51623ccd528d8fac44580501df557d4b467cc5581587fcf057719"}, - {file = "SQLAlchemy-1.4.20-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0f6d467b67a7e5048f1408e8ea60d6caa70be5b386d0eebbf1185ab49cb8c7e4"}, - {file = "SQLAlchemy-1.4.20-cp36-cp36m-win32.whl", hash = "sha256:ff8bebc7a9d297dff2003460e01db2c20c63818b45fb19170f388b1a72fe5a14"}, - {file = "SQLAlchemy-1.4.20-cp36-cp36m-win_amd64.whl", hash = "sha256:46361690f1e1c5385994a4caeb6e8126063ff593a5c635700bbc1245de793c1e"}, - {file = "SQLAlchemy-1.4.20-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:c0eb2cd3ad4967fcbdd9e066e8cd91fe2c23c671dbae9952f0b4d3d42832cc5f"}, - {file = "SQLAlchemy-1.4.20-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:76fbc24311a3d039d6cd147d396719f606d96d1413f3816c028a48e29367f646"}, - {file = "SQLAlchemy-1.4.20-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:f14acb0fd16d404fda9370f93aace682f284340c89c3442ac747c5466ac7e2b5"}, - {file = "SQLAlchemy-1.4.20-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:fcd84e4d46a86291495d131a7824ba38d2e8278bda9425c50661a04633174319"}, - {file = "SQLAlchemy-1.4.20-cp37-cp37m-win32.whl", hash = "sha256:2f60a2e599cf5cf5e5327ce60f2918b897e42ad9f405d10dd01e37869c0ce6fc"}, - {file = "SQLAlchemy-1.4.20-cp37-cp37m-win_amd64.whl", hash = "sha256:f6fc526bd70898489d02bf52c8f0632ab377592ae954d0c0a5bb38d618dddaa9"}, - {file = "SQLAlchemy-1.4.20-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:25c0e0f3a7e8c19350086b3c0fe93c4def045cec053d749ef15da710c4d54c81"}, - {file = "SQLAlchemy-1.4.20-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e0d48456e1aa4f0537f9c9af7be71e1f0659ff68bc1cd538ebc785f6b007bd0d"}, - {file = "SQLAlchemy-1.4.20-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:9675d5bc7e4f96a7bb2b54d14e9b269a5fb6e5d36ecc7d01f0f65bb9af3185f9"}, - {file = "SQLAlchemy-1.4.20-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b502b5e2f08500cc4b8d29bfc4f51d805adcbc00f8d149e98fda8aae85ddb644"}, - {file = "SQLAlchemy-1.4.20-cp38-cp38-win32.whl", hash = "sha256:aad3234a41340e9cf6184e621694e2a7233ba3f8aef9b1e6de8cba431b45ebd2"}, - {file = "SQLAlchemy-1.4.20-cp38-cp38-win_amd64.whl", hash = "sha256:6c8406c3d8c1c7d15da454de15d77f7bb48d14ede5db994f74226c348cf1050e"}, - {file = "SQLAlchemy-1.4.20-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:238d78b3110b7f7cffdb70bf9cda686e0d876a849bc78ba4d471aa7b1461f306"}, - {file = "SQLAlchemy-1.4.20-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:854a7b15750e617e16f8d65dbc004f065a7963544b253b923f16109557648777"}, - {file = "SQLAlchemy-1.4.20-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:ff38ecf89c69a531a7326c2dae71982edfe2f805f3c016cdc5bfd1a04ebf80cb"}, - {file = "SQLAlchemy-1.4.20-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:86c079732328f1add097b0b8079cd532b5d28e207fac93e9d6ea5f487506deef"}, - {file = "SQLAlchemy-1.4.20-cp39-cp39-win32.whl", hash = "sha256:46b99eab618cdc1c871ea707b7c52edc23cfea6c750740cd242ba62b5c84de7f"}, - {file = "SQLAlchemy-1.4.20-cp39-cp39-win_amd64.whl", hash = "sha256:b86d83fefc8a8c394f3490c37e1953bc16c311a3d1d1cf91518793bfb9847fb4"}, - {file = "SQLAlchemy-1.4.20.tar.gz", hash = "sha256:38ee3a266afef2978e82824650457f70c5d74ec0cadec1b10fe5ed6f038eb5d0"}, + {file = "SQLAlchemy-1.4.21-cp27-cp27m-macosx_10_14_x86_64.whl", hash = "sha256:e10be2b717979260db0f0fa6a531e6ddccf0d85cca11983b41d04049214fa0fc"}, + {file = "SQLAlchemy-1.4.21-cp27-cp27m-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:6774f2001e6359b041b8af3b9bc7669afc6adce39438fae99bfacf4b03490d54"}, + {file = "SQLAlchemy-1.4.21-cp27-cp27m-win32.whl", hash = "sha256:ba84fb12826e4db193d5fbfdcf475f85c07fdfb76b84b3fb1504905f540db7ab"}, + {file = "SQLAlchemy-1.4.21-cp27-cp27m-win_amd64.whl", hash = "sha256:4c8dc1ca3330b716c48317b4d91911e00a54c0f2de486c9c25ec0c54ebf12b5f"}, + {file = "SQLAlchemy-1.4.21-cp27-cp27mu-manylinux_2_5_x86_64.manylinux1_x86_64.whl", hash = "sha256:20a5ecd03134c7ed2c05dfdf5bd96d84480afeebe3484e416f7d7ec8c92596ae"}, + {file = "SQLAlchemy-1.4.21-cp36-cp36m-macosx_10_14_x86_64.whl", hash = "sha256:8a98e38cb07b63459070c3a63abd5059f254d2ddec7afe77824e160f6b9e26c3"}, + {file = "SQLAlchemy-1.4.21-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:da11e254ab264f515b59d16f5d1ff24f5f02fbf0b9de2d2981e704176a75c03a"}, + {file = "SQLAlchemy-1.4.21-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:8f77ad5628e82f76ace2ff9a5b10ee87688bda0867f3e269cab5ed8be7e4ccc5"}, + {file = "SQLAlchemy-1.4.21-cp36-cp36m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba8fd99b546aacac74c97bb0676dd5270a1cd84c44fb67adc71d00ccabcb34a8"}, + {file = "SQLAlchemy-1.4.21-cp36-cp36m-win32.whl", hash = "sha256:bee8b2a399c6be1642d5cfcfb9d0d438fcacdd5188e0b16366fa15dbd49ec667"}, + {file = "SQLAlchemy-1.4.21-cp36-cp36m-win_amd64.whl", hash = "sha256:ef998f03ee92e6c98acdfac464c145e0a9949301b6e83688d7194e746314fcba"}, + {file = "SQLAlchemy-1.4.21-cp37-cp37m-macosx_10_14_x86_64.whl", hash = "sha256:decb9caf3a5695a8a4ebe7153b8ef7dcc57f85dc16896e3a33d5cf3e629ac396"}, + {file = "SQLAlchemy-1.4.21-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:89dbe4a792f28fd21d3319d26ceea32a3132f1c5ae578ec513f77e4c2adb9b91"}, + {file = "SQLAlchemy-1.4.21-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:340fb8eda79e5b116f761c953879c98c423eca82481d5cdad762beb108ee763e"}, + {file = "SQLAlchemy-1.4.21-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:538544799d537684e83e697298fd5078252ee68f23b44d8271f77647f225bca3"}, + {file = "SQLAlchemy-1.4.21-cp37-cp37m-win32.whl", hash = "sha256:53b17656bacdb3b194bc6cff1bd2e044879cf015ab5352c932173c2172a4b99d"}, + {file = "SQLAlchemy-1.4.21-cp37-cp37m-win_amd64.whl", hash = "sha256:cfa0c25e4c87517a679d97d0617ddaccb46337f558beac72e7d85c2f34365a35"}, + {file = "SQLAlchemy-1.4.21-cp38-cp38-macosx_10_14_x86_64.whl", hash = "sha256:dae7ab0c4d34d40895e92b71149bcd72a2f7c5971dc013d1c29393b6067448e3"}, + {file = "SQLAlchemy-1.4.21-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:92c9f6dbe3b3d7059beea12e5601b0b37dd7a51f9bb29fbc98ab314e2a8ffdb7"}, + {file = "SQLAlchemy-1.4.21-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:eb418ec022538b24d73260b694ddb5f3878d554614a4611decb433d8eee69acd"}, + {file = "SQLAlchemy-1.4.21-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:628120ce7ef7f31824929c244894ee22a98d706d8879fb5441e1c572e02ca2ae"}, + {file = "SQLAlchemy-1.4.21-cp38-cp38-win32.whl", hash = "sha256:70b978fb1bbb629e9ce41235511d89ef9d694e3933b5a52dd6d0a4040b6c7830"}, + {file = "SQLAlchemy-1.4.21-cp38-cp38-win_amd64.whl", hash = "sha256:5dbcb3fd1d64d0835e383ea091037ca6aa70a43bd1cabb0c71c27796f2c5173f"}, + {file = "SQLAlchemy-1.4.21-cp39-cp39-macosx_10_14_x86_64.whl", hash = "sha256:2ad74f0a7ae8c4fa374d3be26cdf8c0897669ba3fd8bad4607710bc2fb7f132d"}, + {file = "SQLAlchemy-1.4.21-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:0b7af10ecd1c3829ddf824e39129e026476af6a261388db4d26bf11525fd8d05"}, + {file = "SQLAlchemy-1.4.21-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl", hash = "sha256:87cf4054632c20160592ca2917aec93bb83b12b3a39c865feab1ba44e0ed120d"}, + {file = "SQLAlchemy-1.4.21-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6bc28702213988c96e394685ad4103a4e347305cf90569693bef8e3d12f233ae"}, + {file = "SQLAlchemy-1.4.21-cp39-cp39-win32.whl", hash = "sha256:640fc3556a1022a781f3f07fd5dc9da842ef87f873139402d5d98d64d776360f"}, + {file = "SQLAlchemy-1.4.21-cp39-cp39-win_amd64.whl", hash = "sha256:5042a7d43a8e0a8ffc8d2acacbd5fad1edf8336c376714632a5c61eff56ac06e"}, + {file = "SQLAlchemy-1.4.21.tar.gz", hash = "sha256:07e9054f4df612beadd12ca8a5342246bffcad74a1fa8df1368d1f2bb07d8fc7"}, ] tabulate = [ {file = "tabulate-0.8.9-py3-none-any.whl", hash = "sha256:d7c013fe7abbc5e491394e10fa845f8f32fe54f8dc60c6622c6cf482d25d47e4"}, @@ -2078,8 +2131,8 @@ urllib3 = [ {file = "urllib3-1.26.6.tar.gz", hash = "sha256:f57b4c16c62fa2760b7e3d97c35b255512fb6b59a259730f36ba32ce9f8e342f"}, ] virtualenv = [ - {file = "virtualenv-20.4.7-py2.py3-none-any.whl", hash = "sha256:2b0126166ea7c9c3661f5b8e06773d28f83322de7a3ff7d06f0aed18c9de6a76"}, - {file = "virtualenv-20.4.7.tar.gz", hash = "sha256:14fdf849f80dbb29a4eb6caa9875d476ee2a5cf76a5f5415fa2f1606010ab467"}, + {file = "virtualenv-20.6.0-py2.py3-none-any.whl", hash = "sha256:e4fc84337dce37ba34ef520bf2d4392b392999dbe47df992870dc23230f6b758"}, + {file = "virtualenv-20.6.0.tar.gz", hash = "sha256:51df5d8a2fad5d1b13e088ff38a433475768ff61f202356bb9812c454c20ae45"}, ] websocket-client = [ {file = "websocket-client-1.1.0.tar.gz", hash = "sha256:b68e4959d704768fa20e35c9d508c8dc2bbc041fd8d267c0d7345cffe2824568"}, diff --git a/pyproject.toml b/pyproject.toml index 8ed56496..8f586e08 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ pyyaml = ">=5.1" jsonschema = ">=3.1.0" cryptography = ">=3.4.0" pydantic = ">=1.8.1" +chardet = "^4.0.0" # Socrata dataset mounting. # This could be optional but it's very lightweight (only requires requests). diff --git a/splitgraph/commandline/engine.py b/splitgraph/commandline/engine.py index 9ada7b87..df926d16 100644 --- a/splitgraph/commandline/engine.py +++ b/splitgraph/commandline/engine.py @@ -1,10 +1,7 @@ import logging import os import platform -import time -from io import BytesIO from pathlib import Path, PureWindowsPath -from tarfile import TarFile, TarInfo from typing import Dict, TYPE_CHECKING from urllib.parse import urlparse @@ -14,53 +11,15 @@ from splitgraph.__version__ import __version__ from splitgraph.config import CONFIG, SG_CMD_ASCII from splitgraph.exceptions import DockerUnavailableError, EngineSetupError +from splitgraph.utils.docker import get_docker_client, copy_to_container if TYPE_CHECKING: - from docker.models.containers import Container + pass DEFAULT_ENGINE = "default" -def get_docker_client(): - """Wrapper around client.from_env() that also pings the daemon - to make sure it can connect and if not, raises an error.""" - import docker - - try: - client = docker.from_env() - client.ping() - return client - except Exception as e: - raise DockerUnavailableError("Could not connect to the Docker daemon") from e - - -def copy_to_container(container: "Container", source_path: str, target_path: str) -> None: - """ - Copy a file into a Docker container - - :param container: Container object - :param source_path: Source file path - :param target_path: Target file path (in the container) - :return: - """ - # https://github.com/docker/docker-py/issues/1771 - with open(source_path, "rb") as f: - data = f.read() - - tarinfo = TarInfo(name=os.path.basename(target_path)) - tarinfo.size = len(data) - tarinfo.mtime = int(time.time()) - - stream = BytesIO() - tar = TarFile(fileobj=stream, mode="w") - tar.addfile(tarinfo, BytesIO(data)) - tar.close() - - stream.seek(0) - container.put_archive(path=os.path.dirname(target_path), data=stream.read()) - - def patch_and_save_config(config, patch): from splitgraph.config.config import patch_config from splitgraph.config.system_config import HOME_SUB_DIR diff --git a/splitgraph/commandline/image_creation.py b/splitgraph/commandline/image_creation.py index adfa93c0..0dca6784 100644 --- a/splitgraph/commandline/image_creation.py +++ b/splitgraph/commandline/image_creation.py @@ -7,7 +7,7 @@ import click from splitgraph.commandline.common import ImageType, RepositoryType, JsonType, remote_switch_option -from splitgraph.config import get_singleton, CONFIG +from splitgraph.config import DEFAULT_CHUNK_SIZE from splitgraph.exceptions import TableNotFoundError @@ -72,7 +72,7 @@ def checkout_c(image_spec, force, uncheckout, layered): @click.option( "-c", "--chunk-size", - default=int(get_singleton(CONFIG, "SG_COMMIT_CHUNK_SIZE")), + default=DEFAULT_CHUNK_SIZE, type=int, help="Split new tables into chunks of this many rows (by primary key). The default " "value is governed by the SG_COMMIT_CHUNK_SIZE configuration parameter.", diff --git a/splitgraph/config/__init__.py b/splitgraph/config/__init__.py index 5a6e9c16..2ff896f1 100644 --- a/splitgraph/config/__init__.py +++ b/splitgraph/config/__init__.py @@ -29,6 +29,8 @@ SG_CMD_ASCII = get_singleton(CONFIG, "SG_CMD_ASCII") == "true" +DEFAULT_CHUNK_SIZE = int(get_singleton(CONFIG, "SG_COMMIT_CHUNK_SIZE")) + REMOTES = list(CONFIG.get("remotes", [])) # This is a global variable that gets flipped to True by the Multicorn FDW class diff --git a/splitgraph/core/image.py b/splitgraph/core/image.py index a03d24a6..833508d6 100644 --- a/splitgraph/core/image.py +++ b/splitgraph/core/image.py @@ -157,14 +157,17 @@ def checkout(self, force: bool = False, layered: bool = False) -> None: self.object_engine.delete_table(target_schema, table) if layered: - self._lq_checkout() + self.lq_checkout() else: for table in self.get_tables(): self.get_table(table).materialize(table) set_head(self.repository, self.image_hash) - def _lq_checkout( - self, target_schema: Optional[str] = None, wrapper: Optional[str] = FDW_CLASS + def lq_checkout( + self, + target_schema: Optional[str] = None, + wrapper: Optional[str] = FDW_CLASS, + only_tables: Optional[List[str]] = None, ) -> None: """ Intended to be run on the sgr side. Initializes the FDW for all tables in a given image, @@ -198,6 +201,9 @@ def _lq_checkout( # It's easier to create the foreign tables from our side than to implement IMPORT FOREIGN SCHEMA by the FDW for table_name in self.get_tables(): + if only_tables and table_name not in only_tables: + continue + logging.debug( "Mounting %s:%s/%s into %s", self.repository.to_schema(), @@ -220,7 +226,7 @@ def query_schema( tmp_schema = str.format("o{:032x}", getrandbits(128)) try: self.object_engine.create_schema(tmp_schema) - self._lq_checkout(target_schema=tmp_schema, wrapper=wrapper) + self.lq_checkout(target_schema=tmp_schema, wrapper=wrapper) if commit: self.object_engine.commit() # Make sure the new tables are seen by other connections diff --git a/splitgraph/core/types.py b/splitgraph/core/types.py index 5c94ecf7..04676df2 100644 --- a/splitgraph/core/types.py +++ b/splitgraph/core/types.py @@ -68,6 +68,12 @@ def unwrap( return good, bad +def get_table_params(table_info: TableInfo, table_name: str) -> TableParams: + if isinstance(table_info, dict) and table_name in table_info: + return table_info[table_name][1] + return TableParams({}) + + class Comparable(metaclass=ABCMeta): @abstractmethod def __lt__(self, other: Any) -> bool: @@ -91,11 +97,11 @@ def dict_to_table_schema_params( def table_schema_params_to_dict( tables: Dict[str, Tuple[TableSchema, TableParams]] -) -> Dict[str, Dict[str, Dict[str, str]]]: +) -> Dict[str, Dict[str, Dict[str, Any]]]: return { t: { "schema": {c.name: c.pg_type for c in ts}, - "options": {tpk: str(tpv) for tpk, tpv in tp.items()}, + "options": tp, } for t, (ts, tp) in tables.items() } diff --git a/splitgraph/hooks/data_source/base.py b/splitgraph/hooks/data_source/base.py index 27c91a67..7c462701 100644 --- a/splitgraph/hooks/data_source/base.py +++ b/splitgraph/hooks/data_source/base.py @@ -5,6 +5,7 @@ from psycopg2._json import Json from psycopg2.sql import SQL, Identifier +from splitgraph.config import DEFAULT_CHUNK_SIZE from splitgraph.core.engine import repository_exists from splitgraph.core.image import Image from splitgraph.core.types import ( @@ -68,11 +69,16 @@ def __init__( self.credentials = credentials self.params = params + self._validate_table_params(tables) + self.tables = tables + + @classmethod + def _validate_table_params(cls, tables: Optional[TableInfo]) -> None: + import jsonschema + if isinstance(tables, dict): for _, table_params in tables.values(): - jsonschema.validate(instance=table_params, schema=self.table_params_schema) - - self.tables = tables + jsonschema.validate(instance=table_params, schema=cls.table_params_schema) @abstractmethod def introspect(self) -> IntrospectionResult: @@ -114,6 +120,7 @@ def _load(self, schema: str, tables: Optional[TableInfo] = None): raise NotImplementedError def load(self, repository: "Repository", tables: Optional[TableInfo] = None) -> str: + self._validate_table_params(tables) if not repository_exists(repository): repository.init() @@ -132,7 +139,7 @@ def load(self, repository: "Repository", tables: Optional[TableInfo] = None) -> head=None, image_hash=image_hash, snap_only=True, - chunk_size=100000, + chunk_size=DEFAULT_CHUNK_SIZE, schema=tmp_schema, ) finally: @@ -159,6 +166,7 @@ def sync( image_hash: Optional[str], tables: Optional[TableInfo] = None, ) -> str: + self._validate_table_params(tables) if not repository_exists(repository): repository.init() @@ -213,7 +221,7 @@ def get_ingestion_state(repository: "Repository", image_hash: Optional[str]) -> def prepare_new_image( - repository: "Repository", hash_or_tag: Optional[str] + repository: "Repository", hash_or_tag: Optional[str], comment: str = "Singer tap ingestion" ) -> Tuple[Optional[Image], str]: new_image_hash = "{:064x}".format(getrandbits(256)) if repository_exists(repository): @@ -235,5 +243,5 @@ def prepare_new_image( ) else: base_image = None - repository.images.add(parent_id=None, image=new_image_hash, comment="Singer tap ingestion") + repository.images.add(parent_id=None, image=new_image_hash, comment=comment) return base_image, new_image_hash diff --git a/splitgraph/hooks/data_source/fdw.py b/splitgraph/hooks/data_source/fdw.py index 317e736f..facff723 100644 --- a/splitgraph/hooks/data_source/fdw.py +++ b/splitgraph/hooks/data_source/fdw.py @@ -115,6 +115,7 @@ def mount( tables: Optional[TableInfo] = None, overwrite: bool = True, ) -> Optional[List[MountError]]: + self._validate_table_params(tables) tables = tables or self.tables or [] fdw = self.get_fdw_name() diff --git a/splitgraph/ingestion/airbyte/__init__.py b/splitgraph/ingestion/airbyte/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/splitgraph/ingestion/airbyte/data_source.py b/splitgraph/ingestion/airbyte/data_source.py new file mode 100644 index 00000000..39ced39f --- /dev/null +++ b/splitgraph/ingestion/airbyte/data_source.py @@ -0,0 +1,467 @@ +import json +import logging +import os +import re +from abc import ABC +from contextlib import contextmanager +from random import getrandbits +from typing import Optional, Dict, List, Tuple + +import docker.errors +import pydantic +from docker import DockerClient +from docker.models.containers import Container + +from splitgraph.core.repository import Repository +from splitgraph.core.types import ( + SyncState, + TableInfo, + IntrospectionResult, + TableParams, + get_table_params, +) +from splitgraph.engine.postgres.engine import PostgresEngine +from splitgraph.hooks.data_source.base import ( + SyncableDataSource, + get_ingestion_state, + prepare_new_image, +) +from splitgraph.utils.docker import get_docker_client, copy_to_container +from .docker_utils import ( + add_files, + remove_at_end, + wait_not_failed, + build_command, + detect_network_mode, +) +from .models import ( + AirbyteCatalog, + ConfiguredAirbyteCatalog, + AirbyteMessage, +) +from .utils import ( + AirbyteConfig, + _airbyte_message_reader, + _store_raw_airbyte_tables, + _store_processed_airbyte_tables, + get_sg_schema, + select_streams, + get_pk_cursor_fields, +) +from ..singer.common import store_ingestion_state, add_timestamp_tags + + +class AirbyteDataSource(SyncableDataSource, ABC): + """Generic data source for Airbyte-compliant sources. + We run ingestion by combining an Airbyte source and the Airbyte Postgres destination. + """ + + docker_image: Optional[str] = None + airbyte_name: Optional[str] = None + receiver_image = "airbyte/destination-postgres:latest" + normalization_image = "airbyte/normalization:0.1.36" + cursor_overrides: Optional[Dict[str, List[str]]] = None + primary_key_overrides: Optional[Dict[str, List[str]]] = None + docker_environment: Optional[Dict[str, str]] = None + + table_params_schema = { + "type": "object", + "properties": { + "airbyte_cursor_fields": { + "type": "array", + "description": "Fields in this stream to be used as a cursor " + "for incremental replication (overrides Airbyte configuration's cursor_field)", + "items": {"type": "string"}, + }, + "airbyte_primary_key_fields": { + "type": "array", + "description": "Fields in this stream to be used as a primary key for deduplication " + "(overrides Airbyte configuration's primary_key)", + "items": {"type": "string"}, + }, + }, + } + + def get_airbyte_config(self) -> AirbyteConfig: + return {**self.params, **self.credentials} + + def _sync( + self, + schema: str, + state: Optional[SyncState] = None, + tables: Optional[TableInfo] = None, + ) -> SyncState: + # We override the main sync() instead + pass + + def load(self, repository: "Repository", tables: Optional[TableInfo] = None) -> str: + return self.sync(repository, image_hash=None, tables=tables, use_state=False) + + def _make_postgres_config(self, engine: PostgresEngine, schema: str) -> AirbyteConfig: + return { + "host": engine.conn_params["SG_ENGINE_HOST"], + "port": int(engine.conn_params["SG_ENGINE_PORT"] or 5432), + "username": engine.conn_params["SG_ENGINE_USER"], + "password": engine.conn_params["SG_ENGINE_PWD"], + "database": engine.conn_params["SG_ENGINE_DB_NAME"], + "schema": schema, + } + + def _run_discovery(self, config: Optional[AirbyteConfig] = None) -> AirbyteCatalog: + client = get_docker_client() + network_mode = detect_network_mode(client) + + with self._source_container( + client, + network_mode=network_mode, + config=config, + catalog=None, + state=None, + discover=True, + ) as container: + # Copy config into / + copy_to_container( + container, + source_path=None, + target_path="/config.json", + data=json.dumps(config or {}).encode(), + ) + + container.start() + wait_not_failed(container, mirror_logs=False) + + # Grab the catalog from the output (it's mixed with other logs) + for message in _airbyte_message_reader(container.logs(stream=True)): + if message.catalog: + logging.info("Catalog: %s", message.catalog) + return message.catalog + raise AssertionError("No catalog output!") + + def sync( + self, + repository: Repository, + image_hash: Optional[str] = None, + tables: Optional[TableInfo] = None, + use_state: bool = True, + ) -> str: + # https://docs.airbyte.io/understanding-airbyte/airbyte-specification + self._validate_table_params(tables) + tables = tables or self.tables + + # Select columns and streams (full_refresh/incremental, cursors) + src_config = self.get_airbyte_config() + catalog = self._run_discovery(src_config) + configured_catalog = select_streams( + catalog, + tables, + sync=use_state, + cursor_overrides=self.cursor_overrides, + primary_key_overrides=self.primary_key_overrides, + ) + logging.info("Configured catalog: %s", configured_catalog) + + # Create a destination catalog that overrides the namespace in the source to None. + # Some sources and the PG destination respect stream.namespace -- in the case of the + # MySQL source, it denotes the source database name and for PG, it's the target + # schema name. We need to let the source keep its old namespace and override the + # destination for PG (set it to None here and inject it into the config). + dst_catalog = configured_catalog.copy(deep=True) + for stream in dst_catalog.streams: + stream.stream.namespace = None + + # Load ingestion state + base_image, new_image_hash = prepare_new_image( + repository, image_hash, comment="Airbyte data load" + ) + state = get_ingestion_state(repository, image_hash) if use_state else None + logging.info("Current ingestion state: %s", state) + + # Set up a staging schema for the data + # Delete the slashes or Airbyte will do it for us. + staging_schema = "sg_tmp_" + repository.to_schema().replace("/", "_").replace("-", "_") + repository.object_engine.delete_schema(staging_schema) + repository.object_engine.create_schema(staging_schema) + repository.commit_engines() + + dst_config = self._make_postgres_config(repository.object_engine, staging_schema) + + client = get_docker_client() + network_mode = detect_network_mode(client) + + # Run the Airbyte source and receiver and pipe data between them, writing it + # out into a temporary schema. + + logging.info("Running Airbyte EL process") + dest_files, new_state, sync_modes = self._run_airbyte_el( + client, network_mode, src_config, dst_config, configured_catalog, dst_catalog, state + ) + + # At this stage, Airbyte wrote out the raw tables into the staging schema: they have + # the form _airbyte_tmp_STREAM_NAME and schema (hash, raw_json, date). These raw tables + # are append-or-truncate only, so we append/replace them in the existing Splitgraph image + # at this stage. + + logging.info("Storing raw tables as Splitgraph images") + raw_tables = _store_raw_airbyte_tables( + repository, + new_image_hash, + staging_schema, + sync_modes, + default_sync_mode="append_dedup" if use_state else "overwrite", + ) + + # Run normalization + # This converts the raw Airbyte tables (with JSON) into actual tables with fields. + # We first replace the raw table fragments that Airbyte wrote out with the actual full + # tables, checked out via LQ so that dbt (run by Airbyte's normalization container) can + # scan through them and build the actual ingested data. + + new_image = repository.images.by_hash(new_image_hash) + repository.object_engine.delete_schema(staging_schema) + repository.object_engine.create_schema(staging_schema) + new_image.lq_checkout(staging_schema, only_tables=raw_tables) + repository.commit_engines() + + # Now run the normalization container + # This actually always recreates the normalized tables from scratch. + # https://github.com/airbytehq/airbyte/issues/4286 + logging.info("Running Airbyte T step (normalization)") + with self._normalization_container(client, network_mode) as normalization_container: + add_files(normalization_container, dest_files) + normalization_container.start() + wait_not_failed(normalization_container, mirror_logs=True) + + logging.info("Storing processed Airbyte tables") + _store_processed_airbyte_tables(repository, new_image_hash, staging_schema) + + store_ingestion_state( + repository, + new_image_hash, + current_state=state, + new_state=json.dumps(new_state) if new_state else "{}", + ) + add_timestamp_tags(repository, new_image_hash) + + repository.commit_engines() + + return new_image_hash + + def _run_airbyte_el( + self, + client: docker.DockerClient, + network_mode: str, + src_config: AirbyteConfig, + dst_config: AirbyteConfig, + src_catalog: ConfiguredAirbyteCatalog, + dst_catalog: ConfiguredAirbyteCatalog, + state: Optional[SyncState], + ) -> Tuple[List[Tuple[str, str]], Optional[SyncState], Dict[str, str]]: + with self._source_container( + client, network_mode, src_config, src_catalog, state + ) as source, self._destination_container( + client, network_mode, dst_config, src_catalog + ) as destination: + + # Set up the files in src/dest containers + add_files( + source, + [ + ("config", json.dumps(src_config)), + ( + "catalog", + src_catalog.json(exclude_unset=True, exclude_defaults=True), + ), + ("state", json.dumps(state)), + ], + ) + + dest_files = [ + ("config", json.dumps(dst_config)), + ( + "catalog", + dst_catalog.json(exclude_unset=True, exclude_defaults=True), + ), + ] + + add_files(destination, dest_files) + + dest_socket = destination.attach_socket(params={"stdin": 1, "stream": 1}) + dest_socket._writing = True + src_socket = source.attach(stdout=True, stream=True, logs=True) + + source.start() + destination.start() + + # Pipe messages from the source to the destination + for message in _airbyte_message_reader(src_socket): + if message.state or message.record: + if message.record: + # Empty out the namespace so that we use the destination schema in PG + message.record.namespace = None + + out = (message.json(exclude_unset=True, exclude_defaults=True) + "\n").encode() + logging.debug("Writing message %s", out) + while out: + written = dest_socket.write(out) + out = out[written:] + + dest_socket.flush() + elif message.log: + logging.info(message.log.message) + + # NB this is the magic thing that makes the socket actually close and kick the container so that + # it sees that STDIN is closed too. + # Neither of these work + # dest_socket.close() + # dest_socket._sock.close() + # Thank you Docker. + # https://github.com/d11wtq/dockerpty/blob/f8d17d893c6758b7cc25825e99f6b02202632a97/dockerpty/io.py#L182 + # https://github.com/docker/docker-py/issues/1507 + # https://github.com/docker/docker-py/issues/983#issuecomment-492513718 + os.close(dest_socket._sock.fileno()) + + wait_not_failed(source) + wait_not_failed(destination) + dest_logs = destination.logs(stream=True) + + # Grab the state from stdout + new_state: Optional[SyncState] = None + table_sync_modes: Dict[str, str] = {} + + for line in dest_logs: + line = line.decode() + logging.info("%s: %s", destination.name, line) + + # Another thing we want to find out from the destination is how it normalized + # raw stream names (which can be any UTF-8 string) into the output table names + # (_airbyte_raw_xxx) and the sync mode (overwrite/append). This is because + # we get Airbyte to always write into empty tables (merging them into the full + # Splitgraph tables after the fact) but we need to know if it meant to truncate + # or append to those tables. + # The PG destination outputs a log message in this format: + # + # Write config: WriteConfig{streamName=sites, namespace=null, outputSchemaName=sg_tmp_airbyte_google_test, tmpTableName=_airbyte_tmp_cav_sites, outputTableName=_airbyte_raw_sites, syncMode=overwrite} + # + # So we can grab the outputTableName and syncMode to find these out. + # + # Other ways of doing this: detect TRUNCATE on our tables (this is probably the best + # long-term solution, since we want to turn this into just writing to the DDN); + # back out the table names from the stream names by reimplementing/copying + # https://github.com/airbytehq/airbyte/blob/441435a373f03262ce87a53505b1863d5554cc6c/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/destination_name_transformer.py#L53. + match = re.match(r".*outputTableName=([^,]+), syncMode=(\w+)", line) + if match: + raw_table, sync_mode = match.groups() + table_sync_modes[raw_table] = sync_mode + + # Also find the STATE message in the log denoting the new connector bookmark. + if not line.startswith("{"): + continue + try: + message = AirbyteMessage.parse_raw(line) + except pydantic.ValidationError: + logging.warning("Couldn't parse message, continuing") + continue + if message.state: + new_state = SyncState(message.state.data) + logging.info("New state: %s", new_state) + return dest_files, new_state, table_sync_modes + + @contextmanager + def _source_container( + self, + client: DockerClient, + network_mode: str, + config: Optional[AirbyteConfig], + catalog: Optional[ConfiguredAirbyteCatalog], + state: Optional[SyncState], + discover: bool = False, + ) -> Container: + client.images.pull(self.docker_image) + container_name = "sg-ab-src-{:08x}".format(getrandbits(64)) + if discover: + command = ["discover"] + build_command([("config", config)]) + else: + command = ["read"] + build_command( + [("config", config), ("state", state), ("catalog", catalog)] + ) + container = client.containers.create( + image=self.docker_image, + name=container_name, + command=command, + network_mode=network_mode, + environment=self.docker_environment, + ) + with remove_at_end(container): + yield container + + @contextmanager + def _destination_container( + self, + client: DockerClient, + network_mode: str, + config: AirbyteConfig, + catalog: ConfiguredAirbyteCatalog, + ) -> Container: + # Create the Postgres receiver container + client.images.pull(self.receiver_image) + destination_container_name = "sg-ab-dst-{:08x}".format(getrandbits(64)) + command = ["write"] + build_command([("config", config), ("catalog", catalog)]) + container = client.containers.create( + image=self.receiver_image, + name=destination_container_name, + command=command, + network_mode=network_mode, + stdin_open=True, + environment=self.docker_environment, + ) + with remove_at_end(container): + yield container + + @contextmanager + def _normalization_container(self, client: DockerClient, network_mode: str) -> Container: + client.images.pull(self.normalization_image) + # https://github.com/airbytehq/airbyte/blob/830fac6b648263e1add3589294fcabf4bee6fd39/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java#L111 + command = [ + "run", + "--integration-type", + "postgres", + "--config", + "/config.json", + "--catalog", + "/catalog.json", + ] + container = client.containers.create( + image=self.normalization_image, + name="sg-ab-norm-{:08x}".format(getrandbits(64)), + command=command, + network_mode=network_mode, + environment=self.docker_environment, + ) + + with remove_at_end(container): + yield container + + def introspect(self) -> IntrospectionResult: + config = self.get_airbyte_config() + catalog = self._run_discovery(config) + + result = IntrospectionResult({}) + for stream in catalog.streams: + stream_name = stream.name + stream_schema = get_sg_schema(stream) + + cursor_field, primary_key = get_pk_cursor_fields( + stream, + get_table_params(self.tables, stream.name) if self.tables else TableParams({}), + self.cursor_overrides, + self.primary_key_overrides, + ) + + suggested_params = {} + if cursor_field is not None: + suggested_params["airbyte_cursor_field"] = cursor_field + if primary_key is not None: + suggested_params["airbyte_primary_key"] = [k[0] for k in primary_key] + + result[stream_name] = (stream_schema, TableParams(suggested_params)) + return result diff --git a/splitgraph/ingestion/airbyte/docker_utils.py b/splitgraph/ingestion/airbyte/docker_utils.py new file mode 100644 index 00000000..4c57ee8a --- /dev/null +++ b/splitgraph/ingestion/airbyte/docker_utils.py @@ -0,0 +1,86 @@ +import logging +import os +import socket +from contextlib import contextmanager +from typing import List, Tuple, Any + +import docker.errors +from docker import DockerClient +from docker.models.containers import Container + +from splitgraph.exceptions import SplitGraphError +from splitgraph.utils.docker import copy_to_container + + +class SubprocessError(SplitGraphError): + pass + + +def add_files(container: Container, files: List[Tuple[str, str]]) -> None: + for var_name, var_data in files: + if not var_data: + continue + copy_to_container( + container, + source_path=None, + target_path=f"/{var_name}.json", + data=var_data.encode(), + ) + + +@contextmanager +def remove_at_end(container: Container) -> Container: + try: + yield container + finally: + try: + container.remove(force=True) + except docker.errors.APIError as e: + logging.warning("Error removing container at the end, continuing", exc_info=e) + + +def wait_not_failed(container: Container, mirror_logs: bool = False) -> None: + """ + Block until a Docker container exits. + + :raises SubprocessError if the container exited with a non-zero code. + """ + + if mirror_logs: + for line in container.logs(stream=True, follow=True): + logging.info("%s: %s", container.name, line.decode().strip()) + + result = container.wait() + if result["StatusCode"] != 0: + logging.error("Container %s exited with %d", container.name, result["StatusCode"]) + logs = container.logs(tail=1000) or b"" + for line in logs.decode().splitlines(): + logging.info("%s: %s", container.name, line) + raise SubprocessError( + "Container %s exited with %d" % (container.name, result["StatusCode"]) + ) + + +def build_command(files: List[Tuple[str, Any]]) -> List[str]: + command: List[str] = [] + + for var_name, var_data in files: + if not var_data: + continue + command.extend([f"--{var_name}", f"/{var_name}.json"]) + return command + + +def detect_network_mode(client: DockerClient) -> str: + # We want the receiver to connect to the same engine that we're connected to. If we're + # running on the host, that means using our own connection parameters and running the + # receiver with net:host. Inside Docker we have to use the host's Docker socket and + # attach the container to our own network so that it can also use our own params. + + # This also applies in case we're running a source against a database that's also running + # in Docker -- we want to mimic sgr too. + if os.path.exists("/.dockerenv"): + our_container_id = client.containers.get(socket.gethostname()).id + return f"container:{our_container_id}" + else: + return "host" diff --git a/splitgraph/ingestion/airbyte/models.py b/splitgraph/ingestion/airbyte/models.py new file mode 100644 index 00000000..33080d5a --- /dev/null +++ b/splitgraph/ingestion/airbyte/models.py @@ -0,0 +1,186 @@ +# Copied from Airbyte code at https://github.com/airbytehq/airbyte/blob/master/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +# The reason we don't link to the library directly is because it requires an older version +# of pydantic and jsonschema and we only care about the Pydantic models to simplify serialization +# and deserialization of Airbyte messages/catalog/streams. + +from __future__ import annotations + +from enum import Enum +from typing import Any, Dict, List, Optional + +from pydantic import AnyUrl, BaseModel, Extra, Field + + +class Type(Enum): + RECORD = "RECORD" + STATE = "STATE" + LOG = "LOG" + SPEC = "SPEC" + CONNECTION_STATUS = "CONNECTION_STATUS" + CATALOG = "CATALOG" + + +class AirbyteRecordMessage(BaseModel): + class Config: + extra = Extra.allow + + stream: str = Field(..., description="the name of this record's stream") + data: Dict[str, Any] = Field(..., description="the record data") + emitted_at: int = Field( + ..., + description="when the data was emitted from the source. epoch in millisecond.", + ) + namespace: Optional[str] = Field(None, description="the namespace of this record's stream") + + +class AirbyteStateMessage(BaseModel): + class Config: + extra = Extra.allow + + data: Dict[str, Any] = Field(..., description="the state data") + + +class Level(Enum): + FATAL = "FATAL" + ERROR = "ERROR" + WARN = "WARN" + INFO = "INFO" + DEBUG = "DEBUG" + TRACE = "TRACE" + + +class AirbyteLogMessage(BaseModel): + class Config: + extra = Extra.allow + + level: Level = Field(..., description="the type of logging") + message: str = Field(..., description="the log message") + + +class Status(Enum): + SUCCEEDED = "SUCCEEDED" + FAILED = "FAILED" + + +class AirbyteConnectionStatus(BaseModel): + class Config: + extra = Extra.allow + + status: Status + message: Optional[str] = None + + +class SyncMode(Enum): + full_refresh = "full_refresh" + incremental = "incremental" + + +class DestinationSyncMode(Enum): + append = "append" + overwrite = "overwrite" + append_dedup = "append_dedup" + + +class ConnectorSpecification(BaseModel): + class Config: + extra = Extra.allow + + documentationUrl: Optional[AnyUrl] = None + changelogUrl: Optional[AnyUrl] = None + connectionSpecification: Dict[str, Any] = Field( + ..., + description="ConnectorDefinition specific blob. Must be a valid JSON string.", + ) + supportsIncremental: Optional[bool] = Field( + None, description="If the connector supports incremental mode or not." + ) + supportsNormalization: Optional[bool] = Field( + False, description="If the connector supports normalization or not." + ) + supportsDBT: Optional[bool] = Field(False, description="If the connector supports DBT or not.") + supported_destination_sync_modes: Optional[List[DestinationSyncMode]] = Field( + None, description="List of destination sync modes supported by the connector" + ) + + +class AirbyteStream(BaseModel): + class Config: + extra = Extra.allow + + name: str = Field(..., description="Stream's name.") + json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.") + supported_sync_modes: Optional[List[SyncMode]] = None + source_defined_cursor: Optional[bool] = Field( + None, + description="If the source defines the cursor field, then any other cursor field inputs will be ignored. If it does not, either the user_provided one is used, or the default one is used as a backup.", + ) + default_cursor_field: Optional[List[str]] = Field( + None, + description="Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves.", + ) + source_defined_primary_key: Optional[List[List[str]]] = Field( + None, + description="If the source defines the primary key, paths to the fields that will be used as a primary key. If not provided by the source, the end user will have to specify the primary key themselves.", + ) + namespace: Optional[str] = Field( + None, + description="Optional Source-defined namespace. Currently only used by JDBC destinations to determine what schema to write to. Airbyte streams from the same sources should have the same namespace.", + ) + + +class ConfiguredAirbyteStream(BaseModel): + class Config: + extra = Extra.allow + + stream: AirbyteStream + sync_mode: SyncMode + cursor_field: Optional[List[str]] = Field( + None, + description="Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored.", + ) + destination_sync_mode: DestinationSyncMode + primary_key: Optional[List[List[str]]] = Field( + None, + description="Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. Otherwise it is ignored.", + ) + + +class AirbyteCatalog(BaseModel): + class Config: + extra = Extra.allow + + streams: List[AirbyteStream] + + +class ConfiguredAirbyteCatalog(BaseModel): + class Config: + extra = Extra.allow + + streams: List[ConfiguredAirbyteStream] + + +class AirbyteMessage(BaseModel): + class Config: + extra = Extra.allow + + type: Type = Field(..., description="Message type") + log: Optional[AirbyteLogMessage] = Field( + None, + description="log message: any kind of logging you want the platform to know about.", + ) + spec: Optional[ConnectorSpecification] = None + connectionStatus: Optional[AirbyteConnectionStatus] = None + catalog: Optional[AirbyteCatalog] = Field( + None, + description="log message: any kind of logging you want the platform to know about.", + ) + record: Optional[AirbyteRecordMessage] = Field(None, description="record message: the record") + state: Optional[AirbyteStateMessage] = Field( + None, + description="schema message: the state. Must be the last message produced. The platform uses this information", + ) + + +class AirbyteProtocol(BaseModel): + airbyte_message: Optional[AirbyteMessage] = None + configured_airbyte_catalog: Optional[ConfiguredAirbyteCatalog] = None diff --git a/splitgraph/ingestion/airbyte/utils.py b/splitgraph/ingestion/airbyte/utils.py new file mode 100644 index 00000000..efbff6bd --- /dev/null +++ b/splitgraph/ingestion/airbyte/utils.py @@ -0,0 +1,247 @@ +import logging +from typing import Dict, Any, Iterable, Generator, Optional, List, Tuple + +from target_postgres.db_sync import column_type + +from splitgraph.config import DEFAULT_CHUNK_SIZE +from splitgraph.core.repository import Repository +from splitgraph.core.types import TableSchema, TableColumn, TableInfo, TableParams, get_table_params +from splitgraph.exceptions import TableNotFoundError +from .models import ( + AirbyteMessage, + AirbyteStream, + AirbyteCatalog, + ConfiguredAirbyteCatalog, + ConfiguredAirbyteStream, + SyncMode, + DestinationSyncMode, +) + +AirbyteConfig = Dict[str, Any] +AIRBYTE_RAW = "_airbyte_raw" + + +def _airbyte_message_reader( + stream: Iterable[bytes], +) -> Generator[AirbyteMessage, None, None]: + buffer = b"" + for data in stream: + # Accumulate data in a buffer until we get a newline, at which point we can + # decode the message and filter out records/state. + buffer = buffer + data + if b"\n" not in data: + continue + + delimiter = buffer.rindex(b"\n") + 1 + full_message = buffer[:delimiter] + buffer = buffer[delimiter:] + lines = full_message.decode().splitlines() + + for line in lines: + line = line.strip() + if not line or not line.startswith("{"): + continue + message = AirbyteMessage.parse_raw(line) + yield message + + +def _store_raw_airbyte_tables( + repository: Repository, + image_hash: str, + staging_schema: str, + sync_modes: Dict[str, str], + default_sync_mode: str = "overwrite", +) -> List[str]: + engine = repository.object_engine + raw_tables = [t for t in engine.get_all_tables(staging_schema) if t.startswith(AIRBYTE_RAW)] + current_image = repository.images[image_hash] + for raw_table in raw_tables: + sync_mode = sync_modes.get(raw_table) + if not sync_mode: + logging.warning( + "Couldn't detect the sync mode for %s, falling back to %s", + default_sync_mode, + ) + sync_mode = default_sync_mode + logging.info("Storing %s. Sync mode: %s", raw_table, sync_mode) + + # Make sure the raw table's schema didn't change (very rare, since it's + # just hash, JSON, timestamp) + new_schema = engine.get_full_table_schema(staging_schema, raw_table) + if sync_mode != "overwrite": + try: + current_schema = current_image.get_table(raw_table).table_schema + if current_schema != new_schema: + raise AssertionError( + "Schema for %s changed! Old: %s, new: %s", + raw_table, + current_schema, + new_schema, + ) + except TableNotFoundError: + pass + + # If Airbyte meant to overwrite raw tables instead of append to them, we clear out the + # current raw table so that record_table_as_base doesn't append objects to the existing + # table. + if sync_mode == "overwrite": + repository.objects.overwrite_table(repository, image_hash, raw_table, new_schema, []) + + repository.objects.record_table_as_base( + repository, + raw_table, + image_hash, + chunk_size=DEFAULT_CHUNK_SIZE, + source_schema=staging_schema, + source_table=raw_table, + ) + + return raw_tables + + +def _store_processed_airbyte_tables( + repository: Repository, image_hash: str, staging_schema: str +) -> None: + engine = repository.object_engine + # Save the processed tables in the image + processed_tables = [ + t for t in engine.get_all_tables(staging_schema) if not t.startswith(AIRBYTE_RAW) + ] + for table in processed_tables: + logging.info("Storing %s", table) + schema_spec = engine.get_full_table_schema(staging_schema, table) + repository.objects.overwrite_table(repository, image_hash, table, schema_spec, []) + + repository.objects.record_table_as_base( + repository, + table, + image_hash, + chunk_size=DEFAULT_CHUNK_SIZE, + source_schema=staging_schema, + source_table=table, + ) + + +def _column_type(schema_property) -> str: + if "type" not in schema_property: + # workaround for anyOf + return "jsonb" + return str(column_type(schema_property)) + + +def get_sg_schema(stream: AirbyteStream) -> TableSchema: + # NB Airbyte runs a normalization step after the ingestion that we can't easily predict, + # since it involves unnesting some fields into separate tables and mapping column names. + # This is given to the user for informational purposes. + primary_key = [k for ks in stream.source_defined_primary_key or [] for k in ks] + return [ + TableColumn(i, name, _column_type(schema_property), name in primary_key, None) + for i, (name, schema_property) in enumerate(stream.json_schema["properties"].items()) + ] + + +def get_pk_cursor_fields( + stream: AirbyteStream, + table_params: TableParams, + cursor_overrides: Optional[Dict[str, List[str]]] = None, + primary_key_overrides: Optional[Dict[str, List[str]]] = None, +) -> Tuple[Optional[List[str]], Optional[List[List[str]]]]: + cursor_overrides = cursor_overrides or {} + primary_key_overrides = primary_key_overrides or {} + + # Precedence: + # * Override in the table-specific parameters + # * Override in the global plugin settings + # * Default field in the stream itself (reported by the source) + + cursor_field = stream.default_cursor_field + custom_cursor_field = table_params.get( + "airbyte_cursor_field", cursor_overrides.get(stream.name) + ) + + if custom_cursor_field: + cursor_field = custom_cursor_field + + primary_key = stream.source_defined_primary_key + custom_primary_key = table_params.get( + "airbyte_primary_key", primary_key_overrides.get(stream.name) + ) + + if custom_primary_key: + primary_key = [[k] for k in custom_primary_key] + + return cursor_field, primary_key + + +def select_streams( + catalog: AirbyteCatalog, + tables: Optional[TableInfo], + sync: bool = False, + cursor_overrides: Optional[Dict[str, List[str]]] = None, + primary_key_overrides: Optional[Dict[str, List[str]]] = None, +) -> ConfiguredAirbyteCatalog: + streams: List[ConfiguredAirbyteStream] = [] + + for stream in catalog.streams: + if tables and stream.name not in tables: + continue + + cursor_field, primary_key = get_pk_cursor_fields( + stream, + get_table_params(tables, stream.name) if tables else TableParams({}), + cursor_overrides, + primary_key_overrides, + ) + + sync_configured = False + if sync: + if ( + not stream.supported_sync_modes + or SyncMode.incremental not in stream.supported_sync_modes + ): + logging.warning( + "Stream %s doesn't support incremental sync mode and sync=True. " + "Disabling append_dedup and falling back to refresh.", + stream.name, + ) + else: + # Some sources (like google search) issue duplicate fields which breaks mode=append, + # so we have to use mode=append_dedup. However, that requires an explicit PK which + # Airbyte currently doesn't extract from Singer-backed sources. + # PR to fix: https://github.com/airbytehq/airbyte/pull/4789 + # In the meantime, we allow the plugin to override the cursor and the PK field. + # This is also useful for plugins like Postgres where the user might want to + # specify their own cursor field. + + if not primary_key or not (cursor_field or stream.source_defined_cursor): + logging.warning( + "Stream %s doesn't have a primary key or a cursor field/source defined " + "cursor (PK: %s, cursor: %s). Disabling append_dedup and falling back " + "to refresh.", + stream.name, + primary_key, + cursor_field, + ) + else: + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode=SyncMode.incremental, + destination_sync_mode=DestinationSyncMode.append_dedup, + cursor_field=cursor_field, + primary_key=primary_key, + ) + sync_configured = True + + # Fall back to configuring the stream for full refresh. + if not sync_configured: + configured_stream = ConfiguredAirbyteStream( + stream=stream, + sync_mode=SyncMode.full_refresh, + destination_sync_mode=DestinationSyncMode.overwrite, + cursor_field=cursor_field, + primary_key=primary_key, + ) + + streams.append(configured_stream) + + return ConfiguredAirbyteCatalog(streams=streams) diff --git a/splitgraph/ingestion/singer/__init__.py b/splitgraph/ingestion/singer/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/splitgraph/ingestion/singer/_utils.py b/splitgraph/ingestion/singer/common.py similarity index 67% rename from splitgraph/ingestion/singer/_utils.py rename to splitgraph/ingestion/singer/common.py index 6f3f5c0c..4fbe5418 100644 --- a/splitgraph/ingestion/singer/_utils.py +++ b/splitgraph/ingestion/singer/common.py @@ -1,13 +1,21 @@ import logging import traceback from collections import Callable +from datetime import datetime as dt from functools import wraps +from typing import Optional, Dict, Any from psycopg2.sql import SQL, Identifier +from splitgraph.core.repository import Repository from splitgraph.core.types import TableSchema, Changeset from splitgraph.engine import validate_type from splitgraph.engine.postgres.engine import get_change_key, PostgresEngine +from splitgraph.hooks.data_source.base import INGESTION_STATE_TABLE, INGESTION_STATE_SCHEMA + +SingerConfig = Dict[str, Any] +SingerCatalog = Dict[str, Any] +SingerState = Dict[str, Any] def log_exception(f): @@ -113,3 +121,55 @@ def _make_changeset( ).as_string(engine.connection) result = engine.run_sql(query) return {tuple(row[:-1]): (row[-1], {}, {}) for row in result} + + +def store_ingestion_state( + repository: Repository, + image_hash: str, + current_state: Optional[SingerState], + new_state: str, +): + # Add a table to the new image with the new state + repository.object_engine.create_table( + schema=None, + table=INGESTION_STATE_TABLE, + schema_spec=INGESTION_STATE_SCHEMA, + temporary=True, + ) + # NB: new_state here is a JSON-serialized string, so we don't wrap it into psycopg2.Json() + logging.info("Writing state: %s", new_state) + repository.object_engine.run_sql( + SQL("INSERT INTO pg_temp.{} (timestamp, state) VALUES(now(), %s)").format( + Identifier(INGESTION_STATE_TABLE) + ), + (new_state,), + ) + object_id = repository.objects.create_base_fragment( + "pg_temp", + INGESTION_STATE_TABLE, + repository.namespace, + table_schema=INGESTION_STATE_SCHEMA, + ) + # If the state exists already, overwrite it; otherwise, add new state table. + if current_state: + repository.objects.overwrite_table( + repository, + image_hash, + INGESTION_STATE_TABLE, + INGESTION_STATE_SCHEMA, + [object_id], + ) + else: + repository.objects.register_tables( + repository, + [(image_hash, INGESTION_STATE_TABLE, INGESTION_STATE_SCHEMA, [object_id])], + ) + + +def add_timestamp_tags(repository: Repository, image_hash: str): + ingestion_time = dt.utcnow() + short_tag = ingestion_time.strftime("%Y%m%d") + long_tag = short_tag + "-" + ingestion_time.strftime("%H%M%S") + new_image = repository.images.by_hash(image_hash) + new_image.tag(short_tag) + new_image.tag(long_tag) diff --git a/splitgraph/ingestion/singer/data_source.py b/splitgraph/ingestion/singer/data_source.py index 5a0e304a..f19403e0 100644 --- a/splitgraph/ingestion/singer/data_source.py +++ b/splitgraph/ingestion/singer/data_source.py @@ -5,23 +5,25 @@ import tempfile from abc import ABC, abstractmethod from contextlib import contextmanager -from datetime import datetime as dt from io import StringIO from threading import Thread -from typing import Dict, Any, Optional, cast - -from psycopg2.sql import Identifier, SQL +from typing import Optional, cast from splitgraph.core.repository import Repository from splitgraph.core.types import TableParams, TableInfo, SyncState, IntrospectionResult from splitgraph.exceptions import DataSourceError from splitgraph.hooks.data_source.base import ( get_ingestion_state, - INGESTION_STATE_TABLE, - INGESTION_STATE_SCHEMA, prepare_new_image, SyncableDataSource, ) +from splitgraph.ingestion.singer.common import ( + SingerConfig, + SingerCatalog, + SingerState, + store_ingestion_state, + add_timestamp_tags, +) from splitgraph.ingestion.singer.db_sync import ( get_table_name, get_sg_schema, @@ -30,10 +32,6 @@ select_breadcrumb, ) -SingerConfig = Dict[str, Any] -SingerCatalog = Dict[str, Any] -SingerState = Dict[str, Any] - class SingerDataSource(SyncableDataSource, ABC): # Some taps (e.g. tap-github) use legacy --properties instead of --catalog @@ -124,6 +122,8 @@ def sync( tables: Optional[TableInfo] = None, use_state: bool = True, ) -> str: + self._validate_table_params(tables) + tables = tables or self.tables config = self.get_singer_config() catalog = self._run_singer_discovery(config) catalog = self.build_singer_catalog(catalog, tables) @@ -155,53 +155,9 @@ def sync( latest_state = states.splitlines()[-1] logging.info("State stream: %s", states) - # Add a table to the new image with the new state - repository.object_engine.create_table( - schema=None, - table=INGESTION_STATE_TABLE, - schema_spec=INGESTION_STATE_SCHEMA, - temporary=True, - ) - # NB: new_state here is a JSON-serialized string, so we don't wrap it into psycopg2.Json() - logging.info("Writing state: %s", latest_state) - repository.object_engine.run_sql( - SQL("INSERT INTO pg_temp.{} (timestamp, state) VALUES(now(), %s)").format( - Identifier(INGESTION_STATE_TABLE) - ), - (latest_state,), - ) - - object_id = repository.objects.create_base_fragment( - "pg_temp", - INGESTION_STATE_TABLE, - repository.namespace, - table_schema=INGESTION_STATE_SCHEMA, - ) - - # If the state exists already, overwrite it; otherwise, add new state table. - if state: - repository.objects.overwrite_table( - repository, - new_image_hash, - INGESTION_STATE_TABLE, - INGESTION_STATE_SCHEMA, - [object_id], - ) - else: - repository.objects.register_tables( - repository, - [(new_image_hash, INGESTION_STATE_TABLE, INGESTION_STATE_SCHEMA, [object_id])], - ) - - ingestion_time = dt.utcnow() - - short_tag = ingestion_time.strftime("%Y%m%d") - long_tag = short_tag + "-" + ingestion_time.strftime("%H%M%S") - - new_image = repository.images.by_hash(new_image_hash) + store_ingestion_state(repository, new_image_hash, state, latest_state) - new_image.tag(short_tag) - new_image.tag(long_tag) + add_timestamp_tags(repository, new_image_hash) repository.commit_engines() diff --git a/splitgraph/ingestion/singer/db_sync.py b/splitgraph/ingestion/singer/db_sync.py index 0f95b813..bc2f33b6 100644 --- a/splitgraph/ingestion/singer/db_sync.py +++ b/splitgraph/ingestion/singer/db_sync.py @@ -16,7 +16,7 @@ from splitgraph.engine.postgres.engine import get_change_key from splitgraph.exceptions import TableNotFoundError from splitgraph.ingestion.common import merge_tables -from ._utils import _migrate_schema, log_exception, _make_changeset, rollback_at_end +from .common import _migrate_schema, log_exception, _make_changeset, rollback_at_end def select_breadcrumb(stream_message, breadcrumb): diff --git a/splitgraph/splitfile/execution.py b/splitgraph/splitfile/execution.py index d253147f..c87ac82d 100644 --- a/splitgraph/splitfile/execution.py +++ b/splitgraph/splitfile/execution.py @@ -122,7 +122,7 @@ def setup_lq_mounts(self) -> None: for temporary_schema, _, source_image in self.image_map.values(): self.object_engine.delete_schema(temporary_schema) self.object_engine.create_schema(temporary_schema) - source_image._lq_checkout(target_schema=temporary_schema) + source_image.lq_checkout(target_schema=temporary_schema) def teardown_lq_mounts(self) -> None: for temporary_schema, _, _ in self.image_map.values(): diff --git a/splitgraph/utils/__init__.py b/splitgraph/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/splitgraph/utils/docker.py b/splitgraph/utils/docker.py new file mode 100644 index 00000000..3e6626e4 --- /dev/null +++ b/splitgraph/utils/docker.py @@ -0,0 +1,58 @@ +import os +import time +from io import BytesIO +from tarfile import TarInfo, TarFile +from typing import Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from docker.models.containers import Container + +from splitgraph.exceptions import DockerUnavailableError + + +def get_docker_client(): + """Wrapper around client.from_env() that also pings the daemon + to make sure it can connect and if not, raises an error.""" + import docker + + try: + client = docker.from_env() + client.ping() + return client + except Exception as e: + raise DockerUnavailableError("Could not connect to the Docker daemon") from e + + +def copy_to_container( + container: "Container", + source_path: Optional[str], + target_path: str, + data: Optional[bytes] = None, +) -> None: + """ + Copy a file into a Docker container + + :param container: Container object + :param source_path: Source file path + :param target_path: Target file path (in the container) + :return: + """ + + if data is None: + if not source_path: + raise ValueError("One of source_path or data must be specified!") + # https://github.com/docker/docker-py/issues/1771 + with open(source_path, "rb") as f: + data = f.read() + + tarinfo = TarInfo(name=os.path.basename(target_path)) + tarinfo.size = len(data) + tarinfo.mtime = int(time.time()) + + stream = BytesIO() + tar = TarFile(fileobj=stream, mode="w") + tar.addfile(tarinfo, BytesIO(data)) + tar.close() + + stream.seek(0) + container.put_archive(path=os.path.dirname(target_path), data=stream.read()) diff --git a/test/splitgraph/commands/test_layered_querying.py b/test/splitgraph/commands/test_layered_querying.py index d8cf3f43..0721ff13 100644 --- a/test/splitgraph/commands/test_layered_querying.py +++ b/test/splitgraph/commands/test_layered_querying.py @@ -391,7 +391,7 @@ def test_multiengine_flow( # (since it does manage_audit_triggers()) -- so we bypass all bookkeeping and call the # actual LQ routine directly. local_engine_empty.create_schema(pg_repo_local.to_schema()) - pg_repo_local.images["latest"]._lq_checkout() + pg_repo_local.images["latest"].lq_checkout() # Take one of the test cases we ran in test_lq_qual_filtering that exercises index lookups, # LQs, object downloads and make sure that the correct engines are used diff --git a/test/splitgraph/conftest.py b/test/splitgraph/conftest.py index 156efbca..925cbbb8 100644 --- a/test/splitgraph/conftest.py +++ b/test/splitgraph/conftest.py @@ -7,7 +7,7 @@ from minio.deleteobjects import DeleteObject from psycopg2.sql import Identifier, SQL -from splitgraph.commandline.engine import copy_to_container +from splitgraph.utils.docker import copy_to_container from splitgraph.config import SPLITGRAPH_META_SCHEMA, CONFIG from splitgraph.core.common import META_TABLES from splitgraph.core.engine import get_current_repositories diff --git a/test/splitgraph/ingestion/test_airbyte.py b/test/splitgraph/ingestion/test_airbyte.py new file mode 100644 index 00000000..47311657 --- /dev/null +++ b/test/splitgraph/ingestion/test_airbyte.py @@ -0,0 +1,516 @@ +import re +from unittest import mock + +import pytest +from splitgraph.ingestion.airbyte.models import ( + AirbyteCatalog, + AirbyteStream, + SyncMode, + DestinationSyncMode, +) +from psycopg2.sql import Identifier, SQL + +from splitgraph.core.repository import Repository +from splitgraph.core.types import TableColumn, TableParams +from splitgraph.engine import ResultShape +from splitgraph.ingestion.airbyte.docker_utils import SubprocessError +from splitgraph.ingestion.airbyte.utils import select_streams +from splitgraph.ingestion.airbyte.data_source import AirbyteDataSource + + +class MySQLAirbyteDataSource(AirbyteDataSource): + docker_image = "airbyte/source-mysql:latest" + airbyte_name = "airbyte-mysql" + + credentials_schema = {"type": "object", "properties": {"password": {"type": "string"}}} + params_schema = { + "type": "object", + "properties": { + "host": {"type": "string"}, + "port": {"type": "integer"}, + "database": {"type": "string"}, + "username": {"type": "string"}, + "replication_method": {"type": "string"}, + }, + "required": ["host", "port", "database", "username", "replication_method"], + } + + @classmethod + def get_name(cls) -> str: + return "MySQL (Airbyte)" + + @classmethod + def get_description(cls) -> str: + return "MySQL (Airbyte)" + + +def _source(local_engine_empty, table_params=None): + return MySQLAirbyteDataSource( + engine=local_engine_empty, + params={ + "replication_method": "STANDARD", + "host": "localhost", + "port": 3306, + "database": "mysqlschema", + "username": "originuser", + }, + credentials={ + "password": "originpass", + }, + tables=table_params, + ) + + +_EXPECTED_AIRBYTE_CATALOG = AirbyteCatalog( + streams=[ + AirbyteStream( + name="mushrooms", + json_schema={ + "type": "object", + "properties": { + "discovery": {"type": "string"}, + "friendly": {"type": "boolean"}, + "binary_data": {"type": "string"}, + "name": {"type": "string"}, + "mushroom_id": {"type": "number"}, + "varbinary_data": {"type": "string"}, + }, + }, + supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental], + source_defined_cursor=None, + default_cursor_field=[], + source_defined_primary_key=[["mushroom_id"]], + namespace="mysqlschema", + ) + ] +) +TEST_REPO = "test/airbyte" + + +@pytest.mark.mounting +def test_airbyte_mysql_source_introspection_harness(local_engine_empty): + source = _source(local_engine_empty) + + airbyte_config = source.get_airbyte_config() + assert airbyte_config == { + "database": "mysqlschema", + "host": "localhost", + "password": "originpass", + "port": 3306, + "replication_method": "STANDARD", + "username": "originuser", + } + + airbyte_catalog = source._run_discovery(airbyte_config) + assert airbyte_catalog == _EXPECTED_AIRBYTE_CATALOG + + +@pytest.mark.mounting +def test_airbyte_mysql_source_introspection_end_to_end(local_engine_empty): + source = _source(local_engine_empty) + + introspection_result = source.introspect() + + assert introspection_result == { + "mushrooms": ( + [ + TableColumn( + ordinal=0, + name="discovery", + pg_type="character varying", + is_pk=False, + comment=None, + ), + TableColumn( + ordinal=1, name="friendly", pg_type="boolean", is_pk=False, comment=None + ), + TableColumn( + ordinal=2, + name="binary_data", + pg_type="character varying", + is_pk=False, + comment=None, + ), + TableColumn( + ordinal=3, name="name", pg_type="character varying", is_pk=False, comment=None + ), + TableColumn( + ordinal=4, + name="mushroom_id", + pg_type="double precision", + is_pk=True, + comment=None, + ), + TableColumn( + ordinal=5, + name="varbinary_data", + pg_type="character varying", + is_pk=False, + comment=None, + ), + ], + {"airbyte_cursor_field": [], "airbyte_primary_key": ["mushroom_id"]}, + ) + } + + # Introspect again but this time override the cursor field + source = _source( + local_engine_empty, + table_params={"mushrooms": ([], {"airbyte_cursor_field": ["discovery"]})}, + ) + + introspection_result = source.introspect() + + assert introspection_result["mushrooms"][1] == { + "airbyte_cursor_field": ["discovery"], + "airbyte_primary_key": ["mushroom_id"], + } + + +def test_airbyte_mysql_source_catalog_selection_refresh(): + catalog = select_streams(_EXPECTED_AIRBYTE_CATALOG, tables=None, sync=False) + assert len(catalog.streams) == 1 + assert catalog.streams[0].sync_mode == SyncMode.full_refresh + assert catalog.streams[0].destination_sync_mode == DestinationSyncMode.overwrite + + +def test_airbyte_mysql_source_catalog_selection_incremental_no_cursor_fallback(): + catalog = select_streams(_EXPECTED_AIRBYTE_CATALOG, tables=None, sync=True) + assert len(catalog.streams) == 1 + assert catalog.streams[0].sync_mode == SyncMode.full_refresh + assert catalog.streams[0].destination_sync_mode == DestinationSyncMode.overwrite + assert catalog.streams[0].cursor_field == [] # Default cursor field + + +def test_airbyte_mysql_source_catalog_selection_incremental_cursor_override(): + # Pretend mushroom_id can be used as an incremental cursor. + catalog = select_streams( + _EXPECTED_AIRBYTE_CATALOG, + tables=None, + sync=True, + cursor_overrides={"mushrooms": ["mushroom_id"]}, + ) + assert len(catalog.streams) == 1 + assert catalog.streams[0].sync_mode == SyncMode.incremental + assert catalog.streams[0].destination_sync_mode == DestinationSyncMode.append_dedup + assert catalog.streams[0].primary_key == [["mushroom_id"]] + assert catalog.streams[0].cursor_field == ["mushroom_id"] + + +def test_airbyte_mysql_source_catalog_selection_incremental_cursor_override_tables(): + catalog = select_streams( + _EXPECTED_AIRBYTE_CATALOG, + tables={"mushrooms": ([], TableParams({"airbyte_cursor_field": ["mushroom_id"]}))}, + sync=True, + ) + assert len(catalog.streams) == 1 + assert catalog.streams[0].sync_mode == SyncMode.incremental + assert catalog.streams[0].destination_sync_mode == DestinationSyncMode.append_dedup + assert catalog.streams[0].primary_key == [["mushroom_id"]] + assert catalog.streams[0].cursor_field == ["mushroom_id"] + + +def test_airbyte_mysql_source_catalog_selection_incremental_pk_override(): + catalog = select_streams( + _EXPECTED_AIRBYTE_CATALOG, + tables=None, + sync=True, + cursor_overrides={"mushrooms": ["discovery"]}, + primary_key_overrides={"mushrooms": ["discovery"]}, + ) + assert len(catalog.streams) == 1 + assert catalog.streams[0].sync_mode == SyncMode.incremental + assert catalog.streams[0].destination_sync_mode == DestinationSyncMode.append_dedup + assert catalog.streams[0].primary_key == [["discovery"]] + assert catalog.streams[0].cursor_field == ["discovery"] + + +def test_airbyte_mysql_source_catalog_selection_incremental_pk_override_tables(): + catalog = select_streams( + _EXPECTED_AIRBYTE_CATALOG, + tables={ + "mushrooms": ( + [], + TableParams( + {"airbyte_cursor_field": ["discovery"], "airbyte_primary_key": ["discovery"]} + ), + ) + }, + sync=True, + ) + assert len(catalog.streams) == 1 + assert catalog.streams[0].sync_mode == SyncMode.incremental + assert catalog.streams[0].destination_sync_mode == DestinationSyncMode.append_dedup + assert catalog.streams[0].primary_key == [["discovery"]] + assert catalog.streams[0].cursor_field == ["discovery"] + + +# Test in three modes: +# * Sync: two syncs one after another, make sure state is preserved and reinjected +# * Load: just a load into a fresh repo (not much difference since we still store emitted state) +# * Load after sync: make sure we delete data from raw tables between syncs. +@pytest.mark.mounting +@pytest.mark.parametrize("mode", ["sync", "load", "load_after_sync"]) +def test_airbyte_mysql_source_end_to_end(local_engine_empty, mode): + # Use the mushroom_id as the cursor for incremental replication. + # Note we ignore the schema here (Airbyte does its own normalization so we can't predict it). + repo = Repository.from_schema(TEST_REPO) + + if mode == "sync": + source = _source( + local_engine_empty, + table_params={ + "mushrooms": ([], TableParams({"airbyte_cursor_field": ["mushroom_id"]})) + }, + ) + source.sync(repo, "latest") + expected_tables = [ + "_airbyte_raw_mushrooms", + "_sg_ingestion_state", + "mushrooms", + # slowly changing dimension, used for incremental replication + "mushrooms_scd", + ] + else: + source = _source(local_engine_empty) + source.load(repo) + expected_tables = [ + "_airbyte_raw_mushrooms", + "_sg_ingestion_state", + "mushrooms", + ] + + assert len(repo.images()) == 1 + image = repo.images["latest"] + + assert sorted(image.get_tables()) == expected_tables + image.checkout() + + _assert_raw_data(repo) + _assert_normalized_data(repo) + + if mode == "sync": + _assert_state(repo) + _assert_scd_data(repo) + + # Run another sync + source.sync(repo, "latest") + assert len(repo.images()) == 2 + image = repo.images["latest"] + assert sorted(image.get_tables()) == [ + "_airbyte_raw_mushrooms", + "_sg_ingestion_state", + "mushrooms", + "mushrooms_scd", + ] + image.checkout() + + # Check the empty object wasn't written + assert len(image.get_table("_airbyte_raw_mushrooms").objects) == 1 + + # Check the table lengths are all the same (including the raw tables, since we used the + # ingestion state to make sure the source didn't output more raw data) + for table in image.get_tables(): + expected_rows = 1 if table == "_sg_ingestion_state" else 2 + assert ( + repo.run_sql( + SQL("SELECT COUNT(1) FROM {}").format(Identifier(table)), + return_shape=ResultShape.ONE_ONE, + ) + == expected_rows + ) + elif mode == "load": + _assert_state_empty(repo) + elif mode == "load_after_sync": + # Run a load after a sync to make sure the image gets cleared out properly. + + source.load(repo) + + assert len(repo.images()) == 2 + image = repo.images["latest"] + + # Check the SDC table went away + assert sorted(image.get_tables()) == [ + "_airbyte_raw_mushrooms", + "_sg_ingestion_state", + "mushrooms", + ] + image.checkout() + + _assert_raw_data(repo) + _assert_normalized_data(repo) + _assert_state_empty(repo) + + +@pytest.mark.mounting +def test_airbyte_mysql_source_pk_override(local_engine_empty): + source = _source(local_engine_empty) + repo = Repository.from_schema(TEST_REPO) + source.cursor_overrides = {"mushrooms": ["discovery"]} + source.primary_key_overrides = {"mushrooms": ["discovery"]} + # Use sync since otherwise we don't get any effect in the destination (destination_sync_mode + # has to be append_dedup) + source.sync(repo, "latest") + + # Note we don't actually emit PKs here so we can't check they have changed (only influences + # dedup). This is mostly to make sure it doesn't break. + assert len(repo.images()) == 1 + repo.images["latest"].checkout() + _assert_normalized_data(repo) + + +def _assert_state(repo): + assert repo.run_sql("SELECT state FROM _sg_ingestion_state")[0][0] == { + "cdc": False, + "streams": [ + { + "stream_name": "mushrooms", + "stream_namespace": "mysqlschema", + "cursor_field": ["mushroom_id"], + "cursor": "2", + } + ], + } + + +def _assert_state_empty(repo): + assert repo.run_sql("SELECT state FROM _sg_ingestion_state")[0][0] == {} + + +def _assert_scd_data(repo): + assert repo.run_sql( + "SELECT row_to_json(m) FROM mushrooms_scd m ORDER BY _airbyte_start_at ASC", + return_shape=ResultShape.MANY_ONE, + ) == [ + { + "discovery": "2012-11-11T08:06:26Z", + "friendly": True, + "binary_data": "YmludHN0AA==", + "name": "portobello", + "mushroom_id": 1, + "varbinary_data": "fwAAAQ==", + "_airbyte_start_at": 1, + "_airbyte_end_at": None, + "_airbyte_active_row": True, + "_airbyte_emitted_at": mock.ANY, + "_airbyte_mushrooms_hashid": "e48f260f784baa48a5c4643ef36024af", + }, + { + "discovery": "2018-03-17T08:06:26Z", + "friendly": False, + "binary_data": "AAAxMjMAAA==", + "name": "deathcap", + "mushroom_id": 2, + "varbinary_data": "fwAAAQ==", + "_airbyte_start_at": 2, + "_airbyte_end_at": None, + "_airbyte_active_row": True, + "_airbyte_emitted_at": mock.ANY, + "_airbyte_mushrooms_hashid": "5257322455a690592e14baeb4d24069c", + }, + ] + + +def _assert_normalized_data(repo): + # Check the normalized data + assert repo.run_sql( + "SELECT row_to_json(m) FROM mushrooms m ORDER BY discovery ASC", + return_shape=ResultShape.MANY_ONE, + ) == [ + { + "discovery": "2012-11-11T08:06:26Z", + "friendly": True, + "binary_data": "YmludHN0AA==", + "name": "portobello", + "mushroom_id": 1, + "varbinary_data": "fwAAAQ==", + "_airbyte_emitted_at": mock.ANY, + "_airbyte_mushrooms_hashid": "e48f260f784baa48a5c4643ef36024af", + }, + { + "discovery": "2018-03-17T08:06:26Z", + "friendly": False, + "binary_data": "AAAxMjMAAA==", + "name": "deathcap", + "mushroom_id": 2, + "varbinary_data": "fwAAAQ==", + "_airbyte_emitted_at": mock.ANY, + "_airbyte_mushrooms_hashid": "5257322455a690592e14baeb4d24069c", + }, + ] + + # Airbyte's normalization doesn't seem to emit PKs, so all is_pk will be False in any case. + assert repo.images["latest"].get_table("mushrooms").table_schema == [ + TableColumn( + ordinal=1, name="discovery", pg_type="character varying", is_pk=False, comment=None + ), + TableColumn(ordinal=2, name="friendly", pg_type="boolean", is_pk=False, comment=None), + TableColumn( + ordinal=3, name="binary_data", pg_type="character varying", is_pk=False, comment=None + ), + TableColumn(ordinal=4, name="name", pg_type="character varying", is_pk=False, comment=None), + TableColumn( + ordinal=5, name="mushroom_id", pg_type="double precision", is_pk=False, comment=None + ), + TableColumn( + ordinal=6, name="varbinary_data", pg_type="character varying", is_pk=False, comment=None + ), + TableColumn( + ordinal=7, + name="_airbyte_emitted_at", + pg_type="timestamp with time zone", + is_pk=False, + comment=None, + ), + TableColumn( + ordinal=8, name="_airbyte_mushrooms_hashid", pg_type="text", is_pk=False, comment=None + ), + ] + + +def _assert_raw_data(repo): + # Check the raw data + assert sorted( + repo.run_sql( + "SELECT row_to_json(m) FROM _airbyte_raw_mushrooms m", return_shape=ResultShape.MANY_ONE + ), + key=lambda r: r["_airbyte_data"]["mushroom_id"], + ) == [ + { + "_airbyte_ab_id": mock.ANY, + "_airbyte_data": { + "name": "portobello", + "friendly": True, + "discovery": "2012-11-11T08:06:26Z", + "binary_data": "YmludHN0AA==", + "mushroom_id": 1, + "varbinary_data": "fwAAAQ==", + }, + "_airbyte_emitted_at": mock.ANY, + }, + { + "_airbyte_ab_id": mock.ANY, + "_airbyte_data": { + "name": "deathcap", + "friendly": False, + "discovery": "2018-03-17T08:06:26Z", + "binary_data": "AAAxMjMAAA==", + "mushroom_id": 2, + "varbinary_data": "fwAAAQ==", + }, + "_airbyte_emitted_at": mock.ANY, + }, + ] + + +@pytest.mark.mounting +def test_airbyte_mysql_source_failure(local_engine_empty): + source = _source(local_engine_empty) + source.credentials["password"] = "wrongpass" + repo = Repository.from_schema(TEST_REPO) + + with pytest.raises(SubprocessError) as e: + source.sync(repo, "latest") + assert re.match(r"Container sg-ab-src-\S+ exited with 1", str(e.value)) + # Check we didn't create an empty image + assert len(repo.images()) == 0 diff --git a/test/splitgraph/test_misc.py b/test/splitgraph/test_misc.py index 04aef53c..0badaf2b 100644 --- a/test/splitgraph/test_misc.py +++ b/test/splitgraph/test_misc.py @@ -351,7 +351,7 @@ def test_table_schema_params_to_dict(): comment=None, ), ], - {"key": "value"}, + {"key": "value", "key_2": ["this", "is", "an", "array"]}, ), } ) == { @@ -361,7 +361,7 @@ def test_table_schema_params_to_dict(): }, "vegetables": { "schema": {"name": "character varying", "vegetable_id": "integer"}, - "options": {"key": "value"}, + "options": {"key": "value", "key_2": ["this", "is", "an", "array"]}, }, }