In [1]:
# ref: SQLAlchemy makes ETL magically easy
# https://www.freecodecamp.org/news/sqlalchemy-makes-etl-magically-easy-ab2bd0df928/

In [2]:
import sqlalchemy
sqlalchemy.__version__

'1.4.15'

In [3]:
import pandas as pd

# Defining a schema
A database schema defines the structure of a database system, in terms of tables, columns, fields, and the relationships between them. Schemas can be defined in raw SQL, or through the use of SQLAlchemy’s ORM feature.

Below is an example showing how to define a schema of two tables for an imaginary blogging platform. One is a table of users, and the other is a table of posts uploaded.

Note:
Here Users.DOB is set to DateTime type, which should be Date instead. I define the DateTime type on purpose to inspect a phenomenon later: what happen if we feed in database with a "wrong" type?

### Before you start this notebook
Make sure this is a file called "demo.db" at the same location as this notebook

In [4]:
from sqlalchemy import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import *
from sqlalchemy.orm import relationship

engine = create_engine('sqlite:///demo.db')
Base = declarative_base()

class Users(Base):
    __tablename__ = "users"
    UserId = Column(Integer, primary_key=True)
    Title = Column(String)
    FirstName = Column(String)
    LastName = Column(String)
    Email = Column(String)
    Username = Column(String)
    DOB = Column(DateTime)

class Uploads(Base):
    __tablename__ = "uploads"
    UploadId = Column(Integer, primary_key=True)
    UserId = Column(Integer, ForeignKey('users.UserId'))
    Title = Column(String)
    Body = Column(String)
    Timestamp = Column(DateTime)    
#     user = relationship("Users", back_populates = "uploads")    
# relationships: https://www.tutorialspoint.com/sqlalchemy/sqlalchemy_orm_building_relationship.htm
# Users.uploads = relationship("Uploads", order_by = Uploads.UserId, back_populates = "users")


Users.__table__.create(bind=engine, checkfirst=True)
Uploads.__table__.create(bind=engine, checkfirst=True)

First, import everything you need from SQLAlchemy. Then, use create_engine(connection_string) to connect to your database. The exact connection string will depend on the version of SQL you are working with. This example uses a relative path to the SQLite database created earlier.

Next, start defining your table classes. The first one in the example is Users. Each column in this table is defined as a class variable using SQLAlchemy’s Column(type), where type is a data type (such as Integer, String, DateTime and so on). Use primary_key=True to denote columns which will be used as primary keys.

The next table defined here is Uploads. It’s very much the same idea — each column is defined as before.

The final two lines actually create the tables. The checkfirst=True parameter ensures that new tables are only created if they do not currently exist in the database.

In [5]:
Users.__table__

Table('users', MetaData(), Column('UserId', Integer(), table=<users>, primary_key=True, nullable=False), Column('Title', String(), table=<users>), Column('FirstName', String(), table=<users>), Column('LastName', String(), table=<users>), Column('Email', String(), table=<users>), Column('Username', String(), table=<users>), Column('DOB', DateTime(), table=<users>), schema=None)

In [6]:
Uploads.__table__

Table('uploads', MetaData(), Column('UploadId', Integer(), table=<uploads>, primary_key=True, nullable=False), Column('UserId', Integer(), ForeignKey('users.UserId'), table=<uploads>), Column('Title', String(), table=<uploads>), Column('Body', String(), table=<uploads>), Column('Timestamp', DateTime(), table=<uploads>), schema=None)

# Reflect the schema
Here we would like to check if the schema is successively created in the database. In order to reflect a database, instead of using the declarative_base we’ve been using with the ORM so far, we’re going to use the automap_base.

Or, just query the database (sql methods)

In [7]:
from sqlalchemy.ext.automap import automap_base

Base_reflect = automap_base()
Base_reflect.prepare(engine, reflect=True)
Base_reflect.classes.keys()

['uploads', 'users']

In [8]:
# the uploads table (class)
uploads_reflect = Base_reflect.classes.uploads
uploads_reflect.__table__

# the users table (class)
users_reflect = Base_reflect.classes.users
users_reflect.__table__

# columns in the uploads table (class)
[col for col in uploads_reflect.__table__.columns]

# columns in the uploads table (class)
[col for col in users_reflect.__table__.columns]

# foreign keys
print(users_reflect.__table__.foreign_keys, "\n")
print(uploads_reflect.__table__.foreign_keys, "\n")

# all the constraints (it is a set)
uploads_reflect.__table__.constraints
type(uploads_reflect.__table__.constraints)

for constr in list(uploads_reflect.__table__.constraints):
    print(constr,"\n")

