# What's Inside a Data Query Engine  
## *Building one from Scratch*  

## Part 2: Just A Tad More Detail 
  
![What's Inside a Data Query Engine](./images/dataengine05.png)

### <font color='green'>__Support for Google Colab__  </font>  
    
open this notebook in Colab using the following button:  
  
<a href="https://colab.research.google.com/github/shauryashaurya/learn-data-munging/blob/main/00-Python-Collections/01.03%20Fun%20with%20Functools.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>  

  
<font color='green'>uncomment and execute the cell below to setup and run this notebook on Google Colab.</font>

In [None]:
# # SETUP FOR COLAB: select all the lines below and uncomment (CTRL+/ on windows)
# # Let's download and unzip the Small MovieLens Dataset
# ! mkdir ./../data
# ! wget -q https://files.grouplens.org/datasets/movielens/ml-latest-small.zip
# ! unzip ./ml-latest-small.zip -d ./../data/

### Get the _Small_ MovieLens Dataset

We'll use the [small MovieLens dataset](https://grouplens.org/datasets/movielens/#:~:text=Small%3A%20100%2C000%20ratings%20and%203%2C600%20tag%20applications) here.

Download it and unzip to the data folder under the name `ml-latest-small`.

This dataset expands to about 3.2 MB on your local disk. 

In [None]:
datalocation = "./data/ml-latest-small/"

In [None]:
# specify file names
file_path_movies = datalocation + "movies.csv"
file_path_links = datalocation + "links.csv"
file_path_ratings = datalocation + "ratings.csv"
file_path_tags = datalocation + "tags.csv"

# Here's what our data engine should be able to do  
* Load the data into the memory and capture some metadata (things like column names, data types etc.)  
* Get a query, a SELECT (xxx) FROM (xxx) WHERE (XXX)  
* Parse the query to make sense of it  
* Highlight if there are any errors  
* Build a query plan  
* By looking at the plan and metadata, optimize the query futher  
* Execute the query  
* Show the results  
* Show the cost of running the query  
  
  
_The full set of notebooks also covers JOINs and nested queries, but we are going to treat them as intermediate to advanced cases - since they may distract us from the goal of just being able to understand how data engines work._

We'll directly use the [CSV module](https://docs.python.org/3/library/csv.html) here just to keep our focus on the data engine itself and not get distracted by the intricacies of loading a CSV file.

# Invert the Pyramid

Once we build it what would our data engine look like?  
  
Let's viz the final code, so we have a map of the terrain before starting the hike.

<font color='red'>Uncommenting the following code will cause miniature blackholes to condense on your microchip. <br/>**Don't do it.**</font> 

### Create a database and a table and data in the table 
```# db = Database()```  
```# db.create_table('table_name', [Column('col1', str), Column('col2', str), Column('col3', str)])```  
```# db.load_csv('table_name', file_path)```

### Example query    
```# sql_query = "SELECT genres FROM movies WHERE genres = 'Comedy'"```

### Parsing the query to AST    
```# tokens = tokenize(sql_query)```  
```# parser = Parser(iter(tokens))```  
```# ast = parser.parse()```  

### Creating the query plan  
```# planner = QueryPlanner(db)```  
```# plan = planner.create_plan(ast)```  

### Optimizing the query plan  
```# optimizer = QueryOptimizer(db)```  
```# optimized_plan = optimizer.optimize(plan)```  

### Creating the execution plan  
```# execution_plan = create_executable_plan(optimized_plan, db)```  

### Executing the query    
```# query_result, query_costs = execute_query(executable_plan)```  

### Output the result  
```# print(query_costs)```  

# Import the dependencies

In [None]:
import csv
import re
from typing import List, NamedTuple
from enum import Enum, auto

# Storage Layer  
  
Manages I/O  
If we try, this can be a mini data engine all by itself  

In [None]:
class Column:
    def __init__(self, name, col_type):
        self.name = name
        self.col_type = col_type

In [None]:
class Table:
	def __init__(self, name, columns):
		self.name = name
		self.columns = {col.name: col for col in columns}
		self.data = []

	def table_scan(self):
		return self.data

	def insert(self, row):
		if set(row.keys()) != set(self.columns.keys()):
			raise ValueError("Row does not match table schema")
		# Type checking and conversion can be added here
		self.data.append(row)

	def select(self, columns, limit = -1, where_clause=None):
		result = []
		counter = 0
		for row in self.data:
			counter += 1
			if (counter<=limit or limit == -1) and (where_clause is None or where_clause.evaluate(row)):
				result_row = {col: row[col] for col in columns}
				result.append(result_row)
			else:
				break
		return result

	# Todo: Additional methods for update, delete etc.

In [None]:
# Convert the string value from CSV to the specified data type.
def convert_type(value, col_type):
    if col_type == int:
        return int(value)
    elif col_type == float:
        return float(value)
    elif col_type == str:
        return value
    else:
        raise TypeError(f"Unsupported column type: {col_type}")

In [None]:
class Database:
	def __init__(self):
		self.tables = {}

	def create_table(self, name, columns):
		self.tables[name] = Table(name, columns)

	def get_table(self, name):
		return self.tables.get(name)
	
	def load_csv(self, table_name, file_path, delimiter=',', quotechar='"', escapechar=None, quoting=csv.QUOTE_MINIMAL):
		print("Database: load_csv: file_path = ",file_path)
		table = self.get_table(table_name)
		if not table:
			raise ValueError(f"Table {table_name} does not exist")
		# pay attention to the dialect of the CSV
		with open(file_path, 'r', encoding='utf-8') as file:
			reader = csv.DictReader(file, delimiter=delimiter, quotechar=quotechar, escapechar=escapechar, quoting=quoting)
			for row in reader:
				converted_row = {col: convert_type(row[col], table.columns[col].col_type) for col in row}
				table.insert(converted_row)

In [None]:
# Test our database
db_test = Database() # Real original naming
db_test.create_table('users', [Column('name', str), Column('age', int)])
db_test.get_table('users').insert({'name': 'Alice', 'age': 30})
print(db_test.get_table('users').select(['name', 'age']))

In [None]:
# Try it on our movies file
# movieId,title,genres
db_movies = Database()
db_movies.create_table('movies', [Column('movieId', str), Column('title', str), Column('genres', str)])
db_movies.load_csv('movies', file_path_movies)

In [None]:
# let's try the ratings file too - see if type conversions work better
# userId,movieId,rating,timestamp
db_movies.create_table('ratings', [Column('userId', str), 
								   Column('movieId', str), 
								   Column('rating', float), 
								   Column('timestamp', float)])
db_movies.load_csv('ratings', file_path_ratings)
# the above fails silently if you give it the wrong CSV file to ingest, 
# a better data engine needs to catch that.

In [None]:
# lists all the titles
# db.get_table('movies').select(['title'])
db_movies.get_table('movies').select(['title'], limit = 10)
# o bc! chal gaya!!! looks like a pandas or pyspark query tho...

In [None]:
# ratings found in the file
db_movies.get_table('ratings').select(['movieId', 'rating'], limit = 15)

That ```get_table()``` will be a ```TableScanOperator``` later. 

# SQL Parser  
  
## 3 parts to it
1. **Abstract Syntax Tree** - Puts the tokens in a specific order of 'nodes'. This way, the system can go to each node and execute the operation the node represents. The 'tree' part is traditional (I think, Idon'tknow), these days its more a Directed Acyclic Graph (DAG).
2. **Tokenizer** - takes a SQL query string and breaks it into meaningful chunks
3. **Parser** - The actual parser. This will take the tokens from a tokenizer and generate the AST (or DAG, keep your wits around)
  
  
This can sound confusing at first, only because lots of nouns are involved.   
Stay with it, it's really simple.  
  

## AST Node Definitions
  
Our simple database has only a few types of nodes.  
One to represent the processing of a WHERE clause in the SELECT query.  
Another to represent the SELECT statement itself.  

...and for simplicity's sake we do not implement the WHERE clause right now. Let's attend to that later.  

In [None]:
# Base class
class ASTNode:
    pass

In [None]:
class WhereClause(ASTNode):
    def __init__(self, condition):
        self.condition = condition  # This could be a more complex structure in a full implementation

In [None]:
class SelectStatement(ASTNode):
	def __init__(self, columns, table_name, where_clause=None):
		self.columns = columns  # List of column names or '*'
		self.table_name = table_name  # Name of the table
		self.where_clause = where_clause  # WhereClause node or None
		print("SelectStatement: this Select Statement has \nColumns: ",str(list(self.columns)),"\nTable: ",str(self.table_name),"\nWHERE CLAUSE: ", str(self.where_clause))
		
	# def __repr__(self):
		# return "Select Statement has \nColumns: "+str(list(self.columns)),"\nTable: "+str(self.table_name)+"and WHERE CLAUSE: "+ str(self.where_clause)

## Tokenize   
Break a string into meaningful parts. 

In [None]:
# all the different types of tokens we'll support here.
# try ANSI SQL standard for a list of what most data engines support...
class TokenType(Enum):
	SELECT = auto()
	ASTERISK = auto()
	FROM = auto()
	WHERE = auto()
	JOIN = auto()
	ON = auto()
	ORDER = auto()
	BY = auto()
	GROUP = auto()
	HAVING = auto()
	INSERT = auto()
	UPDATE = auto()
	DELETE = auto()
	IDENTIFIER = auto()
	STRING = auto()
	NUMBER = auto()
	OPERATOR = auto()
	PUNCTUATION = auto() # ignoring punctuation could be problematic, it's better to support proper COMMA, SEMICOLON etc.
	WHITESPACE = auto()  # so we can ignore it in further processing

In [None]:
# Define a dictionary for quick keyword lookup
SQL_KEYWORDS = {
	'SELECT': TokenType.SELECT,
	'FROM': TokenType.FROM,
	'WHERE': TokenType.WHERE,
	'JOIN': TokenType.JOIN,
	'ON': TokenType.ON,
	'ORDER': TokenType.ORDER,
	'BY': TokenType.BY,
	'GROUP': TokenType.GROUP,
	'HAVING': TokenType.HAVING,
	'INSERT': TokenType.INSERT,
	'UPDATE': TokenType.UPDATE,
	'DELETE': TokenType.DELETE,
	'ASTERISK': TokenType.ASTERISK
}
# TODO: is it too much to ask that you match this with the TokenTypes????

In [None]:
# actual maaramari
# TODO: like cool-re do 2 or 3 types of tokenizers
def tokenize(sql):
	token_patterns = r'''
		('[^']*'|"[^"]*")			  # String literals
	  | (<=|>=|<>|!=|<|>|=)			  # Comparison operators
	  | (\d+\.\d*|\.\d+|\d+)		  # Numeric values
	  | ([,;()])					  # Punctuation
	  | (\b[a-zA-Z_][a-zA-Z0-9_]*\b)  # Identifiers or SQL keywords
	  | (\s+)						  # Whitespace
	'''
	token_regex = re.compile(token_patterns, re.VERBOSE) #VERBOSE allows for multiline regex with comments 
	for match in token_regex.finditer(sql):
		token = match.group(0)
		if token.isspace():
			yield (token, TokenType.WHITESPACE)
		elif token in (',', ';', '(', ')'):
			yield (token, TokenType.PUNCTUATION)
		elif token in ('*'):
			yield (token, TokenType.ASTERISK)
		elif token.upper() in SQL_KEYWORDS:
			yield (token.upper(), SQL_KEYWORDS[token.upper()])
		elif re.match(r'^[\'"].*[\'"]$', token):
			yield (token, TokenType.STRING)
		elif re.match(r'^\d+(\.\d+)?$', token):
			yield (token, TokenType.NUMBER)
		elif re.match(r'<=|>=|<>|!=|<|>|=$', token):
			yield (token, TokenType.OPERATOR)
		else:
			yield (token, TokenType.IDENTIFIER)

In [None]:
# Test 
sql_query_test_01 = "SELECT name, age FROM users WHERE age >= 21 AND status = 'active' ORDER BY age DESC;"
t = tokenize(sql_query_test_01)
# 
print(list(t))
# print(next(t,None))

## Parser  

Take a SQL query string, break it into meaningful parts, assign nodes to each part.

In [None]:
# wish this was smaller, breaking it into multiple cells for ease.
# notice the
# Parser.method = method 
# hack

In [None]:
class Parser:
	def __init__(self, tokens):
		self.tokens = tokens
		self.current_token = None
		self.next_token = None
		self._next_token()

In [None]:
def _next_token(self):
	try:
		self.current_token = self.next_token
		self.next_token = next(self.tokens, None)
	except StopIteration:
		self.current_token = None
# 
Parser._next_token = _next_token

In [None]:
# Main parsing loop - generates a SelectStatement AST node
# This could be better implemented as a plain regex match (I think, todo - try that next)
def parse(self):
	
	if (self.current_token == None) and (self.next_token != None):
		self._next_token()
	# 
	if self.current_token[1] != TokenType.SELECT:
		raise SyntaxError("Query must start with SELECT")
	self._next_token()

	columns = self._parse_columns()

	# skip whitespace
	if self.current_token[1] == TokenType.WHITESPACE:
		while self.current_token[1] == TokenType.WHITESPACE:
			self._next_token()

	if self.current_token[1] != TokenType.FROM:
		raise SyntaxError("Expected FROM after column list")
	self._next_token()

	table_name = self._parse_table_name()

	where_clause = None
	if self.current_token[1] == TokenType.WHERE:
		self._next_token()
		where_clause = self._parse_where_clause()
	# 
	# This is where a SelectStatement AST node is created
	return SelectStatement(columns, table_name, where_clause)
# 
Parser.parse = parse

In [None]:
# kinda long winded, need a better way to handle whitespaces...
# this is supposed to capture one or more columns in your query
def _parse_columns(self):
	columns = []

	# skip whitespace
	while self.current_token[1] == TokenType.WHITESPACE:
		self._next_token()
		
	if self.current_token[1] == TokenType.ASTERISK:
		columns.append('*')
		self._next_token()
	else:
		while True:
			# skip whitespace or commas
			while self.current_token[1] == TokenType.WHITESPACE or self.current_token[1] == TokenType.PUNCTUATION:
				self._next_token()
				
			# do this till you reach from, we do not support sub-queries in this engine yet.
			if self.current_token[1] == TokenType.FROM:
				break
				
			if self.current_token[1] != TokenType.IDENTIFIER:
				raise SyntaxError("Expected column name")
			print("add ",self.current_token[0]," to list of columns")
			columns.append(self.current_token[0])
			self._next_token()
			# move on, punctuation don't need attention in this simple implementation
			if self.current_token[1] == TokenType.PUNCTUATION:
				continue
	return columns
# 
Parser._parse_columns = _parse_columns

In [None]:
# sooo much simpler! I have to do a better parse_columns... 
def _parse_table_name(self):
	# skip whitespace
	while self.current_token[1] == TokenType.WHITESPACE:
		self._next_token()
	if self.current_token[1] != TokenType.IDENTIFIER:
		raise SyntaxError("Expected table name")
	table_name = self.current_token[0]
	self._next_token()
	return table_name
# 
Parser._parse_table_name = _parse_table_name

In [None]:
# TODO - I need to do a better version here...
# generates a WhereClause AST node
def _parse_where_clause(self):
	# In a full implementation, this would need to handle complex expressions.
	# For simplicity, we'll assume it's just a single condition.
	if self.current_token[1] != TokenType.IDENTIFIER and self.current_token[1] != TokenType.WHITESPACE:
		raise SyntaxError("Expected condition after WHERE")
	# condition = self.current_token[0]
	condition = None
	# self._next_token()
	return WhereClause(condition)
# 
Parser._parse_where_clause = _parse_where_clause

## Tests for the parser  
must eat the pudding to prove it is there...

In [None]:
# test this nonsense - need to write a cleaner parser...
sql_query_test_02 = "SELECT name, age FROM users WHERE age > 30"
print("SQL Query:\n\t", sql_query_test_02)
tokens_sql_query_test_02 = tokenize(sql_query_test_02) 
# REMEMBER, as you list(gen) a generator, it's already emitted all values...
# print("Tokenized:\n\t",list(tokens_check), " ", tokens_check)
parser_sql_query_test_02 = Parser(tokens_sql_query_test_02)
# print("Parser:\n\t",tokens)
ast_sql_query_test_02 = parser_sql_query_test_02.parse()

In [None]:
sql_query_test_03 = "SELECT numpty, humpty, dumpty FROM poems WHERE fall LIKE '%great%'"
print("SQL Query:\n\t", sql_query_test_03)
tokens_sql_query_test_03 = tokenize(sql_query_test_03) 
parser_sql_query_test_03 = Parser(tokens_sql_query_test_03)
ast_sql_query_test_03 = parser_sql_query_test_03.parse()

In [None]:
# TODO: For a more evolved parser, wouldn't it be great to draw the DAG? Like Spark and Dask do?
# print(ast)

# Query Planner  
  
Given an AST, create a plan for executing the query.

In [None]:
class QueryPlan:
	def __init__(self):
		self.steps = []

	def add_step(self, step):
		self.steps.append(step)

	def list_steps(self):
		return str(list(self.steps))

In [None]:
class QueryPlanner:
    def __init__(self, database):
        self.database = database

    def create_plan(self, ast):
        plan = QueryPlan()

        if isinstance(ast, SelectStatement):
            # Step 1: Full Table Scan
            plan.add_step(('FullTableScan', ast.table_name))

            # Step 2: Filter (WHERE clause)
			# - like before we are not getting into the where clause
			# just to keep things simple and easy to grok
            if ast.where_clause:
                plan.add_step(('Filter', ast.where_clause))

            # TODO - Step 3: Joins
            # for join in ast.joins:
            #     plan.add_step(('Join', join.table_name, join.on_condition))

            # Additional steps like aggregations can be added here

        return plan

In [None]:
# check

db_movies



In [None]:
planner = QueryPlanner(db)
plan = planner.create_plan(ast)  # Assuming 'ast' is an AST from the parser

In [None]:
print(plan.list_steps())

# Query Plan Optimizer

In [None]:
class QueryOptimizer:
    def __init__(self, database):
        self.database = database

    def optimize(self, plan):
        optimized_plan = self._apply_predicate_pushdown(plan)
        optimized_plan = self._apply_join_reordering(optimized_plan)
        return optimized_plan

    def _apply_predicate_pushdown(self, plan):
        # This is a placeholder for predicate pushdown logic.
        # In practice, you would modify the plan to move filters closer to the data source.
        return plan

    def _apply_join_reordering(self, plan):
        # This is a placeholder for join reordering logic.
        # In practice, you would reorder joins based on size, indexes, or other factors.
        return plan

In [None]:
# test our dummy optimizer
optimizer = QueryOptimizer(db)
optimized_plan = optimizer.optimize(plan)  # Assuming 'plan' is from the QueryPlanner

In [None]:
print(optimized_plan.list_steps())

# The Execution Engine

## Executable Operations

In [None]:
# basic class with a cost function - 
# because our database will natively provide cost of execution for every query...

class BaseOperator:
    def __init__(self):
        self.rows_processed = 0

    def get_cost(self):
        return {'rows_processed': self.rows_processed}

# Extend this base class in SelectOperator, JoinOperator, GroupByOperator etc.

In [None]:
class TableScanOperator(BaseOperator):
	def __init__(self, table): # Todo: provide a condition filter here to reduce scans -  condition=None)
		
		# self.db = db
		# self.table = self.db.get_table(table)
		self.table = table
		# self.condition = condition

	def execute(self):
		# table_scan = (row for row in self.table)
		table_scan = self.table.table_scan()
		self.rows_processed = len(table_scan)
		return table_scan

	def get_cost(self):
		return super().get_cost()


In [None]:
class SelectOperator(BaseOperator):
	def __init__(self, input_operator, select_columns):
		super().__init__()
		self.input_operator = input_operator
		self.select_columns = select_columns
		self.result_limit = 25

	def execute(self):
		result = []
		for row in self.input_operator.execute():
			self.rows_processed += 1
			if self.rows_processed <= self.result_limit:
				selected_row = {col: row[col] for col in self.select_columns}
				result.append(selected_row)
			else:
				break
		return result

	def get_cost(self):
		return super().get_cost()

## Execution Plan

In [None]:
def create_executable_plan(plan, database):
	executable_plan = []
	for step in plan.steps:
			if step[0] == 'FullTableScan':
				table_name = step[1]
				executable_plan.append(TableScanOperator(database.get_table(table_name)))

			elif step[0] == 'Select':
				columns = step[1]
				input_operator = executable_plan[-1]	# The input is the result of the previous step
				executable_plan.append(SelectOperator(input_operator, columns))

			# just fo sho - first get the where clause sorted...
			# elif step[0] == 'Join':
			#		# Assuming step format is ('Join', right_table_name, join_condition)
			#		right_table = database.get_table(step[1])
			#		join_condition = step[2]
			#		left_operator = executable_plan[-1]
			#		right_operator = TableScanOperator(right_table)
			#		executable_plan.append(JoinOperator(left_operator, right_operator, join_condition))

			# Add cases for other types of steps ('GroupBy', 'Filter' etc etc here...)

	return executable_plan

## Executing Queries

In [None]:
def execute_query(executable_plan):
	for operator in executable_plan:
		cost = {}
		result = operator.execute()
		cost[str(operator)] = operator.get_cost()

	# The result of the last operation is the result of the entire query
	return result, cost


# The Pyramid

Test the final engine, this is where the pudding rubber meets the pudding road, just to mix my metaphors...  

<font color='red'><br/>**Stand Back, this may blow up**</font> 

### Create a database and a table and data in the table 

In [None]:
db = Database()
db.create_table('movies', [Column('movieId', str), Column('title', str), Column('genres', str)])
db.load_csv('movies', file_path_movies)

### Example query    

In [None]:
# Example query
sql_query = "SELECT genres FROM movies WHERE genres = 'Comedy'"

### Parsing the query to AST    

In [None]:
# Parsing the query to AST
tokens = tokenize(sql_query)
parser = Parser(iter(tokens))
ast = parser.parse()

### Creating the query plan    

In [None]:
# Creating the query plan
planner = QueryPlanner(db)
plan = planner.create_plan(ast)

### Optimizing the query plan   

In [None]:
# Optimizing the query plan
optimizer = QueryOptimizer(db)
optimized_plan = optimizer.optimize(plan)

### Creating the execution plan   

In [None]:
# Creating the executable plan
executable_plan = create_executable_plan(optimized_plan, db)

### Executing the query      

In [None]:
# Executing the query
query_result, query_costs = execute_query(executable_plan)

### Output the result  

In [None]:
# Output the result
print(query_costs)

In [None]:
# Output the result
print(query_result)

Whoa!

# Next

That Where clause, JOINS and Optimizers