# Extraction, Transform, and Load


---
This exercise is similar to the one in lab and practice. Here we create a database of freshwater withdrawal by each country, and we use wikipedia as a data source. 

## Tasks:

 **Consider**:
 + https://en.wikipedia.org/wiki/List_of_countries_by_freshwater_withdrawal
 
In the cells below, 

 1. Define a table for information about freshwater widrawal by each country
 1. Describe some challenges you foresee with the data
 1. Review and modify code cells that pull down the data from the tables into a data frame
 1. Load the data into your database
 1. Test loaded data with SQL queries

### 1. Define Tables

In the following cell define the table for storing the data

In [1]:
import getpass
mypasswd = getpass.getpass()
username = 'rc25g'
host = 'pgsql.dsa.lan'
database = 'dsa_student'

········


In [2]:
# Then connects to the DB
from sqlalchemy.engine.url import URL
from sqlalchemy import create_engine

# SQLAlchemy Connection Parameters
postgres_db = {'drivername': 'postgres',
               'username': username,
               'password': mypasswd,
               'host': host,
               'database' :database}
engine = create_engine(URL(**postgres_db), echo=True)
del mypasswd

Now create a query string with above create statement and execute the query with a sqlalchemy engine.

In [3]:
query = """
DROP TABLE IF EXISTS freshwater_withdrawal;
CREATE TABLE rc25g.freshwater_withdrawal (
    country                    varchar(100), -- Character String, varied length
    rank                       INT NOT NULL, -- Integer
    total_withdrawal_k         INT NOT NULL,
    per_capita_withdrawal      INT NOT NULL,
    domestic_withdrawl_p       REAL NOT NULL,
    industrial_withdrawl_p     REAL NOT NULL,
    agricultrual_withdrawl_p   REAL NOT NULL,
    date_yr                    INT NOT NULL,
    CONSTRAINT pk_freshwater_withdrawal
     PRIMARY KEY (country)
)
"""

with engine.connect() as connection:
    res = connection.execute(query)
    print(res)