set() 

{ForeignKey('users.UserId')} 

PrimaryKeyConstraint(Column('UploadId', INTEGER(), table=<uploads>, primary_key=True, nullable=False)) 

ForeignKeyConstraint(<sqlalchemy.sql.base.DedupeColumnCollection object at 0x7f4061242090>, None, link_to_name=True, table=Table('uploads', MetaData(), Column('UploadId', INTEGER(), table=<uploads>, primary_key=True, nullable=False), Column('UserId', INTEGER(), ForeignKey('users.UserId'), table=<uploads>), Column('Title', VARCHAR(), table=<uploads>), Column('Body', VARCHAR(), table=<uploads>), Column('Timestamp', DATETIME(), table=<uploads>), schema=None)) 



In [9]:
# ultimate method (in sqlite)
pd.read_sql("select * from sqlite_schema", con=engine)
pd.read_sql("select * from sqlite_schema", con=engine).sql.apply(lambda x: print(x))

CREATE TABLE users (
	"UserId" INTEGER NOT NULL, 
	"Title" VARCHAR, 
	"FirstName" VARCHAR, 
	"LastName" VARCHAR, 
	"Email" VARCHAR, 
	"Username" VARCHAR, 
	"DOB" DATETIME, 
	PRIMARY KEY ("UserId")
)
CREATE TABLE uploads (
	"UploadId" INTEGER NOT NULL, 
	"UserId" INTEGER, 
	"Title" VARCHAR, 
	"Body" VARCHAR, 
	"Timestamp" DATETIME, 
	PRIMARY KEY ("UploadId"), 
	FOREIGN KEY("UserId") REFERENCES users ("UserId")
)


0    None
1    None
Name: sql, dtype: object

# Extract
Once the schema has been defined, the next task is to extract the raw data from its source. The exact details can vary wildly from case to case, depending on how the raw data is provided. Maybe your app calls an in-house or third-party API, or perhaps you need to read data logged in a CSV file.

The example below uses two APIs to simulate data for the fictional blogging platform described above. The Users table will be populated with profiles randomly generated at randomuser.me, and the Uploads table will contain lorem ipsum-inspired data courtesy of JSONPlaceholder.

Python’s Requests module can be used to call these APIs, as shown below:

In [10]:
import requests

url = 'https://randomuser.me/api/?results=10'
users_json = requests.get(url).json()
url2 = 'https://jsonplaceholder.typicode.com/posts/'
uploads_json = requests.get(url2).json()

In [11]:
## Add this to the first block in your note book to display json
# another option: 
import uuid
from IPython.core.display import display, HTML

import json

class RenderJSON(object):
    def __init__(self, json_data):
        if isinstance(json_data, dict):
            self.json_str = json.dumps(json_data)
        else:
            self.json_str = json_data
        self.uuid = str(uuid.uuid4())
        # This line is missed out in most of the versions of this script across the web, it is essential for this to work interleaved with print statements
#         self._ipython_display_()
        
    def _ipython_display_(self):
        display(HTML('<div id="{}" style="height: auto; width:100%;"></div>'.format(self.uuid)))
        display(HTML("""<script>
        require(["https://rawgit.com/caldwell/renderjson/master/renderjson.js"], function() {
          renderjson.set_show_to_level(1)
          document.getElementById('%s').appendChild(renderjson(%s))
        });</script>
        """ % (self.uuid, self.json_str)))

In [12]:
# RenderJSON(users_json)

In [13]:
# RenderJSON(uploads_json)

In [14]:
import pandas as pd
df_user = pd.json_normalize(users_json.get("results"))
df_user.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 34 columns):
 #   Column                          Non-Null Count  Dtype 
---  ------                          --------------  ----- 
 0   gender                          10 non-null     object
 1   email                           10 non-null     object
 2   phone                           10 non-null     object
 3   cell                            10 non-null     object
 4   nat                             10 non-null     object
 5   name.title                      10 non-null     object
 6   name.first                      10 non-null     object
 7   name.last                       10 non-null     object
 8   location.street.number          10 non-null     int64 
 9   location.street.name            10 non-null     object
 10  location.city                   10 non-null     object
 11  location.state                  10 non-null     object
 12  location.country                10 non-null     objec

In [15]:
# df_user[["id.value"]]
df_user.head()

