# Modularising Python Code for Data Pipelines

The purpose of this notebook is to guide you through some basic data pipeline python code and how we might break it down into functions.  

## Setup

Make sure you have this notebook stored in the same folder as customer data.  
When creating a data pipeline we are normally taking data from one place to another, source to destination.  Our source, initially, will be our customer data csv.  Our destination will be a postgres database instance hosted on supabase.com.  If you have not used supabase before please create an account and a project called 'my_business_insights', NB: when creating your project you will be asked for a password, it is very important that you remember this password as you will need it to interact with your database.  All other connection parameters can be found from the menu pane on the left 'project settings'>'Database'>'connection parameters'.  

In [None]:
# your supabase parameters
POSTGRES_HOST = "your_host"
POSTGRES_USER = "your_user"
POSTGRES_PASSWORD = "your_password"
POSTGRES_DATABASE = "postgres"

In [None]:
###FOR INSTRUCTOR USE IN LIVE DELIVERY###
import configparser

config = configparser.ConfigParser()

# read the configuration file
config.read('my_config.ini')

# your supabase parameters
POSTGRES_HOST = config.get('DB','host')
POSTGRES_USER = config.get('DB','username')
POSTGRES_PASSWORD = config.get('DB','password')
POSTGRES_DATABASE = config.get('DB','db')


In [2]:
# install necessary packages in your environment
!pip install sqlalchemy pandas psycopg2



In [None]:
# import libraries
import pandas as pd
import requests
import json
from sqlalchemy import create_engine, text, Table, MetaData, insert, select

In [None]:
# read data from csv into python
data = pd.read_csv('customer_data.csv')

# establish connection to destination database
conn_string = f'postgresql+psycopg2://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{POSTGRES_HOST}:5432/{POSTGRES_DATABASE}'
engine = create_engine(conn_string)

# create table sql query
create_table_sql = text("""
    CREATE TABLE IF NOT EXISTS customer_data (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255),
        postcode VARCHAR(9)
    );
    """)

# context manager manage connection to database
# this block ensures the table exists in the database before we try and write to it
with engine.connect() as conn:
    try:
        conn.execute(create_table_sql)
        conn.commit()
    except Exception as e:
        print(e)
        conn.rollback()

# get the table from the database
metadata = MetaData()
customer_table = Table('customer_data', metadata, autoload_with=engine)
# format data for writing to database
data_dict = data.to_dict('records')


# context manager manages connection to database
# this block writes the data to the database
with engine.connect() as conn:
    try:
        conn.execute(insert(customer_table),
                    data_dict)
        conn.commit()
    except Exception as e:
        print(e)
        conn.rollback()

Now try breaking down this process into separate functions.  The answers are in the cell block below, click on the cell to reveal.  

In [None]:
# Write your code in here.  

def get_data(filename_csv):
    ...

def get_sql_engine(user, passwd, host):
    ...

def connection_manager(sql_engine, query, data=[]):
    ...

def insert_data(table_name, engine):
    ...

In [None]:
###ANSWERS###
def get_data(filename_csv):
    data = pd.read_csv(filename_csv)
    return data.to_dict('records')

def get_sql_engine(user, passwd, host, database):
    conn_string = f'postgresql+psycopg2://{user}:{passwd}@{host}:5432/{database}'
    return create_engine(conn_string)

def connection_manager(sql_engine, query, data =[]):
    with sql_engine.connect() as conn:
        try:
            result = conn.execute(query, data)
            conn.commit()
        except Exception as e:
            print(e)
            conn.rollback()
    return result

def insert_data(table_name, engine):
    metadata = MetaData()
    table = Table(table_name, metadata, autoload_with=engine)
    return insert(table)

Now that we've broken up our code into easy manageable chunks we can run our pipeline using the following neater script.  
NB: The last two functions could be rewritten to make use of df.to_sql() this is potentially more efficient.  

In [None]:
# this ensures the table exists in the database before we try and write to it
engine = get_sql_engine(POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_DATABASE)

