ETL SAMPLE
Python code using SQLAlchemy as ORM to implement ETL process to load data from OLTP database (MySQL) into a staging area for OLAP transformation, and then into OLAP database. It also includes a scheduler to control the delta and updates only the modified data.

In [None]:
import datetime
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

# Define engine for OLTP database
oltp_engine = create_engine('mysql://username:password@hostname/database_name')

# Define engine for staging area
stage_engine = create_engine('mysql://username:password@hostname/stage_database_name')

# Define engine for OLAP database
olap_engine = create_engine('mysql://username:password@hostname/olap_database_name')

# Define metadata for tables
metadata = MetaData()

# Define base for ORM
Base = declarative_base()

# Define model for OLTP table
class OltpTable(Base):
    __table__ = Table('table_name', metadata,
                      Column('id', Integer, primary_key=True),
                      Column('name', String),
                      Column('modified_datetime', DateTime))

# Define model for OLAP table
class OlapTable(Base):
    __table__ = Table('table_name', metadata,
                      Column('id', Integer, primary_key=True),
                      Column('name', String),
                      Column('created_datetime', DateTime),
                      Column('modified_datetime', DateTime))

# Create sessions for OLTP and OLAP databases
Session_oltp = sessionmaker(bind=oltp_engine)
Session_olap = sessionmaker(bind=olap_engine)

# Get OLTP session
session_oltp = Session_oltp()

# Get the latest modified datetime from OLAP table
session_olap = Session_olap()
latest_modified_datetime = session_olap.query(OlapTable.modified_datetime).order_by(OlapTable.modified_datetime.desc()).first()[0]

# Get all the records from OLTP table modified after the latest modified datetime from OLAP table
records = session_oltp.query(OltpTable).filter(OltpTable.modified_datetime > latest_modified_datetime).all()

# Create staging session
Session_stage = sessionmaker(bind=stage_engine)
session_stage = Session_stage()

# Insert records into staging table
for record in records:
    session_stage.execute(Table('staging_table_name', metadata),
                          {'id': record.id, 'name': record.name, 'modified_datetime': record.modified_datetime})

# Commit staging session
session_stage.commit()

# Get OLTP session
session_oltp = Session_oltp()

# Get records from staging table
records = session_stage.query(Table('staging_table_name', metadata)).all()

# Create OLAP session
Session_olap = sessionmaker(bind=olap_engine)
session_olap = Session_olap()

# Insert or update records in OLAP table
for record in records:
    existing_record = session_olap.query(OlapTable).filter(OlapTable.id==record.id).first()
    if existing_record:
        existing_record.name = record.name
        existing_record.modified_datetime = datetime.datetime.now()
    else:
        session_olap.execute(Table('olap_table_name', metadata),
                              {'id': record.id, 'name': record.name, 'created_datetime': datetime.datetime.now(), 'modified_datetime': datetime.datetime.now()})

# Commit OLAP session
session_olap.commit()