## Data sourcing

Source data from various source systems and ingest them using python code.

1. Parquet files
2. CSV files
3. APIs
4. RDBMS databases
5. HTML

In [1]:
# import modules
import certifi
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sqlite3
import urllib3
from urllib3 import request
from unicodedata import normalize

In [2]:
!pwd

/Users/vicky/Documents/python-etl-example/Building-ETL-Pipelines-with-Python/Chapters/chapter_04


### Sourcing Parquet data

Please visit the url https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [3]:
# Read data from the Parquet file. We use pandas read_parquet method for ease and speed.
df_parquet = pd.read_parquet("./data/yellow_tripdata_2022-01.parquet")
df_parquet.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0


### Sourcing CSV data 

Please visit the url https://data.cityofnewyork.us/resource/h9gi-nx95.csv?$limit=500


In [4]:
# Read data from the CSV file. We use pandas read_csv method for ease and speed.
df_csv = pd.read_csv("./data/h9gi-nx95.csv")
df_csv.head()

Unnamed: 0,crash_date,crash_time,borough,zip_code,latitude,longitude,location,on_street_name,off_street_name,cross_street_name,...,contributing_factor_vehicle_2,contributing_factor_vehicle_3,contributing_factor_vehicle_4,contributing_factor_vehicle_5,collision_id,vehicle_type_code1,vehicle_type_code2,vehicle_type_code_3,vehicle_type_code_4,vehicle_type_code_5
0,2022-07-20T00:00:00.000,1:25,,,40.835808,-73.89083,"\n, \n(40.835808, -73.89083)",BOSTON ROAD,,,...,Unspecified,,,,4547589,Sedan,Sedan,,,
1,2022-07-21T00:00:00.000,5:20,,,,,,FDR DRIVE,,,...,Unspecified,,,,4548075,Sedan,Sedan,,,
2,2021-04-14T00:00:00.000,5:32,,,,,,BRONX WHITESTONE BRIDGE,,,...,Unspecified,,,,4407480,Sedan,Sedan,,,
3,2021-04-13T00:00:00.000,21:35,BROOKLYN,11217.0,40.68358,-73.97617,"(40.68358, -73.97617)",,,620 ATLANTIC AVENUE,...,,,,,4407147,Sedan,,,,
4,2021-04-15T00:00:00.000,16:15,,,,,,HUTCHINSON RIVER PARKWAY,,,...,,,,,4407665,Station Wagon/Sport Utility Vehicle,,,,


### Sourcing data from APIs

Please make sure to install the certifi library using - pipenv install certifi

In [5]:
!pip install requests



In [6]:
# get api data from url
url = 'https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit=500'
import requests

response = requests.get(url)
# Check if API is available to retrive the data
#apt_status = http.request('GET', url).status
print(response.status_code)
if response.status_code == 200:
    # Sometimes we get certificate error . We shoul never silence this error as this may cause a securirty threat.
    # Create a Pool manager that can be used to read the API response 
    http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
    data = json.loads(http.request('GET', url).data.decode('utf-8'))
    df_api = pd.json_normalize(data)
else:
    df_api = pd.Dataframe()
df_api.head(10)

200


