<a href="https://colab.research.google.com/github/rroszczyk/Python/blob/master/sql/db_alchemy.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install treelib

Collecting treelib
  Downloading treelib-1.7.0-py3-none-any.whl (18 kB)
Installing collected packages: treelib
Successfully installed treelib-1.7.0


In [2]:
from treelib import Tree

In [3]:
%%writefile books.csv
first_name,last_name,title,publisher
Isaac,Asimov,Foundation,Random House
Pearl,Buck,The Good Earth,Random House
Pearl,Buck,The Good Earth,Simon & Schuster
Tom,Clancy,The Hunt For Red October,Berkley
Tom,Clancy,Patriot Games,Simon & Schuster
Stephen,King,It,Random House
Stephen,King,It,Penguin Random House
Stephen,King,Dead Zone,Random House
Stephen,King,The Shining,Penguin Random House
John,Le Carre,"Tinker, Tailor, Solider, Spy: A George Smiley Novel",Berkley
Alex,Michaelides,The Silent Patient,Simon & Schuster
Carol,Shaben,Into The Abyss,Simon & Schuster

Writing books.csv


In [4]:
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from sqlalchemy.orm import relationship, backref, declarative_base
from sqlalchemy.sql import asc, desc, func

Base = declarative_base()

author_publisher = Table(
    "author_publisher",
    Base.metadata,
    Column("author_id", Integer, ForeignKey("author.author_id")),
    Column("publisher_id", Integer, ForeignKey("publisher.publisher_id")),
)

book_publisher = Table(
    "book_publisher",
    Base.metadata,
    Column("book_id", Integer, ForeignKey("book.book_id")),
    Column("publisher_id", Integer, ForeignKey("publisher.publisher_id")),
)

class Author(Base):
    __tablename__ = "author"
    author_id = Column(Integer, primary_key=True)
    first_name = Column(String)
    last_name = Column(String)
    books = relationship("Book", backref=backref("author"))
    publishers = relationship(
        "Publisher", secondary=author_publisher, back_populates="authors"
    )

class Book(Base):
    __tablename__ = "book"
    book_id = Column(Integer, primary_key=True)
    author_id = Column(Integer, ForeignKey("author.author_id"))
    title = Column(String)
    publishers = relationship(
        "Publisher", secondary=book_publisher, back_populates="books"
    )

class Publisher(Base):
    __tablename__ = "publisher"
    publisher_id = Column(Integer, primary_key=True)
    name = Column(String)
    authors = relationship(
        "Author", secondary=author_publisher, back_populates="publishers"
    )
    books = relationship(
        "Book", secondary=book_publisher, back_populates="publishers"
    )

In [5]:
import os
import csv
from importlib import resources
from sqlalchemy import and_, create_engine
from sqlalchemy.orm import sessionmaker

def get_author_book_publisher_data(filepath):
    """
    This function gets the data from the csv file
    """
    with open(filepath) as csvfile:
        csv_reader = csv.DictReader(csvfile)
        data = [row for row in csv_reader]
        return data


def populate_database(session, author_book_publisher_data):
    # insert the data
    for row in author_book_publisher_data:

        author = (
            session.query(Author)
            .filter(Author.last_name == row["last_name"])
            .one_or_none()
        )
        if author is None:
            author = Author(
                first_name=row["first_name"], last_name=row["last_name"]
            )
            session.add(author)

        book = (
            session.query(Book)
            .filter(Book.title == row["title"])
            .one_or_none()
        )
        if book is None:
            book = Book(title=row["title"])
            session.add(book)

        publisher = (
            session.query(Publisher)
            .filter(Publisher.name == row["publisher"])
            .one_or_none()
        )
        if publisher is None:
            publisher = Publisher(name=row["publisher"])
            session.add(publisher)

        # add the items to the relationships
        author.books.append(book)
        author.publishers.append(publisher)
        publisher.authors.append(author)
        publisher.books.append(book)
        session.commit()

    session.close()


def main():
    print("starting")

    data = get_author_book_publisher_data("books.csv")
    author_book_publisher_data = data

    # create the database
    engine = create_engine(f"sqlite:///books.db")
    Base.metadata.create_all(engine)
    Session = sessionmaker()
    Session.configure(bind=engine)
    session = Session()
    populate_database(session, author_book_publisher_data)

    print("finished")

main()

starting
finished


In [6]:
%load_ext sql

In [7]:
%%sql
sqlite:///books.db

In [8]:
%%sql
SELECT * FROM sqlite_schema

 * sqlite:///books.db
Done.


type,name,tbl_name,rootpage,sql
table,author,author,2,"CREATE TABLE author ( 	author_id INTEGER NOT NULL, first_name VARCHAR, last_name VARCHAR, PRIMARY KEY (author_id) )"
table,publisher,publisher,3,"CREATE TABLE publisher ( 	publisher_id INTEGER NOT NULL, name VARCHAR, PRIMARY KEY (publisher_id) )"
table,author_publisher,author_publisher,4,"CREATE TABLE author_publisher ( 	author_id INTEGER, publisher_id INTEGER, FOREIGN KEY(author_id) REFERENCES author (author_id), FOREIGN KEY(publisher_id) REFERENCES publisher (publisher_id) )"
table,book,book,5,"CREATE TABLE book ( 	book_id INTEGER NOT NULL, author_id INTEGER, title VARCHAR, PRIMARY KEY (book_id), FOREIGN KEY(author_id) REFERENCES author (author_id) )"
table,book_publisher,book_publisher,6,"CREATE TABLE book_publisher ( 	book_id INTEGER, publisher_id INTEGER, FOREIGN KEY(book_id) REFERENCES book (book_id), FOREIGN KEY(publisher_id) REFERENCES publisher (publisher_id) )"


