In [30]:
import sqlite3
import copy
from typing import (
    Any,
    Generic,
    TypeVar,
    Self,
    Generator,
    Type,
    Literal
)
import uuid

# ORM SOURCE CODE

In [31]:


DATATYPES: dict[str, str] = {
    'int':'INTEGER',
    'str':'TEXT',
    'float':'REAL',
    'bytes':'BLOB'
}

T = TypeVar('T')

class SQLModel:
    
    def __init__(self,**kwargs) -> None:
        self.__annotations__ = vars(self.__class__)['__annotations__']
        self.__model_fields__: dict[str, Any] = {key:copy.deepcopy(getattr(self,key)) for key in self.__annotations__.keys()}
        self.__name__: str = self.__class__.__name__
        self.__sql__ = f"CREATE TABLE {self.__name__.lower()} ({', '.join([self.__model_fields__[x].__sql__ for x in self.__model_fields__.keys()])});"
        for key,val in kwargs.items():
            setattr(self,key,val)
    
    def __repr__(self) -> str:
        data: list[str] = [f'{key}={getattr(self,key)}' for key in self.__annotations__.keys()]
        return f"{self.__class__.__name__}({','.join(data)})"
    
    @staticmethod    
    def execute_table(conn:sqlite3.Connection) -> sqlite3.Cursor:
        
        #create cursor
        cursor: sqlite3.Cursor = conn.cursor()
        
        #tables
        sql_create_tables:list[str] = [x().__sql__ for x in SQLModel.__subclasses__()]
        
        script:str = f"BEGIN; {' '.join(sql_create_tables)} COMMIT;"
        
        return cursor.executescript(script)
class Field(Generic[T]):
    
    def __init__(
        self,
        primary_key=False,
        unique=False,
        required=False,
        auto_increment=False,
        name='',
        _type:Any=None,
    ) -> None:
        
        self.__name__: str = name
        self.__type__: type = _type
        
        self.__constraints__:list[str] = []
        
        if primary_key:
            self.__constraints__.append("PRIMARY KEY")
            
        if unique:
            self.__constraints__.append("UNIQUE")
            
        if required:
            self.__constraints__.append("NOT NULL")
            
        if auto_increment:
            self.__constraints__.append("AUTOINCREMENT")
        
        self.__sql__:str = f"{self.__name__} {DATATYPES[self.__type__.__name__]} {' '.join(self.__constraints__)}"
    
    def __repr__(self) -> str:
        return self.__name__
    
    def __str__(self) -> str:
        return ""
    
    def __eq__(self, __value: object) -> str:
        
        if isinstance(__value,str):
            return f"{self.__name__} = '{__value}'"
        else:
            return f"{self.__name__} = {__value}"
        
    def __gt__(self, __value: object) -> str:
        return f"{self.__name__} > {__value}"
        
    def __lt__(self, __value: object) -> str:
        return f"{self.__name__} < {__value}"
    
    def __ge__(self, __value: object) -> str:
        return f"{self.__name__} >= {__value}"
    
    def __le__(self, __value: object) -> str:
        return f"{self.__name__} <= {__value}"
    
    def __ne__(self, __value: object) -> str:
        return f"{self.__name__} <> {__value}"
    
    def __in__(self,__iter: object) -> str:
        return ""
    
def _and(left,right) -> str:
    return f"{left} AND {right}"

def _or(left,right) -> str:
    return f"{left} OR {right}"

def _not(condition) -> str:
    return f"NOT {condition}"

def _null(conditions) -> str:
    return f"{conditions} IS NULL"

def _not_null(conditions) -> str:
    return f"{conditions} IS NOT NULL"

def _like(column:Field,pattern:str) -> str:
    return f"{column} LIKE %{pattern}%"

def _in(left:Field,right) -> str:
    return f"{left.__name__} IN {right}"

def _between(column:Field,start:int,stop:int) -> str:
    return f"{column.__name__} BETWEEN {start} AND {stop}"

