In [27]:
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy import text

# Connection details for source and target databases
server_source = 'MENNAS-LAPTOP\\MSSQLSERVER22'
database_source = 'AdventureWorks2019'
username_source = 'sa'
password_source = 'sa123456'
driver = 'ODBC Driver 17 for SQL Server'

server_target = 'MENNAS-LAPTOP\\MSSQLSERVER22'
database_target = 'HR'
username_target = 'sa'
password_target = 'sa123456'


# Create engine for source and target databases
connection_string_source = (
    f'mssql+pyodbc://{username_source}:{password_source}@{server_source}/{database_source}'
    f'?driver={driver.replace(" ", "+")}'
)
engine_source = create_engine(connection_string_source, echo=False)

connection_string_target = (
    f'mssql+pyodbc://{username_target}:{password_target}@{server_target}/{database_target}'
    f'?driver={driver.replace(" ", "+")}'
)
engine_target = create_engine(connection_string_target, echo=False)


## Working

In [50]:
import pandas as pd
from sqlalchemy import create_engine

# Step 1: Get the maximum Timestamp from the HR dimension table in both databases

# Query to get the maximum Timestamp from the HR EmployeeDepartmentHistory table in AdventureWorks2019
query_max_timestamp_aw_EmployeeDepartmentHistory = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory]
"""

# Query to get the maximum Timestamp from the HR Employee table in AdventureWorks2019
query_max_timestamp_aw_Employee = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[Employee]
"""

# Query to get the maximum Timestamp from the HR dimension table in the HR database
query_max_timestamp_hr = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [dimension].[EmployeeDepartmentHistory]
"""

try:
    # Execute the query for AdventureWorks2019 EmployeeDepartmentHistory table
    max_timestamp_aw_edh = pd.read_sql(query_max_timestamp_aw_EmployeeDepartmentHistory, engine_source)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from AdventureWorks2019 EmployeeDepartmentHistory: {max_timestamp_aw_edh}")
    
    # Execute the query for AdventureWorks2019 Employee table
    max_timestamp_aw_emp = pd.read_sql(query_max_timestamp_aw_Employee, engine_source)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from AdventureWorks2019 Employee: {max_timestamp_aw_emp}")
    
    # Execute the query for HR database EmployeeDepartmentHistory table
    max_timestamp_hr = pd.read_sql(query_max_timestamp_hr, engine_target)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from HR database EmployeeDepartmentHistory: {max_timestamp_hr}")
    
except Exception as e:
    print(f"Error retrieving maximum Timestamps: {e}")



Maximum Timestamp from AdventureWorks2019 EmployeeDepartmentHistory: 248014
Maximum Timestamp from AdventureWorks2019 Employee: 188930
Maximum Timestamp from HR database EmployeeDepartmentHistory: 151136


In [51]:
from sqlalchemy.orm import sessionmaker

session = sessionmaker(engine_target)
with session.begin() as sess:
    select_stmt = f"""
select * from  [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory]
where [BusinessEntityID] = 200
    """
#     print(select_stmt)
    df = pd.read_sql(select_stmt, engine_target)
df

Unnamed: 0,BusinessEntityID,DepartmentID,ShiftID,StartDate,EndDate,ModifiedDate,Timestamp
0,200,1,3,2009-01-17,,2010-01-01,b'\x00\x00\x00\x00\x00\x03\xc8\xce'


In [52]:
session = sessionmaker(engine_target)
with session.begin() as sess:
    update_statement = f"""
            UPDATE [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory] 
            SET [ShiftID] = 1, 
                [EndDate] = null, 
                [ModifiedDate] = '2010-01-01'
            WHERE [BusinessEntityID] = 200;
    """
    print(update_statement)
    sess.execute(text(update_statement))


            UPDATE [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory] 
            SET [ShiftID] = 1, 
                [EndDate] = null, 
                [ModifiedDate] = '2010-01-01'
            WHERE [BusinessEntityID] = 200;
    


In [53]:
from sqlalchemy.orm import sessionmaker

session = sessionmaker(engine_target)
with session.begin() as sess:
    select_stmt = f"""