Unnamed: 0,gender,email,phone,cell,nat,name.title,name.first,name.last,location.street.number,location.street.name,...,login.sha256,dob.date,dob.age,registered.date,registered.age,id.name,id.value,picture.large,picture.medium,picture.thumbnail
0,female,esma.numanoglu@example.com,(162)-358-6721,(328)-072-2360,TR,Ms,Esma,Numanoğlu,514,Tunalı Hilmi Cd,...,087cd77f180761546dac889391075b46a113b60d6e39d2...,1977-04-04T15:23:57.928Z,44,2005-12-16T21:03:13.494Z,16,,,https://randomuser.me/api/portraits/women/45.jpg,https://randomuser.me/api/portraits/med/women/...,https://randomuser.me/api/portraits/thumb/wome...
1,male,esat.toraman@example.com,(050)-947-7988,(584)-016-8162,TR,Mr,Esat,Toraman,5393,Anafartalar Cd,...,85c1610b2ab5a2493353de576432940e375fe4765d7e20...,1990-09-07T02:06:45.557Z,31,2005-04-28T00:52:31.344Z,16,,,https://randomuser.me/api/portraits/men/86.jpg,https://randomuser.me/api/portraits/med/men/86...,https://randomuser.me/api/portraits/thumb/men/...
2,female,ella.kowalski@example.com,005-248-8975,326-434-7494,CA,Mrs,Ella,Kowalski,2523,Lakeview Ave,...,a73675b4b85f11a4a828f4e51475b871997a0abbf46165...,1959-12-05T02:44:47.837Z,62,2007-07-23T07:28:20.798Z,14,,,https://randomuser.me/api/portraits/women/31.jpg,https://randomuser.me/api/portraits/med/women/...,https://randomuser.me/api/portraits/thumb/wome...
3,female,halima.aagesen@example.com,25956157,93606779,NO,Miss,Halima,Aagesen,2344,Lammers' gate,...,830ef58ec90f63a83483ccac9a21cdd0d9df54482c5aab...,1948-03-30T10:33:27.921Z,73,2014-05-23T01:25:02.867Z,7,FN,30034819277.0,https://randomuser.me/api/portraits/women/56.jpg,https://randomuser.me/api/portraits/med/women/...,https://randomuser.me/api/portraits/thumb/wome...
4,male,leo.syvertsen@example.com,38492919,43395669,NO,Mr,Leo,Syvertsen,8850,Kristian Augusts gate,...,48da855aa1750f277eec3f01c7e0d806394b44bd424cca...,1995-12-21T16:19:07.658Z,26,2007-06-25T07:20:51.517Z,14,FN,21129534149.0,https://randomuser.me/api/portraits/men/47.jpg,https://randomuser.me/api/portraits/med/men/47...,https://randomuser.me/api/portraits/thumb/men/...


In [16]:
pd.json_normalize(uploads_json)

Unnamed: 0,userId,id,title,body
0,1,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...
1,1,2,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...
2,1,3,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...
3,1,4,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...
4,1,5,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...
...,...,...,...,...
95,10,96,quaerat velit veniam amet cupiditate aut numqu...,in non odio excepturi sint eum\nlabore volupta...
96,10,97,quas fugiat ut perspiciatis vero provident,eum non blanditiis soluta porro quibusdam volu...
97,10,98,laboriosam dolor voluptates,doloremque ex facilis sit sint culpa\nsoluta a...
98,10,99,temporibus sit alias delectus eligendi possimu...,quo deleniti praesentium dicta non quod\naut e...


The data is currently held in two objects (users_json and uploads_json) in JSON format. The next step will be to transform and load this data into the tables defined earlier.

# Transform
Before the data can be loaded into the database, it is important to ensure that it is in the correct format. The JSON objects created in the code above are nested, and contain more data than is required for the tables defined.

An important intermediary step is to transform the data from its current nested JSON format to a flat format that can be safely written to the database without error.

For the example running through this article, the data are relatively simple, and won’t need much transformation. The code below creates two lists, users and uploads, which will be used in the final step:


In [17]:
# detour: date time parsing
# What is this date format? 2011-08-12T20:17:46.384Z
# ANS: ISO 8061 (A datetime format many open API adopt)

import dateutil.parser
date_str = "2011-08-12T20:17:46.384Z"
date_str = "2011-08-12T20:17:46.384+8:00"
print(type(dateutil.parser.parse(date_str)))
date_time = dateutil.parser.parse(date_str)
print(date_time.date(),"\n",
      date_time.time(),"\n",
      date_time.timetz())

<class 'datetime.datetime'>
2011-08-12 
 20:17:46.384000 
 20:17:46.384000+08:00


In [18]:
# construct users-list uplods-list
# (Note: Here is the original users-list and uplods-list construnctor. 
# Later, we will present a more concise version using df.to_list("records") method)
from datetime import datetime, timedelta
from random import randint


