# Sharing Processes with a Team

## Connection and Cursor

We will use a connection object to define our connection to our database over a network.

We will use a cursor object to write to the database. The cursor is created by a class function of the connection object.

In [None]:
import psycopg2 as pg2
from psycopg2.extras import RealDictCursor

connection = pg2.connect(host='postgis',
                         user='postgres',
                         database='postgres')
cursor = connection.cursor(cursor_factory=RealDictCursor)

The cursor can be used to execute queries and then to fetch the results of the query.

In [None]:
result = cursor.execute("SELECT * FROM business LIMIT 2;")

In [None]:
cursor.fetchall()

In [None]:
connection.close()

Because we used the `cursor_factory=RealDictCursor` argument, the results return a list of dictionary objects from the database. 

The advantage of this is that we can display these results easily with a DataFrame.


In [None]:
import pandas as pd

In [None]:
connection = pg2.connect(host='postgis',
                         user='postgres',
                         database='postgres')
cursor = connection.cursor(cursor_factory=RealDictCursor)
result = cursor.execute("SELECT * FROM business LIMIT 2;")
pd.DataFrame(cursor.fetchall())

In [None]:
connection.close()

## `lib.db_helper`

Because we will keep using the connection-cursor pattern, we have written it into a sub-module.

In [None]:
cd /home/jovyan

In [None]:
import lib.db_helper as db

#### `db.connect_to_db()`

Contains our connection credentials and returns a connection and cursor. 

    def connect_to_db():
        con = pg2.connect(host='postgis',
                          dbname='postgres',
                          user='postgres')
        cur = con.cursor(cursor_factory=RealDictCursor)
        return con, cur

In [None]:
connection, cursor = db.connect_to_db()
result = cursor.execute("SELECT * FROM business LIMIT 2;")
pd.DataFrame(cursor.fetchall())

In [None]:
connection.close()

#### `db.query_to_dictionary()`

1. creates a connection and a cursor
1. uses the cursor to execute a query
1. if `fetch_res` is `True` it fetches the results, otherwise results are `None`
1. closes the connection and returns `results`


    def query_to_dictionary(query, fetch_res=True):
        con, cur = connect_to_db()
        cur.execute(query)
        if fetch_res:
            results = cur.fetchall()
        else:
            results = None
        con.close()
        return results

In [None]:
db.query_to_dictionary("""SELECT * FROM business LIMIT 2;""")

In [None]:
db.query_to_dictionary("""SELECT * FROM business LIMIT 2;""", fetch_res=False)

#### `db.query_to_dataframe()`

Wraps `query_to_dictionary()` in a `pandas.DataFrame`.

    def query_to_dataframe(query):
        return DataFrame(query_to_dictionary(query))

In [None]:
db.query_to_dataframe('SELECT * FROM business LIMIT 2;')

### Create new column in `business` table

To demonstrate how transactions work, we will write a SQL query to create a new column in our database.

#### Run this if the column has already been created and you wish to run all cells in the notebook

In [None]:
# db.query_to_dictionary("""
# BEGIN;
# ALTER TABLE business DROP COLUMN gpnt_location;
# COMMIT;
# """, fetch_res=False)

In [None]:
db.query_to_dictionary("""
BEGIN;
ALTER TABLE business ADD COLUMN gpnt_location geometry(POINT,4326);
COMMIT;
""", fetch_res=False)

In [None]:
db.query_to_dataframe('SELECT * FROM business LIMIT 2;')

# Clean the Data

In [None]:
select_count_business = """
SELECT COUNT(*) FROM business
"""
db.query_to_dataframe(select_count_business)

In [None]:
select_count_by_postal_code = """
SELECT postal_code, COUNT(*) FROM business
GROUP BY postal_code
ORDER BY postal_code
"""
db.query_to_dataframe(select_count_by_postal_code)

Note that we might also have invalid data.

In [None]:
select_count_business_invalid_data = """
SELECT COUNT(*) FROM business
WHERE latitude = 0 AND longitude = 0;"""

In [None]:
db.query_to_dataframe(select_count_business_invalid_data)

In [None]:
select_count_business_valid = """
SELECT COUNT(*) FROM business
WHERE 
    (latitude IS NOT NULL
     AND longitude IS NOT NULL)
AND 
    (latitude != 0 
     AND longitude != 0)
"""

select_count_business_invalid = """
SELECT COUNT(*) FROM business
WHERE 
    (latitude IS NULL
     AND longitude IS NULL)
OR 
    (latitude = 0 
     AND longitude = 0)
"""

In [None]:
db.query_to_dataframe(select_count_business_valid)

In [None]:
db.query_to_dataframe(select_count_business_invalid)

### Store Valid and Invalid Queries

We will also make use of subqueries. We will define two:

1. a subquery for all businesses with valid lat/long
1. a subquery for all businesses with invalid lat/long

#### Valid Business Query

In [None]:
select_business_valid = """
SELECT * FROM business
WHERE 
    (latitude IS NOT NULL
     AND longitude IS NOT NULL)
AND 
    (latitude != 0 
     AND longitude != 0)
"""