select * from  [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory]
where [BusinessEntityID] = 200
    """
#     print(select_stmt)
    df = pd.read_sql(select_stmt, engine_target)
df

Unnamed: 0,BusinessEntityID,DepartmentID,ShiftID,StartDate,EndDate,ModifiedDate,Timestamp
0,200,1,1,2009-01-17,,2010-01-01,b'\x00\x00\x00\x00\x00\x03\xc8\xcf'


In [54]:
import pandas as pd
from sqlalchemy import create_engine

# Step 3: Extract new records from HumanResources.Employee in AdventureWorks2019
query_new_records_emp = f"""
SELECT BusinessEntityID, JobTitle, CAST([Timestamp] AS BIGINT) AS Timestamp
FROM HumanResources.Employee
WHERE CAST([Timestamp] AS BIGINT) > {max_timestamp_aw_emp}
"""

# Step 2: Extract new records from HumanResources.EmployeeDepartmentHistory in AdventureWorks2019
query_new_records_edh = f"""
SELECT BusinessEntityID, ShiftID, StartDate, EndDate, ModifiedDate, CAST([Timestamp] AS BIGINT) AS Timestamp
FROM HumanResources.EmployeeDepartmentHistory
WHERE CAST([Timestamp] AS BIGINT) > {max_timestamp_aw_edh}
"""
# Execute the query to get new records from EmployeeDepartmentHistory
df_new_records_edh = pd.read_sql(query_new_records_edh, engine_source)
print("New records from EmployeeDepartmentHistory loaded into DataFrame:")
print(df_new_records_edh.head().to_markdown(tablefmt='simple'))
    
# Execute the query to get new records from Employee
df_new_records_emp = pd.read_sql(query_new_records_emp, engine_source)
print("New records from Employee loaded into DataFrame:")
print(df_new_records_emp.head().to_markdown(tablefmt='simple'))



New records from EmployeeDepartmentHistory loaded into DataFrame:
      BusinessEntityID    ShiftID  StartDate    EndDate    ModifiedDate           Timestamp
--  ------------------  ---------  -----------  ---------  -------------------  -----------
 0                 200          1  2009-01-17              2010-01-01 00:00:00       248015
New records from Employee loaded into DataFrame:
BusinessEntityID    JobTitle    Timestamp
------------------  ----------  -----------


In [55]:
import pandas as pd
from sqlalchemy import create_engine

# Retrieve the existing records from the target table
query_existing_records = """
SELECT BusinessEntityID, ShiftID, StartDate, EndDate, ModifiedDate, Timestamp, IsCurrent
FROM dimension.EmployeeDepartmentHistory
"""
try:
    df_existing_records = pd.read_sql(query_existing_records, engine_target)
    print("Existing records loaded into DataFrame:")
    print(df_existing_records.head().to_markdown(tablefmt='simple'))
except Exception as e:
    print(f"Error loading existing records: {e}")



Existing records loaded into DataFrame:
      BusinessEntityID    ShiftID  StartDate    EndDate     ModifiedDate                Timestamp    IsCurrent
--  ------------------  ---------  -----------  ----------  --------------------------  -----------  -----------
 0                   1          2  2009-01-14   2024-08-17  2024-08-17 10:36:30.557000       M-              True
 1                   1          2  2024-08-12   2024-08-17  2024-08-17 10:36:30.557000       M.              True
 2                   1          2  2024-08-12   2024-08-17  2024-08-17 10:36:30.557000       M/              True
 3                   2          1  2008-01-31               2008-01-30 00:00:00              M0              True
 4                   2          2  2020-08-12               2024-08-12 13:38:03.163000       M1              True


In [56]:
# Retrieve new records from the source table
query_new_records_edh = f"""
SELECT BusinessEntityID, ShiftID, StartDate, EndDate, ModifiedDate, CAST([Timestamp] AS BIGINT) AS Timestamp
FROM HumanResources.EmployeeDepartmentHistory
WHERE CAST([Timestamp] AS BIGINT) > {max_timestamp_aw_edh}
"""
try:
    df_new_records_edh = pd.read_sql(query_new_records_edh, engine_source)
    print("New records loaded into DataFrame:")
    print(df_new_records_edh.head().to_markdown())
except Exception as e:
    print(f"Error loading new records: {e}")

New records loaded into DataFrame:
|    |   BusinessEntityID |   ShiftID | StartDate   | EndDate   | ModifiedDate        |   Timestamp |
|---:|-------------------:|----------:|:------------|:----------|:--------------------|------------:|
|  0 |                200 |         1 | 2009-01-17  |           | 2010-01-01 00:00:00 |      248015 |


In [57]:
import pandas as pd
from sqlalchemy import create_engine

# Extract JobTitle and BusinessEntityID from Employee table
query_employee = """
SELECT BusinessEntityID, JobTitle
FROM HumanResources.Employee
"""
try:
    df_employee = pd.read_sql(query_employee, engine_source)
    print("Employee data loaded into DataFrame:")
    print(df_employee.head().to_markdown(tablefmt='simple'))
except Exception as e:
    print(f"Error loading employee data: {e}")


Employee data loaded into DataFrame:
      BusinessEntityID  JobTitle
--  ------------------  -----------------------------
 0                   1  Chief Executive Officer
 1                   2  Vice President of Engineering
 2                   3  Engineering Manager
 3                   4  Senior Tool Designer
 4                   5  Design Engineer


In [58]:
# Extract EmployeeDepartmentHistory data
query_history = """
SELECT BusinessEntityID, ShiftID, StartDate, EndDate, ModifiedDate
FROM HumanResources.EmployeeDepartmentHistory
"""
try:
    df_history = pd.read_sql(query_history, engine_source)
    print("EmployeeDepartmentHistory data loaded into DataFrame:")
    print(df_history.head().to_markdown(tablefmt='simple'))
except Exception as e:
    print(f"Error loading employee department history data: {e}")


EmployeeDepartmentHistory data loaded into DataFrame:
      BusinessEntityID    ShiftID  StartDate    EndDate     ModifiedDate
--  ------------------  ---------  -----------  ----------  --------------------------
 0                   1          2  2009-01-14   2024-08-17  2024-08-17 10:36:30.557000
 1                   1          2  2024-08-12   2024-08-17  2024-08-17 10:36:30.557000
 2                   1          2  2024-08-12   2024-08-17  2024-08-17 10:36:30.557000
 3                   2          1  2008-01-31               2008-01-30 00:00:00
 4                   2          2  2020-08-12               2024-08-12 13:38:03.163000


In [59]:
# Merge DataFrames on BusinessEntityID
df_merged = pd.merge(df_new_records_edh, df_employee, on='BusinessEntityID', how='left')

print("Merged DataFrame:")
print(df_merged.head().to_markdown(tablefmt='simple'))

Merged DataFrame:
      BusinessEntityID    ShiftID  StartDate    EndDate    ModifiedDate           Timestamp  JobTitle
--  ------------------  ---------  -----------  ---------  -------------------  -----------  ----------------------------
 0                 200          1  2009-01-17              2010-01-01 00:00:00       248015  Production Technician - WC40


## SCD2

In [60]:

import datetime;
 
# ct stores current time
ct = datetime.datetime.now()
now = ct.strftime('%Y-%m-%d %H:%M:%S')
business_entity_id =200
# Convert new records to a list of dictionaries
update_records = df_new_records_edh.to_dict(orient='records')
print(update_records)
# Update existing records
session = sessionmaker(engine_target) # 
with session.begin() as sess: #     for record in update_records:
    update_query = f"""UPDATE [dimension].[EmployeeDepartmentHistory]
            SET [IsCurrent] = 0,
                [scd_enddate] = '{now}'
            WHERE [BusinessEntityID] = {business_entity_id}
              AND [IsCurrent] = 1
              AND ([scd_enddate] IS NULL);
            """
    sess.execute(text(update_query))


print(update_query)

[{'BusinessEntityID': 200, 'ShiftID': 1, 'StartDate': datetime.date(2009, 1, 17), 'EndDate': None, 'ModifiedDate': Timestamp('2010-01-01 00:00:00'), 'Timestamp': 248015}]
UPDATE [dimension].[EmployeeDepartmentHistory]
            SET [IsCurrent] = 0,
                [scd_enddate] = '2024-08-31 12:17:32'
            WHERE [BusinessEntityID] = 200
              AND [IsCurrent] = 1
              AND ([scd_enddate] IS NULL);
            


In [61]:
import datetime;
 
# ct stores current time
ct = datetime.datetime.now()
now = ct.strftime('%Y-%m-%d %H:%M:%S')
business_entity_id =200
# Convert new records to a list of dictionaries
update_records = df_new_records_edh.to_dict(orient='records')
print(update_records)
# Update existing records
with engine_target.connect() as connection:
    for record in update_records:
        update_query = f"""UPDATE [dimension].[EmployeeDepartmentHistory]
                SET [IsCurrent] = 0,
                    [scd_enddate] = '{now}'
                WHERE [BusinessEntityID] = {business_entity_id}
                  AND [IsCurrent] = 1
                  AND ([scd_enddate] IS NULL);
                """
        connection.execute(text(update_query))
        #                    , {
        #     'start_date': now,
        #     'business_entity_id': record['BusinessEntityID']
        # }
        

print("Existing records updated successfully.")

# Ensure the `Timestamp` column is not included in the DataFrame
df_merged_no_timestamp = df_merged.drop(columns=['Timestamp'], errors='ignore')

# Set `IsCurrent` to 1 for all new records
df_merged_no_timestamp['IsCurrent'] = 1

# Insert new records
target_schema = 'dimension'
target_table = 'EmployeeDepartmentHistory'

df_merged_no_timestamp.to_sql(target_table, con=engine_target, schema=target_schema, if_exists='append', index=False, chunksize=1000)
print("New records successfully inserted into dimension.EmployeeDepartmentHistory with IsCurrent = 1")


[{'BusinessEntityID': 200, 'ShiftID': 1, 'StartDate': datetime.date(2009, 1, 17), 'EndDate': None, 'ModifiedDate': Timestamp('2010-01-01 00:00:00'), 'Timestamp': 248015}]
Existing records updated successfully.
New records successfully inserted into dimension.EmployeeDepartmentHistory with IsCurrent = 1


In [62]:

# Ensure the `Timestamp` column is not included in the DataFrame
df_merged_no_timestamp = df_merged.drop(columns=['Timestamp'], errors='ignore')

# Set `IsCurrent` to 1 for all new records
df_merged_no_timestamp['IsCurrent'] = 1

# Insert new records
target_schema = 'dimension'
target_table = 'EmployeeDepartmentHistory'

df_merged_no_timestamp.to_sql(target_table, con=engine_target, schema=target_schema, if_exists='append', index=False, chunksize=1000)
print("New records successfully inserted into dimension.EmployeeDepartmentHistory with IsCurrent = 1")


New records successfully inserted into dimension.EmployeeDepartmentHistory with IsCurrent = 1


## SCD3

In [63]:
delta_update = [i['BusinessEntityID'] for i in update_records_source]
existing = df_existing_records[df_existing_records['BusinessEntityID'].isin(delta_update)]
update_records_dest = existing.to_dict(orient='records')
update_records_dest[0]['ShiftID']

3

In [64]:
update_records_dest

[{'BusinessEntityID': 200,
  'ShiftID': 3,
  'StartDate': datetime.date(2009, 1, 17),
  'EndDate': None,
  'ModifiedDate': Timestamp('2010-01-01 00:00:00'),
  'Timestamp': b'\x00\x00\x00\x00\x00\x02N]',
  'IsCurrent': False},
 {'BusinessEntityID': 200,
  'ShiftID': 3,
  'StartDate': datetime.date(2009, 1, 17),
  'EndDate': None,
  'ModifiedDate': Timestamp('2010-01-01 00:00:00'),
  'Timestamp': b'\x00\x00\x00\x00\x00\x02N`',
  'IsCurrent': True}]

In [65]:
business_entity_id =200
# Convert new records to a list of dictionaries
update_records_source = df_new_records_edh.to_dict(orient='records')
print(update_records_source)
update_query = f"""UPDATE [dimension].[EmployeeDepartmentHistory] SET ALT_shiftid = {update_records_dest[0]['ShiftID']},[shiftid] = {update_records_source[0]['ShiftID']}
        WHERE [BusinessEntityID] = {business_entity_id}"""
update_query

[{'BusinessEntityID': 200, 'ShiftID': 1, 'StartDate': datetime.date(2009, 1, 17), 'EndDate': None, 'ModifiedDate': Timestamp('2010-01-01 00:00:00'), 'Timestamp': 248015}]


'UPDATE [dimension].[EmployeeDepartmentHistory] SET ALT_shiftid = 3,[shiftid] = 1\n        WHERE [BusinessEntityID] = 200'

In [66]:
import datetime;
 
# ct stores current time
# ct = datetime.datetime.now()
# now = ct.strftime('%Y-%m-%d %H:%M:%S')
business_entity_id =200
# Convert new records to a list of dictionaries
update_records_source = df_new_records_edh.to_dict(orient='records')
print(update_records_source)
# Update existing records
with engine_target.connect() as connection:
    for record in update_records_source:
        # update_query = text("""
        # UPDATE [dimension].[EmployeeDepartmentHistory]
        # SET [IsCurrent] = 0,
        #     [EndDate] = :start_date
        # WHERE [BusinessEntityID] = :business_entity_id
        #   AND [IsCurrent] = 1
        #   AND ([EndDate] IS NULL OR [EndDate] > :start_date);
        # """)

        with session.begin() as sess: #     for record in update_records:
            update_query = f"""UPDATE [dimension].[EmployeeDepartmentHistory] SET ALT_shiftid = {update_records_dest[0]['ShiftID']},[shiftid] = {update_records_source[0]['ShiftID']}
                    WHERE [BusinessEntityID] = {business_entity_id}"""
            sess.execute(text(update_query))


        #                    , {
        #     'start_date': now,
        #     'business_entity_id': record['BusinessEntityID']
        # }
        