2020-12-07 22:26:37,741 INFO sqlalchemy.engine.base.Engine select version()
2020-12-07 22:26:37,742 INFO sqlalchemy.engine.base.Engine {}
2020-12-07 22:26:37,744 INFO sqlalchemy.engine.base.Engine select current_schema()
2020-12-07 22:26:37,745 INFO sqlalchemy.engine.base.Engine {}
2020-12-07 22:26:37,746 INFO sqlalchemy.engine.base.Engine SELECT CAST('test plain returns' AS VARCHAR(60)) AS anon_1
2020-12-07 22:26:37,747 INFO sqlalchemy.engine.base.Engine {}
2020-12-07 22:26:37,748 INFO sqlalchemy.engine.base.Engine SELECT CAST('test unicode returns' AS VARCHAR(60)) AS anon_1
2020-12-07 22:26:37,749 INFO sqlalchemy.engine.base.Engine {}
2020-12-07 22:26:37,749 INFO sqlalchemy.engine.base.Engine show standard_conforming_strings
2020-12-07 22:26:37,750 INFO sqlalchemy.engine.base.Engine {}
2020-12-07 22:26:37,751 INFO sqlalchemy.engine.base.Engine 
DROP TABLE IF EXISTS freshwater_withdrawal;
CREATE TABLE rc25g.freshwater_withdrawal (
    country                    varchar(100), -- Charac

### 2. Data Scrapping 

In [4]:
#import the library to query a website
import requests
# import Beautiful soup library to access functions to parse the data returned from the website
from bs4 import BeautifulSoup


In [5]:
# specify the url
url = "https://en.wikipedia.org/wiki/List_of_countries_by_freshwater_withdrawal"
# Open website URL and return the html to the variable 'response'
response = requests.get(url)
print(response.encoding)
print(response.status_code)

UTF-8
200


The response we get from web is typically html content. 
We can read the content of the server's response. 
Below, when a `BeautifulSoup` object is created from an html response, we explicitly reference the text format(`response.text`).

The default encoding format is 'UTF-8' as shown below. 

[Click here for additional documentations about the response object.](http://docs.python-requests.org/en/master/user/quickstart/#response-content)



In [6]:
# Parse the html in the 'response' variable, and store it in Beautiful Soup format
soup = BeautifulSoup(response.text, "html")

### Inspect the page source to determine how you need to extract the tables into its own soup object.


In [7]:
print(soup.prettify)

<bound method Tag.prettify of <!DOCTYPE html>

<html class="client-nojs" dir="ltr" lang="en">
<head>
<meta charset="utf-8"/>
<title>List of countries by freshwater withdrawal - Wikipedia</title>
<script>document.documentElement.className="client-js";RLCONF={"wgBreakFrames":!1,"wgSeparatorTransformTable":["",""],"wgDigitTransformTable":["",""],"wgDefaultDateFormat":"dmy","wgMonthNames":["","January","February","March","April","May","June","July","August","September","October","November","December"],"wgRequestId":"X871dgpAAL0AAA8rQ@UAAADY","wgCSPNonce":!1,"wgCanonicalNamespace":"","wgCanonicalSpecialPageName":!1,"wgNamespaceNumber":0,"wgPageName":"List_of_countries_by_freshwater_withdrawal","wgTitle":"List of countries by freshwater withdrawal","wgCurRevisionId":945992976,"wgRevisionId":945992976,"wgArticleId":18012399,"wgIsArticle":!0,"wgIsRedirect":!1,"wgAction":"view","wgUserName":null,"wgUserGroups":["*"],"wgCategories":["CS1 maint: archived copy as title","Articles with short descri

In [8]:
all_tables=soup.find_all('table')
# print(type(all_tables))
print(f"Num of avaiable tables = {len(all_tables)}")

# We can find all of the tables with wikitable class 
wikitables=soup.find_all('table', {'class':"wikitable"})
print(f"Num of avaiable wikitables = {len(wikitables)}")
right_table = wikitables[0]
# print(type(right_table))

Num of avaiable tables = 3
Num of avaiable wikitables = 1


#### Look at the first couple rows

In [9]:
first_two_rows = soup.find_all("tr")[0:2]

print("Header")
print("-"*30)
print(first_two_rows[0])

print("="*30)

print("First Data row")
print("-"*30)
print(first_two_rows[1]) 

Header
------------------------------
<tr><td class="mbox-image"><div style="width:52px"><img alt="" data-file-height="40" data-file-width="40" decoding="async" height="40" src="//upload.wikimedia.org/wikipedia/en/thumb/b/b4/Ambox_important.svg/40px-Ambox_important.svg.png" srcset="//upload.wikimedia.org/wikipedia/en/thumb/b/b4/Ambox_important.svg/60px-Ambox_important.svg.png 1.5x, //upload.wikimedia.org/wikipedia/en/thumb/b/b4/Ambox_important.svg/80px-Ambox_important.svg.png 2x" width="40"/></div></td><td class="mbox-text"><div class="mbox-text-span">This table may be more easily updated if the rank-order column (1,2,3) is <a href="/wiki/Help:Sorting#Removing_a_rank_column_(1,2,3)_from_a_table" title="Help:Sorting">removed</a> and a <b><a href="/wiki/Help:Sorting#Auto-ranking_or_adding_a_row_numbering_column_(1,2,3)_next_to_a_table" title="Help:Sorting">row number column</a></b> is added instead. <a href="/wiki/Help:Sorting#Initial_alphabetical_sort_versus_initial_sort_by_rank_order" 

#### TODOs: 

**Examining the HTML Table Header, and identify the columns. Feel free automate the extraction of header.**




In [11]:
def custom_is_number(txt):
    txt = txt.replace(",","").replace(".", "")  #replace , or . also check for other chars that should be removed
     if txt.isdigit():
          return True
      else:
          return False

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 5)

In [55]:
# We will use the locale library so we can use 
# atof and atoi to convert alphanumeric to float and integers, respectively.
import locale
locale.setlocale( locale.LC_ALL, 'en_US.UTF-8' ) 


rank=[]
country=[]
total_withdrawal_k=[]
per_capita_withdrawal=[]
domestic_withdrawl_p=[]
industrial_withdrawl_p=[]
agricultrual_withdrawl_p=[]
date_yr=[]

# skip first iteration as we dont need headers
for row in right_table.findAll("tr")[1:]: 
    # for each row, pull out the td elements.
    cells = row.findAll('td') # To store all other details

#     print(f"cells = {cells}")    
    
    this_rank = cells[0].find(text=True)
    print("Processing rank {}".format(this_rank))

    # If the rank is a number, we can convert it
    if (not this_rank.isnumeric()):
        print("Non-Ranked, skipping")
        continue
        
    rank.append(locale.atoi(this_rank))

    # for the country name, we need to find the name (text) in the Country Hyperlink (a)
    countr_cell = cells[1].find('a').find(text=True)
    if countr_cell and countr_cell != None:
        print(countr_cell)

    country.append(countr_cell)

    def custom_is_number(txt):
        txt = txt.replace(",","").replace(".", "").strip()  #replace , or . also check for other chars that should be removed
        if txt.isdigit():
            return True
        else:
            return False  

    # Adjust the the data from Text to numeric data types
    tw_cell = cells[2].find(text=True)
    if tw_cell and custom_is_number(tw_cell):
        total_withdrawal_k.append(locale.atof(tw_cell))
    else:
        total_withdrawal_k.append(None) 

    pw_cell = cells[2].find(text=True)
    if pw_cell and custom_is_number(pw_cell):
        per_capita_withdrawal.append(locale.atof(pw_cell))
    else:
        per_capita_withdrawal.append(None)

    dw_cell = cells[4].find(text=True)
    if dw_cell and custom_is_number(dw_cell):
        domestic_withdrawl_p.append(locale.atoi(dw_cell))
    else:
        domestic_withdrawl_p.append(None)     
    
    iw_cell = cells[5].find(text=True)
    if iw_cell and custom_is_number(iw_cell):
        industrial_withdrawl_p.append(locale.atof(iw_cell))
    else:
        industrial_withdrawl_p.append(None)

    aw_cell = cells[6].find(text=True)
    if aw_cell and custom_is_number(aw_cell):
        agricultrual_withdrawl_p.append(locale.atoi(aw_cell))
    else:
        agricultrual_withdrawl_p.append(None)  
    
    # Note, that this is to float because the vatican row has a non-int value
    date_text = cells[7].find(text=True)
    if date_text and custom_is_number(date_text):
            date_yr.append(locale.atoi(date_text))
    else:
            date_yr.append(None)


Processing rank 1
India
Processing rank 2
China
Processing rank 3
United States
Processing rank 4
Vietnam
Processing rank 5
Japan
Processing rank 6
Indonesia
Processing rank 7
Thailand
Processing rank 8
Uzbekistan
Processing rank 9
Mexico
Processing rank 10
Russia
Processing rank 11
Iran
Processing rank 12
Pakistan
Processing rank 13
Egypt
Processing rank 14
Brazil
Processing rank 15
Bangladesh
Processing rank 16
Canada
Processing rank 17
Italy
Processing rank 18
Iraq
Processing rank 19
Turkey
Processing rank 20
Germany
Processing rank 21
Sudan
Processing rank 22
Spain
Processing rank 23
Ukraine
Processing rank 24
Burma
Processing rank 25
Turkmenistan
Processing rank 26
Colombia
Processing rank 27
France
Processing rank 28
South Korea
Processing rank 29
Philippines
Processing rank 30
Australia
Processing rank 31
Kazakhstan
Processing rank 32
Hungary
Processing rank 33
Afghanistan
Processing rank 34
Syria
Processing rank 35
Peru
Processing rank 36
Saudi Arabia
Processing rank 37
Azerbai

##### Now that we have built all our columns, stack into a data frame!

In [56]:
import pandas as pd

df=pd.DataFrame({'country': country,
                'rank': rank,
                'total_withdrawal_k': total_withdrawal_k,
                'per_capita_withdrawal': per_capita_withdrawal,
                'domestic_withdrawl_p': domestic_withdrawl_p,
                'industrial_withdrawl_p': industrial_withdrawl_p,
                'agricultrual_withdrawl_p': agricultrual_withdrawl_p,
                'date_yr': date_yr
                })

### Check  column data types!
Does this match the data types we sketched out in the `CREATE TABLE` statement above?
If you need to adjust the definition, this would be the time.
Alternatively, we can adjust the columns using Pandas techniques.

In [57]:
df.dtypes

country                      object
rank                          int64
total_withdrawal_k          float64
per_capita_withdrawal       float64
domestic_withdrawl_p        float64
industrial_withdrawl_p      float64
agricultrual_withdrawl_p    float64
date_yr                     float64
dtype: object

Once we have our Panda data frame and the SQL table inline, we can load it into the database.

---

### 4. Load the data into your database using SQLAlchemy


In [58]:
df.to_sql('freshwater_withdrawal', # The table to load
          engine,             # The engine created above
          schema= username,   # The schema where the table lives, our pawprint
          if_exists='append', # If the table is found, it would keep loading the end of table.
          index=False,        # Ignore creating an index for the index col in the dataframe
          chunksize=20)       # Do 20 records from the data frame at a time

2020-12-07 23:51:12,903 INFO sqlalchemy.engine.base.Engine select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=%(schema)s and relname=%(name)s
2020-12-07 23:51:12,904 INFO sqlalchemy.engine.base.Engine {'schema': 'rc25g', 'name': 'freshwater_withdrawal'}
2020-12-07 23:51:12,911 INFO sqlalchemy.engine.base.Engine BEGIN (implicit)
2020-12-07 23:51:12,912 INFO sqlalchemy.engine.base.Engine INSERT INTO rc25g.freshwater_withdrawal (country, rank, total_withdrawal_k, per_capita_withdrawal, domestic_withdrawl_p, industrial_withdrawl_p, agricultrual_withdrawl_p, date_yr) VALUES (%(country)s, %(rank)s, %(total_withdrawal_k)s, %(per_capita_withdrawal)s, %(domestic_withdrawl_p)s, %(industrial_withdrawl_p)s, %(agricultrual_withdrawl_p)s, %(date_yr)s)
2020-12-07 23:51:12,913 INFO sqlalchemy.engine.base.Engine ({'country': 'India', 'rank': 1, 'total_withdrawal_k': 645.84, 'per_capita_withdrawal': 645.84, 'domestic_withdrawl_p': 8.0, 'industrial_withdrawl_p': 5.

2020-12-07 23:51:12,936 INFO sqlalchemy.engine.base.Engine INSERT INTO rc25g.freshwater_withdrawal (country, rank, total_withdrawal_k, per_capita_withdrawal, domestic_withdrawl_p, industrial_withdrawl_p, agricultrual_withdrawl_p, date_yr) VALUES (%(country)s, %(rank)s, %(total_withdrawal_k)s, %(per_capita_withdrawal)s, %(domestic_withdrawl_p)s, %(industrial_withdrawl_p)s, %(agricultrual_withdrawl_p)s, %(date_yr)s)
2020-12-07 23:51:12,936 INFO sqlalchemy.engine.base.Engine ({'country': 'Mali', 'rank': 61, 'total_withdrawal_k': 6.55, 'per_capita_withdrawal': 6.55, 'domestic_withdrawl_p': 9.0, 'industrial_withdrawl_p': 1.0, 'agricultrual_withdrawl_p': 90.0, 'date_yr': 2000.0}, {'country': 'Romania', 'rank': 62, 'total_withdrawal_k': 6.5, 'per_capita_withdrawal': 6.5, 'domestic_withdrawl_p': 9.0, 'industrial_withdrawl_p': 34.0, 'agricultrual_withdrawl_p': 57.0, 'date_yr': 2003.0}, {'country': 'Algeria', 'rank': 63, 'total_withdrawal_k': 6.07, 'per_capita_withdrawal': 6.07, 'domestic_withdr

IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "domestic_withdrawl_p" violates not-null constraint
DETAIL:  Failing row contains (Macedonia, 86, 2, 2, null, null, null, null).

[SQL: INSERT INTO rc25g.freshwater_withdrawal (country, rank, total_withdrawal_k, per_capita_withdrawal, domestic_withdrawl_p, industrial_withdrawl_p, agricultrual_withdrawl_p, date_yr) VALUES (%(country)s, %(rank)s, %(total_withdrawal_k)s, %(per_capita_withdrawal)s, %(domestic_withdrawl_p)s, %(industrial_withdrawl_p)s, %(agricultrual_withdrawl_p)s, %(date_yr)s)]
[parameters: ({'country': 'Switzerland', 'rank': 81, 'total_withdrawal_k': 2.52, 'per_capita_withdrawal': 2.52, 'domestic_withdrawl_p': 24.0, 'industrial_withdrawl_p': 74.0, 'agricultrual_withdrawl_p': 2.0, 'date_yr': 2002.0}, {'country': 'Sweden', 'rank': 82, 'total_withdrawal_k': 2.4, 'per_capita_withdrawal': 2.4, 'domestic_withdrawl_p': 23.0, 'industrial_withdrawl_p': 67.0, 'agricultrual_withdrawl_p': 10.0, 'date_yr': 1996.0}, {'country': 'Finland', 'rank': 83, 'total_withdrawal_k': 2.33, 'per_capita_withdrawal': 2.33, 'domestic_withdrawl_p': 14.0, 'industrial_withdrawl_p': 84.0, 'agricultrual_withdrawl_p': 3.0, 'date_yr': 1999.0}, {'country': 'Mongolia', 'rank': 84, 'total_withdrawal_k': 2.31, 'per_capita_withdrawal': 2.31, 'domestic_withdrawl_p': 10.0, 'industrial_withdrawl_p': 58.0, 'agricultrual_withdrawl_p': 33.0, 'date_yr': 2000.0}, {'country': 'United Arab Emirates', 'rank': 85, 'total_withdrawal_k': 2.3, 'per_capita_withdrawal': 2.3, 'domestic_withdrawl_p': 23.0, 'industrial_withdrawl_p': 9.0, 'agricultrual_withdrawl_p': 68.0, 'date_yr': 2000.0}, {'country': 'Macedonia', 'rank': 86, 'total_withdrawal_k': 2.27, 'per_capita_withdrawal': 2.27, 'domestic_withdrawl_p': None, 'industrial_withdrawl_p': None, 'agricultrual_withdrawl_p': None, 'date_yr': None}, {'country': 'Senegal', 'rank': 87, 'total_withdrawal_k': 2.22, 'per_capita_withdrawal': 2.22, 'domestic_withdrawl_p': 4.0, 'industrial_withdrawl_p': 3.0, 'agricultrual_withdrawl_p': 93.0, 'date_yr': 2002.0}, {'country': 'Niger', 'rank': 88, 'total_withdrawal_k': 2.18, 'per_capita_withdrawal': 2.18, 'domestic_withdrawl_p': 4.0, 'industrial_withdrawl_p': 0.0, 'agricultrual_withdrawl_p': 95.0, 'date_yr': 2000.0}  ... displaying 10 of 20 total bound parameter sets ...  {'country': 'Bolivia', 'rank': 99, 'total_withdrawal_k': 1.44, 'per_capita_withdrawal': 1.44, 'domestic_withdrawl_p': 13.0, 'industrial_withdrawl_p': 7.0, 'agricultrual_withdrawl_p': 81.0, 'date_yr': 2000.0}, {'country': 'Estonia', 'rank': 100, 'total_withdrawal_k': 1.41, 'per_capita_withdrawal': 1.41, 'domestic_withdrawl_p': 56.0, 'industrial_withdrawl_p': 39.0, 'agricultrual_withdrawl_p': 5.0, 'date_yr': 2002.0})]
(Background on this error at: http://sqlalche.me/e/gkpj)

### 5. Test loaded data with SQL queries


#### TODO: Run the SQL in your database to verify the data was loaded.



In [59]:
with engine.connect() as connection:
    res = connection.execute("select * from freshwater_withdrawal limit 2")
    for row in res:
        print(row)

2020-12-07 23:51:16,821 INFO sqlalchemy.engine.base.Engine select * from freshwater_withdrawal limit 2
2020-12-07 23:51:16,822 INFO sqlalchemy.engine.base.Engine {}



### 6. Now that the data is loaded, let's pull it back out and store it to a dataframe!





In [60]:
df_backout = pd.read_sql_table(
    'freshwater_withdrawal',
    con = engine,             # The engine created above
    schema= username   # The schema where the table lives, our pawprint
)

2020-12-07 23:51:18,903 INFO sqlalchemy.engine.base.Engine SELECT c.relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = %(schema)s AND c.relkind in ('r', 'p')
2020-12-07 23:51:18,904 INFO sqlalchemy.engine.base.Engine {'schema': 'rc25g'}
2020-12-07 23:51:18,915 INFO sqlalchemy.engine.base.Engine SELECT c.relname FROM pg_class c JOIN pg_namespace n ON n.oid = c.relnamespace WHERE n.nspname = %(schema)s AND c.relkind IN ('v', 'm')
2020-12-07 23:51:18,916 INFO sqlalchemy.engine.base.Engine {'schema': 'rc25g'}
2020-12-07 23:51:18,922 INFO sqlalchemy.engine.base.Engine 
            SELECT c.oid
            FROM pg_catalog.pg_class c
            LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
            WHERE (n.nspname = %(schema)s)
            AND c.relname = %(table_name)s AND c.relkind in
            ('r', 'v', 'm', 'f', 'p')
        
2020-12-07 23:51:18,922 INFO sqlalchemy.engine.base.Engine {'schema': 'rc25g', 'table_name': 'freshwater

# Save your notebook, then `File > Close and Halt`

---