select_business_invalid = """
SELECT * FROM business
WHERE 
    (latitude IS NULL
     AND longitude IS NULL)
OR 
    (latitude = 0 
     AND longitude = 0)
"""

In [None]:
print(db.query_to_dataframe(select_business_valid).shape)
db.query_to_dataframe(select_business_valid).sample(4)

In [None]:
print(db.query_to_dataframe(select_business_invalid).shape)
db.query_to_dataframe(select_business_invalid).sample(4)

### Explore Postal Code and Lat/Long

First, let's get the postal codes with a count of less than 10. We will use a sub-query.

Let's use these subqueries to help us to clean the data. 

Note that we re-use `select_count_by_postal_code`.

In [None]:
print(select_count_by_postal_code)

In [None]:
select_postal_code_by_postal_code_less_than_10 = """
SELECT postal_code FROM 
    ({}) q
WHERE count < 10
""".format(select_count_by_postal_code)

print(select_postal_code_by_postal_code_less_than_10)

Note that we are using a subquery here. 

e.g. 

    SELECT * FROM (SUBQUERY) sub_query_name;
    

In [None]:
db.query_to_dataframe(select_postal_code_by_postal_code_less_than_10)

We'll think of these as our "bad postal codes". We can think of the query we defined as a list of these codes. That query was

In [None]:
print(select_postal_code_by_postal_code_less_than_10)

Do we have (lat,long) pairs for these locations? Let's select all rows with one of these postal codes.

### `IN`

To do this we will use the SQL keyword `IN`. `IN` checks a column against a list of items.

e.g. 

    SELECT * FROM my_table WHERE my_column IN ('1,2,3,4,5');

# We will replace the list with our Sub-Query!! 

In [None]:
select_business_where_bad_postal_code = """
SELECT * FROM business WHERE postal_code IN ({})""".format(select_postal_code_by_postal_code_less_than_10)
print(select_business_where_bad_postal_code)

In [None]:
db.query_to_dataframe(select_business_where_bad_postal_code).sample(5)

Note that some of these have a lat and long. 

What we want is rows that have "bad postal codes" and no (lat, long) pair. 

### A Nested Query

To get these we will write a nested query that looks like this:

    SELECT * FROM (invalid_lat_long) WHERE postal_code IN (bad_postal_code_str)

In [None]:
select_invalid_business_where_bad_postal_code = """
SELECT * 
FROM ({}) q
WHERE postal_code IN ({})
""".format(select_business_invalid, select_postal_code_by_postal_code_less_than_10)
print(select_invalid_business_where_bad_postal_code)

In [None]:
db.query_to_dataframe(select_invalid_business_where_bad_postal_code)

#### Let's use `DELETE` to get rid of rows that have no lat, long and a bad zip

First, let's collect a list of business `id`s. 

In [None]:
select_invalid_business_id_where_bad_postal_code = """
SELECT id 
FROM ({}) q
WHERE postal_code IN ({})
""".format(select_business_invalid, select_postal_code_by_postal_code_less_than_10)
print(select_invalid_business_id_where_bad_postal_code)

In [None]:
db.query_to_dataframe(select_invalid_business_id_where_bad_postal_code)

In [None]:
print(select_invalid_business_id_where_bad_postal_code)

We can use this subquery in our `DELETE` query.

Remember, that a **delete** action is a write action and needs to be handled as a transaction. We need to `BEGIN` and `COMMIT`. 

In [None]:
delete_invalid_business_bad_postal_code = """
BEGIN;
DELETE 
FROM business
WHERE id IN ({});
COMMIT;
""".format(select_invalid_business_id_where_bad_postal_code)
print(delete_invalid_business_bad_postal_code)

Remember, when we execute this we do not need to fetch results so we should set `fetch_res` to `False`.

In [None]:
db.query_to_dictionary(delete_invalid_business_bad_postal_code, fetch_res=False)

Let's have a look at the sql file used to define our database. 

```SQL
CREATE TABLE business (
    id INTEGER,
    name TEXT,
    address TEXT,
    city TEXT,
    state TEXT,
    postal_code INTEGER,
    latitude FLOAT,
    longitude FLOAT,
    phone_number TEXT,
    PRIMARY KEY (id));

CREATE TABLE inspection (
    business_id INTEGER,
    score INTEGER,
    date TIMESTAMP,
    type TEXT,
    CONSTRAINT fk_business_id
    FOREIGN KEY (business_id)
    REFERENCES business (id));

CREATE TABLE legend (
    minimum_score INTEGER,
    maximum_score INTEGER,
    description TEXT);

CREATE TABLE violation (
    business_id INTEGER,
    date TIMESTAMP,
    description TEXT,
    CONSTRAINT fk_business_id
    FOREIGN KEY (business_id)
    REFERENCES business (id));

```

Note that we have foreign key `CONSTRAINT`s on the `inspection` and `violation` tables.  

This means that, in order to drop the businesses with "bad" data, we will need to drop any inspections and violations associated with these. 

### "Bad" Business Data