print("Existing records updated successfully.")




[{'BusinessEntityID': 200, 'ShiftID': 1, 'StartDate': datetime.date(2009, 1, 17), 'EndDate': None, 'ModifiedDate': Timestamp('2010-01-01 00:00:00'), 'Timestamp': 248015}]
Existing records updated successfully.


In [67]:
# Ensure the `Timestamp` column is not included in the DataFrame
df_merged_no_timestamp = df_merged.drop(columns=['Timestamp'], errors='ignore')

# Set `IsCurrent` to 1 for all new records
df_merged_no_timestamp['IsCurrent'] = 1

# Insert new records
target_schema = 'dimension'
target_table = 'EmployeeDepartmentHistory'

df_merged_no_timestamp.to_sql(target_table, con=engine_target, schema=target_schema, if_exists='append', index=False, chunksize=1000)
print("New records successfully inserted into dimension.EmployeeDepartmentHistory with IsCurrent = 1")

New records successfully inserted into dimension.EmployeeDepartmentHistory with IsCurrent = 1


In [68]:
import pandas as pd
from sqlalchemy import create_engine

# Step 1: Get the maximum Timestamp from the HR dimension table in both databases

# Query to get the maximum Timestamp from the HR EmployeeDepartmentHistory table in AdventureWorks2019
query_max_timestamp_aw_EmployeeDepartmentHistory = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory]
"""

# Query to get the maximum Timestamp from the HR Employee table in AdventureWorks2019
query_max_timestamp_aw_Employee = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[Employee]
"""