# create table sql query
create_table_sql = text("""
    CREATE TABLE IF NOT EXISTS customer_data (
        id SERIAL PRIMARY KEY,
        name VARCHAR(255),
        postcode VARCHAR(9)
    );
    """)
connection_manager(engine, create_table_sql)

# insert the data into the table
connection_manager(engine,
                   insert_data('customer_data', engine),
                   get_data('customer_data.csv')
                  )


This code is reuseable, makes improvements and makes troubleshooting easier to implement.

## Adding in Geolocation

Next we will look at adding in geolocation data by making use of the postcodes.io API.  Lets have a look at how it works.
https://postcodes.io/

Here we'll make a call to the postcodes API and have a look at the response.  

In [None]:
url = "https://api.postcodes.io/postcodes"
params = {
	"postcodes": ["S1 2BP","W2 6LG"]
}
response = requests.post(url, json = params)

print(response.json())

In [None]:
print(json.dumps(response.json(),indent=4))

In [None]:
print(response.json()['result'][0]['result']['longitude'], response.json()['result'][0]['result']['latitude'])

Now that we understand what the response looks like from the API we can write some functions to add this information to our database.  
Try breaking down this process into separate functions. The answers are in the cell block below, click on the cell to reveal.  

In [None]:
# To stop us overloading the API we're going to limit the calls to just 10 postcodes at a time.  
def get_10_postcodes(engine):
    metadata = MetaData()
    customers = Table('customer_data', metadata, autoload_with=engine)
    location = Table('location_data', metadata, autoload_with=engine)
    result = ...
    return result

# Next we'll get our data from the API.  
def get_long_lat(postcode_list):
    ...
    
# Then we need to format it so we only get the bits we're interested in.  
def extract_long_lat(data):
    ...

In [None]:
###ANSWERS###
# To stop us overloading the API we're going to limit the calls to just 10 postcodes at a time.  
def get_10_postcodes(engine):
    metadata = MetaData()
    customers = Table('customer_data', metadata, autoload_with=engine)
    location = Table('location_data', metadata, autoload_with=engine)
    result = select(customers.c.postcode).where(
        customers.c.postcode.notin_( # excluding ones we've already got geolocation for
            select(location.c.postcode)
        )).limit(10) # only 10 postcodes at a time
    return result

# Next we'll get our data from the API.  
def get_long_lat(postcode_list):
    url = "https://api.postcodes.io/postcodes"
    params = {
    	"postcodes": postcode_list
    }
    response = requests.post(url, json = params)
    return response.json()

# Then we need to format it so we only get the bits we're interested in.  
def extract_long_lat(data):
    extracted_data = []
    for item in data['result']:
        if 'result' in item and item['result']:
            extracted_data.append({
                'postcode': item['result'].get('postcode'),
                'longitude': item['result'].get('longitude'),
                'latitude': item['result'].get('latitude')
            })
    return pd.DataFrame(extracted_data)

Now we have our functions we can put them together into a script and update our database.  

In [None]:
# this ensures the table exists in the database before we try and write to it
engine = get_sql_engine(POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_HOST, POSTGRES_DATABASE)

# making sure our target table is ready
create_table_sql = text("""
    CREATE TABLE IF NOT EXISTS location_data (
        postcode VARCHAR(9) PRIMARY KEY,
        longitude FLOAT,
        latitude FLOAT
    );
    """)
connection_manager(engine, create_table_sql)

# We grab our postcodes
result = connection_manager(engine,
                   get_10_postcodes(engine)
                  ).fetchall()
postcodes = [i[0] for i in result]

# We extract the data from the API
data = get_long_lat(postcodes)

# We filter out all the unnecessary additional information
df = extract_long_lat(data)

# We load the data into our database
connection_manager(engine,
                   insert_data('location_data',engine),
                   df.to_dict('records')
                  )

This has now added data to our second table in our database.  Here is a link to the schema we're trying to produce https://lucid.app/lucidchart/8f0f8fd2-e3ae-4d68-891a-dca08ea9da38/edit?viewport_loc=1258%2C-150%2C1744%2C781%2C0_0&invitationId=inv_81a74f07-50c9-4088-9478-179449341de7.  