In [1]:
import os
import json
import sqlalchemy
import pandas
from faker import Faker
import uuid
import random
import sys
import typing
import numpy as np

In [2]:
sys.path.insert(0, '../src')

In [3]:
from source_sink.rdbms.rdbms_connector import *

In [4]:
%load_ext sql

In [5]:
%load_ext google.cloud.bigquery

# Generate data

In [6]:
test_list = ['something', 'awsome', None]
test2_list = [1, 2, 3, 100000, None]
test3_list = [1.212, 1.4444, None]

In [7]:
def generate_fake_records(records_num: int) -> pandas.DataFrame:
    faker = Faker()
    list_records = list()
    for i in range(records_num):
        record = dict()
        record['id'] = str(uuid.uuid4())
        record['name'] = faker.name()
        record['address'] = faker.address()
        record['message'] = faker.sentence()
        record['email'] = faker.email()
        record['dob'] = faker.date_of_birth()
        record['test4'] = faker.sentence()
        record['created_at'] = faker.date_time()
        record['updated_at'] = faker.date_time()
        list_records.append(record)
    df = pandas.DataFrame(list_records)
    df['test'] = np.random.choice(test_list, size=len(df))
    df['test2'] = np.random.choice(test2_list, size=len(df))
    df['test3'] = np.random.choice(test3_list, size=len(df))
    df = df.convert_dtypes(dtype_backend='pyarrow')
    return df

In [8]:
def read_conn_info(conn_file: str) -> ConnectionInfo:
    conn_file = open(conn_file, 'r')
    conn_info = json.loads(conn_file.read())
    conn_file.close()
    return ConnectionInfo(**conn_info)

In [9]:
def insert_data(run_arg: dict) -> None:
    conn_info = run_arg['conn_info']
    table_name = run_arg['table_name']
    db_type = run_arg['db_type']
    records_num = run_arg['records_num']
    connector = db_type(conn_info=conn_info)
    conn = connector.get_database_connection()
    df = generate_fake_records(records_num)
    df.to_sql(table_name, conn, if_exists='append', index=False)
    connector.close_engine()

# Oracle

In [9]:
conn_info_oracle = read_conn_info('../secrets/oracle-connection.json')
conn_str_oracle = OracleConnector(conn_info=conn_info_oracle).conn_string

In [11]:
%%sql $conn_str_oracle
create table test.test_table (
    id varchar2(36) NOT NULL PRIMARY KEY,
    name varchar2(100),
    address varchar2(500),
    message varchar2(1000),
    email varchar2(100),
    test varchar2(100),
    dob date,
    test2 number,
    test3 float,
    test4 clob,
    created_at timestamp,
    updated_at timestamp
)

0 rows affected.


[]

In [19]:
inputs = [{
    "conn_info": conn_info_oracle,
    "table_name": "test_table",
    "db_type": OracleConnector,
    "records_num": 100000
} for l in range(10)]
for o in inputs:
    insert_data(o)
    print("100000 rows inserted")

# Postgres

In [13]:
conn_info_postgres = read_conn_info('../secrets/postgres-connection.json')
conn_str_postgres = PostgresConnector(conn_info=conn_info_postgres).conn_string

In [14]:
%%sql $conn_str_postgres
create table test_table (
    id char(36) NOT NULL PRIMARY KEY,
    name varchar(100),
    address varchar(500),
    message varchar(1000),
    email varchar(100),
    test varchar(100),
    dob date,
    test2 bigint,
    test3 float,
    test4 text,
    created_at timestamp,
    updated_at timestamp
)

Done.


[]

In [15]:
inputs = [{
    "conn_info": conn_info_postgres,
    "table_name": "test_table",
    "db_type": PostgresConnector,
    "records_num": 100000
} for l in range(10)]
for o in inputs:
    insert_data(o)
    print("100000 rows inserted")

100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted


# MySQL

In [21]:
conn_info_mysql = read_conn_info('../secrets/mysql-connection.json')
conn_str_mysql = MySQLConnector(conn_info=conn_info_mysql).conn_string

In [23]:
%%sql $conn_str_mysql
create table test_table (
    id char(36) NOT NULL PRIMARY KEY,
    name varchar(100),
    address varchar(500),
    message varchar(1000),
    email varchar(100),
    test varchar(100),
    dob date,
    test2 bigint,
    test3 float,
    test4 text,
    created_at timestamp,
    updated_at timestamp
)

0 rows affected.


[]

In [24]:
inputs = [{
    "conn_info": conn_info_mysql,
    "table_name": "test_table",
    "db_type": MySQLConnector,
    "records_num": 100000
} for l in range(10)]
for o in inputs:
    insert_data(o)
    print("100000 rows inserted")

100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted
100000 rows inserted


# BigQuery

In [None]:
%%bigquery
drop table `test.test_table`;