# Query to get the maximum Timestamp from the HR dimension table in the HR database
query_max_timestamp_hr = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [dimension].[EmployeeDepartmentHistory]
"""

try:
    # Execute the query for AdventureWorks2019 EmployeeDepartmentHistory table
    max_timestamp_aw_edh = pd.read_sql(query_max_timestamp_aw_EmployeeDepartmentHistory, engine_source)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from AdventureWorks2019 EmployeeDepartmentHistory: {max_timestamp_aw_edh}")
    
    # Execute the query for AdventureWorks2019 Employee table
    max_timestamp_aw_emp = pd.read_sql(query_max_timestamp_aw_Employee, engine_source)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from AdventureWorks2019 Employee: {max_timestamp_aw_emp}")
    
    # Execute the query for HR database EmployeeDepartmentHistory table
    max_timestamp_hr = pd.read_sql(query_max_timestamp_hr, engine_target)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from HR database EmployeeDepartmentHistory: {max_timestamp_hr}")
    
except Exception as e:
    print(f"Error retrieving maximum Timestamps: {e}")


Maximum Timestamp from AdventureWorks2019 EmployeeDepartmentHistory: 248015
Maximum Timestamp from AdventureWorks2019 Employee: 188930
Maximum Timestamp from HR database EmployeeDepartmentHistory: 151144


## TEST


In [47]:
import pandas as pd
from sqlalchemy import create_engine

# Step 1: Get the maximum Timestamp from the HR dimension table in both databases

# Query to get the maximum Timestamp from the HR EmployeeDepartmentHistory table in AdventureWorks2019
query_max_timestamp_aw_EmployeeDepartmentHistory = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory]
"""