users, uploads = [], []

for i, result in enumerate(users_json['results']):
    row = {}
    row['UserId'] = i
    row['Title'] = result['name']['title']
    row['FirstName'] = result['name']['first']
    row['LastName'] = result['name']['last']
    row['Email'] = result['email']
    row['Username'] = result['login']['username']
#     dob = datetime.strptime(result['dob']['date'],'%Y-%m-%d %H:%M:%S')    
    dob = dateutil.parser.parse(result['dob']['date'])
    row['DOB'] = dob.date()
    
    users.append(row)
    
for result in uploads_json:
    row = {}
    row['UploadId'] = result['id']
    row['UserId'] = result['userId']
    row['Title'] = result['title']
    row['Body'] = result['body']
    delta = timedelta(seconds=randint(1,86400))
    row['Timestamp'] = datetime.now() - delta
    
    uploads.append(row)

In [19]:
# date time type
print("users.DOB: ", type(users[0].get("DOB")))
print("uploads.Timestamp: ", type(uploads[0].get("Timestamp")))

users.DOB:  <class 'datetime.date'>
uploads.Timestamp:  <class 'datetime.datetime'>


In [20]:
pd.DataFrame(users)

Unnamed: 0,UserId,Title,FirstName,LastName,Email,Username,DOB
0,0,Ms,Esma,Numanoğlu,esma.numanoglu@example.com,organicpeacock202,1977-04-04
1,1,Mr,Esat,Toraman,esat.toraman@example.com,organiclion104,1990-09-07
2,2,Mrs,Ella,Kowalski,ella.kowalski@example.com,redladybug769,1959-12-05
3,3,Miss,Halima,Aagesen,halima.aagesen@example.com,organicwolf185,1948-03-30
4,4,Mr,Leo,Syvertsen,leo.syvertsen@example.com,blueelephant716,1995-12-21
5,5,Mr,علی رضا,یاسمی,aalyrd.ysmy@example.com,organicbird452,1991-09-28
6,6,Miss,Elma,da Rocha,elma.darocha@example.com,yellowfrog198,1950-02-27
7,7,Mr,مهدي,جعفری,mhdy.jaafry@example.com,orangerabbit552,1997-09-18
8,8,Ms,Nanna,Andersen,nanna.andersen@example.com,whiteswan562,1994-10-12
9,9,Mrs,Ida,Kristensen,ida.kristensen@example.com,orangeswan552,1946-11-02


In [21]:
pd.DataFrame(uploads)

Unnamed: 0,UploadId,UserId,Title,Body,Timestamp
0,1,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...,2021-06-07 16:46:23.165991
1,2,1,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...,2021-06-07 17:52:31.165996
2,3,1,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...,2021-06-07 15:54:44.165998
3,4,1,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...,2021-06-08 09:37:43.166000
4,5,1,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...,2021-06-07 16:09:38.166002
...,...,...,...,...,...
95,96,10,quaerat velit veniam amet cupiditate aut numqu...,in non odio excepturi sint eum\nlabore volupta...,2021-06-08 07:29:55.166243
96,97,10,quas fugiat ut perspiciatis vero provident,eum non blanditiis soluta porro quibusdam volu...,2021-06-08 00:23:17.166245
97,98,10,laboriosam dolor voluptates,doloremque ex facilis sit sint culpa\nsoluta a...,2021-06-07 19:18:01.166246
98,99,10,temporibus sit alias delectus eligendi possimu...,quo deleniti praesentium dicta non quod\naut e...,2021-06-07 16:32:53.166248


The main step here is to iterate through the JSON objects created before. For each result, create a new Python dictionary object with keys corresponding to each column defined for the relevant table in the schema. This ensures that the data is no longer nested, and keeps only the data needed for the tables.

The other step is to use Python’s datetime module to manipulate dates, and transform them into DateTime type objects that can be written to the database. For the sake of this example, random DateTime objects are generated using the timedelta() method from Python’s DateTime module.

Each created dictionary is appended to a list, which will be used in the final step of the pipeline.

# Load
Finally, the data is in a form that can be loaded into the database. SQLAlchemy makes this step straightforward through its Session API.

The Session API acts a bit like a middleman, or “holding zone,” for Python objects you have either loaded from or associated with the database. These objects can be manipulated within the session before being committed to the database.

The code below creates a new session object, adds rows to it, then merges and commits them to the database:

In [22]:
# using sqlachemy ORM and try/except style:
# considr some modification to next code blocks.
Session = sessionmaker(bind=engine)
session = Session()

try:
    for user in users:
        row = Users(**user)
        session.add(row)

    for upload in uploads:
        row = Uploads(**upload)
        session.add(row)

    session.commit()