Unnamed: 0,crash_date,crash_time,on_street_name,off_street_name,number_of_persons_injured,number_of_persons_killed,number_of_pedestrians_injured,number_of_pedestrians_killed,number_of_cyclist_injured,number_of_cyclist_killed,...,cross_street_name,location.latitude,location.longitude,location.human_address,contributing_factor_vehicle_3,vehicle_type_code_3,contributing_factor_vehicle_4,vehicle_type_code_4,contributing_factor_vehicle_5,vehicle_type_code_5
0,2021-09-11T00:00:00.000,2:39,WHITESTONE EXPRESSWAY,20 AVENUE,2,0,0,0,0,0,...,,,,,,,,,,
1,2022-03-26T00:00:00.000,11:45,QUEENSBORO BRIDGE UPPER,,1,0,0,0,0,0,...,,,,,,,,,,
2,2022-06-29T00:00:00.000,6:55,THROGS NECK BRIDGE,,0,0,0,0,0,0,...,,,,,,,,,,
3,2021-09-11T00:00:00.000,9:35,,,0,0,0,0,0,0,...,1211 LORING AVENUE,40.667202,-73.8665,"{""address"": """", ""city"": """", ""state"": """", ""zip""...",,,,,,
4,2021-12-14T00:00:00.000,8:13,SARATOGA AVENUE,DECATUR STREET,0,0,0,0,0,0,...,,40.683304,-73.917274,"{""address"": """", ""city"": """", ""state"": """", ""zip""...",,,,,,
5,2021-04-14T00:00:00.000,12:47,MAJOR DEEGAN EXPRESSWAY RAMP,,0,0,0,0,0,0,...,,,,,,,,,,
6,2021-12-14T00:00:00.000,17:05,BROOKLYN QUEENS EXPRESSWAY,,0,0,0,0,0,0,...,,40.709183,-73.956825,"{""address"": """", ""city"": """", ""state"": """", ""zip""...",,,,,,
7,2021-12-14T00:00:00.000,8:17,,,2,0,0,0,0,0,...,344 BAYCHESTER AVENUE,40.86816,-73.83148,"{""address"": """", ""city"": """", ""state"": """", ""zip""...",,,,,,
8,2021-12-14T00:00:00.000,21:10,,,0,0,0,0,0,0,...,2047 PITKIN AVENUE,40.67172,-73.8971,"{""address"": """", ""city"": """", ""state"": """", ""zip""...",,,,,,
9,2021-12-14T00:00:00.000,14:58,3 AVENUE,EAST 43 STREET,0,0,0,0,0,0,...,,40.75144,-73.97397,"{""address"": """", ""city"": """", ""state"": """", ""zip""...",,,,,,


In [7]:
!pwd

/Users/vicky/Documents/python-etl-example/Building-ETL-Pipelines-with-Python/Chapters/chapter_04


### Sourcing Data from RDBMS tables

In [8]:
# Read sqlite query results into a pandas DataFrame
with sqlite3.connect("./data/movies.sqlite") as conn:
    df = pd.read_sql("SELECT * from movies", conn)
df.head()

Unnamed: 0,id,original_title,budget,popularity,release_date,revenue,title,vote_average,vote_count,overview,tagline,uid,director_id
0,43597,Avatar,237000000,150,2009-12-10,2787965087,Avatar,7.2,11800,"In the 22nd century, a paraplegic Marine is di...",Enter the World of Pandora.,19995,4762
1,43598,Pirates of the Caribbean: At World's End,300000000,139,2007-05-19,961000000,Pirates of the Caribbean: At World's End,6.9,4500,"Captain Barbossa, long believed to be dead, ha...","At the end of the world, the adventure begins.",285,4763
2,43599,Spectre,245000000,107,2015-10-26,880674609,Spectre,6.3,4466,A cryptic message from Bond’s past sends him o...,A Plan No One Escapes,206647,4764
3,43600,The Dark Knight Rises,250000000,112,2012-07-16,1084939099,The Dark Knight Rises,7.6,9106,Following the death of District Attorney Harve...,The Legend Ends,49026,4765
4,43601,John Carter,260000000,43,2012-03-07,284139100,John Carter,6.1,2124,"John Carter is a war-weary, former military ca...","Lost in our world, found in another.",49529,4766


# Sourcing data from Webpages

Please visit the url https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)

In [9]:
# get data from url
df_html = pd.read_html('https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)',match = 'by country')
# Let's see how many tables are there with tage ' by county'
print(len(df_html)) # There are 4 tables
# Let's see the first table
df_html[0]

4