# Query to get the maximum Timestamp from the HR Employee table in AdventureWorks2019
query_max_timestamp_aw_Employee = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[Employee]
"""

# Query to get the maximum Timestamp from the HR dimension table in the HR database
query_max_timestamp_hr = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [dimension].[EmployeeDepartmentHistory]
"""

try:
    # Execute the query for AdventureWorks2019 EmployeeDepartmentHistory table
    max_timestamp_aw_edh = pd.read_sql(query_max_timestamp_aw_EmployeeDepartmentHistory, engine_source)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from AdventureWorks2019 EmployeeDepartmentHistory: {max_timestamp_aw_edh}")
    
    # Execute the query for AdventureWorks2019 Employee table
    max_timestamp_aw_emp = pd.read_sql(query_max_timestamp_aw_Employee, engine_source)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from AdventureWorks2019 Employee: {max_timestamp_aw_emp}")
    
    # Execute the query for HR database EmployeeDepartmentHistory table
    max_timestamp_hr = pd.read_sql(query_max_timestamp_hr, engine_target)['MaxTimestamp'].iloc[0]
    print(f"Maximum Timestamp from HR database EmployeeDepartmentHistory: {max_timestamp_hr}")
    
except Exception as e:
    print(f"Error retrieving maximum Timestamps: {e}")



