 Install Required Libraries

In [None]:
!pip install sqlalchemy



In [None]:
import sqlalchemy
print(sqlalchemy.__version__)


2.0.40


create_engine() is the universal way to configure connections.

.connect() gives us a live connection object to run queries.

In [None]:
import pandas as pd
from google.colab import files

uploaded = files.upload()

Saving european_database.sqlite to european_database (3).sqlite


In [None]:
from sqlalchemy import create_engine

# Create an engine connected to the database
engine = create_engine("sqlite:///european_database.sqlite")

# Connect to the database
conn = engine.connect()


Create a MetaData() object to store schema.

Use metadata.reflect(bind=engine) to read existing tables.

In [None]:
from sqlalchemy import MetaData

# Store table and schema structure
metadata = MetaData()

# Automatically read table definitions
metadata.reflect(bind=engine)

# Print table names
print(metadata.tables.keys())


dict_keys([])


In [None]:
from sqlalchemy import Table, Column, Integer, Text

# # Define a new table
teams = Table(
    'teams', metadata,
    Column('team_id', Integer, primary_key=True),
    Column('team_name', Text, nullable=False),
    Column('city', Text, nullable=False)
)

# Create the table in the database (if it doesn't already exist)
metadata.create_all(engine)

# Insert multiple rows
insert_query = teams.insert().values([
    {'team_id': 1, 'team_name': 'Team A', 'city': 'City A'},
    {'team_id': 2, 'team_name': 'Team B', 'city': 'City B'},
    {'team_id': 3, 'team_name': 'Team C', 'city': 'City C'}
])

# Run the insert operation
conn.execute(insert_query)


<sqlalchemy.engine.cursor.CursorResult at 0x791afbd78ec0>

In [None]:
if 'teams' in metadata.tables:
    print(repr(metadata.tables['teams']))
else:
    print("Table 'teams' does not exist.")


Table('teams', MetaData(), Column('team_id', Integer(), table=<teams>, primary_key=True, nullable=False), Column('team_name', Text(), table=<teams>, nullable=False), Column('city', Text(), table=<teams>, nullable=False), schema=None)


In [None]:
from sqlalchemy import select

# Reference the 'teams' table
teams = metadata.tables['teams']

# Create a SELECT query
# query = select(teams)

# Execute query
result_proxy = conn.execute(select(teams))

# Fetch all results
rows = result_proxy.fetchall()

# Print each row
for row in rows:
    print(row)



(1, 'Team A', 'City A')
(2, 'Team B', 'City B')
(3, 'Team C', 'City C')


In [None]:
import pandas as pd


query = select(teams.c)  # select all columns
 # Read SQL results into DataFrame
df = pd.read_sql(query, conn)

# Print schema: column names and data types
print("Columns in 'teams' table:")
for col in teams.columns:
  print(f"{col.name}: {col.type}")

print("\nRows in 'teams' table:")
print(df.head())

Columns in 'teams' table:
team_id: INTEGER
team_name: TEXT
city: TEXT

Rows in 'teams' table:
   team_id team_name    city
0        1    Team A  City A
1        2    Team B  City B
2        3    Team C  City C


In [None]:
query = teams.select()  # Same as select(teams)
print(query)


SELECT teams.team_id, teams.team_name, teams.city 
FROM teams


In [None]:
exe = conn.execute(query)         # Execute query
result = exe.fetchmany(2)         # Get top 2 rows
print(result)


[(1, 'Team A', 'City A'), (2, 'Team B', 'City B')]


In [None]:
query = select(teams.c.team_name) #Show only a specific column
result = conn.execute(query).fetchall()
for row in result:
    print(row)


('Team A',)
('Team B',)
('Team C',)


In [None]:
from sqlalchemy import distinct

query = select(distinct(teams.c.city)) #Show only distinct values of a column
result = conn.execute(query).fetchall()
for row in result:
    print(row)


('City A',)
('City B',)
('City C',)


In [None]:
from sqlalchemy import distinct

query = select(distinct(teams.c.city)) #Show only distinct values of a column
# result = conn.execute(query).fetchall()
# print(result[0])

result = conn.execute(query).fetchone()
print(result)

('City A',)


In [None]:
import sqlalchemy as db

# Step 1: Create a new SQLite database engine
engine = db.create_engine('sqlite:///datacamp.sqlite')

# Step 2: Establish connection
conn = engine.connect()

# Step 3: Initialize metadata
metadata = db.MetaData()

# Step 4: Define 'Student' table schema
Student = db.Table('Student', metadata,
    db.Column('Id', db.Integer(), primary_key=True, autoincrement=True),
    db.Column('Name', db.String(255), nullable=False),
    db.Column('Major', db.String(255), default="Math"),
    db.Column('Pass', db.Boolean(), default=True)
)

# Step 5: Create table in the database
metadata.create_all(engine)


In [None]:
# Insert one row
query = db.insert(Student).values(Name='Matthew', Major='English', Pass=True)
result = conn.execute(query)
print(result)

# Query to check inserted data
query = db.select(Student)
result_proxy = conn.execute(query)
rows = result_proxy.fetchall()

print("\nRows in 'Student' table:")
for row in rows:
    print(row)


