In [1]:
%pip install psycopg2-binary

Note: you may need to restart the kernel to use updated packages.


# How to use python to connect postgres

In [2]:
# psycopg2 is a PostgreSQL database adapter for Python. It enables your Python programs to communicate with the PostgreSQL database engine.
import psycopg2

In [3]:
# Establish a connection to the PostgreSQL server
conn = psycopg2.connect(
        host="db",        # The hostname of your PostgreSQL server
        dbname="test_db", # The name of the database you want to connect to
        user="root",      # The username that you have set up for the database
        password="root",  # The password for your user
        port=5432         # Port number which PostgreSQL server is listening on
    )
print("Successfully connected to the database!")

Successfully connected to the database!


In [4]:
# Instantiate a new cursor object from the connection
# The cursor allows you to execute PostgreSQL command through Python code
cur = conn.cursor()

In [5]:
"""
This will drop all the tables for teaching purposes only. 
It allows you to run this notebook multiple times without worrying about the previous stage of the database. 
You can assume that every time you run this notebook, you will have a clean, fresh database.
"""
def DropAllTable():
    cur.execute("""
        SELECT table_name FROM information_schema.tables
        WHERE table_schema = 'public'
    """)
    for table in cur.fetchall():
        cur.execute("DROP TABLE " + table[0] + " CASCADE;")
    conn.commit()
DropAllTable()

## Execute SQL in python

In [6]:
# Using the cursor object previously created
cur.execute("""
    CREATE TABLE Users (
        email TEXT PRIMARY KEY,
        name TEXT NOT NULL
    );
""") 
# The SQL command will start a transaction and will execute the CREATE statement.
# It creates a new table named 'Users' with 'email' and 'name' fields

conn.commit()  
# The 'commit' command will confirm the transaction, saving the changes to the database.
# It's important to call 'commit' method to ensure that any changes made
# are permanently written in the database.

In [7]:
# Use the cursor object to execute an INSERT SQL statement
cur.execute("""
    INSERT INTO Users (name, email) VALUES 
    ('John Doe', 'john@example.com'),
    ('Jane Doe', 'jane@example.com'),
    ('Bill Smith', 'bill@example.com')
""")
# This SQL command starts a transaction and executes the INSERT statement
# The command inserts three rows of data into the 'Users' table

conn.commit()
# The 'commit' command will confirm the transaction, making the changes to data permanent
# Always call 'commit' after making any changes to ensure they get persisted to the DB

## Retrieve data from an SQL statement

In [8]:
# Use the cursor object to execute a SELECT SQL statement
cur.execute("SELECT * FROM Users") 
# The SELECT statement is used to fetch data from the database.
# It doesn't modify data, so there's no need to commit after a SELECT statement.

# Fetch one rows from the result set using the "fetchone" method
# The fetchone method retrieves the next row of a query result set and returns a single sequence, or None if no more row is available.
row = cur.fetchone()

# while there are rows left to be fetched
while row is not None:
    print(row)  # print the current row
    row = cur.fetchone()  # fetch the next row


('john@example.com', 'John Doe')
('jane@example.com', 'Jane Doe')
('bill@example.com', 'Bill Smith')


In [9]:
# Execute the query using the cursor object
cur.execute("SELECT * FROM Users")

# Fetch all rows from the query
rows = cur.fetchall()

# Iterate over each row and print it
for row in rows:
    print(row)

('john@example.com', 'John Doe')
('jane@example.com', 'Jane Doe')
('bill@example.com', 'Bill Smith')


In [10]:
# Refactor it as a function for easy use in the future.
def getAllUser():
    cur.execute("SELECT * FROM Users")
    return cur.fetchall()

In [11]:
getAllUser()

[('john@example.com', 'John Doe'),
 ('jane@example.com', 'Jane Doe'),
 ('bill@example.com', 'Bill Smith')]

## Pass variable to SQL statement.

In [12]:
# If you want more complex sql 
cur.execute("SELECT * FROM Users WHERE email='jane@example.com' ")
# Fetch all rows from the result set
cur.fetchall()

[('jane@example.com', 'Jane Doe')]

In [13]:
def getUserByemail(email):
    # Using a parameterized query with %s as placeholder for email
    cur.execute("SELECT * FROM Users WHERE email = %s", (email,))
    return cur.fetchall()

In [14]:
getUserByemail("jane@example.com")

