### Type 1 SCD (Overwriting):
- Behavior: The old data is overwritten with the new data. No history is kept.
- Use Case: When the data changes are minor or non-critical, and you don't need to track historical changes.

Example: If a customer's address changes, the new address simply replaces the old one in the table.

#

#### Connecting to SQL Server

We will be using SQLAlchemy to connect to Sql Server

In [2]:
import pandas as pd
import sqlalchemy as sal

In [3]:
# removed sql server details for security purpose
connection_string = ('mssql://<sql server _name>\SQLEXPRESS/<database_name>?driver=ODBC+DRIVER+17+FOR+SQL+SERVER')
engine = sal.create_engine(connection_string)
conn=engine.connect()

#

In [110]:
# Create a target table in sql server name produts
    # create table products(
    #     product_id int,
    #     product_name varchar(100),
    #     price double
    # )

# Source file name products is there with 2 records for the first time
    # product_id,product_name,price
    # 1,'HP Pro Notebook',80000
    # 2,'Iphpone 15',70000

# Source file will comes with new/updated record for the 2nd time
    # product_id,product_name,price
    # 1,'HP Pro Notebook',95000
    # 3,'Lenovo Thinkpad',50000
  # The new records will be inserted and updated records values will be updated in the target table

#

#### Functions to Implement Type 1 SCD

In [4]:
# function for to extract the data
def extract():
    products_df = pd.read_csv('products.csv')
    products_db_df = pd.read_sql_query('select * from products', conn)
    return products_df, products_db_df

#

In [10]:
# function to transform the data
def transform(products_df, products_db_df):
    # perform left join the on products_df and products_db_df on product_id
    joined_df = pd.merge(products_df, products_db_df, how='left', on='product_id')
    
    # get the records where product_name_y is null to identify new records coming from source
    insert_df = joined_df[joined_df['product_name_y'].isna()]
    # get the rows and columns names as per the target table schema
    insert_final_df = insert_df.iloc[:, 0:3]
    # rename the columns
    insert_final_df.columns = products_db_df.columns

    # get the records where product_name_y is not null to identify updated records from the source
    updates_df = joined_df[joined_df['product_name_y'].notna()]
    # get the rows and columns names as per the target table schema
    updates_final_df = updates_df.iloc[:, 0:3]
    # rename the columns
    updates_final_df.columns = products_db_df.columns
    
    return insert_final_df,updates_final_df
    

#

In [6]:
# function to staged the data before loading into target table
def load_staging(updates_final_df):
    updates_final_df.to_sql('products_stg', con=conn, index=False, if_exists='replace')
    conn.commit()

#

In [7]:
# function to load the data in sql server target table 
def insert(df_insert_final):
    df_insert_final.to_sql('products', con=conn, index=False, if_exists='append')
    conn.commit()

#

In [13]:
# function to update the target table for updated records
def updates():
    query = sal.text('update products set price = products_stg.price, product_name=products_stg.product_name from products_stg where products.product_id=products_stg.product_id')
    p = conn.execute(query)
    # print("printitng p2:", p)
    conn.commit()

#

#### calling the functions

In [None]:
# calling the functions
products_df, products_db_df = extract()

insert_final_df, updates_final_df = transform(products_df, products_db_df)

load_staging(updates_final_df)

insert(insert_final_df)

updates()