Skip to content

Commit

Permalink
feat(connector): integrate connectorx into connector
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxiaoying authored and dovahcrow committed Oct 25, 2021
1 parent 1e0dfec commit 106457e
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 71 deletions.
39 changes: 5 additions & 34 deletions dataprep/connector/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,8 @@ def read_sql(
Run the SQL query, download the data from database into a dataframe.
Please check out https://github.com/sfu-db/connector-x for more details.
Supported databases
==========
- Postgres
- Mysql
- Sqlite
- SQL Server
- Oracle
- Redshift (through postgres protocol)
- Clickhouse (through mysql protocol)
Supported dataframes
==========
- Pandas
- Arrow
- Dask
- Modin
- Polars
Parameters
==========
----------
conn
the connection string.
query
Expand All @@ -62,22 +44,11 @@ def read_sql(
partition_num
how many partition to generate.
Examples
========
Read a DataFrame from a SQL using a single thread:
>>> postgres_url = "postgresql://username:password@server:port/database"
>>> query = "SELECT * FROM lineitem"
>>> read_sql(postgres_url, query)
Read a DataFrame parallelly using 10 threads by automatically partitioning the provided SQL
on the partition column:
>>> postgres_url = "postgresql://username:password@server:port/database"
Example
--------
>>> db_url = "postgresql://username:password@server:port/database"
>>> query = "SELECT * FROM lineitem"
>>> read_sql(postgres_url, query, partition_on="partition_col", partition_num=10)
Read a DataFrame parallelly using 2 threads by manually providing two partition SQLs:
>>> postgres_url = "postgresql://username:password@server:port/database"
>>> queries = ["SELECT * FROM lineitem WHERE partition_col <= 10",
"SELECT * FROM lineitem WHERE partition_col > 10"]
>>> read_sql(postgres_url, queries)
>>> read_sql(db_url, query, partition_on="partition_col", partition_num=10)
"""
if _WITH_CX:
df = cx.read_sql(
Expand Down
1 change: 1 addition & 0 deletions dataprep/tests/connector/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
# assert len(df) < 1000
#


@pytest.mark.skipif(
environ.get("DATAPREP_CREDENTIAL_TESTS", "0") == "0",
reason="Skip tests that requires credential",
Expand Down
5 changes: 5 additions & 0 deletions docs/source/api_reference/dataprep.connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,8 @@ Errors
:members:
:undoc-members:
:show-inheritance:

read_sql
---------

.. autofunction:: dataprep.connector.read_sql
55 changes: 29 additions & 26 deletions docs/source/bokeh/theme.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,44 @@
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Copyright (c) 2012 - 2020, Anaconda, Inc., and Bokeh Contributors.
# All rights reserved.
#
# The full license is in the file LICENSE.txt, distributed with this software.
#-----------------------------------------------------------------------------
''' Install some functions for the bokeh theme to make use of.
# -----------------------------------------------------------------------------
""" Install some functions for the bokeh theme to make use of.
'''
"""

#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Boilerplate
#-----------------------------------------------------------------------------
import logging # isort:skip
# -----------------------------------------------------------------------------
import logging # isort:skip

log = logging.getLogger(__name__)

#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------

# External imports
import sphinx.builders.html
from docutils import nodes
from sphinx.locale import admonitionlabels
from sphinx.writers.html5 import HTML5Translator

#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------

__all__ = (
'setup',
)
__all__ = ("setup",)

#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# General API
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Dev API
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------

# Mapping of admonition classes to Bootstrap contextual classes
alert_classes = {
Expand All @@ -57,6 +56,7 @@
"example": "info",
}


class BootstrapHTML5Translator(HTML5Translator):
"""Custom HTML Translator for a Bootstrap-ified Sphinx layout
This is a specialization of the HTML5 Translator of sphinx.
Expand Down Expand Up @@ -91,6 +91,7 @@ def visit_table(self, node: nodes.Element) -> None:
tag = self.starttag(node, "table", CLASS=" ".join(classes))
self.body.append(tag)


def convert_docutils_node(list_item, only_pages=False):
if not list_item.children:
return None
Expand All @@ -99,7 +100,7 @@ def convert_docutils_node(list_item, only_pages=False):
url = reference.attributes["refuri"]
active = "current" in list_item.attributes["classes"]

if only_pages and '#' in url:
if only_pages and "#" in url:
return None

nav = {}
Expand All @@ -116,13 +117,12 @@ def convert_docutils_node(list_item, only_pages=False):

return nav


def update_page_context(self, pagename, templatename, ctx, event_arg):
from sphinx.environment.adapters.toctree import TocTree

def get_nav_object(**kwds):
toctree = TocTree(self.env).get_toctree_for(
pagename, self, collapse=True, **kwds
)
toctree = TocTree(self.env).get_toctree_for(pagename, self, collapse=True, **kwds)

nav = []
for child in toctree.children[0].children:
Expand All @@ -144,15 +144,18 @@ def get_page_toc_object():
ctx["get_page_toc_object"] = get_page_toc_object
return None


sphinx.builders.html.StandaloneHTMLBuilder.update_page_context = update_page_context


def setup(app):
app.set_translator("html", BootstrapHTML5Translator)

#-----------------------------------------------------------------------------

# -----------------------------------------------------------------------------
# Private API
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------

#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Code
#-----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
37 changes: 30 additions & 7 deletions docs/source/user_guide/connector/introduction.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@
"Connector\n",
"=========\n",
"\n",
"DataPrep.Connector aims to simplify data collection from Web APIs by providing a standard set of operations. \n",
"Connector wraps-up complex API calls into a set of easy-to-use Python functions. \n",
"By using Connector, you can skip the complex API configuration process and rapidly query different Web APIs in few steps, enabling you to execute the analysis workflow you are familiar with in a direct way.\n",
"\n",
"Watch our introduction in PyGlobal Conference `here <https://www.youtube.com/watch?v=56qu-0Ka-dA/>`_."
"DataPrep.Connector aims to simplify data collection from Web APIs and databases by providing a standard set of operations. "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Web APIS\n",
"Connector wraps-up complex API calls into a set of easy-to-use Python functions. \n",
"By using Connector, you can skip the complex API configuration process and rapidly query different Web APIs in few steps, enabling you to execute the analysis workflow you are familiar with in a direct way.\n",
"\n",
"Watch our introduction in PyGlobal Conference [here](https://www.youtube.com/watch?v=56qu-0Ka-dA/).\n",
"\n",
"Connector offers essential features to facilitate the process of collecting data, for example:\n",
"\n",
"* Concurrency: Collect data from websites, in parallel, in a fast way!\n",
Expand All @@ -30,7 +32,27 @@
"\n",
"* Authorization: Access more Web APIs quickly! Even the ones that implement authorization!\n",
"\n",
"The user guide first presents a case study for [dblp](https://dblp.org/) as an example for the process overview and provides a detailed explanation of the functionalities in the following sections."
"The user guide first presents a case study for [dblp](https://dblp.org/) as an example for the process overview and provides a detailed explanation of the functionalities in the following sections.\n",
"\n",
"## DBs\n",
"Connector wraps on [connectorx](https://github.com/sfu-db/connector-x>) to allow user fetch data from databases through SQL query. The result of the query will be stored into a Python dataframe.\n",
"\n",
"#### Supported databases:\n",
"* Postgres\n",
"* Mysql\n",
"* Sqlite\n",
"* SQL Server\n",
"* Oracle\n",
"* Redshift (through postgres protocol)\n",
"* Clickhouse (through mysql protocol)\n",
"\n",
"#### Supported dataframes:\n",
"* Pandas\n",
"* Arrow\n",
"* Dask\n",
"* Modin\n",
"* Polars\n",
"\n"
]
},
{
Expand All @@ -53,7 +75,8 @@
" * [query(): fetch data into DataFrames via APIs](query.ipynb)\n",
" * [info(): get information of a website](info.ipynb)\n",
" * [Authorization schemes supported](authorization.ipynb)\n",
" * [Auto-pagination](pagination.ipynb)"
" * [Auto-pagination](pagination.ipynb)\n",
" * [read_sql(): fetch data into DataFrames via SQL query](sql.ipynb)"
]
}
],
Expand Down
120 changes: 120 additions & 0 deletions docs/source/user_guide/connector/sql.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# ```read_sql()``` function"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The read_sql function issues a SQL query to a specific database and return the result in a Python DataFrame.\n",
"In this section, we show how to use this feature.\n",
"\n",
"## Install `connectorx`\n",
"\n",
"Connector wrap up the function on [connectorx](https://github.com/sfu-db/connector-x), in order to enable `read_sql`, you need to first install it by running:\n",
"\n",
"```\n",
"pip install connectorx\n",
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## API\n",
"\n",
"```python\n",
"read_sql(conn: str, query: Union[List[str], str], *, return_type: str = \"pandas\", protocol: str = \"binary\", partition_on: Optional[str] = None, partition_range: Optional[Tuple[int, int]] = None, partition_num: Optional[int] = None)\n",
"```\n",
"\n",
"Run the SQL query, download the data from database into a Pandas dataframe.\n",
"\n",
"## Parameters\n",
"- `conn: str`: Connection string URI. Supported URI scheme: `(postgres|postgressql|mysql|mssql|sqlite)://username:password@addr:port/dbname`.\n",
"- `query: Union[str, List[str]]`: SQL query or list of SQL queries for fetching data.\n",
"- `return_type: str = \"pandas\"`: The return type of this function. It can be `arrow`, `pandas`, `modin`, `dask` or `polars`.\n",
"- `protocol: str = \"binary\"`: The protocol used to fetch data from source, default is `binary`. Check out [here](https://github.com/sfu-db/connector-x/blob/main/Types.md) to see more details.\n",
"- `partition_on: Optional[str]`: The column to partition the result.\n",
"- `partition_range: Optional[Tuple[int, int]]`: The value range of the partition column.\n",
"- `partition_num: Optioinal[int]`: The number of partitions to generate.\n",
"\n",
"## Examples\n",
"- Read a DataFrame from a SQL using a single thread\n",
"\n",
" ```python\n",
" from dataprep.connector as cx\n",
"\n",
" postgres_url = \"postgresql://username:password@server:port/database\"\n",
" query = \"SELECT * FROM lineitem\"\n",
"\n",
" cx.read_sql(postgres_url, query)\n",
" ```\n",
"\n",
"- Read a DataFrame parallelly using 10 threads by automatically partitioning the provided SQL on the partition column (`partition_range` will be automatically queried if not given)\n",
"\n",
" ```python\n",
" import connectorx as cx\n",
"\n",
" postgres_url = \"postgresql://username:password@server:port/database\"\n",
" query = \"SELECT * FROM lineitem\"\n",
"\n",
" cx.read_sql(postgres_url, query, partition_on=\"l_orderkey\", partition_num=10)\n",
" ```\n",
"\n",
"- Read a DataFrame parallelly using 2 threads by manually providing two partition SQLs (the schemas of all the query results should be same)\n",
"\n",
" ```python\n",
" import connectorx as cx\n",
"\n",
" postgres_url = \"postgresql://username:password@server:port/database\"\n",
" queries = [\"SELECT * FROM lineitem WHERE l_orderkey <= 30000000\", \"SELECT * FROM lineitem WHERE l_orderkey > 30000000\"]\n",
"\n",
" cx.read_sql(postgres_url, queries)\n",
"\n",
" ```\n",
" \n",
"- Read a DataFrame parallelly using 4 threads from a more complex query\n",
"\n",
" ```python\n",
" import connectorx as cx\n",
"\n",
" postgres_url = \"postgresql://username:password@server:port/database\"\n",
" query = f\"\"\"\n",
" SELECT l_orderkey,\n",
" SUM(l_extendedprice * ( 1 - l_discount )) AS revenue,\n",
" o_orderdate,\n",
" o_shippriority\n",
" FROM customer,\n",
" orders,\n",
" lineitem\n",
" WHERE c_mktsegment = 'BUILDING'\n",
" AND c_custkey = o_custkey\n",
" AND l_orderkey = o_orderkey\n",
" AND o_orderdate < DATE '1995-03-15'\n",
" AND l_shipdate > DATE '1995-03-15'\n",
" GROUP BY l_orderkey,\n",
" o_orderdate,\n",
" o_shippriority \n",
" \"\"\"\n",
"\n",
" cx.read_sql(postgres_url, query, partition_on=\"l_orderkey\", partition_num=4)\n",
"\n",
" ```"
]
}
],
"metadata": {
"language_info": {
"name": "python"
},
"orig_nbformat": 4
},
"nbformat": 4,
"nbformat_minor": 2
}

1 comment on commit 106457e

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataPrep.EDA Benchmarks

Benchmark suite Current: 106457e Previous: 1e0dfec Ratio
dataprep/tests/benchmarks/eda.py::test_create_report 0.18193184574336807 iter/sec (stddev: 0.06637716183494886) 0.19024625460585337 iter/sec (stddev: 0.05150359901659495) 1.05

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.