[('jane@example.com', 'Jane Doe')]

In [15]:
def createUser(name, email):
    cur.execute("""
         INSERT INTO Users (name, email) 
         VALUES (%s, %s)
         """,
         (name, email)
    )
    conn.commit()

In [16]:
createUser("testFunction", "test@test.com")

## Excercie
- Create a function deleteUser(email)
- Create a function updateUserByEmail(email, name)

# Refacter code 

In [17]:
class UserDB():
    def __init__(self, conn):
        self.conn = conn
        self.cur = conn.cursor()
 
    def getAllUser(self):
        self.cur.execute("SELECT * FROM Users")
        return self.cur.fetchall()

    def getUserByemail(self, email):
        self.cur.execute("SELECT * FROM Users WHERE email = %s", (email,))
        return self.cur.fetchall()
        
    def createUser(self, name, email):
        self.cur.execute("""
             INSERT INTO Users (name, email) 
             VALUES (%s, %s)
             """,
             (name, email)
        )
        self.conn.commit()

In [18]:
userDB = UserDB(conn)

In [19]:
userDB.getAllUser()

[('john@example.com', 'John Doe'),
 ('jane@example.com', 'Jane Doe'),
 ('bill@example.com', 'Bill Smith'),
 ('test@test.com', 'testFunction')]

In [20]:
userDB.createUser("nameFromClass", "email@class.com")

In [21]:
userDB.getAllUser()

[('john@example.com', 'John Doe'),
 ('jane@example.com', 'Jane Doe'),
 ('bill@example.com', 'Bill Smith'),
 ('test@test.com', 'testFunction'),
 ('email@class.com', 'nameFromClass')]

## Exercise 
- Add a `deleteUser(email)` method
- Add an `updateUserByEmail(email, name)` method.

# Get to know the JSON format.
   In short, JSON is similar to a dictionary or a list of dictionaries in Python, but it is represented as a string encoding.

In [22]:
import json

In [23]:
# A multi-line string containing JSON data.
json_data = """
{
    "name": "Tendon",
    "email": "tendon@example.com"
}
"""

# Convert the JSON string into Python dictionary using json.loads() function
dict_data = json.loads(json_data)

# Print the type of dict_data
print(type(dict_data))  # <class 'dict'>

# Print the content of dict_data
print(dict_data)  # {'name': 'Tendon', 'email': 'tendon@example.com'}

<class 'dict'>
{'name': 'Tendon', 'email': 'tendon@example.com'}


In [24]:
json_data = """
[
    {
        "name": "Tendon",
        "email": "tendon@example.com"
    },
    {
        "name": "Tonten",
        "email": "tonten@example.com"
    }
]
"""

# Convert the JSON string into a Python List of dictionaries using json.loads() function
dict_data = json.loads(json_data)

# Print the type of dict_data
print(type(dict_data))  #  <class 'list'>

# Print the content of dict_data
print(dict_data)  # [{'name': 'Tendon', 'email': 'tendon@example.com'}, {'name': 'Tonten', 'email': 'tonten@example.com'}]

<class 'list'>
[{'name': 'Tendon', 'email': 'tendon@example.com'}, {'name': 'Tonten', 'email': 'tonten@example.com'}]


In [25]:
# A dictionary that contains user data
dict_data = {
    'name': 'Tendon', 
    'email': 'tendon@example.com'
}

# Convert the Python dictionary to a JSON string using json.dumps() function
json_data = json.dumps(dict_data)

# Print the type of json_data
print(type(json_data))  # <class 'str'>

# Print the content of json_data
print(json_data)  # {"name": "Tendon", "email": "tendon@example.com"}

<class 'str'>
{"name": "Tendon", "email": "tendon@example.com"}


In [26]:
dict_data = [
    {
        "name": "Tendon",
        "email": "tendon@example.com"
    },
    {
        "name": "Tonten",
        "email": "tonten@example.com"
    }
]

# Convert the Python list of dictionaries to a JSON string using json.dumps() function
json_data = json.dumps(dict_data)

# Print the type of json_data
print(type(json_data))  # <class 'str'>

# Print the content of json_data
print(json_data)  # [{"name": "Tendon", "email": "tendon@example.com"}, {"name": "Tonten", "email": "tonten@example.com"}]


<class 'str'>
[{"name": "Tendon", "email": "tendon@example.com"}, {"name": "Tonten", "email": "tonten@example.com"}]


