In [None]:
TOPIC: ETL and Data Integration
  1. Design an ETL process using a programming language (e.g., Python) to extract data from a source system (e.g., CSV files), transform it by applying certain business rules or calculations, and load it into a data warehouse.
  2. Implement the ETL process by writing code that performs the extraction, transformation, and loading steps.


Solution:-  Python ETL Example
ETL is the process of extracting a huge amount of data from a wide array of sources and formats and then converting & consolidating it into a single format before storing it in a database or writing it to a destination file.

In this example, some of the data is stored in CSV files while others are in JSON files. All of this data has to be consolidated into a single format and then stored in a unified file location.

Step 1: Import the modules and functions
In this ETL using Python example, first, you need to import the required modules and functions.


import glob 
import pandas as pd 
import xml.etree.ElementTree as ET 
from datetime import datetime
The dealership_data file contains CSV, JSON, and XML files for used car data. The features incorporated here are car_model, year_of_manufacture, price, and fuel. So you need to extract the file from the raw data, transform it into a target file, and then load it into the output.
Download the source file:


!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
Extracting the zip file:


nzip datasource.zip -d dealership_data
Setting the path for Target files:


tmpfile    = "dealership_temp.tmp"               # store all extracted data

logfile    = "dealership_logfile.txt"            # all event logs will be stored

targetfile = "dealership_transformed_data.csv"   # transformed data is stored
Step 2: Extract
The Extract function in this ETL using Python example is used to extract a huge amount of data in batches. This data is extracted from numerous sources.

CSV Extract Function:


def extract_from_csv(file_to_process): 
    dataframe = pd.read_csv(file_to_process) 
    return dataframe
JSON Extract Function


def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe
XML Extract Function


def extract_from_xml(file_to_process):

    dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])

    tree = ET.parse(file_to_process) 

    root = tree.getroot() 

    for person in root: 

        car_model = person.find("car_model").text 

        year_of_manufacture = int(person.find("year_of_manufacture").text)

        price = float(person.find("price").text) 

        fuel = person.find("fuel").text 

        dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True) 

        return dataframe
Calling Extract Function()


def extract():
       extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel']) 
    #for csv files
      for csvfile in glob.glob("dealership_data/*.csv"):
          extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
    #for json files
      for jsonfile in glob.glob("dealership_data/*.json"):
          extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
    #for xml files
      for xmlfile in glob.glob("dealership_data/*.xml"):
          extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)
      return extracted_data
Step 3: Transform
Using the transform function you can convert the data in any format as per your needs.


def transform(data):
       data['price'] = round(data.price, 2)
       return data
Step 4: Loading and Logging
In this step, the data is loaded to the destination file. A logging entry needs to be established before loading.

load function()


def load(targetfile,data_to_load):
    data_to_load.to_csv(targetfile)
log function()


def log(message):
    timestamp_format = '%H:%M:%S-%h-%d-%Y'
    #Hour-Minute-Second-MonthName-Day-Year
    now = datetime.now() # get current timestamp
    timestamp = now.strftime(timestamp_format)
    with open("dealership_logfile.txt","a") as f: f.write(timestamp + ',' + message + 'n')
Step 5: Running ETL Process
The log indicates that you have started the ETL process.


log("ETL Job Started")
The log indicates that you have started and ended the Extract phase.


log("Extract phase Started")
extracted_data = extract() 
log("Extract phase Ended")
The log indicates that you have started and ended the Transform phase.


log(“Transform phase Started”)
transformed_data = transform(extracted_data)
log("Transform phase Ended")
The log indicates that you have started and ended the Load phase.


log("Load phase Started")
load(targetfile,transformed_data)
log("Load phase Ended")
The log indicates that the ETL process has ended.


log("ETL Job Ended")




In [None]:
TOPIC: Performance Optimization and Querying
    1. Scenario: You need to improve the performance of your data loading process in the data warehouse. Write a Python script that implements the following optimizations:
Utilize batch processing techniques to load data in bulk instead of individual row insertion.
      b)  Implement multi-threading or multiprocessing to parallelize the data loading process.
      c)  Measure the time taken to load a specific amount of data before and after implementing these optimizations.

Solution:- Data acquisition is a large part of many data analytics projects and system development life cycles. 
    This article will show you how to write a simple Python program that uses the BULK INSERT utility to rapidly insert data from a CSV file into a SQL Server database table.

Why use this Approach?
There are many ways to load data from a CSV file into a SQL Server table. Here a few methods:

