In [None]:
import pymysql
import psycopg2

from faker import Faker
faker = Faker()


In [None]:
# WE DON'T WANT OUR TERMINAL DIRTIED WITH ERROR MESSAGES BUT STILL KEEP A GOOD WATCH ON THEM
# SO WE WRITE WITH BEST PRACTICES AT HEART, WE LOG
import logging 

logger =  logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger_handler = logging.FileHandler('./logs/using_jupyter.log')
logger_formatter = logging.Formatter('%(asctime)s -%(name)s %(levelname)s - %(message)s \n')
logger_handler.setFormatter(logger_formatter)
logger.addHandler(logger_handler)
logger.info('Logs is instatiated')

In [None]:
# COLLECTING OUR CREDENTIALS FROM `connection_profile.conf` WITH `configparser`
import configparser

parser = configparser.ConfigParser()
parser.read('./login_credentials/connection_profile.conf')

postgres_host = parser.get('postgres_connection', 'host')
postgres_port = parser.get('postgres_connection', 'port')
postgres_database = parser.get('postgres_connection', 'database')
postgres_user = parser.get('postgres_connection', 'user')
postgres_password = parser.get('postgres_connection', 'password')
postgres_table1 = parser.get('postgres_connection', 'table1')
postgres_table2 = parser.get('postgres_connection', 'table2')

mysql_host = parser.get('mysql_connection', 'host')
mysql_port = parser.get('mysql_connection', 'port')
mysql_database = parser.get('mysql_connection', 'database')
mysql_user = parser.get('mysql_connection', 'user')
mysql_password = parser.get('mysql_connection', 'password')
mysql_table = parser.get('mysql_connection', 'table')


In [None]:
# CONNECTIING TO OUR LOCAL MYSQL DATABASE WITH `pymysql`

try:
    conn = pymysql.connect(host=mysql_host,
                            port=int(mysql_port),
                            user=mysql_user,
                            password=mysql_password)                            
except Exception:
        logger.error('Error connecting to database', exc_info=True)

else:
    print('Connection successful')

In [None]:
# CREATING DATABASE

with conn.cursor() as cur:
    try:
        cur.execute(f'CREATE DATABASE IF NOT EXISTS {mysql_database}')
    except Exception:
        logger.error(f'Error creating database-- {mysql_database}', exc_info=True)
    else:
        conn.commit()
        print(f'Database-- {mysql_database} created successfully')

In [None]:
# USING CREATED DATABASE

with conn.cursor() as cur:
    try:
        cur.execute(f'USE {mysql_database}')
    except Exception:
        logger.error(f'Error using database-- {mysql_database}', exc_info=True)
    else:
        conn.commit()
        print(f'Connected to Databse-- {mysql_database} succesfully')

In [None]:
# DROPPING TABLE IF EXIST TO AVOID ERROR THROWBACK IF TABLE DOES NOT EXIST

query1 =f'DROP TABLE IF EXISTS {mysql_table};'
with conn.cursor() as cur:
    try:
        cur.execute(query1)
    except Exception as err:
        logger.error(f'Drop table-- {mysql_table}', exc_info=True)
        print(err)
    else:
        conn.commit()
        print(f'Dropping table-- {mysql_table} if exist')
        
# CREATING OUR TABLE SCHEMA

query2 = f'CREATE TABLE {mysql_table} (first_name text(50), last_name text(50), ssn varchar(30),\
                                        home_address varchar(250), crypto_owned text(50), wage text(20),\
                                        phone_number text(40), company_worked text(150), bank_account text(70),\
                                        occupation text(50), company_mail varchar(70), personal_email varchar(70),\
                                        birth_date varchar(20));'
with conn.cursor() as cur:
    try:
        cur.execute(query2)
    except Exception as err:
        logger.error(f'Error creating table-- {mysql_table}', exc_info=True)
        print(err)
    else:
        conn.commit()
        print(f'Creating table-- {mysql_table} successful')

In [None]:
# FAKER GENERATING OUR DATA

data = []
for i in range(500):
    data.append((faker.first_name(), faker.last_name(), faker.ssn(),\
                faker.address(), faker.cryptocurrency_name(), faker.pricetag(),\
                faker.phone_number(), faker.company(), faker.bban(), faker.job(),\
                faker.company_email(), faker.ascii_free_email(), faker.date()))
    i+= 1

data_for_db = tuple(data)

# INSERTING MUTIPLE ROWS WITH `executemany()`