except Exception as e: 
    print(e)
finally:
    session.close()

The sessionmaker factory is used to generate newly-configured Session classes. Session is an everyday Python class that is instantiated on the second line as session.

Next up are two loops which iterate through the users and uploads lists created earlier. The elements of these lists are dictionary objects whose keys correspond to the columns given in the Users and Uploads classes defined previously.

Each object is used to instantiate a new instance of the relevant class (using Python’s handy some_function(**some_dict) trick). This object is added to the current session with session.add().

Finally, when the session contains the rows to be added, session.commit() is used to commit the transaction to the database.

In [23]:
pd.read_sql("select * from uploads", con=engine)

Unnamed: 0,UploadId,UserId,Title,Body,Timestamp
0,1,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...,2021-06-07 16:46:23.165991
1,2,1,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...,2021-06-07 17:52:31.165996
2,3,1,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...,2021-06-07 15:54:44.165998
3,4,1,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...,2021-06-08 09:37:43.166000
4,5,1,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...,2021-06-07 16:09:38.166002
...,...,...,...,...,...
95,96,10,quaerat velit veniam amet cupiditate aut numqu...,in non odio excepturi sint eum\nlabore volupta...,2021-06-08 07:29:55.166243
96,97,10,quas fugiat ut perspiciatis vero provident,eum non blanditiis soluta porro quibusdam volu...,2021-06-08 00:23:17.166245
97,98,10,laboriosam dolor voluptates,doloremque ex facilis sit sint culpa\nsoluta a...,2021-06-07 19:18:01.166246
98,99,10,temporibus sit alias delectus eligendi possimu...,quo deleniti praesentium dicta non quod\naut e...,2021-06-07 16:32:53.166248


In [24]:
pd.read_sql("select * from users", con=engine)

Unnamed: 0,UserId,Title,FirstName,LastName,Email,Username,DOB
0,0,Ms,Esma,Numanoğlu,esma.numanoglu@example.com,organicpeacock202,1977-04-04 00:00:00.000000
1,1,Mr,Esat,Toraman,esat.toraman@example.com,organiclion104,1990-09-07 00:00:00.000000
2,2,Mrs,Ella,Kowalski,ella.kowalski@example.com,redladybug769,1959-12-05 00:00:00.000000
3,3,Miss,Halima,Aagesen,halima.aagesen@example.com,organicwolf185,1948-03-30 00:00:00.000000
4,4,Mr,Leo,Syvertsen,leo.syvertsen@example.com,blueelephant716,1995-12-21 00:00:00.000000
5,5,Mr,علی رضا,یاسمی,aalyrd.ysmy@example.com,organicbird452,1991-09-28 00:00:00.000000
6,6,Miss,Elma,da Rocha,elma.darocha@example.com,yellowfrog198,1950-02-27 00:00:00.000000
7,7,Mr,مهدي,جعفری,mhdy.jaafry@example.com,orangerabbit552,1997-09-18 00:00:00.000000
8,8,Ms,Nanna,Andersen,nanna.andersen@example.com,whiteswan562,1994-10-12 00:00:00.000000
9,9,Mrs,Ida,Kristensen,ida.kristensen@example.com,orangeswan552,1946-11-02 00:00:00.000000


Notice that in the users.DOB column, the DOBs have redundant information (zeros) attached. This is expected by design. Since the DOB is in Datetime type (date + time), when inject with date type, the database will parse date type to datetime type and add time = 00:00:00.00000 by default.

Lesson learnt: choose the proper data type when designing database. In this case, use date type for DOB.

In [25]:
# declare a new schema (class)
class UsersUploadsJoin(Base):
    __tablename__ = "users_uploads_join"
    __table_args__ = {'extend_existing': True} # add this line to make the schema modifiable
    UserId = Column(Integer, primary_key=True)
    UploadId = Column(Integer, primary_key=True)
    FirstName = Column(String)
    LastName = Column(String)
    Username = Column(String)
    DOB = Column(Date)
    UploadsTitle = Column(String)
    Body = Column(String)
    Timestamp = Column(DateTime)    
    
UsersUploadsJoin.__table__.create(bind=engine, checkfirst=True)

In [26]:
# verify
pd.read_sql("select * from sqlite_schema", con=engine).sql.apply(lambda x: print(x))