Run the BULK INSERT utility from the command line.
Run the BULK INSERT utility from SQL Server Management Studio (SSMS).
Use the SQL Server Management Studio (SSMS) Import Flat File wizard.
Write a program that opens the CSV file, reads its records one-by-one, and calls a SQL INSERT statement to insert the rows into a database table.
If there are so many ways to get data from a CSV into a SQL Server database, why write a Python program that calls the BULK INSERT utility to load it into a table? Here are some reasons why this might be a helpful approach:

The Python program could perform process steps before it executes BULK INSERT.
The Python program could ensure that it does not write duplicate data to the destination table.
The program could cleanse or transform the data following the BULK INSERT.
It could perform error-handling functions.
It could send notifications, by email or other methods, about its actions.

Creating the Database and Table
Important Note about SQL Server Versions
Microsoft introduced the ability to use BULK INSERT to insert data from CSV files in SQL Server 2017. So, you will need that version or newer to use this capability.

Create the Database
Here are the steps used to create a database called HR (for Human Resources):

Connect to SQL Server

Launch SSMS.
Connect to the database server. In this case, I used Windows authentication to connect to the locally-installed instance of SQL Server Express.


Create the HR Database

Expand the [+ Databases] node in Object Explorer. Right-click on [+ Database] and click on [New Database…].
In the New Database dialog box, enter “HR” into the Database name textbox. Leave all settings as is. Click on [OK] to create the database.

Verify that the HR database appears in Object Explorer. If not, right-click on Databases and click on [Refresh]. It should come into view.

Create the Table
At this point, the HR database will not contain any tables or other objects, such as stored procedures. While BULK INSERT can create tables when it runs, I have found that creating tables ahead of time offers advantages. For example, I can specify the table’s key column(s) and each column’s type and length. Looking again at the sample data in the CSV file, let’s create columns with the same names and with these data types:

ID — INT
Job Title — NCHAR(60)
Email Address — NCHAR(120)
FirstName LastName — NCHAR(80)
Since all rows will have values for all columns, set each column to NOT NULL. Also, since ID is a unique identifier for each row, select it as the key.

Follow these steps to create the table:

In Object Explorer, click on [+ HR] to view the selection within the database.
Right-click on [+ Tables] and click on [New] and then click on [Table…].
Enter the data as shown below. After the Column Name, Data Type, and Allow Nulls values have been entered, right-click on the ID column name and click on [Set Primary Key]. Setting ID as a key will ensure that only one row in the table can contain any ID value.


Click on the Save icon in the ribbon menu, and in the Choose Name dialog box, enter the name “Person.” Click on [OK] to save the table.
In Object Explorer, click on [+ Tables] to expand the node. Then, right-click on [- Tables] and click on [Refresh]. The Person table should now be in view.
Click on [+ dbo.Person] and then on [+ Columns] to examine the table’s structure. 

The Python Program
Now that the HR database and Person table exist let’s examine a simple Python program that uses the BULK INSERT utility. It simply inserts all records from the CSV file into the Person table.

Code Modules
This Python program consists of two modules or files:

c_bulk_insert.py contains the c_bulk_insert class. It includes functions to connect to the database and build and execute a BULK INSERT statement to insert data from a CSV file into a database table.
sql_server_bulk_insert.py simply instantiates the c_bulk_insert class and calls it with the information needed to do its work.
Code Logic
When the program instantiates class c_bulk_insert, it performs these steps:

Connect to the SQL Server database.
Construct the BULK INSERT query with the destination table’s name, input CSV file, and some settings.
Open a database cursor.
Execute the query.
Clean up: Commit the BULK INSERT transactions, close the cursor, and close the database connection.
The Code
The Python class c_bulk_insert in module c_bulk_insert.py performs the logic described in the Code Logic section above.

""" 
    Name:           c_bulk_insert.py
    Author:         Randy Runtsch
    Date:           March 17, 2021
    Description:    This module contains the c_bulk_insert class that connect to a SQL Server database
                    and executes the BULK INSERT utility to insert data from a CSV file into a table.
    Prerequisites:  1. Create the database data table.
                    2. Create the database update_csv_log table.
"""
import pyodbc
class c_bulk_insert:
def __init__(self, csv_file_nm, sql_server_nm, db_nm, db_table_nm):
# Connect to the database, perform the insert, and update the log table.
conn = self.connect_db(sql_server_nm, db_nm)
        self.insert_data(conn, csv_file_nm, db_table_nm)
        conn.close
def connect_db(self, sql_server_nm, db_nm):
# Connect to the server and database with Windows authentication.
conn_string = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + sql_server_nm + ';DATABASE=' + db_nm + ';Trusted_Connection=yes;'
        conn = pyodbc.connect(conn_string)