query2 = f"INSERT INTO {mysql_table} VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
with conn.cursor() as cur:
    try:
        cur.executemany(query2, data_for_db)
    except Exception:
        logger.error('Error inserting into table-- {mysql_table}', exc_info=True)
    else:
        conn.commit()
        print(f'Inserting into table-- {mysql_table} successful')


In [None]:
# CLOSING OUR `conn`, THIS WAS HANDLED IN THE `.py` BY THE CONTEXT MANAGER

if conn is not None:
    try:
        conn.close()
        print('Connection closed')
    except:
        logger.error('Connection previously closed', exc_info=True)
        print('Connection previously closed')

In [None]:
# THE NEXT THING WOULD BE TO CREATE AND CONNECT TO OUR DESTINATION POSTGRES DATABASE
# THE CODE BELOW USING `psycopg2` AS OUR DATABASE ADAPTER IS SUPPOSE TO DO THE JOB BUT UNFORTUNATELY WOULD NOT WORK, HERE ARE THE VARIOUS REASONS:
# 1 -- POSTGRES/PSYCOPG2 DOES NOT ALLOW THE CREATION OF 'DATABASE' WITHIN A TRANSACTION BLOCK, WHICH MEANS A DATABSE IS REQUIRED BY DEFAULT WITH PSYCOPG2
#      trying would yield an error ::: "can't create database with a transcation block"
# 2-- POSTGRES DOES NOT ALLOW DDL COMMANDS LIKE CREATING 'DATABASE' EASILY, DOING IT WOULD REQUIRE US TO EDIT 
#     THE POSTGRES APPLICATION SETTING SECURITY CONFIGURATION (`pg_hba.conf` file), AND I REALLY DON'T WANT US MESSING WITH THAT
#      trying would yield an error ::: "requires a peer authentication"


# PLS LOOK UP THE INTERNET ON CREATING A DATABASE WITH POSTGRES EITHER WITH `psql` OR `pgAdmin`, IT A VERY SIMPLE PROCESS


# with psycopg2.connect(f'host={postgres_host} port={postgres_port} user={postgres_user} password={postgres_password}') as conn:
#     try:
#         with conn.cursor() as cur:
#             cur.execute(f'CREATE DATABASE {postgres_database}')
#             conn.commit()
#             cur.execute(f'USE {postgres_database}')
#             conn.commmit()
#     except:
#         logger.error('Error creating and using Postgres database--{postgres_databse}', exc_info=True)
#     else:
#         print(f'Creating database-- {postgres_database} succesfully')



In [None]:
# THIS MARK THE END OF 'SOURCING_AND_SETTING_UP_DB.py'. THE NEXT LINE IS CONCERNED WITH THE 'EtLT'/`ETL` OF OUR DATA REPLICATION PROCESS, I HOPE IT HAS BEEN CLEAR SO FAR.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

In [None]:
# THE JARS FILE ENABLES THE CONNECTION TO THE LOCAL INSTANCE RDBMS, CHECK THE TEXT FILE IN THE JAR FOLDER FOR THIER RESPECTIVE SOURCE
# NOTE!!!, THE CONFIGURATION WORKED BEACAUSE THIS PARTICULAR SCRIPT IS EXPECTED TO BE RUN ON A SPARK STANDALONE CLIENT(i.e YOUR LOCAL MACHINE)
# IN THE CASE OF MULTIPLE NODES, CONFIGURATION WOULD INCLUDE 'spark.extraClassPath.Driver' & 'spark.executor.Driver', PLS CHECK THE INTERNET FOR BETTER UNDERSTANDING ON THAT.
# AND FIGURING THIS OUT HAD IT OWN SUBTLETY, I MUST TELL YOU

spark = SparkSession.builder.config('spark.jars', './jars/mysql-connector-java-8.0.29.jar,./jars/postgresql-42.3.6.jar')\
        .appName('using_jupyter').enableHiveSupport().getOrCreate()

In [None]:
spark

In [None]:
 #  EXTRACTION 

 # EXTRANTING FROM OUR LOCAL MYSQL RDBMS USING THE LOAD-METHOD, THERE IS A JDBC-METHOD, VERY MUCH SIMILAR
try:
    jdbcDF = spark.read.format('jdbc')\
                        .option('url', f'jdbc:mysql://{mysql_host}:{mysql_port}/{mysql_database}')\
                        .option('driver', 'com.mysql.jdbc.Driver')\
                        .option('dbtable', f'{mysql_table}')\
                        .option('user', f'{mysql_user}')\
                        .option('password', f'{mysql_password}').load()