CREATE TABLE users (
	"UserId" INTEGER NOT NULL, 
	"Title" VARCHAR, 
	"FirstName" VARCHAR, 
	"LastName" VARCHAR, 
	"Email" VARCHAR, 
	"Username" VARCHAR, 
	"DOB" DATETIME, 
	PRIMARY KEY ("UserId")
)
CREATE TABLE uploads (
	"UploadId" INTEGER NOT NULL, 
	"UserId" INTEGER, 
	"Title" VARCHAR, 
	"Body" VARCHAR, 
	"Timestamp" DATETIME, 
	PRIMARY KEY ("UploadId"), 
	FOREIGN KEY("UserId") REFERENCES users ("UserId")
)
CREATE TABLE users_uploads_join (
	"UserId" INTEGER NOT NULL, 
	"UploadId" INTEGER NOT NULL, 
	"FirstName" VARCHAR, 
	"LastName" VARCHAR, 
	"Username" VARCHAR, 
	"DOB" DATE, 
	"UploadsTitle" VARCHAR, 
	"Body" VARCHAR, 
	"Timestamp" DATETIME, 
	PRIMARY KEY ("UserId", "UploadId")
)
None


0    None
1    None
2    None
3    None
Name: sql, dtype: object

# All Together (with some modifications)
Now we describe the workflow in an real-world loading task (loading as the "L" in ETL, meaning inject data in a database here).
* step1: read the table schema from database (we are not going to create schema everytime, instead we will reflect schema and make sure the data to be injected match the schema)
* step2: prepare the data to load (aka inject). (We use Pandas in this step). A final check of data type and column name consistency. 
* step3: load data (in the context of SQL, INSERT INTO). Use a loading tool to abstract this process (we use sqlalchemy here)

All the steps use methods we have seen before, at least in a high-level. We will make some modifications in some steps to make the workflow more concise and clearer.


In [64]:
# step1: retrieve table schema (reflect)
from sqlalchemy.ext.automap import automap_base

Base_reflect = automap_base()
Base_reflect.prepare(engine, reflect=True)
print("table names: ", Base_reflect.classes.keys(), "\n")

# retrive schema stored in the sqlalchemy DeclarativeMeta class
print(type(JoinReflect), "\n")
JoinReflect = Base_reflect.classes.users_uploads_join

# review the schema
for col in JoinReflect.__table__.columns:
    print(col.description, col.type)
[col for col in JoinReflect.__table__.columns]


table names:  ['uploads', 'users', 'users_uploads_join'] 

<class 'sqlalchemy.orm.decl_api.DeclarativeMeta'> 

UserId INTEGER
UploadId INTEGER
FirstName VARCHAR
LastName VARCHAR
Username VARCHAR
DOB DATE
UploadsTitle VARCHAR
Body VARCHAR
Timestamp DATETIME


[Column('UserId', INTEGER(), table=<users_uploads_join>, primary_key=True, nullable=False),
 Column('UploadId', INTEGER(), table=<users_uploads_join>, primary_key=True, nullable=False),
 Column('FirstName', VARCHAR(), table=<users_uploads_join>),
 Column('LastName', VARCHAR(), table=<users_uploads_join>),
 Column('Username', VARCHAR(), table=<users_uploads_join>),
 Column('DOB', DATE(), table=<users_uploads_join>),
 Column('UploadsTitle', VARCHAR(), table=<users_uploads_join>),
 Column('Body', VARCHAR(), table=<users_uploads_join>),
 Column('Timestamp', DATETIME(), table=<users_uploads_join>)]

In [53]:
col.description
col.type
col.constraints

DATETIME()

In [28]:
# step2: prep the TABLE users_uploads_join
# note: rename the column Title to UploadsTitle to match the schema
df_users = pd.DataFrame(users)
df_uploads = pd.DataFrame(uploads)
df_join = pd.merge(df_users[["UserId", "FirstName", "LastName", "Username", "DOB"]],
         df_uploads[["UserId", "UploadId", "Title", "Body", "Timestamp"]], on="UserId")\
        .rename(columns = {'Title': 'UploadsTitle'}, inplace = False)
df_join