return conn
def insert_data(self, conn, csv_file_nm, db_table_nm):
# Insert the data from the CSV file into the database table.
# Assemble the BULK INSERT query. Be sure to skip the header row by specifying FIRSTROW = 2.
qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV', FIRSTROW = 2)"
# Execute the query
cursor = conn.cursor()
        success = cursor.execute(qry)
        conn.commit()
        cursor.close
The module sql_server_bulk_insert.py instantiates c_bulk_insert. It calls it with:

CSV file name
SQL Server instance engine name
Database name
Destination table name
""" 
    Name:           sql_server_bulk_insert.py
    Author:         Randy Runtsch
    Date:           March 17, 2021
    Description:    This program is the controller that uses the Microsoft Transact-SQL BULK INSERT
                    statement to quickly insert the rows from a CSV file into
                    a SQL Server table.
    Prerequisites:  1. Create the database data table.
                    2. Create the database update_csv_log table.
"""
from c_bulk_insert import c_bulk_insert
bulk_insert = c_bulk_insert(r'c:\\test_data\\person.csv', 'xxxxx-DESKTOP-\\SQLEXPRESS', 'HR', 'Person')
The Results
After the program runs, executing a SELECT query in SSMS shows that it wrote the records from the CSV file to the Person table.

Select Top(1000) [ID]
     ,[Job Title]
     ,[Email Address]
     ,[Firstname Lastname]
    From [HR].[dbo].[Person]


Where to Go from Here
There may be many reasons and ways to enhance the program. Here are a few ideas:

Add error handling to the database connection, query execution, and other parts of the program. The program could use error handling, for example, to gracefully shut down, retry a set number of times, and notify the appropriate parties by email.
Automate the program to insert data into the database when needed. For example, schedule the program to run periodically with Windows Task Scheduler to insert person records from new CSV files.
Create and write to a log table to capture the program start and finish times and other important events. Add other messages, such as error details, to identify issues to troubleshoot.
Conclusion
As you can see, using Python to call BULK INSERT is a way to automate part of a workflow to quickly insert data from a CSV file into a SQL Server database table. It might prove to be a handy technique to add to your data analytics or software development toolkit.



    

In [None]:
The Python class c_bulk_insert in module c_bulk_insert.py performs the logic described in the Code Logic section above.

""" 
    Name:           c_bulk_insert.py
    Author:         Randy Runtsch
    Date:           March 17, 2021
    Description:    This module contains the c_bulk_insert class that connect to a SQL Server database
                    and executes the BULK INSERT utility to insert data from a CSV file into a table.
    Prerequisites:  1. Create the database data table.
                    2. Create the database update_csv_log table.
"""
import pyodbc
class c_bulk_insert:
def __init__(self, csv_file_nm, sql_server_nm, db_nm, db_table_nm):
# Connect to the database, perform the insert, and update the log table.
conn = self.connect_db(sql_server_nm, db_nm)
        self.insert_data(conn, csv_file_nm, db_table_nm)
        conn.close
def connect_db(self, sql_server_nm, db_nm):
# Connect to the server and database with Windows authentication.
conn_string = 'DRIVER={ODBC Driver 17 for SQL Server};SERVER=' + sql_server_nm + ';DATABASE=' + db_nm + ';Trusted_Connection=yes;'
        conn = pyodbc.connect(conn_string)
return conn
def insert_data(self, conn, csv_file_nm, db_table_nm):
# Insert the data from the CSV file into the database table.
# Assemble the BULK INSERT query. Be sure to skip the header row by specifying FIRSTROW = 2.
qry = "BULK INSERT " + db_table_nm + " FROM '" + csv_file_nm + "' WITH (FORMAT = 'CSV', FIRSTROW = 2)"
# Execute the query
cursor = conn.cursor()
        success = cursor.execute(qry)
        conn.commit()
        cursor.close

#The module sql_server_bulk_insert.py instantiates c_bulk_insert. It calls it with:
#1)CSV file name
#2)SQL Server instance engine name
#3)Database name
#4)Destination table name
""" 
    Name:           sql_server_bulk_insert.py
    Author:         Randy Runtsch
    Date:           March 17, 2021
    Description:    This program is the controller that uses the Microsoft Transact-SQL BULK INSERT
                    statement to quickly insert the rows from a CSV file into
                    a SQL Server table.
    Prerequisites:  1. Create the database data table.
                    2. Create the database update_csv_log table.
"""