class ForeignKey(Field):
    
    def __init__(self,name:str,table:Type[SQLModel],_type:type,column:str) -> None:
        self.__name__ = name
        self.__type__ = _type
        self.__sql__ = f"{self.__name__} {DATATYPES[self.__type__.__name__]} , FOREIGN KEY ({self.__name__}) REFERENCES {table().__name__}({column})"
        pass
    
class Query:
    
    def __init__(self,table:Type[SQLModel]) -> None:
        self.table = table
        self.__query__:list[str] = []
        
    def select(self,*columns:Field|ForeignKey,distinct:bool=False,top:int|None=None) -> Self:
        
        if distinct:
            if top:
                select_syntax:str = "SELECT TOP {top} DISTINCT {columns} from {table}".format(top=top)
            else:
                select_syntax:str = "SELECT DISTINCT {columns} from {table}"
        else:
            if top:
                select_syntax:str = "SELECT TOP {top} {columns} from {table}".replace('{top}',str(top))
            else:
                select_syntax:str = "SELECT {columns} from {table}"
        
        
        
        if len(columns) == 0:
            
            self.__query__.append(
                select_syntax.format(
                    table=self.table.__name__.lower(),
                    columns=", ".join([f'{self.table.__name__.lower()}.{x}' for x in self.table().__model_fields__.keys()])
                )
            )
            
            return self
        
        self.__query__.append(
            select_syntax.format(
                    table=self.table.__name__.lower(),
                    columns=", ".join([x.__name__ for x in columns])
                )
        )
        
        return self
    
    def where(self,*conditions) -> Self:
        
        where_syntax: str = "WHERE {conditions}"
        
        self.__query__.append(
            where_syntax.format(
                conditions="".join(conditions)
            )
        )
                
        return self
    
    def insert(self,values_obj:SQLModel) -> Self:
        insert_syntax: str = "INSERT INTO {table} VALUES ({values})"
        
        values_obj_str:list[str] = []
        
        for val in [str(x) for x in {key:val for key,val in values_obj.__dict__.items() if '__' not in key}.values()]:
            if isinstance(val,str):
                values_obj_str.append(f"'{val}'")
            else:
                values_obj_str.append(val)
        
        self.__query__.append(
            insert_syntax.format(
                values=', '.join(values_obj_str),
                table=self.table.__name__.lower()
            )
        )
        return self
    
    def order_by(self,*column:Field[Any],order:str='ASC') -> Self:
        
        order_by_syntax = "ORDER BY {columns} {order_type}"
        self.__query__.append(
            order_by_syntax.format(
                columns=''.join([x.__name__ for x in column]),
                order_type=order,
            )
        )
        return self
    
    def update(self,columns_set:SQLModel) -> Self:
        
        update_table_syntax = 'UPDATE {table} SET {columns_set}'
        
        values_obj_str:list[str] = []
        
        for key,val in {key:val for key,val in columns_set.__dict__.items() if '__' not in key}.items():
            if isinstance(val,str):
                values_obj_str.append(f"{key} = '{val}'")
            else:
                values_obj_str.append(f"{key} = {val}")
        
        self.__query__.append(
            update_table_syntax.format(
                columns_set=', '.join(values_obj_str),
                table=self.table.__name__.lower()
            )
        )
        
        return self
    
    def delete(self) -> Self:
        
        delete_syntax = "DELETE FROM {table}"
        
        self.__query__.append(
            delete_syntax.format(
                table=self.table.__name__
            )
        )
        
        return self
    
    def min(self,column:Field) -> Self:
        
        min_syntax = "SELECT MIN({column}) FROM {table}"
        
        self.__query__.append(
            min_syntax.format(
                column = column.__name__,
                table = self.table.__name__.lower()
            )
        )
        
        return self
    
    def max(self,column:Field) -> Self:
        
        max_syntax = "SELECT MAX({column}) FROM {table}"
        
        self.__query__.append(
            max_syntax.format(
                column = column.__name__,
                table = self.table.__name__.lower()
            )
        )
        
        return self
    
    def count(self,column:Field) -> Self:
        
        count_syntax = "SELECT COUNT({column}) FROM {table}"
        
        self.__query__.append(
            count_syntax.format(
                column = column.__name__,
                table = self.table.__name__.lower()
            )
        )
        
        return self
    
    def sum(self,column) -> Self:
        
        sum_syntax = "SELECT SUM({column}) FROM {table}"
        
        self.__query__.append(
            sum_syntax.format(
                column = column.__name__,
                table = self.table.__name__.lower()
            )
        )
        
        return self
    
    
    def avg(self,column) -> Self:
        
        avg_syntax = "SELECT AVG({column}) FROM {table}"
        
        self.__query__.append(
            avg_syntax.format(
                column = column.__name__,
                table = self.table.__name__.lower()
            )
        )
        
        return self
    
    def join(self,column,type:str) -> Self:
        return self
    