Unnamed: 0,UserId,FirstName,LastName,Username,DOB,UploadId,UploadsTitle,Body,Timestamp
0,1,Esat,Toraman,organiclion104,1990-09-07,1,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...,2021-06-07 16:46:23.165991
1,1,Esat,Toraman,organiclion104,1990-09-07,2,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...,2021-06-07 17:52:31.165996
2,1,Esat,Toraman,organiclion104,1990-09-07,3,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...,2021-06-07 15:54:44.165998
3,1,Esat,Toraman,organiclion104,1990-09-07,4,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...,2021-06-08 09:37:43.166000
4,1,Esat,Toraman,organiclion104,1990-09-07,5,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...,2021-06-07 16:09:38.166002
...,...,...,...,...,...,...,...,...,...
85,9,Ida,Kristensen,orangeswan552,1946-11-02,86,placeat quia et porro iste,quasi excepturi consequatur iste autem tempori...,2021-06-08 08:24:33.166222
86,9,Ida,Kristensen,orangeswan552,1946-11-02,87,nostrum quis quasi placeat,eos et molestiae\nnesciunt ut a\ndolores persp...,2021-06-07 14:09:00.166224
87,9,Ida,Kristensen,orangeswan552,1946-11-02,88,sapiente omnis fugit eos,consequatur omnis est praesentium\nducimus non...,2021-06-08 05:13:29.166227
88,9,Ida,Kristensen,orangeswan552,1946-11-02,89,sint soluta et vel magnam aut ut sed qui,repellat aut aperiam totam temporibus autem et...,2021-06-08 06:33:24.166228


In [29]:
# modification 1: construct the join-lists
# A more concise join-list constructor using df.to_list("records")
list_join_dict = df_join.to_dict("records")
list_join_dict

