Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 25 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[![Python Version](https://img.shields.io/badge/python-3.9+-blue.svg)](https://www.python.org/downloads/)
[![MongoDB](https://img.shields.io/badge/MongoDB-7.0+-green.svg)](https://www.mongodb.com/)
[![SQLAlchemy](https://img.shields.io/badge/SQLAlchemy-1.4+_2.0+-darkgreen.svg)](https://www.sqlalchemy.org/)
[![Superset](https://img.shields.io/badge/Apache_Superset-1.0+-blue.svg)](https://superset.apache.org/docs/6.0.0/configuration/databases)

PyMongoSQL is a Python [DB API 2.0 (PEP 249)](https://www.python.org/dev/peps/pep-0249/) client for [MongoDB](https://www.mongodb.com/). It provides a familiar SQL interface to MongoDB, allowing developers to use SQL to interact with MongoDB collections.

Expand Down Expand Up @@ -40,6 +41,9 @@ PyMongoSQL implements the DB API 2.0 interfaces to provide SQL-like access to Mo
- **ANTLR4** (SQL Parser Runtime)
- antlr4-python3-runtime >= 4.13.0

- **JMESPath** (JSON/Dict Path Query)
- jmespath >= 1.0.0

### Optional Dependencies

- **SQLAlchemy** (for ORM/Core support)
Expand Down Expand Up @@ -136,37 +140,41 @@ while users:
### SELECT Statements
- Field selection: `SELECT name, age FROM users`
- Wildcards: `SELECT * FROM products`
- **Nested fields**: `SELECT profile.name, profile.age FROM users`
- **Array access**: `SELECT items[0], items[1].name FROM orders`

### WHERE Clauses
- Equality: `WHERE name = 'John'`
- Comparisons: `WHERE age > 25`, `WHERE price <= 100.0`
- Logical operators: `WHERE age > 18 AND status = 'active'`
- **Nested field filtering**: `WHERE profile.status = 'active'`
- **Array filtering**: `WHERE items[0].price > 100`

### Nested Field Support
- **Single-level**: `profile.name`, `settings.theme`
- **Multi-level**: `account.profile.name`, `config.database.host`
- **Array access**: `items[0].name`, `orders[1].total`
- **Complex queries**: `WHERE customer.profile.age > 18 AND orders[0].status = 'paid'`

> **Note**: Avoid SQL reserved words (`user`, `data`, `value`, `count`, etc.) as unquoted field names. Use alternatives or bracket notation for arrays.

### Sorting and Limiting
- ORDER BY: `ORDER BY name ASC, age DESC`
- LIMIT: `LIMIT 10`
- Combined: `ORDER BY created_at DESC LIMIT 5`

## Connection Options
## Limitations & Roadmap

```python
from pymongosql.connection import Connection
**Note**: Currently PyMongoSQL focuses on Data Query Language (DQL) operations. The following SQL features are **not yet supported** but are planned for future releases:

# Basic connection
conn = Connection(host="localhost", port=27017, database="mydb")
- **DML Operations** (Data Manipulation Language)
- `INSERT`, `UPDATE`, `DELETE`
- **DDL Operations** (Data Definition Language)
- `CREATE TABLE/COLLECTION`, `DROP TABLE/COLLECTION`
- `CREATE INDEX`, `DROP INDEX`
- `LIST TABLES/COLLECTIONS`

# With authentication
conn = Connection(
host="mongodb://user:pass@host:port/db?authSource=admin",
database="mydb"
)

# Connection properties
print(conn.host) # MongoDB connection URL
print(conn.port) # Port number
print(conn.database_name) # Database name
print(conn.is_connected) # Connection status
```
These features are on our development roadmap and contributions are welcome!

## Contributing

Expand Down
22 changes: 21 additions & 1 deletion pymongosql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from .connection import Connection

__version__: str = "0.2.1"
__version__: str = "0.2.2"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel: str = "2.0"
Expand Down Expand Up @@ -42,6 +42,26 @@ def connect(*args, **kwargs) -> "Connection":
return Connection(*args, **kwargs)


# Register superset execution strategy for mongodb+superset:// connections
def _register_superset_executor() -> None:
"""Register SupersetExecution strategy for superset mode.

This allows the executor and cursor to be unaware of superset -
the execution strategy is automatically selected based on the connection mode.
"""
try:
from .executor import ExecutionPlanFactory
from .superset_mongodb.executor import SupersetExecution

ExecutionPlanFactory.register_strategy(SupersetExecution())
except ImportError:
# Superset module not available - skip registration
pass


# Auto-register superset executor on module import
_register_superset_executor()

# SQLAlchemy integration (optional)
# For SQLAlchemy functionality, import from pymongosql.sqlalchemy_mongodb:
# from pymongosql.sqlalchemy_mongodb import create_engine_url, create_engine_from_mongodb_uri
Expand Down
2 changes: 2 additions & 0 deletions pymongosql/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ class BaseCursor(metaclass=ABCMeta):
def __init__(
self,
connection: "Connection",
mode: str = "standard",
**kwargs,
) -> None:
super().__init__()
self._connection = connection
self.mode = mode

@property
def connection(self) -> "Connection":
Expand Down
15 changes: 15 additions & 0 deletions pymongosql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from .common import BaseCursor
from .cursor import Cursor
from .error import DatabaseError, NotSupportedError, OperationalError
from .helper import ConnectionHelper

_logger = logging.getLogger(__name__)

Expand All @@ -35,9 +36,17 @@ def __init__(
to ensure full compatibility. All parameters are passed through directly
to the underlying MongoClient.

Supports connection string patterns:
- mongodb://host:port/database - Core driver (no subquery support)
- mongodb+superset://host:port/database - Superset driver with subquery support

See PyMongo MongoClient documentation for full parameter details.
https://www.mongodb.com/docs/languages/python/pymongo-driver/current/connect/mongoclient/
"""
# Check if connection string specifies mode
connection_string = host if isinstance(host, str) else None
self._mode, host = ConnectionHelper.parse_connection_string(connection_string)

# Extract commonly used parameters for backward compatibility
self._host = host or "localhost"
self._port = port or 27017
Expand Down Expand Up @@ -154,6 +163,11 @@ def database(self) -> Database:
raise OperationalError("No database selected")
return self._database

@property
def mode(self) -> str:
"""Get the specified mode"""
return self._mode

def use_database(self, database_name: str) -> None:
"""Switch to a different database"""
if self._client is None:
Expand Down Expand Up @@ -267,6 +281,7 @@ def cursor(self, cursor: Optional[Type[BaseCursor]] = None, **kwargs) -> BaseCur

new_cursor = cursor(
connection=self,
mode=self._mode,
**kwargs,
)
self.cursor_pool.append(new_cursor)
Expand Down
127 changes: 35 additions & 92 deletions pymongosql/cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@
import logging
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence, Tuple, TypeVar

from pymongo.cursor import Cursor as MongoCursor
from pymongo.errors import PyMongoError

from .common import BaseCursor, CursorIterator
from .error import DatabaseError, OperationalError, ProgrammingError, SqlSyntaxError
from .result_set import ResultSet
from .executor import ExecutionContext, ExecutionPlanFactory
from .result_set import DictResultSet, ResultSet
from .sql.builder import ExecutionPlan
from .sql.parser import SQLParser

if TYPE_CHECKING:
from .connection import Connection
Expand All @@ -23,25 +20,25 @@ class Cursor(BaseCursor, CursorIterator):

NO_RESULT_SET = "No result set."

def __init__(self, connection: "Connection", **kwargs) -> None:
def __init__(self, connection: "Connection", mode: str = "standard", **kwargs) -> None:
super().__init__(
connection=connection,
mode=mode,
**kwargs,
)
self._kwargs = kwargs
self._result_set: Optional[ResultSet] = None
self._result_set_class = ResultSet
self._current_execution_plan: Optional[ExecutionPlan] = None
self._mongo_cursor: Optional[MongoCursor] = None
self._is_closed = False

@property
def result_set(self) -> Optional[ResultSet]:
return self._result_set

@result_set.setter
def result_set(self, val: ResultSet) -> None:
self._result_set = val
def result_set(self, rs: ResultSet) -> None:
self._result_set = rs

@property
def has_result_set(self) -> bool:
Expand All @@ -52,8 +49,8 @@ def result_set_class(self) -> Optional[type]:
return self._result_set_class

@result_set_class.setter
def result_set_class(self, val: type) -> None:
self._result_set_class = val
def result_set_class(self, rs_cls: type) -> None:
self._result_set_class = rs_cls

@property
def rowcount(self) -> int:
Expand All @@ -78,74 +75,6 @@ def _check_closed(self) -> None:
if self._is_closed:
raise ProgrammingError("Cursor is closed")

def _parse_sql(self, sql: str) -> ExecutionPlan:
"""Parse SQL statement and return ExecutionPlan"""
try:
parser = SQLParser(sql)
execution_plan = parser.get_execution_plan()

if not execution_plan.validate():
raise SqlSyntaxError("Generated query plan is invalid")

return execution_plan

except SqlSyntaxError:
raise
except Exception as e:
_logger.error(f"SQL parsing failed: {e}")
raise SqlSyntaxError(f"Failed to parse SQL: {e}")

def _execute_execution_plan(self, execution_plan: ExecutionPlan) -> None:
"""Execute an ExecutionPlan against MongoDB using db.command"""
try:
# Get database
if not execution_plan.collection:
raise ProgrammingError("No collection specified in query")

db = self.connection.database

# Build MongoDB find command
find_command = {"find": execution_plan.collection, "filter": execution_plan.filter_stage or {}}

# Apply projection if specified (already in MongoDB format)
if execution_plan.projection_stage:
find_command["projection"] = execution_plan.projection_stage

# Apply sort if specified
if execution_plan.sort_stage:
sort_spec = {}
for sort_dict in execution_plan.sort_stage:
for field, direction in sort_dict.items():
sort_spec[field] = direction
find_command["sort"] = sort_spec

# Apply skip if specified
if execution_plan.skip_stage:
find_command["skip"] = execution_plan.skip_stage

# Apply limit if specified
if execution_plan.limit_stage:
find_command["limit"] = execution_plan.limit_stage

_logger.debug(f"Executing MongoDB command: {find_command}")

# Execute find command directly
result = db.command(find_command)

# Create result set from command result
self._result_set = self._result_set_class(
command_result=result, execution_plan=execution_plan, **self._kwargs
)

_logger.info(f"Query executed successfully on collection '{execution_plan.collection}'")

except PyMongoError as e:
_logger.error(f"MongoDB command execution failed: {e}")
raise DatabaseError(f"Command execution failed: {e}")
except Exception as e:
_logger.error(f"Unexpected error during command execution: {e}")
raise OperationalError(f"Command execution error: {e}")

def execute(self: _T, operation: str, parameters: Optional[Dict[str, Any]] = None) -> _T:
"""Execute a SQL statement

Expand All @@ -162,11 +91,25 @@ def execute(self: _T, operation: str, parameters: Optional[Dict[str, Any]] = Non
_logger.warning("Parameter substitution not yet implemented, ignoring parameters")

try:
# Parse SQL to ExecutionPlan
self._current_execution_plan = self._parse_sql(operation)
# Create execution context
context = ExecutionContext(operation, self.mode)

# Get appropriate execution strategy
strategy = ExecutionPlanFactory.get_strategy(context)

# Execute using selected strategy (Standard or Subquery)
result = strategy.execute(context, self.connection)

# Execute the execution plan
self._execute_execution_plan(self._current_execution_plan)
# Store execution plan for reference
self._current_execution_plan = strategy.execution_plan

# Create result set from command result
self._result_set = self._result_set_class(
command_result=result,
execution_plan=self._current_execution_plan,
database=self.connection.database,
**self._kwargs,
)

return self

Expand Down Expand Up @@ -236,15 +179,6 @@ def fetchall(self) -> List[Sequence[Any]]:
def close(self) -> None:
"""Close the cursor and free resources"""
try:
if self._mongo_cursor:
# Close MongoDB cursor
try:
self._mongo_cursor.close()
except Exception as e:
_logger.warning(f"Error closing MongoDB cursor: {e}")
finally:
self._mongo_cursor = None

if self._result_set:
# Close result set
try:
Expand Down Expand Up @@ -274,3 +208,12 @@ def __del__(self):
self.close()
except Exception:
pass # Ignore errors during cleanup


class DictCursor(Cursor):
"""Cursor that returns results as dictionaries instead of tuples/sequences"""

def __init__(self, connection: "Connection", **kwargs) -> None:
super().__init__(connection=connection, **kwargs)
# Override result set class to use DictResultSet
self._result_set_class = DictResultSet
Loading