class Session:
    
    def __init__(self,engine:sqlite3.Connection) -> None:
        self.engine: sqlite3.Connection = engine
        self.cursor: sqlite3.Cursor = engine.cursor()
    
    def __enter__(self) -> Self:
        return self
    
    def __exit__(self,exception_type, exception_value, exception_traceback) -> None:        
        self.cursor.close()
        if exception_value:
            raise Exception(exception_value)
        pass
    
    def execute(self,query_obj:Query) -> Self:
        self.cursor.execute(' '.join(query_obj.__query__))
        return self
    
    def commit(self) -> Self:
        self.engine.commit()
        return self
    
    def fetchall(self) -> list:
        columns:tuple = tuple([x[0] for x in self.cursor.description])
        rows_data: list[Any] = self.cursor.fetchall()
        data:list[dict] = []
        for single_row in rows_data:
            row_value:dict = {}
            for value,col in zip(single_row,columns):
                row_value.update({col:value})
            data.append(row_value)
        return data

# ORM CODE FLOW

In [32]:
class User(SQLModel):
    id:Field[str] = Field(name='id',_type=str,primary_key=True)
    name:Field[str] = Field(name='name',_type=str,required=True)
    email:Field[str] = Field(name='email',_type=str,required=True)

class Item(SQLModel):
    id:Field[str] = Field(name='id',_type=str,primary_key=True)
    name:Field[str] = Field(name='name',_type=str,required=True)
    price:Field[float] = Field(name='price',_type=float,required=True)
    user_id:ForeignKey = ForeignKey(name='user_id',_type=int,table=User,column="id")

In [33]:
conn: sqlite3.Connection = sqlite3.connect(":memory:")
SQLModel.execute_table(conn)

<sqlite3.Cursor at 0x1fdb2507cc0>

In [34]:
user_var1 = User(id=str(uuid.uuid1()),name="karl",email="karlalferezfx@gmail.com")
karl_item_1 = Item(id=str(uuid.uuid1()),name="soap",price=3.14,user_id=user_var1.id)
karl_item_2 = Item(id=str(uuid.uuid1()),name="shampoo",price=1.50,user_id=user_var1.id)
karl_item_3 = Item(id=str(uuid.uuid1()),name="toothpaste",price=5.00,user_id=user_var1.id)

user_var2 = User(id=str(uuid.uuid1()),name="andrei",email="andrei_barlaan@gmail.com")
andrei_item_1 = Item(id=str(uuid.uuid1()),name="face mask",price=0.01,user_id=user_var2.id)
andrei_item_2 = Item(id=str(uuid.uuid1()),name="yosi",price=0.10,user_id=user_var2.id)
andrei_item_3 = Item(id=str(uuid.uuid1()),name="disposable vape",price=5.50,user_id=user_var2.id)

In [35]:
with Session(conn) as db:
  
  insert_user1: Query = Query(User).insert(user_var1)
  insert_user2: Query = Query(User).insert(user_var2)
  db.execute(insert_user1)
  db.execute(insert_user2)
  db.commit()

  for item in [karl_item_1,karl_item_2,karl_item_3,andrei_item_1,andrei_item_2,andrei_item_3]:
    insert_stmt:Query = Query(Item).insert(item)
    db.execute(insert_stmt)
  db.commit()

  select_price: Query = Query(User).select().where(User.email == 'andrei_barlaan@gmail.com')
  
  data: list = db.execute(select_price).fetchall()