Maximum Timestamp from AdventureWorks2019 EmployeeDepartmentHistory: 248014
Maximum Timestamp from AdventureWorks2019 Employee: 188930
Maximum Timestamp from HR database EmployeeDepartmentHistory: 151136


In [48]:
import pandas as pd
from sqlalchemy import create_engine, text



# Step 2: Extract new records from AdventureWorks2019 tables
query_new_records_emp = f"""
SELECT BusinessEntityID, JobTitle, CAST([Timestamp] AS BIGINT) AS Timestamp
FROM HumanResources.Employee
WHERE CAST([Timestamp] AS BIGINT) > {max_timestamp_aw_emp}
"""

query_new_records_edh = f"""
SELECT BusinessEntityID, ShiftID, StartDate, EndDate, ModifiedDate, CAST([Timestamp] AS BIGINT) AS Timestamp
FROM HumanResources.EmployeeDepartmentHistory
WHERE CAST([Timestamp] AS BIGINT) > {max_timestamp_aw_edh}
"""

try:
    df_new_records_emp = pd.read_sql(query_new_records_emp, engine_source)
    print("New records from Employee loaded into DataFrame:")
    print(df_new_records_emp.head())

    df_new_records_edh = pd.read_sql(query_new_records_edh, engine_source)
    print("New records from EmployeeDepartmentHistory loaded into DataFrame:")
    print(df_new_records_edh.head())

