![](https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Ftse4.mm.bing.net%2Fth%3Fid%3DOIP.AGpR3V7Vrdn_i_PagIALbgHaEK%26pid%3DApi&f=1&ipt=ac559938bbcf188aa8ad6abfc1590344e468c12b00aa682d7084485d0b6a32c8&ipo=images)

### Environment settings

In [1]:
# Import libraries
import numpy as np
import pandas as pd
import polars as pl
import gspread
import duckdb
import sqlalchemy as db
import pyodbc
from oauth2client.service_account import ServiceAccountCredentials
from google.oauth2 import service_account
from google.cloud import bigquery
import connectorx as cx
import warnings
warnings.filterwarnings('ignore')

In [2]:
scopes = ['https://www.googleapis.com/auth/spreadsheets',
        'https://www.googleapis.com/auth/drive',
        'https://www.googleapis.com/auth/analytics.readonly']

# Read google credentials
api = 'creds.json'

# connect to google sheets
gs_credentials = ServiceAccountCredentials.from_json_keyfile_name(api, scopes)
gc = gspread.authorize(gs_credentials)

# connect to big query
bq_credentials = service_account.Credentials.from_service_account_file(api)
project_id = 'repository'
client = bigquery.Client(credentials=bq_credentials,project=project_id)

::: {#fig-etl}

![](https://external-content.duckduckgo.com/iu/?u=https%3A%2F%2Fblog.nextpathway.com%2Fhubfs%2FETL%2520Diagram.png&f=1&nofb=1&ipt=e80a931b60398e151ffc64ab62c4e279c26e7fccaa2e122858bbadd1f6377a61&ipo=images)

ETL Diagram
:::

## Extract Phase

The extraction phase entails retrieving data from your source. This may involve interacting with:

- Flat files
- Databases
- XML files
- APIs
- Other

### Flat files

In [3]:
users = (
    pl.read_csv('users.csv', dtypes={'phone': pl.Utf8, 'id_atg':pl.Utf8})
    .with_columns(
        pl.col('entry_data').str.strptime(pl.Datetime, strict=False)
    )
)

In [4]:
users_profiling = (
    # read csv file
    pl.read_csv('profiles.csv', dtypes={'contact_phone': pl.Utf8,'post_code':pl.Utf8})
        # change column dtypes
    .with_columns(
        pl.col('entry_data','entry_data_gep','update_date').str.strptime(pl.Datetime, strict=False),
        pl.col('contact_phone').cast(pl.Utf8),
    )
)

In [5]:
orders = (
    pl.read_csv('orders.csv').with_columns(
        pl.col('entry_date','delivery_date').str.strptime(pl.Datetime, strict=False)
    )
)

In [6]:
order_details = (
    pl.read_csv('order-details.csv')
)

In [7]:
promotions = (
    pl.read_csv('promotions.csv', dtypes={'short_description':pl.Utf8})
    .with_columns(pl.col('key').cast(pl.Utf8))
).unique(subset='key')

In [8]:
order_status = pl.read_csv('order-status.csv')

In [9]:
social_networks = (
    pl.read_csv('social_networks.csv').select(
        pl.col('id_social_network','social_network','description')
    )
)

### Parquet files

In [10]:
types = pl.read_parquet('types.parquet')

In [11]:
genre = pl.read_parquet('genre.parquet')

### json files

In [12]:
warehouses = pl.read_json('warehuse_catalog.json')

### Databases

In [None]:
# Connection to MS Access
conn = pyodbc.connect(r'DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};'r'DBQ=C:\Users\user\folder\file.accdb;')
# Create cursor
cursor = conn.cursor()

In [None]:
# Write query
query = 'select * from sales where year=2022'
# Convert to pandas dataframe
df = pd.read_sql(query, con=conn)
df.head()

## Transform Phase

Cleanse, validate, and manipulate the extracted data based on your analysis requirements. This might include:

- **Data Cleaning**

Handle missing values, inconsistent formatting, or errors.

- **Data Type Conversion**

Ensure consistent data types for columns based on their intended use in BigQuery.

- **Filtering/Aggregation**

Select or aggregate specific data subsets for targeted analysis.

- **Enrichment**

Merge extracted data with additional sources to enhance its value.

In [13]:
# join types, orders, order_details, promotions and warehouses
sheet = (
    types.join(orders, on='id_type', how='left')
    .join(order_details, on='id_order', how='left')
    .join(promotions, on='key', how='left')
    .join(warehouses, on='id_warehouse', how='left')
).rename({'id_warehouse':'id_warehouse_promo', 'active':'promo_active'})

## Load Phase

There are two primary options for loading data:

- **Staging Table**

Create a staging table and load the transformed data into it for temporary storage before validating and potentially modifying it:

- **Direct Load**

Load the data directly into your target table, bypassing the staging step. However, this approach can be less flexible for complex transformations:

In [209]:
# create dataset
client.create_dataset('database')

Dataset(DatasetReference('gepp-538', 'database'))

In [224]:
# convert to pandas
sheet = sheet.to_pandas()
# upload to big query
sheet.to_gbq('dw.transformation.catalog',
                    project_id='repository-538',
                    if_exists='replace',
                    credentials=bq_credentials)

100%|███████████████████████████████████████████| 1/1 [00:00<00:00, 7626.01it/s]


## Execute queries from Big Query

In [None]:
# create sql query
query = '''
    SELECT *
    FROM `dw.transformation.catalog`
'''
# convert query to pandas dataframe
catalog = pd.read_gbq(query, credentials=bq_credentials)

## Contact

**Jesus L. Monroy**
<br>
*Economist & Data Scientist*

[Medium](https://medium.com/@jesuslm) | [Linkedin](https://www.linkedin.com/in/j3sus-lm) | [Twitter](https://x.com/j3suslm)