In [None]:
db.query_to_dataframe(select_invalid_business_id_where_bad_postal_code)

In [None]:
print(select_invalid_business_id_where_bad_postal_code)

In [None]:
select_violations_for_bad_biz = """
SELECT * FROM violation
WHERE business_id IN ({})
""".format(select_invalid_business_id_where_bad_postal_code)

In [None]:
db.query_to_dataframe(select_violations_for_bad_biz)

### Delete Violations

In [None]:
select_count_violation = """
SELECT COUNT(*) FROM violation"""

In [None]:
db.query_to_dataframe(select_count_violation)

In [None]:
delete_violations_for_bad_biz = """
BEGIN;
DELETE FROM violation
WHERE business_id IN ({});
COMMIT;""".format(select_invalid_business_id_where_bad_postal_code)

In [None]:
db.query_to_dictionary(delete_violations_for_bad_biz, fetch_res=False)

In [None]:
db.query_to_dataframe(select_count_violation)

### Delete Inspections

In [None]:
select_count_inspections = """
SELECT COUNT(*) FROM inspection"""

In [None]:
db.query_to_dataframe(select_count_inspections)

In [None]:
delete_inspections_for_bad_biz = """
BEGIN;
DELETE FROM inspection
WHERE business_id IN ({});
COMMIT;""".format(select_invalid_business_id_where_bad_postal_code)

In [None]:
db.query_to_dictionary(delete_inspections_for_bad_biz, fetch_res=False)

In [None]:
db.query_to_dataframe(select_count_inspections)

### Delete Businesses

In [None]:
db.query_to_dataframe(select_count_business)

In [None]:
db.query_to_dictionary(delete_invalid_business_bad_postal_code, fetch_res=False)

In [None]:
db.query_to_dataframe(select_count_business)

### Pull Bad Postal Codes Again

In [None]:
db.query_to_dataframe(select_postal_code_by_postal_code_less_than_10)

## Repair Bad Zip Codes
To repair the bad zip codes, we will attempt match the nearest points using GIS. 

To do this, we will first need to populate the `gpnt_location` column.

In [None]:
db.query_to_dataframe(select_business_valid).sample(4)

In [None]:
update_gpnt = """
BEGIN;
UPDATE business SET 
gpnt_location = ST_SetSRID(ST_MakePoint(latitude, longitude),4326);
COMMIT;
"""

In [None]:
db.query_to_dictionary(update_gpnt, fetch_res=False)

### Match points to the points with bad zip codes using a given radius

In [None]:
db.query_to_dataframe(select_postal_code_by_postal_code_less_than_10)

In [None]:
def select_postal_code(postal_code):
    return """SELECT * FROM business WHERE postal_code = {}""".format(postal_code)

In [None]:
db.query_to_dataframe(select_postal_code(92672))

In [None]:
def select_gpnt_for_postal_code(postal_code):
    return """SELECT gpnt_location FROM business WHERE postal_code = {} LIMIT 1""".format(postal_code)

In [None]:
def select_on_gpnt_radius(postal_code, distance):
    return """
            SELECT *
            FROM business
            WHERE ST_Distance_Sphere(gpnt_location, ({})) <= {}
            """.format(select_gpnt_for_postal_code(postal_code), distance)

In [None]:
db.query_to_dataframe(select_on_gpnt_radius(92672, 100))

In [None]:
update_92672 = """
BEGIN;
UPDATE business SET postal_code = 94117 WHERE postal_code = 92672;
COMMIT;
"""

In [None]:
db.query_to_dictionary(update_92672, fetch_res=False)

### Check the Status of our Bad Postal Codes

In [None]:
db.query_to_dataframe(select_postal_code_by_postal_code_less_than_10)

In [None]:
db.query_to_dataframe(select_on_gpnt_radius(94101, 100))

## Plot locations

Next, we will use Folium to plot the businesses for which we have a latitude and a longitude. 

### `folium.Map` 

To build our `folium.Map` object we will need to center the map in a given location. We will use the `avg` aggregate function to do this. 

In [None]:
import folium

In [None]:
avg_location_sf = """
SELECT avg(latitude) as avg_lat, 
       avg(longitude) as avg_lon FROM ({}) q;
""".format(select_business_valid)

In [None]:
db.query_to_dataframe(avg_location_sf)

In [None]:
avg_loc_df = db.query_to_dataframe(avg_location_sf)

Then assign the result as a single list value.

In [None]:
avg_loc = avg_loc_df.values.tolist()[0]

In [None]:
sf_map = folium.Map(location=avg_loc, zoom_start=13)

In [None]:
within_200_meters_of_94101 = db.query_to_dataframe(select_on_gpnt_radius(94101, 200))

In [None]:
near_94101_lat_long = within_200_meters_of_94101[['latitude', 'longitude','postal_code']].values.tolist()

In [None]:
for loc in near_94101_lat_long:
    try:
        postal_code = str(int(loc[2]))
    except:
        postal_code = None
    folium.Marker(loc[:2],postal_code).add_to(sf_map)

In [None]:
sf_map