In [27]:
getAllUser()

[('john@example.com', 'John Doe'),
 ('jane@example.com', 'Jane Doe'),
 ('bill@example.com', 'Bill Smith'),
 ('test@test.com', 'testFunction'),
 ('email@class.com', 'nameFromClass')]

In [28]:
def listOfTuple2listOfDict(users):
    result_list = []  # Initialize an empty list to add dictionaries to
    for user in users:
        user_map = {
            "email": user[0],  # First element of the tuple is mapped to "email"
            "name": user[1],   # Second element of the tuple is mapped to "name"
        }
        result_list.append(user_map)  # Append the new dictionary to the list
    return result_list  # Return the final list

In [29]:
listOfTuple2listOfDict(getAllUser())

[{'email': 'john@example.com', 'name': 'John Doe'},
 {'email': 'jane@example.com', 'name': 'Jane Doe'},
 {'email': 'bill@example.com', 'name': 'Bill Smith'},
 {'email': 'test@test.com', 'name': 'testFunction'},
 {'email': 'email@class.com', 'name': 'nameFromClass'}]

In [30]:
getUserByemail('jane@example.com')

[('jane@example.com', 'Jane Doe')]

In [31]:
listOfTuple2listOfDict(getUserByemail('jane@example.com'))

[{'email': 'jane@example.com', 'name': 'Jane Doe'}]

# Create HTTP server

In [32]:
from http.server import BaseHTTPRequestHandler, HTTPServer

In [33]:
# Importing necessary modules
import json
from http.server import BaseHTTPRequestHandler
# Assuming UserDB class and listOfTuple2listOfDict function have been defined

class Handler(BaseHTTPRequestHandler):
    # Initiating an instance of the UserDB class
    userDB = userDB

    def do_GET(self):
        # Check if the requested path is '/users'
        if self.path == '/users':
            # Retrieve all users from the database
            users = self.userDB.getAllUser()

            # Convert the retrieved users from list of tuples to list of dicts
            listOfDict = listOfTuple2listOfDict(users)

            # Convert the list of dicts to JSON string
            json_string = json.dumps(listOfDict)

            # Sending HTTP response header with status code 200 (Success)
            self.send_response(200)

            # Indicating in the response header that the content type of the response body will be 'application/json'
            self.send_header('Content-type', 'application/json')
            self.end_headers()

            # Write the JSON string to the response body
            self.wfile.write(json_string.encode())

    def do_POST(self):
        # Check if the requested path is '/users'
        if self.path == '/users':
            # Get the length of the request body
            content_len = int(self.headers['Content-Length'])

            # Read and load the request body content which is in JSON format
            post_body = json.loads(self.rfile.read(content_len))

            try:
                # Try to create a new user in the database
                self.userDB.createUser(post_body['name'], post_body['email'])

                # If successful, get the new user's details from the database
                users = self.userDB.getUserByemail(post_body['email'])

                # Convert the user's details to list of dict
                listOfDict = listOfTuple2listOfDict(users)

                # Convert the list of dicts to JSON string
                json_string = json.dumps(listOfDict)

                # Sending HTTP response header with status code 201 (Successfully created)
                self.send_response(201)

                # Indicating in the response header that the content type of the response body will be 'application/json'
                self.send_header('Content-type', 'application/json')
                self.end_headers()

                # Write the JSON data to the response body
                self.wfile.write(json_string.encode())

            except:
                # If there's a problem like the email already exists, it will throw an exception

                # Sending HTTP response header with status code 500 (Server error)
                self.send_response(500)

                # Indicating in the response header that the content type of the response body will be 'application/json'
                self.send_header('Content-type', 'application/json')
                self.end_headers()

                # Preparing the error message in JSON format
                error_json = json.dumps({"error": "may email already exit"})

                # Write the error message to the response body
                self.wfile.write(error_json.encode())



    def do_PUT(self):
        pass

    def do_DELETE(self):
        pass

In [None]:
httpd = HTTPServer(('0.0.0.0', 8000), Handler)
httpd.serve_forever()

127.0.0.1 - - [25/Oct/2023 13:40:52] "GET /users HTTP/1.1" 200 -
127.0.0.1 - - [25/Oct/2023 13:40:52] "POST /users HTTP/1.1" 201 -


### Exercise: 
- Create a PUT API for users (update user based on email)
- Create a DELETE API for users (delete user based on email).