[{'UserId': 1,
  'FirstName': 'Esat',
  'LastName': 'Toraman',
  'Username': 'organiclion104',
  'DOB': datetime.date(1990, 9, 7),
  'UploadId': 1,
  'UploadsTitle': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit',
  'Body': 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto',
  'Timestamp': Timestamp('2021-06-07 16:46:23.165991')},
 {'UserId': 1,
  'FirstName': 'Esat',
  'LastName': 'Toraman',
  'Username': 'organiclion104',
  'DOB': datetime.date(1990, 9, 7),
  'UploadId': 2,
  'UploadsTitle': 'qui est esse',
  'Body': 'est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla',
  'Timestamp': Timestamp('2021-06-07 17:52:31.165996')},
 {'UserId': 1,
  'FirstName': 'Esat',
  'LastName': 'Toraman',

In [30]:
# Note: DOB is actually in date() type even shown as object in df.info()
df_join.loc[[0]].DOB.apply(lambda x: print("DOB: ", type(x), "\n"))
df_join.info()

DOB:  <class 'datetime.date'> 

<class 'pandas.core.frame.DataFrame'>
Int64Index: 90 entries, 0 to 89
Data columns (total 9 columns):
 #   Column        Non-Null Count  Dtype         
---  ------        --------------  -----         
 0   UserId        90 non-null     int64         
 1   FirstName     90 non-null     object        
 2   LastName      90 non-null     object        
 3   Username      90 non-null     object        
 4   DOB           90 non-null     object        
 5   UploadId      90 non-null     int64         
 6   UploadsTitle  90 non-null     object        
 7   Body          90 non-null     object        
 8   Timestamp     90 non-null     datetime64[ns]
dtypes: datetime64[ns](1), int64(2), object(6)
memory usage: 9.1+ KB


In [31]:
# modification2: using sqlalchemy bulk_insert_mappings
# ref: bulk_insert_mappings: https://docs.sqlalchemy.org/en/14/orm/session_api.html#sqlalchemy.orm.Session.bulk_insert_mappings
# ref: Optimize Inserts Using SQLAlchemy: https://www.devx.com/dbzone/optimize-inserts-using-sqlalchemy.html
# ref: sqlalchemy session API: https://docs.sqlalchemy.org/en/14/orm/session_api.html#session-and-sessionmaker

with Session() as session:
    session.bulk_insert_mappings(JoinReflect, list_join_dict)
    session.commit()

In [32]:
# verify
pd.read_sql("select * from users_uploads_join", engine)

Unnamed: 0,UserId,UploadId,FirstName,LastName,Username,DOB,UploadsTitle,Body,Timestamp
0,1,1,Esat,Toraman,organiclion104,1990-09-07,sunt aut facere repellat provident occaecati e...,quia et suscipit\nsuscipit recusandae consequu...,2021-06-07 16:46:23.165991
1,1,2,Esat,Toraman,organiclion104,1990-09-07,qui est esse,est rerum tempore vitae\nsequi sint nihil repr...,2021-06-07 17:52:31.165996
2,1,3,Esat,Toraman,organiclion104,1990-09-07,ea molestias quasi exercitationem repellat qui...,et iusto sed quo iure\nvoluptatem occaecati om...,2021-06-07 15:54:44.165998
3,1,4,Esat,Toraman,organiclion104,1990-09-07,eum et est occaecati,ullam et saepe reiciendis voluptatem adipisci\...,2021-06-08 09:37:43.166000
4,1,5,Esat,Toraman,organiclion104,1990-09-07,nesciunt quas odio,repudiandae veniam quaerat sunt sed\nalias aut...,2021-06-07 16:09:38.166002
...,...,...,...,...,...,...,...,...,...
85,9,86,Ida,Kristensen,orangeswan552,1946-11-02,placeat quia et porro iste,quasi excepturi consequatur iste autem tempori...,2021-06-08 08:24:33.166222
86,9,87,Ida,Kristensen,orangeswan552,1946-11-02,nostrum quis quasi placeat,eos et molestiae\nnesciunt ut a\ndolores persp...,2021-06-07 14:09:00.166224
87,9,88,Ida,Kristensen,orangeswan552,1946-11-02,sapiente omnis fugit eos,consequatur omnis est praesentium\nducimus non...,2021-06-08 05:13:29.166227
88,9,89,Ida,Kristensen,orangeswan552,1946-11-02,sint soluta et vel magnam aut ut sed qui,repellat aut aperiam totam temporibus autem et...,2021-06-08 06:33:24.166228


In [34]:
# error handling: try to execute the following
with Session() as session:
    session.bulk_insert_mappings(JoinReflect, list_join_dict)
    session.commit()

IntegrityError: (sqlite3.IntegrityError) UNIQUE constraint failed: users_uploads_join.UserId, users_uploads_join.UploadId
[SQL: INSERT INTO users_uploads_join ("UserId", "UploadId", "FirstName", "LastName", "Username", "DOB", "UploadsTitle", "Body", "Timestamp") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)]
[parameters: ((1, 1, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto', '2021-06-07 16:46:23.165991'), (1, 2, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'qui est esse', 'est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla', '2021-06-07 17:52:31.165996'), (1, 3, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut', '2021-06-07 15:54:44.165998'), (1, 4, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'eum et est occaecati', 'ullam et saepe reiciendis voluptatem adipisci\nsit amet autem assumenda provident rerum culpa\nquis hic commodi nesciunt rem tenetur doloremque ipsam iure\nquis sunt voluptatem rerum illo velit', '2021-06-08 09:37:43.166000'), (1, 5, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'nesciunt quas odio', 'repudiandae veniam quaerat sunt sed\nalias aut fugiat sit autem sed est\nvoluptatem omnis possimus esse voluptatibus quis\nest aut tenetur dolor neque', '2021-06-07 16:09:38.166002'), (1, 6, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'dolorem eum magni eos aperiam quia', 'ut aspernatur corporis harum nihil quis provident sequi\nmollitia nobis aliquid molestiae\nperspiciatis et ea nemo ab reprehenderit accusantium quas\nvoluptate dolores velit et doloremque molestiae', '2021-06-07 20:54:38.166004'), (1, 7, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'magnam facilis autem', 'dolore placeat quibusdam ea quo vitae\nmagni quis enim qui quis quo nemo aut saepe\nquidem repellat excepturi ut quia\nsunt ut sequi eos ea sed quas', '2021-06-08 05:15:29.166006'), (1, 8, 'Esat', 'Toraman', 'organiclion104', '1990-09-07', 'dolorem dolore est ipsam', 'dignissimos aperiam dolorem qui eum\nfacilis quibusdam animi sint suscipit qui sint possimus cum\nquaerat magni maiores excepturi\nipsam ut commodi dolor voluptatum modi aut vitae', '2021-06-08 09:07:03.166008')  ... displaying 10 of 90 total bound parameter sets ...  (9, 89, 'Ida', 'Kristensen', 'orangeswan552', '1946-11-02', 'sint soluta et vel magnam aut ut sed qui', 'repellat aut aperiam totam temporibus autem et\narchitecto magnam ut\nconsequatur qui cupiditate rerum quia soluta dignissimos nihil iure\ntempore quas est', '2021-06-08 06:33:24.166228'), (9, 90, 'Ida', 'Kristensen', 'orangeswan552', '1946-11-02', 'ad iusto omnis odit dolor voluptatibus', 'minus omnis soluta quia\nqui sed adipisci voluptates illum ipsam voluptatem\neligendi officia ut in\neos soluta similique molestias praesentium blanditiis', '2021-06-07 12:09:35.166231'))]
(Background on this error at: http://sqlalche.me/e/14/gkpj)

## Warning
Pandas has its native method df.to_sql. In general this is not a good way to interact with RDBMS. Afterall, Pandas is at best used for processing and analysis. When it comes to database interaction, people have been claiming Pandas is very slow.
Another issue of Pandas is it might not be a good fit for big data. We will talk about how to handle big data in some other time.

# Next
## Error handling, error handling, error handling
## Pipeline, orchestration
## Big data