In [None]:
!pip install pandas python-dotenv
!pip install cx_Oracle sqlalchemy

In [1]:
from dotenv import load_dotenv
load_dotenv('.env')

import os
hostname = os.getenv('hostname')
port = os.getenv('port')
sid = os.getenv('sid')
# service_name = os.getenv('service_name')
username = os.getenv('username')
password = os.getenv('password')

In [2]:
import pandas as pd 
import time

In [3]:
# data source : https://www.kaggle.com/datasets/vipin20/transaction-data?resource=download
df = pd.read_csv('./transaction_data.csv')
# fix format time 
df["TransactionTime"] = pd.to_datetime(df['TransactionTime'])
df = df.sort_values(by='TransactionTime', ascending=True)

  df["TransactionTime"] = pd.to_datetime(df['TransactionTime'])


In [4]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 1083818 entries, 538323 to 242563
Data columns (total 8 columns):
 #   Column                  Non-Null Count    Dtype         
---  ------                  --------------    -----         
 0   UserId                  1083818 non-null  int64         
 1   TransactionId           1083818 non-null  int64         
 2   TransactionTime         1083818 non-null  datetime64[ns]
 3   ItemCode                1083818 non-null  int64         
 4   ItemDescription         1080910 non-null  object        
 5   NumberOfItemsPurchased  1083818 non-null  int64         
 6   CostPerItem             1083818 non-null  float64       
 7   Country                 1083818 non-null  object        
dtypes: datetime64[ns](1), float64(1), int64(4), object(2)
memory usage: 74.4+ MB


In [5]:
from sqlalchemy import create_engine
import cx_Oracle

cx_Oracle.init_oracle_client(lib_dir=r"./instantclient_21_14")
# Set up the Oracle connection
dsn = cx_Oracle.makedsn(host=hostname, port=port, sid=sid) # result: (DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST=localhost)(PORT=1521))(CONNECT_DATA=(SERVICE_NAME=ORCLPDB1)))

connection = cx_Oracle.connect(user=username, password=password, dsn=dsn)

# Create an SQLAlchemy engine
engine = create_engine(f'oracle+cx_oracle://{username}:{password}@{dsn}')

DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libclntsh.so: cannot open shared object file: No such file or directory". See https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html for help

In [None]:
engine



In [None]:
# print("Test connection query")
pd.read_sql_query("SELECT * FROM C##MYUSER.BPM_EMPLOYEES", engine).head(1)

In [None]:
from sqlalchemy.dialects.oracle import (
                                        FLOAT,
                                        NUMBER,
                                        VARCHAR2,
                                        DATE
                                        )

dtype = {"UserId" : NUMBER,
         "TransactionId" : NUMBER,      
         "TransactionTime": DATE,
         "ItemCode": NUMBER,
         "ItemDescription": VARCHAR2(255),
         "NumberOfItemsPurchased": NUMBER,
         "CostPerItem": FLOAT,
         "Country": VARCHAR2(124),
        }

In [None]:
# # Define the table name and schema
# table_name = 'test_table'
# schema_name = 'C##KEVIN'

Update : 
- Import data transaksi was down,
- make looping for insert data oracle, for testing cdc with delay 10 second

In [None]:
schema_name = 'C##MYUSER'
table_name = "TRANSACTIONS"

In [None]:
# runing query in oracle
from sqlalchemy import create_engine, Table, MetaData, Column, Integer, String, Float, Date

# Initialize metadata object
metadata = MetaData()

# Define the table
table = Table(
   table_name, metadata, 
   Column('UserId', Integer), 
   Column('TransactionId', Integer),
   Column('TransactionTime', Date),
   Column('ItemCode', Integer),
   Column('ItemDescription', String(255)),
   Column('NumberOfItemsPurchased', Integer),
   Column('CostPerItem', Float),
   Column('Country', String(124)),
   schema=schema_name
)

# Create the table
metadata.create_all(engine)

Grant that table was create for make able to CDC to kafka

In [None]:
from sqlalchemy import text

# Connect to the database
connection = engine.connect()

# Define your SQL queries
sql_query1 = text(f"GRANT SELECT ON {schema_name}.{table_name} TO c##dbzuser")
sql_query2 = text(f"ALTER TABLE {schema_name}.{table_name} ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS")

# Execute the queries
connection.execute(sql_query1)
print(f"Permissions granted successfully on {schema_name}.{table_name} to c##dbzuser")

connection.execute(sql_query2)
print(f"Table {schema_name}.{table_name} altered successfully")


In [None]:
# Export the DataFrame to the Oracle database

# Connect to the database
connection = engine.connect()

print("Import bulk data to Oracle database")
df.to_sql(table_name, connection, schema=schema_name, if_exists='append', index=False, dtype=dtype)


In [None]:

with engine.connect() as connection:
    for i, row in df.iterrows():
        transaction = connection.begin()  # Begin transaction for each row
        try:
            row_df = pd.DataFrame(row).T  # Transform the row into a DataFrame
            lower_case_table_name = table_name.lower()  # Convert table name to lower case
            row_df.to_sql(lower_case_table_name, connection, schema=schema_name, if_exists='append', index=False, dtype=dtype)
            print(f"Inserted row {i + 1} into {lower_case_table_name}")
            transaction.commit()  # Commit the transaction for each row
            time.sleep(2)  # Wait for 1 second
        except Exception as e:
            transaction.rollback()  # Rollback the transaction if an exception occurs
            print("Error occurred:", e)


In [None]:
# Close the connection  
connection.close()