In [87]:
# Import the data from csv file and explore the data

import os
import sys
import pandas as pd
import logging
from google.cloud import bigquery
from google.oauth2 import service_account
from hashlib import md5
from typing import List

# **** SETUP ****

DATA_DIR = "./data/"
DEFAULT_RECEIPTS_FILE = os.path.join(DATA_DIR, "World_Energy_Consumption.csv")
PROJECT_NAME = "emissions-team-project"
DATASET_NAME = "emissions"

# read the csv for the raw data
df = pd.read_csv(DEFAULT_RECEIPTS_FILE)

display(df.head(n=10))



Unnamed: 0,iso_code,country,year,coal_prod_change_pct,coal_prod_change_twh,gas_prod_change_pct,gas_prod_change_twh,oil_prod_change_pct,oil_prod_change_twh,energy_cons_change_pct,...,solar_elec_per_capita,solar_energy_per_capita,gdp,wind_share_elec,wind_cons_change_pct,wind_share_energy,wind_cons_change_twh,wind_consumption,wind_elec_per_capita,wind_energy_per_capita
0,AFG,Afghanistan,1900,,,,,,,,...,,,,,,,,,,
1,AFG,Afghanistan,1901,,0.0,,,,,,...,,,,,,,,,,
2,AFG,Afghanistan,1902,,0.0,,,,,,...,,,,,,,,,,
3,AFG,Afghanistan,1903,,0.0,,,,,,...,,,,,,,,,,
4,AFG,Afghanistan,1904,,0.0,,,,,,...,,,,,,,,,,
5,AFG,Afghanistan,1905,,0.0,,,,,,...,,,,,,,,,,
6,AFG,Afghanistan,1906,,0.0,,,,,,...,,,,,,,,,,
7,AFG,Afghanistan,1907,,0.0,,,,,,...,,,,,,,,,,
8,AFG,Afghanistan,1908,,0.0,,,,,,...,,,,,,,,,,
9,AFG,Afghanistan,1909,,0.0,,,,,,...,,,,,,,,,,


In [88]:
# **** SETUP LOGGING ****
# setup logging and logger
logging.basicConfig(            # setting up the root logger
    format='[%(levelname)-5s][%(asctime)s][%(module)s:%(lineno)04d] : %(message)s',
    level=logging.INFO,
    stream=sys.stdout
)
logger: logging.Logger = logging.getLogger('root')      # alias the root logger as `logger`
logger.setLevel(logging.DEBUG)                          # programmatically reassign the logging level


# **** BIGQUERY CLIENT ****
logger.debug(f"Creating bigquery client")