except:
    logger.error(f'Eror extracting from {mysql_database}.{mysql_table} with pyspark', exc_info=True)
else:
    print(f'Successfully extracted data from database.table-- {mysql_database}.{mysql_table}')

In [None]:
jdbcDF.show(5)
jdbcDF.printSchema()

In [None]:
# THIS IS TO WIDEN THE VIEW OF JUPYTER NOTEBOOK IN YOUR BROWSER

from IPython.display import display, HTML 
display(HTML("<style>.container { width:100% !important; }</style>"))

In [None]:
# TRANSFORMATION

# USING PYSQARK TO DROP COLUMN `SSN`, WE ARE CONCLUDING IT PRIVATE INFORMATION...smile (it fake data)

jdbcDF =  jdbcDF.drop('ssn')

jdbcDF.printSchema()
jdbcDF.show(5)

In [None]:
# WE'RE CHANGING THE COLUMN `DATE_OF_BIRTH` DATA TYPE FROM STRING TO DATE TYPE`

jdbcDF = jdbcDF.withColumn('birth_date', to_date(col('birth_date'), 'yyyy-MM-dd'))

jdbcDF.printSchema()

In [None]:
# LET GET DOWN WITH SQL
# CREATING A TEMP VIEW FROM OUR DATAFRAME

jdbcDF.createOrReplaceTempView('temp_table')

In [None]:
# SHOWING THE TEMP TABLE `TEMP_TABLE`

spark.sql("SELECT T.* FROM TEMP_TABLE T").show()

In [None]:
# BREAKING OUR TEMP TABLE TO PERSONAL AND BUSSINESS INFORMATION AND CHANGING COLUMN NAME

personal_info_tbl =  spark.sql("SELECT `first_name` AS `first name`, `last_name` AS `last name`, `birth_date` AS `date_of_birth`, `home_address`, `phone_number`,  `personal_email` FROM TEMP_TABLE")
business_info_tbl =  spark.sql("SELECT CONCAT(`first_name`,'   ', `last_name`) AS `full_name`, `occupation` AS `job`, `company_worked` AS `company`, `wage` AS `hourly/weekly_wage`, `crypto_owned`, `bank_account`, `company_mail` FROM TEMP_TABLE")

In [None]:
personal_info_tbl.show(5)

In [None]:
business_info_tbl.show(5)

In [None]:
# LOAD

# AT THE FINAL TASK, OUR TWO SEPARATE TABLE IS LOADED TO OUR POSTGRES DATABASE
try:
      personal_info_tbl.write.format('jdbc').mode('append')\
                                            .option('url', f'jdbc:postgresql://{postgres_host}:{postgres_port}/{postgres_database}')\
                                            .option('dbtable', f'public.{postgres_table1}')\
                                            .option('user', f'{postgres_user}').option('driver', 'org.postgresql.Driver')\
                                            .option('password', f'{postgres_password}').save()
except:
    logger.error(f'Eror loading to {postgres_database}.public.{postgres_table1} with pyspark', exc_info=True)
else:
    print(f'Successfully loaded data to databse.public.table-- {postgres_database}.public.{postgres_table1}')     

    # THE APPEND METHOD IS NOT REQUIRED FOR DATA REPLICATION, I JUST INCLUDED IT TO SHOW THERE IS SUCH, THE `.py` SCRIPTS WOULD HAVE IT                           

In [None]:
try:
     # THE IS THE `jdbc` METHOD, AN ALTERNATIVE TO THE `save` METHOD
                    business_info_tbl.write.jdbc(f'jdbc:postgresql://{postgres_host}:{postgres_port}/{postgres_database}', f'public.{postgres_table2}', 'append',\
                    properties={'user':f'{postgres_user}', 'password': f'{postgres_password}', 'driver': 'org.postgresql.Driver'})
except:
    logger.error(f'Eror loading to {postgres_database}.public.{postgres_table2} with pyspark', exc_info=True)
else:
    print(f'Successfully loaded data to databse.public.table-- {postgres_database}.public.{postgres_table2}')    

    # THE APPEND METHOD IS NOT REQUIRED FOR DATA REPLICATION, I JUST INCLUDED IT TO SHOW THERE IS SUCH, THE `.py` SCRIPTS WOULD HAVE IT     

In [None]:
# THIS WOULD HAVE WORTH THE WORK IF EVERY LINE OF CODE, DECISION AND REASIONING WAS OPEN AS BOOK TO YOU. IF ANY CORRECTION OR RECOMMENDATION, LET ME KNOW @ oluwatobitobias@gmail.com. THANKS