except Exception as e:
    print(f"Error loading new records: {e}")

# Step 3: Retrieve existing records from the target table
query_existing_records = """
SELECT BusinessEntityID, ShiftID, StartDate, EndDate, ModifiedDate, Timestamp, IsCurrent
FROM dimension.EmployeeDepartmentHistory
"""

try:
    df_existing_records = pd.read_sql(query_existing_records, engine_target)
    print("Existing records loaded into DataFrame:")
    print(df_existing_records.head())

except Exception as e:
    print(f"Error loading existing records: {e}")

# Step 4: Extract JobTitle and BusinessEntityID from Employee table
query_employee = """
SELECT BusinessEntityID, JobTitle
FROM HumanResources.Employee
"""

try:
    df_employee = pd.read_sql(query_employee, engine_source)
    print("Employee data loaded into DataFrame:")
    print(df_employee.head())

except Exception as e:
    print(f"Error loading employee data: {e}")

# Step 5: Merge DataFrames on BusinessEntityID
df_merged = pd.merge(df_new_records_edh, df_employee, on='BusinessEntityID', how='left')
print("Merged DataFrame:")
print(df_merged.head())

##############################################################################################################################



New records from Employee loaded into DataFrame:
Empty DataFrame
Columns: [BusinessEntityID, JobTitle, Timestamp]
Index: []
New records from EmployeeDepartmentHistory loaded into DataFrame:
Empty DataFrame
Columns: [BusinessEntityID, ShiftID, StartDate, EndDate, ModifiedDate, Timestamp]
Index: []
Existing records loaded into DataFrame:
   BusinessEntityID  ShiftID   StartDate     EndDate            ModifiedDate  \
0                 1        2  2009-01-14  2024-08-17 2024-08-17 10:36:30.557   
1                 1        2  2024-08-12  2024-08-17 2024-08-17 10:36:30.557   
2                 1        2  2024-08-12  2024-08-17 2024-08-17 10:36:30.557   
3                 2        1  2008-01-31        None 2008-01-30 00:00:00.000   
4                 2        2  2020-08-12        None 2024-08-12 13:38:03.163   

                       Timestamp  IsCurrent  
0  b'\x00\x00\x00\x00\x00\x02M-'       True  
1  b'\x00\x00\x00\x00\x00\x02M.'       True  
2  b'\x00\x00\x00\x00\x00\x02M/'       True

In [49]:
# Step 1: Get the maximum Timestamp from the HR dimension table in both databases
query_max_timestamp_aw_edh = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[EmployeeDepartmentHistory]
"""

query_max_timestamp_aw_emp = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [AdventureWorks2019].[HumanResources].[Employee]
"""

query_max_timestamp_hr = """
SELECT CONVERT(INT, MAX([Timestamp])) AS MaxTimestamp
FROM [dimension].[EmployeeDepartmentHistory]
"""