In [9]:
%%sql
SELECT * FROM book

 * sqlite:///books.db
Done.


book_id,author_id,title
1,1,Foundation
2,2,The Good Earth
3,3,The Hunt For Red October
4,3,Patriot Games
5,4,It
6,4,Dead Zone
7,4,The Shining
8,5,"Tinker, Tailor, Solider, Spy: A George Smiley Novel"
9,6,The Silent Patient
10,7,Into The Abyss


In [10]:
def output_author_hierarchy(authors, filename="authors.txt"):
    """
    Outputs the author/book/publisher information in
    a hierarchical manner

    :param authors:         the collection of root author objects
    :return:                None
    """
    authors_tree = Tree()
    authors_tree.create_node("Authors", "authors")
    for author in authors:
        author_id = f"{author.first_name} {author.last_name}"
        authors_tree.create_node(author_id, author_id, parent="authors")
        for book in author.books:
            book_id = f"{author_id}:{book.title}"
            authors_tree.create_node(book.title, book_id, parent=author_id)
            for publisher in book.publishers:
                authors_tree.create_node(publisher.name, parent=book_id)
    # Output the hierarchical authors data
    # authors_tree.show()
    authors_tree.save2file(filename)

In [11]:
def get_books_by_publishers(session, ascending=True):
    """Get a list of publishers and the number of books they've published"""
    if not isinstance(ascending, bool):
        raise ValueError(f"Sorting value invalid: {ascending}")

    direction = asc if ascending else desc

    return (
        session.query(
            Publisher.name, func.count(Book.title).label("total_books")
        )
        .join(Publisher.books)
        .group_by(Publisher.name)
        .order_by(direction("total_books"))
    )

In [12]:
def get_authors_by_publishers(session, ascending=True):
    """Get a list of publishers and the number of authors they've published"""
    if not isinstance(ascending, bool):
        raise ValueError(f"Sorting value invalid: {ascending}")

    direction = asc if ascending else desc

    return (
        session.query(
            Publisher.name,
            func.count(Author.first_name).label("total_authors"),
        )
        .join(Publisher.authors)
        .group_by(Publisher.name)
        .order_by(direction("total_authors"))
    )

In [13]:
def add_new_book(session, author_name, book_title, publisher_name):
    """Adds a new book to the system"""
    # Get the author's first and last names
    first_name, _, last_name = author_name.partition(" ")

    # Check if book exists
    book = (
        session.query(Book)
        .join(Author)
        .filter(Book.title == book_title)
        .filter(
            and_(
                Author.first_name == first_name, Author.last_name == last_name
            )
        )
        .filter(Book.publishers.any(Publisher.name == publisher_name))
        .one_or_none()
    )
    # Does the book by the author and publisher already exist?
    if book is not None:
        return

    # Get the book by the author
    book = (
        session.query(Book)
        .join(Author)
        .filter(Book.title == book_title)
        .filter(
            and_(
                Author.first_name == first_name, Author.last_name == last_name
            )
        )
        .one_or_none()
    )
    # Create the new book if needed
    if book is None:
        book = Book(title=book_title)

    # Get the author
    author = (
        session.query(Author)
        .filter(
            and_(
                Author.first_name == first_name, Author.last_name == last_name
            )
        )
        .one_or_none()
    )
    # Do we need to create the author?
    if author is None:
        author = Author(first_name=first_name, last_name=last_name)
        session.add(author)

    # Get the publisher
    publisher = (
        session.query(Publisher)
        .filter(Publisher.name == publisher_name)
        .one_or_none()
    )
    # Do we need to create the publisher?
    if publisher is None:
        publisher = Publisher(name=publisher_name)
        session.add(publisher)

    # Initialize the book relationships
    book.author = author
    book.publishers.append(publisher)
    session.add(book)

    # Commit to the database
    session.commit()

In [14]:
def get_authors(session):
    """Get a list of author objects sorted by last name"""
    return session.query(Author).order_by(Author.last_name).all()

In [15]:
engine = create_engine(f"sqlite:///books.db")

Session = sessionmaker()
Session.configure(bind=engine)
session = Session()

books_by_publisher = get_books_by_publishers(session, ascending=False)
for row in books_by_publisher:
  print(f"Publisher: {row.name}, total books: {row.total_books}")
print()

authors_by_publisher = get_authors_by_publishers(session)
for row in authors_by_publisher:
  print(f"Publisher: {row.name}, total authors: {row.total_authors}")
print()

authors = get_authors(session)
output_author_hierarchy(authors, "authors1.txt")

add_new_book(
  session,
  author_name="Stephen King",
  book_title="The Stand",
  publisher_name="Random House",
)
    # Output the updated hierarchical author data
authors = get_authors(session)
output_author_hierarchy(authors, "authors2.txt")

Publisher: Simon & Schuster, total books: 4
Publisher: Random House, total books: 4
Publisher: Penguin Random House, total books: 2
Publisher: Berkley, total books: 2

Publisher: Berkley, total authors: 2
Publisher: Penguin Random House, total authors: 2
Publisher: Random House, total authors: 4
Publisher: Simon & Schuster, total authors: 4