<sqlalchemy.engine.cursor.CursorResult object at 0x791af7b35be0>

Rows in 'Student' table:
(1, 'Matthew', 'English', True)


In [None]:
# Insert multiple rows
query = db.insert(Student)
values_list = [
    {'Name': 'Nisha', 'Major': "Science", 'Pass': False},
    {'Name': 'Natasha', 'Major': "Math", 'Pass': True},
    {'Name': 'Ben', 'Major': "English", 'Pass': False}
]

result = conn.execute(query, values_list)

# Check results
output = conn.execute(db.select(Student)).fetchall()
print(output)


[(1, 'Matthew', 'English', True), (2, 'Nisha', 'Science', False), (3, 'Natasha', 'Math', True), (4, 'Ben', 'English', False)]


In [None]:
from sqlalchemy.sql import text

query = text("SELECT Name, Major FROM Student WHERE Pass = True")
output = conn.execute(query)
print(output.fetchall())


[('Matthew', 'English'), ('Natasha', 'Math')]


In [None]:
from sqlalchemy.sql import text

query = text("SELECT Major, COUNT(*) FROM Student GROUP BY Major")
output = conn.execute(query)
print(output.fetchall())


[('English', 2), ('Math', 1), ('Science', 1)]


In [None]:
query = Student.select().where(Student.columns.Major == 'English')
output = conn.execute(query)
print(output.fetchall())


[(1, 'Matthew', 'English', True), (4, 'Ben', 'English', False)]


In [None]:
query = Student.select().where(Student.columns.Pass == True)
output = conn.execute(query)
print(output.fetchall())

[(1, 'Matthew', 'English', True), (3, 'Natasha', 'Math', True)]


In [None]:
query = Student.select().where(
    db.and_(
        Student.columns.Major == 'English',
        Student.columns.Pass == False
    )
)
output = conn.execute(query)
print(output.fetchall())


[(4, 'Ben', 'English', False)]


In [None]:
# IN clause
Student.select().where(Student.columns.Major.in_(['English', 'Math']))

# AND/OR condition
Student.select().where(
    db.or_(
        Student.columns.Major == 'English',
        Student.columns.Pass == True
    )
)

# ORDER BY
Student.select().order_by(db.desc(Student.columns.Name))

# LIMIT
Student.select().limit(3)


<sqlalchemy.sql.selectable.Select object at 0x791af7c0d850>

In [None]:
from sqlalchemy import func, select

# Sum of Id values
sum_query = select(func.sum(Student.columns.Id))
print(conn.execute(sum_query).fetchall())

# Average of Id values
avg_query = select(func.avg(Student.columns.Id))
print(conn.execute(avg_query).fetchall())

# Count of students
count_query = select(func.count(Student.columns.Id))
print(conn.execute(count_query).fetchall())

# Minimum Id
min_query = select(func.min(Student.columns.Id))
print(conn.execute(min_query).fetchall())

# Maximum Id
max_query = select(func.max(Student.columns.Id))
print(conn.execute(max_query).fetchall())


[(10,)]
[(2.5,)]
[(4,)]
[(1,)]
[(4,)]


In [None]:
# Group by Pass status and sum the IDs
group_query = select(
    func.sum(Student.columns.Id),
    Student.columns.Pass
).group_by(Student.columns.Pass)

print(conn.execute(group_query).fetchall())


[(6, False), (4, True)]


In [None]:
# Correct way to select distinct values
distinct_query = select(func.distinct(Student.columns.Major))
print(conn.execute(distinct_query).fetchall())


[('English',), ('Science',), ('Math',)]


In [None]:
from sqlalchemy import select, func

# Count number of students per (Major, Pass) combination
query = select(
    Student.columns.Major,
    Student.columns.Pass,
    func.count(Student.columns.Id)
).group_by(
    Student.columns.Major,
    Student.columns.Pass
)

# Execute and print result
result = conn.execute(query).fetchall()
print(result)


In [None]:
from sqlalchemy import select, func

# Build the full query
query = (
    select(
        Student.columns.Major,
        func.count(Student.columns.Id).label("student_count")
    )
    .where(Student.columns.Pass == True)  # WHERE Pass = True
    .group_by(Student.columns.Major)      # GROUP BY Major
    .having(func.count(Student.columns.Id) > 1)  # HAVING COUNT(Id) > 1
)

# Execute the query
result = conn.execute(query).fetchall()

# Print the results
print(result)


In [None]:
from sqlalchemy.sql import text

# Define the full SQL query as a string
query = text("""
    SELECT Major, COUNT(Id) AS student_count
    FROM Student
    WHERE Pass = 1
    GROUP BY Major
    HAVING COUNT(Id) > 1
""")

# Execute the query
result = conn.execute(query).fetchall()

# Print the result
print(result)


In [None]:
import pandas as pd

# Query students with English or Math major
query = Student.select().where(Student.columns.Major.in_(['English', 'Math']))
output = conn.execute(query)

# Fetch results and convert to DataFrame
results = output.fetchall()
df = pd.DataFrame(results)

# Assign column names from the query output
df.columns = output.keys()

# Display the DataFrame
print(df)


In [None]:
df = pd.read_sql(query, conn)
print(df)
