In [21]:
import requests  # Download the file
import psycopg2  # Connect to PostgreSQL
import numpy as np  # Data Manipulation
import pandas as pd  # Data Manipulation
import os
from dotenv import load_dotenv  # load .env file with DB Credentials
from tqdm import tqdm  # Progress Bar while downloading
from io import StringIO


def download_file(url):
    """
    Download the requested file to the same directory
    """

    file_name = url.split("/")[-1]

    req = requests.get(url, stream=True, allow_redirects=True)
    total_size = int(req.headers.get("content-length"))
    initial_pos = 0
    file_path = f"../Data/{file_name}"

    # Progress Bar to monitor downlaod
    with open(file_path, "wb") as obj:
        with tqdm(
            total=total_size,
            unit_scale=True,
            desc=file_name,
            initial=initial_pos,
            ascii=True,
        ) as pbar:
            for chunk in req.iter_content(chunk_size=1024):
                if chunk:
                    obj.write(chunk)
                    pbar.update(len(chunk))

    return file_path


def make_connection():
    """ 
    Connect to the PostgreSQL database server 
    """

    load_dotenv()

    DB_NAME = os.getenv("DB_NAME")
    DB_USER = os.getenv("DB_USER")
    DB_PASSWORD = os.getenv("DB_PASSWORD")
    HOST = os.getenv("HOST")
    PORT = os.getenv("PORT")

    try:
        print("Connecting to the PostgreSQL database...")
        connection = psycopg2.connect(
            host=HOST, database=DB_NAME, user=DB_USER, password=DB_PASSWORD, port=PORT
        )

    except (Exception, psycopg2.DatabaseError) as error:
        print(f"Error while connecting to PostgreSQL:\n{error}")
        return -1

    print("Connection successful")

    return connection


def create_tables(conn):

    queries = (
        """ 
        CREATE TABLE IF NOT EXISTS countries_google (
            country_region_code varchar(2) PRIMARY KEY,
            country_region varchar(30)    
        )
        """,
        """
        CREATE TABLE IF NOT EXISTS location_util_google (
            country_region_code varchar(2),
            iso_3166_2_code varchar(6),
            census_fips_code numeric(5,0),
            place_id varchar(27),
            
            FOREIGN KEY (country_region_code) 
                REFERENCES countries_google (country_region_code)
        )
        """,
        """ 
        CREATE TABLE IF NOT EXISTS mobility_stats_google (
            country_region_code varchar(2),
            sub_region_1 varchar(100),
            sub_region_2 varchar(100),
            metro_area varchar(50),
            date date,
            retail_and_recreation_percent_change_from_baseline numeric(4,0),
            grocery_and_pharmacy_percent_change_from_baseline numeric(4,0),
            parks_percent_change_from_baseline numeric(4,0),
            transit_stations_percent_change_from_baseline numeric(4,0),
            workplaces_percent_change_from_baseline numeric(4,0),
            residential_percent_change_from_baseline numeric(4,0),
            
            FOREIGN KEY (country_region_code) 
                REFERENCES countries_google (country_region_code)
        )
        """,
    )
    try:
        with conn.cursor() as cursor:
            for query in queries:
                cursor.execute(query)
            conn.commit()
            print("Tables created successfully!\n----------------------------")
            return 1
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error while creating tables!\nRolling back changes...\n", error)
        conn.rollback()
        return -1


def import_data(conn, table_name, df):

    buffer = StringIO()
    df.to_csv(buffer, header=False, index=False)
    buffer.seek(0)

    with conn.cursor() as cursor:
        try:
            cursor.execute(f"TRUNCATE {table_name} CASCADE;")
            print(f"Truncated {table_name}")

            df.where(pd.notnull(df), None)

            cursor.copy_expert(f"COPY {table_name} from STDIN CSV QUOTE '\"'", buffer)
            conn.commit()
            print("Done!\n-------------------------------")
            return 1

        except (Exception, psycopg2.DatabaseError) as error:
            print("Error: %s" % error)
            conn.rollback()
            return -1


def driver_code():

    count = 0

    file_path = download_file(
        "https://www.gstatic.com/covid19/mobility/Global_Mobility_Report.csv"
    )
    conn = make_connection()
    create_tables(conn)

    df = pd.read_csv(filepath_or_buffer=file_path, parse_dates=True, low_memory=False)

    countries = df[["country_region_code", "country_region"]].drop_duplicates()
    stats = df.drop(
        labels=["country_region", "iso_3166_2_code", "census_fips_code", "place_id"],
        axis=1,
    )
    location_util = df[
        ["country_region_code", "iso_3166_2_code", "census_fips_code", "place_id"]
    ].drop_duplicates()

    countries["country_region_code"][countries["country_region"] == "Namibia"] = "NA"

    count += import_data(conn, "countries_google", countries)
    count += import_data(conn, "mobility_stats_google", stats)
    count += import_data(conn, "location_util_google", location_util)

    return count

In [35]:
driver_code()

Truncated countries_google
Done!
-------------------------------
Truncated mobility_stats_google
Done!
-------------------------------
Truncated location_util_google
Done!
-------------------------------


In [None]:
os.remove('../Data/Global_Mobility_Report.csv')