Unnamed: 0_level_0,Country/Territory,UN region,IMF[1][13],IMF[1][13],World Bank[14],World Bank[14],United Nations[15],United Nations[15]
Unnamed: 0_level_1,Country/Territory,UN region,Forecast,Year,Estimate,Year,Estimate,Year
0,World,—,104476432,2023,100562011,2022,96698005,2021
1,United States,Americas,26949643,2023,25462700,2022,23315081,2021
2,China,Asia,17700899,[n 1]2023,17963171,[n 3]2022,17734131,[n 1]2021
3,Germany,Europe,4429838,2023,4072192,2022,4259935,2021
4,Japan,Asia,4230862,2023,4231141,2022,4940878,2021
...,...,...,...,...,...,...,...,...
209,Palau,Oceania,267,2023,—,—,218,2021
210,Kiribati,Oceania,246,2023,223,2022,227,2021
211,Nauru,Oceania,150,2023,151,2022,155,2021
212,Montserrat,Americas,—,—,—,—,72,2021


In [16]:
# Make sure below packages have been installed 
# pip install pyarrow
# pip install certifi

import urllib3
from urllib3 import request
import certifi
import json
import sqlite3
import pandas as pd
import logging

import logging

# define top level module logger
logger = logging.getLogger(__name__)

def source_data_from_parquet(parquet_file_name):
    try:
        df_parquet = pd.read_parquet(parquet_file_name)
        logger.info(f'{parquet_file_name} : extracted {df_parquet.shape[0]} records from the parguet file')
    except Exception as e:
        logger.exception( f'{parquet_file_name} : - exception {e} encountered while extracting the parguet file')
        df_parquet = pd.DataFrame()
    return df_parquet


def source_data_from_csv(csv_file_name):
    try:
        df_csv = pd.read_csv(csv_file_name)
        logger.info(f'{csv_file_name} : extracted {df_csv.shape[0]} records from the csv file')
    except Exception as e:
        logger.exception(f'{csv_file_name} : - exception {e} encountered while extracting the csv_file_name file')
        df_csv = pd.DataFrame()
    return df_csv

def source_data_from_api(api_endpoint):
    try:
        # Check if API is available to retrive the data
        # Sometimes we get certificate error . We should never silence this error as this may cause a securirty threat.
        # Create a Pool manager that can be used to read the API response 
        http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
        api_response = http.request('GET', api_endpoint)
        apt_status = api_response.status
        if apt_status == 200:
            logger.info(f'{apt_status} - ok : while invoking the api {api_endpoint}')
            data = json.loads(api_response.data.decode('utf-8'))
            df_api = pd.json_normalize(data)
            logger.info(f'{apt_status}- extracted {df_api.shape[0]} records from the csv file')
        else:
            logger.error(f'{apt_status}- error : while invoking the api {api_endpoint}')
            df_api = pd.Dataframe()
    except Exception as e:
        logger.exception(f'{apt_status} : - exception {e} encountered while reading data from the api')
        df_api = pd.DataFrame()
    return df_api

def source_data_from_table(db_name, table_name):
    try:
        # Read sqlite query results into a pandas DataFrame
        with sqlite3.connect(db_name) as conn:
            df_table = pd.read_sql(f"SELECT * from {table_name}", conn)
            logger.info(f'{db_name}- read {df_table.shape[0]} records from the table: {table_name}')
    except Exception as e:
        logger.exception(f'{db_name} : - exception {e} encountered while reading data from the table: {table_name}')                 
        df_table = pd.DataFrame()
    return df_table


def source_data_from_webpage(web_page_url,matching_keyword):
    try:
        # Read webpage table into a pandas DataFrame
        df_html = pd.read_html(web_page_url,match = matching_keyword)
        df_html = df_html[0]
        logger.info(f'{web_page_url}- read {df_html.shape[0]} records from the page: {web_page_url}')
    except Exception as e:
        logger.exception(f'{web_page_url} : - exception {e} encountered while reading data from the page: {web_page_url}')
        df_html = pd.DataFrame()
    return df_html