In [36]:
print(data)

[{'id': '208fe5bf-69e0-11ee-b9a4-44032cfb3c03', 'name': 'andrei', 'email': 'andrei_barlaan@gmail.com'}]


# ORM QUERY 

In [37]:
Query(User).select().__query__

['SELECT user.id, user.name, user.email from user']

In [38]:
Query(User).select(distinct=True).__query__

['SELECT DISTINCT user.id, user.name, user.email from user']

In [39]:
Query(User).select(top=5).__query__

['SELECT TOP 5 user.id, user.name, user.email from user']

In [40]:
Query(User).select().where(User.name == 'example').__query__

['SELECT user.id, user.name, user.email from user', "WHERE name = 'example'"]

In [41]:
Query(User).select().order_by(User.name,order='ASC').__query__

['SELECT user.id, user.name, user.email from user', 'ORDER BY name ASC']

In [42]:
Query(User).select().where(_and(User.name == 'example',User.email == 'email@example.com')).__query__

['SELECT user.id, user.name, user.email from user',
 "WHERE name = 'example' AND email = 'email@example.com'"]

In [43]:
Query(User).select().where(_or(User.name == 'example',User.email == 'email@example.com')).__query__

['SELECT user.id, user.name, user.email from user',
 "WHERE name = 'example' OR email = 'email@example.com'"]

In [44]:
Query(User).select().where(_not(User.name == 'example')).__query__

['SELECT user.id, user.name, user.email from user',
 "WHERE NOT name = 'example'"]

In [45]:
Query(User).insert(user_var1).__query__

["INSERT INTO user VALUES ('208fbeb3-69e0-11ee-b0cf-44032cfb3c03', 'karl', 'karlalferezfx@gmail.com')"]

In [46]:
Query(User).select().where(_null(User.name == 'example')).__query__

['SELECT user.id, user.name, user.email from user',
 "WHERE name = 'example' IS NULL"]

In [47]:
Query(User).select().where(_not_null(User.name == 'example')).__query__

['SELECT user.id, user.name, user.email from user',
 "WHERE name = 'example' IS NOT NULL"]

In [48]:
Query(User).update(user_var1).where(User.id == '123456').__query__

["UPDATE user SET id = '208fbeb3-69e0-11ee-b0cf-44032cfb3c03', name = 'karl', email = 'karlalferezfx@gmail.com'",
 "WHERE id = '123456'"]

In [49]:
Query(User).delete().where(User.id == '123456').__query__

['DELETE FROM User', "WHERE id = '123456'"]

In [50]:
Query(User).min(User.name).where(User.id == '123456').__query__

['SELECT MIN(name) FROM user', "WHERE id = '123456'"]

In [51]:
Query(User).max(User.name).where(User.id == '123456').__query__

['SELECT MAX(name) FROM user', "WHERE id = '123456'"]

In [52]:
Query(User).count(User.name).__query__

['SELECT COUNT(name) FROM user']

In [53]:
Query(Item).sum(Item.price).where(Item.user_id == '1231231232').__query__


['SELECT SUM(price) FROM item', "WHERE user_id = '1231231232'"]

In [54]:
Query(Item).avg(Item.price).where(Item.user_id == '1231231232').__query__

['SELECT AVG(price) FROM item', "WHERE user_id = '1231231232'"]

In [55]:
Query(Item).select(Item.price).where(_like(Item.name,'1231231232')).__query__

['SELECT price from item', 'WHERE  LIKE %1231231232%']

In [56]:
Query(User).select().where(_in(User.name,('karl','robeck','alferez'))).__query__

['SELECT user.id, user.name, user.email from user',
 "WHERE name IN ('karl', 'robeck', 'alferez')"]

In [57]:
Query(User).select().where(_between(User.id,0,10)).__query__

['SELECT user.id, user.name, user.email from user',
 'WHERE id BETWEEN 0 AND 10']

In [58]:
Query(User).select().join(Item,type='inner').where(_in(User.name,('karl','robeck','alferez')))

<__main__.Query at 0x1fdb2575dd0>