key_path = "/Users/Ruben/Desktop/google_cred/.cred/emissions-team-project-10e7e81d45bb.json"
credentials = service_account.Credentials.from_service_account_file(
    key_path, scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

logger.info(f"Setup Completed")

[DEBUG][2023-01-20 10:08:03,238][4263563694:0013] : Creating bigquery client
[INFO ][2023-01-20 10:08:03,448][4263563694:0021] : Setup Completed


In [89]:
# **** BIQUERY SCHEMA TABLE SETUP ****

FACTS_TABLE_METADATA = {
    'fct_gdp': {
        'table_name': 'fct_gdp',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('country_code_year', 'string', mode='REQUIRED'),
            bigquery.SchemaField('country_code', 'string', mode='NULLABLE'),
            bigquery.SchemaField('year', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('gdp', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('population', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('created_at', 'timestamp', mode='NULLABLE'),
            bigquery.SchemaField('modified_at', 'timestamp', mode='NULLABLE'),
        ],
    },
    'fct_consump': {
        'table_name': 'fct_consump',
        'schema': [
            # indexes are written if only named in the schema
            bigquery.SchemaField('country_code_year', 'string', mode='REQUIRED'),
            bigquery.SchemaField('country_code', 'string', mode='NULLABLE'),
            bigquery.SchemaField('year', 'int64', mode='NULLABLE'),
            bigquery.SchemaField('biofuel_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('coal_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('fossil_fuel_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('gas_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('hydro_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('low_carbon_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('oil_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('other_renewable_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('primary_energy_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('renewables_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('solar_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('wind_consumption', 'float64', mode='NULLABLE'),
            bigquery.SchemaField('created_at', 'timestamp', mode='NULLABLE'),
            bigquery.SchemaField('modified_at', 'timestamp', mode='NULLABLE'),
        ],
    },
    
}

In [90]:
# **** BIGQUERY DATASET CREATION ****

dataset_id = f"{PROJECT_NAME}.{DATASET_NAME}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset, exists_ok=True)

logger.info(f"Created emissions dataset: {dataset.full_dataset_id}")

[INFO ][2023-01-20 10:08:10,399][2746255190:0008] : Created emissions dataset: emissions-team-project:emissions


Dataset created in BigQuery in prep to load tables:

<img src="./imgs/successful_dataset_creation.png" alt="dataset creation" width="640"/>

In [91]:
# create the gdf dataframe for fct_gdp table creation

# extract necessary columns from raw csv file
gdp_df = df
gdp_cols = ['iso_code', 'country', 'year', 'gdp', 'population']
gdp_df = gdp_df[gdp_cols]


# rename columns to standardize across all the tables
gdp_df = gdp_df.rename(columns={
    'iso_code': 'country_code', 
    'country':'country_name', 
    })

# remove na for country_code and drop duplicates
gdp_df = gdp_df.dropna(subset=['country_code'])
gdp_df.drop_duplicates(subset=['country_code', 'year', 'population'], inplace=True)
gdp_df = gdp_df.fillna(0)

# create composite key column for the fct_gdp table
def composite_key(row):
    comp_key = f"{row.country_code}{row.year}"
    return comp_key

# generate and insert the composite key at the first position
gdp_df.insert(0, 'country_code_year', gdp_df.apply(composite_key, axis=1))

# create created_at and modified_at columns
gdp_df['created_at'] = pd.Timestamp.now()
gdp_df['modified_at'] = None

gdp_df.set_index('country_code_year', inplace=True)

display(gdp_df)
gdp_df.dtypes




Unnamed: 0_level_0,country_code,country_name,year,gdp,population,created_at,modified_at
country_code_year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
AFG1900,AFG,Afghanistan,1900,0.000000e+00,5021241.0,2023-01-20 10:08:15.260701,
AFG1901,AFG,Afghanistan,1901,0.000000e+00,5053439.0,2023-01-20 10:08:15.260701,
AFG1902,AFG,Afghanistan,1902,0.000000e+00,5085403.0,2023-01-20 10:08:15.260701,
AFG1903,AFG,Afghanistan,1903,0.000000e+00,5118005.0,2023-01-20 10:08:15.260701,
AFG1904,AFG,Afghanistan,1904,0.000000e+00,5150814.0,2023-01-20 10:08:15.260701,
...,...,...,...,...,...,...,...
ZWE2015,ZWE,Zimbabwe,2015,2.503057e+10,13815000.0,2023-01-20 10:08:15.260701,
ZWE2016,ZWE,Zimbabwe,2016,2.515176e+10,14030000.0,2023-01-20 10:08:15.260701,
ZWE2017,ZWE,Zimbabwe,2017,0.000000e+00,14237000.0,2023-01-20 10:08:15.260701,
ZWE2018,ZWE,Zimbabwe,2018,0.000000e+00,14439000.0,2023-01-20 10:08:15.260701,


country_code            object
country_name            object
year                     int64
gdp                    float64
population             float64
created_at      datetime64[ns]
modified_at             object
dtype: object

In [92]:
# create the consumption dataframe to prep for fct_consump table

consump_df = df
consump_cols = [
    'iso_code', 
    'country', 
    'year', 
    'biofuel_consumption', 
    'coal_consumption',
    'fossil_fuel_consumption',
    'gas_consumption',
    'hydro_consumption',
    'low_carbon_consumption',
    'nuclear_consumption',
    'oil_consumption',
    'other_renewable_consumption',
    'primary_energy_consumption',
    'renewables_consumption',
    'solar_consumption',
    'wind_consumption']

# isolate for necessary columns
consump_df = consump_df[consump_cols]

# rename columns to standardized names
consump_df = consump_df.rename(columns={
    'iso_code': 'country_code', 
    'country':'country_name', 
    })
# remove na for country_code and drop duplicates
consump_df = consump_df.dropna(subset=['country_code'])
consump_df.drop_duplicates(subset=['country_code', 'year'], inplace=True)
consump_df = consump_df.fillna(0)

# create composite key column for the fct_consump table
def composite_key(row):
    comp_key = f"{row.country_code}{row.year}"
    return comp_key

# generate and insert the composite key at the first position
consump_df.insert(0, 'country_code_year', consump_df.apply(composite_key, axis=1))

# create created_at and modified_at columns
consump_df['created_at'] = pd.Timestamp.now()
consump_df['modified_at'] = None

consump_df.set_index('country_code_year', inplace=True)

display(consump_df)

consump_df.dtypes

Unnamed: 0_level_0,country_code,country_name,year,biofuel_consumption,coal_consumption,fossil_fuel_consumption,gas_consumption,hydro_consumption,low_carbon_consumption,nuclear_consumption,oil_consumption,other_renewable_consumption,primary_energy_consumption,renewables_consumption,solar_consumption,wind_consumption,created_at,modified_at
country_code_year,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
AFG1900,AFG,Afghanistan,1900,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
AFG1901,AFG,Afghanistan,1901,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
AFG1902,AFG,Afghanistan,1902,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
AFG1903,AFG,Afghanistan,1903,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
AFG1904,AFG,Afghanistan,1904,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
ZWE2015,ZWE,Zimbabwe,2015,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,55.642,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
ZWE2016,ZWE,Zimbabwe,2016,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,47.500,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
ZWE2017,ZWE,Zimbabwe,2017,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000,0.0,0.0,0.0,2023-01-20 10:08:21.099289,
ZWE2018,ZWE,Zimbabwe,2018,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.000,0.0,0.0,0.0,2023-01-20 10:08:21.099289,


country_code                           object
country_name                           object
year                                    int64
biofuel_consumption                   float64
coal_consumption                      float64
fossil_fuel_consumption               float64
gas_consumption                       float64
hydro_consumption                     float64
low_carbon_consumption                float64
nuclear_consumption                   float64
oil_consumption                       float64
other_renewable_consumption           float64
primary_energy_consumption            float64
renewables_consumption                float64
solar_consumption                     float64
wind_consumption                      float64
created_at                     datetime64[ns]
modified_at                            object
dtype: object

In [93]:
# define the load table function

def load_table(
    df: pd.DataFrame, 
    client: bigquery.Client, 
    table_name: str, 
    schema: List[bigquery.SchemaField], 
    create_disposition: str = 'CREATE_IF_NEEDED', 
    write_disposition: str = 'WRITE_TRUNCATE'
    ) -> None:
    """load dataframe into bigquery table

    Args:
        df (pd.DataFrame): dataframe to load
        client (bigquery.Client): bigquery client
        table_name (str): full table name including project and dataset id
        schema (List[bigquery.SchemaField]): table schema with data types
        create_disposition (str, optional): create table disposition. Defaults to 'CREATE_IF_NEEDED'.
        write_disposition (str, optional): overwrite table disposition. Defaults to 'WRITE_TRUNCATE'.
    """
    # *** run some checks ***
    # test table name to be full table name including project and dataset name. It must contain to dots
    assert len(table_name.split('.')) == 3, f"Table name must be a full bigquery table name including project and dataset id: '{table_name}'"
    # setup bigquery load job:
    #  create table if needed, replace rows, define the table schema
    job_config = bigquery.LoadJobConfig(
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        schema=schema
    )
    logger.info(f"loading table: '{table_name}'")
    job = client.load_table_from_dataframe(df, destination=table_name, job_config=job_config)
    job.result()        # wait for the job to finish
    # get the resulting table
    table = client.get_table(table_name)
    logger.info(f"loaded {table.num_rows} rows into {table.full_table_id}")

In [94]:
# Load gdp fact table

# get table name and schema from FACTS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{FACTS_TABLE_METADATA['fct_gdp']['table_name']}"
schema = FACTS_TABLE_METADATA['fct_gdp']['schema']
# load dataframe
load_table(gdp_df, client, table_name, schema)

logger.info(f"loaded gdp facts")

[INFO ][2023-01-20 10:08:35,229][2444475212:0031] : loading table: 'emissions-team-project.emissions.fct_gdp'
[INFO ][2023-01-20 10:08:39,165][2444475212:0036] : loaded 15630 rows into emissions-team-project:emissions.fct_gdp
[INFO ][2023-01-20 10:08:39,166][2491823397:0009] : loaded gdp facts


In [95]:
# Load consumption fact table

# get table name and schema from FACTS_TABLE_METADATA config param
table_name = f"{PROJECT_NAME}.{DATASET_NAME}.{FACTS_TABLE_METADATA['fct_consump']['table_name']}"
schema = FACTS_TABLE_METADATA['fct_consump']['schema']
# load dataframe
load_table(consump_df, client, table_name, schema)

logger.info(f"loaded consumption facts")

[INFO ][2023-01-20 10:08:41,082][2444475212:0031] : loading table: 'emissions-team-project.emissions.fct_consump'
[INFO ][2023-01-20 10:08:44,739][2444475212:0036] : loaded 15630 rows into emissions-team-project:emissions.fct_consump
[INFO ][2023-01-20 10:08:44,740][977876786:0009] : loaded consumption facts


Both fct tables loaded into BigQuery:

<img src="./imgs/successful_fct_table_creation.png" alt="fct table creation" width="640"/>

In [None]:
-- Code used to query to prep data for Looker studio

with gdp as (

  SELECT 
    country_code_year,
    country_code, 
    country_name,
    year,
    gdp,
    population,
  FROM `emissions-team-project.emissions.fct_gdp`
  group by 1,2,3,4,5,6
),

consump as (
  SELECT 
  country_code_year,
  country_code, 
  country_name,
  year,
  biofuel_consumption as b,
  coal_consumption as c,
  fossil_fuel_consumption as f,
  gas_consumption as g,
  hydro_consumption as h,
  low_carbon_consumption as l,
  nuclear_consumption as n,
  oil_consumption as o,
  other_renewable_consumption as other,
  primary_energy_consumption as p,
  renewables_consumption as r,
  solar_consumption as s,
  wind_consumption as w
FROM `emissions-team-project.emissions.fct_consump`
),

consump_two as (
  SELECT
    country_code_year,
    country_code, 
    country_name,
    year,
    b + c + f +g +h + l +n + o +other +p +r+s+w as total_consump
  FROM consump
  group by 1,2,3,4,5
),

combined as (
  SELECT
    gdp.country_code_year,
    gdp.country_code, 
    gdp.country_name,
    gdp.year,
    gdp.gdp as gdp,
    gdp.population as population,
    c.total_consump
  FROM gdp
  INNER JOIN consump_two as c
  ON gdp.country_code_year = c.country_code_year
  group by 1,2,3,4,5,6,7)
  
  SELECT *
  FROM combined;

Snapshot of the query used for creating first visualization on Looker:

<br>

<img src="imgs/SQL_query_Looker_1.png" alt="looker query 1" width="640"/>

<br>

Below is the visualization created using the above query. Click image to go to dashboard.

<br>

[<img src="imgs/GDP_pop_con_Looker_graph.png">](https://datastudio.google.com/embed/reporting/dbe92c8b-ccd3-41d9-b269-5964eb9717c3/page/f94CD)



In [None]:
-- Code used to query to prep data for Looker studio

with consump as (
  SELECT 
  country_code_year,
  country_code, 
  country_name,
  year,
  biofuel_consumption as b,
  coal_consumption as c,
  fossil_fuel_consumption as f,
  gas_consumption as g,
  hydro_consumption as h,
  low_carbon_consumption as l,
  nuclear_consumption as n,
  oil_consumption as o,
  other_renewable_consumption as other,
  primary_energy_consumption as p,
  renewables_consumption as r,
  solar_consumption as s,
  wind_consumption as w
FROM `emissions-team-project.emissions.fct_consump`
),

consump_two as (
  SELECT
    country_code_year,
    country_code, 
    country_name,
    year,
    b + c + f +g +h + l +n + o +other +p +r+s+w as total_consump
  FROM consump
  group by 1,2,3,4,5
),

em as (
  SELECT 
    country_code_year,
    total_em
  FROM `emissions-team-project.emissions.fct_emissions`
),


combined as (
  SELECT
    c.country_code_year,
    c.country_code, 
    c.country_name,
    c.year,
    c.total_consump,
    em.total_em
  FROM consump_two as c
  INNER JOIN em
  ON c.country_code_year = em.country_code_year)
  
  SELECT *
  FROM combined;

Snapshot of the query used for creating the second visualization on Looker:

<br>

<img src="imgs/SQL_query_Looker_2.png" alt="looker query 2" width="640"/>

<br>

Below is the visualization created using the above query. Click image to go to dashboard.

<br>

[<img src="imgs/Con_Em_Looker_graph.png">](https://datastudio.google.com/embed/reporting/dbe92c8b-ccd3-41d9-b269-5964eb9717c3/page/f94CD)