def extracted_data():
        parquet_file_name = "./data/yellow_tripdata_2022-01.parquet"
        csv_file_name = "./data/h9gi-nx95.csv"
        api_endpoint = "https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit=500"
        db_name = "./data/movies.sqlite"
        table_name = "movies"
        web_page_url = "https://en.wikipedia.org/wiki/List_of_countries_by_GDP_(nominal)"
        matching_keyword = "by country"

        # Extract data from all source systems
        # Now these dataframes are available for loading data into eithter VSA table, PSA table or to be consumed in 
        # transfromation pipeline.

        df_parquet,df_csv,df_api,df_table,df_html = (source_data_from_parquet(parquet_file_name),
                                                    source_data_from_csv(csv_file_name),
                                                    source_data_from_api(api_endpoint),
                                                    source_data_from_table(db_name, table_name),
                                                    source_data_from_webpage(web_page_url,matching_keyword))
        return df_parquet,df_csv,df_api,df_table,df_html


In [11]:
!pwd

/Users/vicky/Documents/python-etl-example/Building-ETL-Pipelines-with-Python/Chapters/chapter_04


In [17]:
from extraction import extraction_functional_enhanced

In [18]:
df_parquit,_,_,_,_ = extraction_functional_enhanced.extracted_data()

yellow_tripdata_2022-01.parquet : - exception [Errno 2] No such file or directory: 'yellow_tripdata_2022-01.parquet' encountered while extracting the parguet file
Traceback (most recent call last):
  File "/Users/vicky/Documents/python-etl-example/Building-ETL-Pipelines-with-Python/Chapters/chapter_04/extraction/extraction_functional_enhanced.py", line 20, in source_data_from_parquet
    df_parquet = pd.read_parquet(parquet_file_name)
  File "/Users/vicky/anaconda3/envs/py39/lib/python3.9/site-packages/pandas/io/parquet.py", line 670, in read_parquet
    return impl.read(
  File "/Users/vicky/anaconda3/envs/py39/lib/python3.9/site-packages/pandas/io/parquet.py", line 265, in read
    path_or_handle, handles, filesystem = _get_path_or_handle(
  File "/Users/vicky/anaconda3/envs/py39/lib/python3.9/site-packages/pandas/io/parquet.py", line 139, in _get_path_or_handle
    handles = get_handle(
  File "/Users/vicky/anaconda3/envs/py39/lib/python3.9/site-packages/pandas/io/common.py", line 8

In [14]:
def source_data_from_parquet(parquet_file_name):
    try:
        df_parquet = pd.read_parquet(parquet_file_name)
        logger.info(f'{parquet_file_name} : extracted {df_parquet.shape[0]} records from the parguet file')
    except Exception as e:
        logger.exception( f'{parquet_file_name} : - exception {e} encountered while extracting the parguet file')
        df_parquet = pd.DataFrame()
    return df_parquet

df = source_data_from_parquet("./data/yellow_tripdata_2022-01.parquet")

In [15]:
df.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,1,2022-01-01 00:35:40,2022-01-01 00:53:29,2.0,3.8,1.0,N,142,236,1,14.5,3.0,0.5,3.65,0.0,0.3,21.95,2.5,0.0
1,1,2022-01-01 00:33:43,2022-01-01 00:42:07,1.0,2.1,1.0,N,236,42,1,8.0,0.5,0.5,4.0,0.0,0.3,13.3,0.0,0.0
2,2,2022-01-01 00:53:21,2022-01-01 01:02:19,1.0,0.97,1.0,N,166,166,1,7.5,0.5,0.5,1.76,0.0,0.3,10.56,0.0,0.0
3,2,2022-01-01 00:25:21,2022-01-01 00:35:23,1.0,1.09,1.0,N,114,68,2,8.0,0.5,0.5,0.0,0.0,0.3,11.8,2.5,0.0
4,2,2022-01-01 00:36:48,2022-01-01 01:14:20,1.0,4.3,1.0,N,68,163,1,23.5,0.5,0.5,3.0,0.0,0.3,30.3,2.5,0.0
