ETL process to extract data from SQL server and forward it to the PotgreSQL database.

cursor.fetchall() - fetches all the rows of a query result. It returns all the rows as a list of tuples

cursor.fetchmany(size) - returns the number of rows specified by size argument (returns a list of tuples)

cursor.fetchone() - returns a single (1st) record or None if no more rows are available

In [18]:
from sqlalchemy import create_engine
import pandas as pd
import pyodbc
import psycopg2
import os

os.environ?

In [34]:
# SQL server connection:
conn_str = (
    r'DRIVER={SQL Server};'
    r'SERVER=L204LTP\SQLEXPRESS;'
    r'DATABASE=AdventureWorks2019;'
    r'Trusted_Connection=yes;'
)

conn = pyodbc.connect(conn_str)

In [45]:
# PostgreSQL connection:

user="postgres"
password="justkacz"

engine = create_engine(f'postgresql://{user}:{password}@{server}:5432/test2') #test2=database name

In [55]:
# extract data from SQL server:

source= pd.read_sql_query("""Select top 10 * from Person.Person;""", conn)
source.head()

Unnamed: 0,BusinessEntityID,PersonType,NameStyle,Title,FirstName,MiddleName,LastName,Suffix,EmailPromotion,AdditionalContactInfo,Demographics,rowguid,ModifiedDate
0,1,EM,False,,Ken,J,Sánchez,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",92C4279F-1207-48A3-8448-4636514EB7E2,2009-01-07
1,2,EM,False,,Terri,Lee,Duffy,,1,,"<IndividualSurvey xmlns=""http://schemas.micros...",D8763459-8AA8-47CC-AFF7-C9079AF79033,2008-01-24
2,3,EM,False,,Roberto,,Tamburello,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",E1A2555E-0828-434B-A33B-6F38136A37DE,2007-11-04
3,4,EM,False,,Rob,,Walters,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",F2D7CE06-38B3-4357-805B-F4B6B71C01FF,2007-11-28
4,5,EM,False,Ms.,Gail,A,Erickson,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",F3A3F6B4-AE3B-430C-A754-9F2231BA6FEF,2007-12-30


In [52]:
# load data from SQL server to the new table created in Postgres:
source.to_sql(name="PipelineTest", con=engine, if_exists='replace', index=False)

In [56]:
# read data from a newly created table in Postgres:
df=pd.read_sql('Select * from public."PipelineTest"', engine)
df.head()

Unnamed: 0,BusinessEntityID,PersonType,NameStyle,Title,FirstName,MiddleName,LastName,Suffix,EmailPromotion,AdditionalContactInfo,Demographics,rowguid,ModifiedDate
0,1,EM,False,,Ken,J,Sánchez,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",92C4279F-1207-48A3-8448-4636514EB7E2,2009-01-07
1,2,EM,False,,Terri,Lee,Duffy,,1,,"<IndividualSurvey xmlns=""http://schemas.micros...",D8763459-8AA8-47CC-AFF7-C9079AF79033,2008-01-24
2,3,EM,False,,Roberto,,Tamburello,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",E1A2555E-0828-434B-A33B-6F38136A37DE,2007-11-04
3,4,EM,False,,Rob,,Walters,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",F2D7CE06-38B3-4357-805B-F4B6B71C01FF,2007-11-28
4,5,EM,False,Ms.,Gail,A,Erickson,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",F3A3F6B4-AE3B-430C-A754-9F2231BA6FEF,2007-12-30


In [69]:
# modification to the source table - extracting two more rows and updating MiddleName from J to Jake, 
source2=pd.read_sql_query('Select top 12 * from Person.Person', conn)
source2.loc[source2.MiddleName=='J', ['MiddleName']]='Jake'
source2.head()

Unnamed: 0,BusinessEntityID,PersonType,NameStyle,Title,FirstName,MiddleName,LastName,Suffix,EmailPromotion,AdditionalContactInfo,Demographics,rowguid,ModifiedDate
0,1,EM,False,,Ken,Jake,Sánchez,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",92C4279F-1207-48A3-8448-4636514EB7E2,2009-01-07
1,2,EM,False,,Terri,Lee,Duffy,,1,,"<IndividualSurvey xmlns=""http://schemas.micros...",D8763459-8AA8-47CC-AFF7-C9079AF79033,2008-01-24
2,3,EM,False,,Roberto,,Tamburello,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",E1A2555E-0828-434B-A33B-6F38136A37DE,2007-11-04
3,4,EM,False,,Rob,,Walters,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",F2D7CE06-38B3-4357-805B-F4B6B71C01FF,2007-11-28
4,5,EM,False,Ms.,Gail,A,Erickson,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",F3A3F6B4-AE3B-430C-A754-9F2231BA6FEF,2007-12-30


In [73]:
# converting tables downloaded from SQL server and Postgres to tuples:
mssql_t=source2.apply(tuple,1)
postgres_t=df.apply(tuple,1)

#detecting differences:
diff=mssql_t.isin(postgres_t)
diff

0     False
1      True
2      True
3      True
4      True
5      True
6      True
7      True
8      True
9      True
10    False
11    False
dtype: bool

In [75]:
# displaying unmatched rows:
source2[~diff]

Unnamed: 0,BusinessEntityID,PersonType,NameStyle,Title,FirstName,MiddleName,LastName,Suffix,EmailPromotion,AdditionalContactInfo,Demographics,rowguid,ModifiedDate
0,1,EM,False,,Ken,Jake,Sánchez,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",92C4279F-1207-48A3-8448-4636514EB7E2,2009-01-07
10,11,EM,False,,Ovidiu,V,Cracium,,0,,"<IndividualSurvey xmlns=""http://schemas.micros...",D2CC2577-EF6B-4408-BD8C-747337FE5645,2010-11-28
11,12,EM,False,,Thierry,B,D'Hers,,2,,"<IndividualSurvey xmlns=""http://schemas.micros...",FA263C7F-600D-4E89-8DCD-0978F3530F5F,2007-12-04
