# Data Warehouse & ETL

This week we'll take a look at more ETL functions, building up a mini warehouse using Bikeshare and weather data.

We are going to use [PostgreSQL](https://www.postgresql.org) 9.5 or later version this time. PostgreSQL is already installed on your AWS EC2 instances based on our AMI.

## Setup - bikeshare data, again

We'll download the same Bikeshare data you've worked with before, and we'll create some database tables and indexes more deliberately using PostgreSQL.

In [None]:
%load_ext sql

In [None]:
!dropdb -U student week10

In [None]:
!createdb -U student week10

In [None]:
%sql postgresql://student@/week10

In [None]:
!wget -O 2017-Q1-trips.zip https://s3.amazonaws.com/capitalbikeshare-data/2017-Q1-cabi-trips-history-data.zip

In [None]:
!unzip -o 2017-Q1-trips.zip

In [None]:
!mv 2017-Q1-Trips-History-Data.csv 2017q1.csv

In [None]:
!wc -l 2017q1.csv

In [None]:
!csvcut -n 2017q1.csv

### Create table and import

Given the volume of data here, let's go straight to pgsql to load the data.

*Note* use `gshuf` if you're on a Mac, otherwise try `shuf`.  Same options should work for both.

In [None]:
!head -n 10000 2017q1.csv | csvstat

Based on these values, I expect we can work with the following:

In [None]:
%%sql
DROP TABLE IF EXISTS rides;

CREATE TABLE rides (
    duration_ms INTEGER,
    start_date TIMESTAMP,
    end_date TIMESTAMP,
    start_station_id INTEGER,
    start_station VARCHAR(64),
    end_station_id INTEGER,
    end_station VARCHAR(64),
    bike_number CHAR(21),
    member_type CHAR(10)
)

Now we'll load the data directly using `COPY` command.  Note that this **requires** the use of an absolute path, so adjust it to your location:

In [None]:
!cp 2017q1.csv /tmp/2017q1.csv

In [None]:
%%sql
COPY rides FROM '/tmp/2017q1.csv'
CSV
HEADER;

In [None]:
%%sql
SELECT COUNT(*) FROM rides;

In [None]:
%%sql
SELECT * FROM rides
LIMIT 10

## More ETL with SQL

Today we'll extend previous week's examples of how to extract consistent sets of values out of your database. We will start with this schema: 

In [None]:
from IPython.display import Image
Image(url="https://github.com/tongwang/data2017/raw/master/lectures/star-week-01/rides.png")

We will eventually build a star schema like this:

In [None]:
Image(url="https://github.com/tongwang/data2017/raw/master/lectures/star-week-01/rides_star.png")

First let's extract simple details like station ids and names.

In [None]:
%%sql
SELECT DISTINCT start_station_id, start_station
FROM rides
ORDER BY start_station
LIMIT 10;

In [None]:
%%sql
SELECT DISTINCT end_station_id, end_station
FROM rides
ORDER BY end_station
LIMIT 10;

To be sure we get them all, we need to combine them into a union set.

In [None]:
%%sql
SELECT DISTINCT start_station_id AS station_id, 
       start_station AS station 
FROM rides
UNION
SELECT DISTINCT end_station_id AS station_id, 
       end_station AS station 
FROM rides

Now we can create a new table to house the unique station names.

In [None]:
%%sql
DROP TABLE IF EXISTS station;

CREATE TABLE station (
    key SERIAL PRIMARY KEY,
    station_id INTEGER,
    name VARCHAR(64)
);

In [None]:
%%sql
INSERT INTO station (station_id, name)
SELECT DISTINCT start_station_id AS station_id, 
       start_station AS station 
FROM rides
UNION
SELECT DISTINCT end_station_id AS station_id, 
       end_station AS station 
FROM rides;

In [None]:
%%sql
SELECT * FROM station LIMIT 10;

We can even add these new identifiers back to the original table now.

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN start_station_key INTEGER,
ADD CONSTRAINT fk_start_station
    FOREIGN KEY (start_station_key)
    REFERENCES station (key);

In [None]:
%%sql 
SELECT * FROM rides LIMIT 10;

In [None]:
%%sql
UPDATE rides
SET start_station_key = station.key
FROM station
WHERE rides.start_station_id = station.station_id;

In [None]:
%%sql 
SELECT * FROM rides LIMIT 10;

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN end_station_key INTEGER,
ADD CONSTRAINT fk_end_station
    FOREIGN KEY (end_station_key)
    REFERENCES station (key);

In [None]:
%%sql
UPDATE rides
SET end_station_key = station.key
FROM station
WHERE rides.end_station = station.name;

In [None]:
%%sql
SELECT * FROM rides
LIMIT 10;

Now we can drop the columns `start_station_id`, `start_station`, `end_station_id` and `end_station` from the rides relation.

In [None]:
%%sql
ALTER TABLE rides 
DROP COLUMN start_station_id,
DROP COLUMN start_station,
DROP COLUMN end_station_id,
DROP COLUMN end_station;

In [None]:
%%sql
SELECT * FROM rides
LIMIT 10;

### Simple address geocoding

It feels like we should do a little more with the stations, doesn't it?  Let's see if we can geocode them using the [geocoder library](https://geocoder.readthedocs.io/). First let's install `geocoder` package: 

In [None]:
!pip install --user geocoder

In [None]:
%%sql
ALTER TABLE station
ADD COLUMN lat NUMERIC,
ADD COLUMN lng NUMERIC;

#### Connecting to the db from python

Here we'll use a little python to update run geocoding queries and flesh out the data a bit more.

Note that the following code takes very long to finish. Please complete the following steps at home.

In [None]:
import psycopg2
import geocoder

conn = psycopg2.connect("dbname='week10' user='student'")
c = conn.cursor()
c.execute("SELECT key, name FROM station ORDER BY key ASC LIMIT 10")
rows = c.fetchall()
for r in rows:
    station_key, station_name = r
    print('%s: %s' % (station_key, station_name))
    g = geocoder.google('%s Washington DC' % station_name)
    if g:
        c.execute("UPDATE station SET lat = (%s), lng = (%s) WHERE key = (%s)", 
                  (g.lat, g.lng, station_key))
    else:
        print("no geocode")
conn.commit()

In [None]:
%%sql
SELECT * FROM station ORDER BY key ASC LIMIT 10;

### Add more derived facts and dimensions

Another useful step might be recording the minutes as a new column so we don't have to calculate from milliseconds every time.

In [None]:
%%sql
ALTER TABLE rides
ADD COLUMN duration_min NUMERIC;

In [None]:
%%sql
UPDATE rides
SET duration_min = ROUND(CAST(duration_ms AS NUMERIC) / (1000 * 60), 1);

In [None]:
%%sql
SELECT * FROM rides
LIMIT 5;

In data warehouse models and in statistical model feature engineering, it can be particularly useful to extract all kinds of parts of dates out into their own attributes.  You never know where you'll find significance.

This kind of extraction is quite common.

In [None]:
%%sql
SELECT DISTINCT TO_CHAR(start_date, 'YYYY-MM-DD HH24:00:00') AS hour,
    TO_CHAR(start_date, 'YYYY-MM-DD') AS day, 
    TO_CHAR(start_date, 'YYYY') AS year,
    TO_CHAR(start_date, 'Month') AS month_of_year_str,
    TO_CHAR(start_date, 'MM') AS month_of_year,
    TO_CHAR(start_date, 'DD') AS day_of_month,
    TO_CHAR(start_date, 'Day') AS day_of_week_str,
    TO_CHAR(start_date, 'D') AS day_of_week,
    CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) >= 6 
        THEN 'true' 
        ELSE 'false'
    END AS is_weekend,
    CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) < 6 
        THEN 'true' 
        ELSE 'false'
    END AS is_weekday,
    TO_CHAR(start_date, 'HH24') AS hour_of_day,
    TO_CHAR(start_date, 'Q') AS quarter_of_year
FROM rides
LIMIT 10;

In [None]:
%%sql
DROP TABLE IF EXISTS hour;

CREATE TABLE hour (
    key SERIAL PRIMARY KEY,
    hour CHAR(19),
    day CHAR(10),
    year INTEGER,
    month_of_year_str VARCHAR(12),
    month_of_year INTEGER,
    day_of_month INTEGER,
    day_of_week_str CHAR(9),
    day_of_week INTEGER,
    is_weekend BOOLEAN,
    is_weekday BOOLEAN,
    hour_of_day INTEGER,
    quarter_of_year INTEGER
);

In [None]:
%%sql
INSERT INTO hour (hour, day, year, month_of_year_str, month_of_year, day_of_month, 
                  day_of_week_str, day_of_week, is_weekend, is_weekday, 
                  hour_of_day, quarter_of_year)
SELECT DISTINCT TO_CHAR(start_date, 'YYYY-MM-DD HH24:00:00') AS hour,
    TO_CHAR(start_date, 'YYYY-MM-DD') AS day, 
    CAST(TO_CHAR(start_date, 'YYYY') AS INTEGER) AS year,
    TO_CHAR(start_date, 'Month') AS month_of_year_str,
    CAST(TO_CHAR(start_date, 'MM') AS INTEGER) AS month_of_year,
    CAST(TO_CHAR(start_date, 'DD') AS INTEGER) AS day_of_month,
    TO_CHAR(start_date, 'Day') AS day_of_week_str,
    CAST(TO_CHAR(start_date, 'D') AS INTEGER) AS day_of_week,
    CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) IN (1, 7) 
        THEN TRUE
        ELSE FALSE
    END AS is_weekend,
    CASE WHEN CAST(TO_CHAR(start_date, 'D') AS INTEGER) NOT IN (1, 7) 
        THEN TRUE
        ELSE FALSE
    END AS is_weekday,
    CAST(TO_CHAR(start_date, 'HH24') AS INTEGER) AS hour_of_day,
    CAST(TO_CHAR(start_date, 'Q') AS INTEGER) AS quarter_of_year
FROM rides
UNION
SELECT DISTINCT TO_CHAR(end_date, 'YYYY-MM-DD HH24:00:00') AS hour,
    TO_CHAR(end_date, 'YYYY-MM-DD') AS day, 
    CAST(TO_CHAR(end_date, 'YYYY') AS INTEGER) AS year,
    TO_CHAR(end_date, 'Month') AS month_of_year_str,
    CAST(TO_CHAR(end_date, 'MM') AS INTEGER) AS month_of_year,
    CAST(TO_CHAR(end_date, 'DD') AS INTEGER) AS day_of_month,
    TO_CHAR(end_date, 'Day') AS day_of_week_str,
    CAST(TO_CHAR(end_date, 'D') AS INTEGER) AS day_of_week,
    CASE WHEN CAST(TO_CHAR(end_date, 'D') AS INTEGER) IN (1, 7) 
        THEN TRUE
        ELSE FALSE
    END AS is_weekend,
    CASE WHEN CAST(TO_CHAR(end_date, 'D') AS INTEGER) NOT IN (1, 7) 
        THEN TRUE
        ELSE FALSE
    END AS is_weekday,
    CAST(TO_CHAR(end_date, 'HH24') AS INTEGER) AS hour_of_day,
    CAST(TO_CHAR(end_date, 'Q') AS INTEGER) AS quarter_of_year
FROM rides;

In [None]:
%%sql
SELECT * FROM hour
LIMIT 10;

And let's make sure we got that weekend bit right:

In [None]:
%%sql
SELECT DISTINCT day_of_week_str, day_of_week, is_weekend, is_weekday 
FROM hour
ORDER BY day_of_week;

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN start_hour_key INTEGER,
ADD CONSTRAINT fk_start_hour
    FOREIGN KEY (start_hour_key)
    REFERENCES hour (key);

In [None]:
%%sql
UPDATE rides
SET start_hour_key = hour.key
FROM hour
WHERE TO_CHAR(rides.start_date, 'YYYY-MM-DD HH24:00:00') = hour.hour;

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN end_hour_key INTEGER,
ADD CONSTRAINT fk_end_hour
    FOREIGN KEY (end_hour_key)
    REFERENCES hour (key);

In [None]:
%%sql
UPDATE rides
SET end_hour_key = hour.key
FROM hour
WHERE TO_CHAR(rides.end_date, 'YYYY-MM-DD HH24:00:00') = hour.hour;

In [None]:
%%sql
SELECT * FROM rides
LIMIT 5;

Let's verify that those key values are correct. Here is the query that returns the first 10 trips of the bike W20893, with start/end dates as well as the start/end hours. It joins the rides fact table with hour dimension table:

In [None]:
%%sql
SELECT rides.start_date, rides.end_date, s_hour.hour AS start_hour, e_hour.hour AS end_hour
FROM rides
JOIN hour AS s_hour
  ON s_hour.key = rides.start_hour_key
JOIN hour AS e_hour
  ON e_hour.key = rides.end_hour_key
WHERE bike_number = 'W20893'
ORDER BY rides.start_date
LIMIT 10;

Let's create member_type and bike dimensions:

In [None]:
%%sql
DROP TABLE IF EXISTS member_type;

CREATE TABLE member_type (
    key SERIAL PRIMARY KEY,
    member_type CHAR(10)
);

DROP TABLE IF EXISTS bike;

CREATE TABLE bike (
    key SERIAL PRIMARY KEY,
    bike_number CHAR(21)
);

We populate the member_type dimension table. As you can see there is only two rows in this dimension table:

In [None]:
%%sql
INSERT INTO member_type (member_type)
SELECT DISTINCT member_type 
FROM rides;

Next, we add member type FK to the rides table:

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN member_type_key INTEGER,
ADD CONSTRAINT fk_member_type
    FOREIGN KEY (member_type_key)
    REFERENCES member_type (key);

...and populate the member type FK:

In [None]:
%%sql
UPDATE rides AS r
SET member_type_key = m.key
FROM member_type AS m
WHERE r.member_type = m.member_type;

Same steps for the bike:

In [None]:
%%sql
INSERT INTO bike (bike_number)
SELECT DISTINCT bike_number 
FROM rides;

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN bike_key INTEGER,
ADD CONSTRAINT fk_bike
    FOREIGN KEY (bike_key)
    REFERENCES bike (key);

In [None]:
%%sql
UPDATE rides
SET bike_key = bike.key
FROM bike
WHERE rides.bike_number = bike.bike_number;

We can drop the `bike_number` and `member_type` from the rides table.

In [None]:
%%sql
ALTER TABLE rides 
DROP COLUMN bike_number,
DROP COLUMN member_type;

We can even drop the `start_date` and `end_date` if we are not interested at the minute level.

In [None]:
%%sql
ALTER TABLE rides 
DROP COLUMN start_date,
DROP COLUMN end_date;

Now look at the final fact table:

In [None]:
%%sql
SELECT * FROM rides
LIMIT 5;

Now we are ready to explore the data. Let's find out the number of trips by day of the week:

In [None]:
%%sql
SELECT day_of_week_str, COUNT(*) count
FROM rides
JOIN hour
  ON rides.start_hour_key = hour.key
GROUP BY day_of_week_str, day_of_week
ORDER BY day_of_week;

In [None]:
%matplotlib inline

In [None]:
_.bar()

Let's look at the trips separately from casual riders and registered riders:

In [None]:
%%sql
SELECT day_of_week_str, COUNT(*) count
FROM rides
JOIN hour
  ON rides.start_hour_key = hour.key
JOIN member_type
    ON rides.member_type_key = member_type.key
WHERE member_type.member_type = 'Casual'
GROUP BY day_of_week_str, day_of_week
ORDER BY day_of_week;

In [None]:
_.bar()

In [None]:
%%sql
SELECT day_of_week_str, COUNT(*) count
FROM rides
JOIN hour
  ON rides.start_hour_key = hour.key
JOIN member_type
    ON rides.member_type_key = member_type.key
WHERE member_type.member_type = 'Registered'
GROUP BY day_of_week_str, day_of_week
ORDER BY day_of_week;

In [None]:
_.bar()

In [None]:
%%sql
SELECT hour.hour_of_day, COUNT(*) count
FROM rides
JOIN hour
    ON rides.start_hour_key = hour.key
JOIN member_type
    ON rides.member_type_key = member_type.key
WHERE member_type.member_type = 'Casual'
GROUP BY hour.hour_of_day 
ORDER BY hour.hour_of_day;

In [None]:
_.bar()

In [None]:
%%sql
SELECT hour.hour_of_day, COUNT(*) count
FROM rides
JOIN hour
    ON rides.start_hour_key = hour.key
JOIN member_type
    ON rides.member_type_key = member_type.key
WHERE member_type.member_type = 'Registered'
GROUP BY hour.hour_of_day 
ORDER BY hour.hour_of_day;

In [None]:
_.bar()

## Adding weather data

An interesting dimension to the bikeshare history is weather - I know I don't mind riding in the rain, but I'm probably in the minority.

Weather Underground offers access to weather history data at links like https://www.wunderground.com/history/airport/KDCA/2017/1/18/DailyHistory.html. Until recently we were able to download the weather history data in CSV format for free. Now Weather Underground offers API access that supports JSON and XML formats only and starts to charge for it based on usage. 

Fortunately we have downloaded the data for 2017 Q1 before they disabled the CSV suppport. There is one CSV file for each day. You can download them all as a zip file.

In [None]:
!wget -O weather2017q1.csv.zip https://s3.amazonaws.com/dmfa-2017/weather2017q1.csv.zip

In [None]:
!unzip -o weather2017q1.csv.zip

In [None]:
!head weather-20170125.csv | csvlook

Something is not right! The header is missing and there is `<br />` at the end of each line. Let's look at the raw content of the CSV file.

In [None]:
!head weather-20170125.csv

There are two issues:
1. The first line is blank
2. There are extra characters at the end of each line.

Let's clean the data using sed (stream editor) command. sed command "`/^$/d`" removes blank lines. "`s/<br \/>//g`" finds and replace `<br \/>` with empty string `''`.

In [None]:
!sed '/^$/d;s/<br \/>//g' weather-20170125.csv | head | csvlook

Now it looks much better! Apply the fix to all weather CSV files.

In [None]:
!for f in weather-2017*.csv; do sed -i '/^$/d;s/<br \/>//g' ${f}; done

If you don't know what the -i option do, you can find it out in the help:

In [None]:
!sed --help

And combine date based csv files into a single file:

In [None]:
!csvstack weather-201701*.csv weather-201702*.csv weather-201703*.csv > weather-2017q1.csv

In [None]:
!csvstat weather-2017q1.csv

We've noticed special values such as `-`, `N/A` and `None`. We need to remove them so that they will be treated as NULL by the database.

In [None]:
!sed -i 's/,N\/A,/,,/g;s/,-,/,,/g;;s/,None,/,,/g' weather-2017q1.csv

Based on these values, I expect we can work with the following schema for weather. Note that the type for `time_utc` is `TIMESTAMPTZ`, which is the abbreviation for `timestamp with time zone`, a PostgreSQL specific type. We also add an attribute named `time`. We will use it to store local eastern time.

In [None]:
%%sql
DROP TABLE IF EXISTS weather;

CREATE TABLE weather (
    key SERIAL PRIMARY KEY,
    time_str VARCHAR(8),
    temp NUMERIC,
    dew_point NUMERIC,
    humidity NUMERIC,
    pressure NUMERIC,
    visibility NUMERIC,
    wind_dir VARCHAR(8),
    wind_speed VARCHAR(10),
    gust_speed NUMERIC,
    precipitation NUMERIC,
    events VARCHAR(50),
    conditions VARCHAR(50),
    wind_dir_degrees NUMERIC,
    time_utc TIMESTAMPTZ,
    time TIMESTAMP
)

Now we'll load the data into PostgreSQL. Note that this requires the use of an absolute path, so adjust it to your location:

In [None]:
!pwd

In [None]:
%%sql
COPY weather 
(time_str, temp, dew_point, humidity, pressure, visibility, wind_dir, wind_speed, gust_speed, precipitation, events, conditions, wind_dir_degrees, time_utc)
FROM '/home/ubuntu/lectures/week-10/weather-2017q1.csv'
CSV
HEADER
QUOTE '"'
DELIMITER ',';

In [None]:
%%sql
SELECT * from weather LIMIT 10;

Next, we need to convert UTC time to local time EST or EDT. We know Daylight Saving Time started on Sunday, March 12, 2017, 2:00:00 am. The conversion takes two steps:

First we convert UTC times to EST times and populate `time` attribute for all `time_utc` values before `2017-03-12 07:00:00+00:00`, which is Sunday, March 12, 2017, 2:00:00 am EST. 

In [None]:
%%sql
UPDATE weather SET time = time_utc AT TIME ZONE 'EST'
WHERE time_utc <= '2017-03-12 07:00:00+00:00';

Next we convert UTC times to EDT times and populate `time` attribute for all `time_utc` values after `2017-03-12 07:00:00+00:00`. 

In [None]:
%%sql
UPDATE weather SET time = time_utc AT TIME ZONE 'EDT'
WHERE time_utc > '2017-03-12 07:00:00+00:00';

Verify that time attributes look okay on March 12:

In [None]:
%%sql
SELECT time_str, time from weather 
WHERE TO_CHAR(time, 'YYYY-MM-DD') = '2017-03-12'
ORDER BY time;

Now we add two foreign key columns (`start_weather_key` and `end_weather_key`) to the `rides` table that reference `weather` dimension table.

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN start_weather_key INTEGER,
ADD CONSTRAINT fk_start_weather
    FOREIGN KEY (start_weather_key)
    REFERENCES weather (key);

In [None]:
%%sql
ALTER TABLE rides 
ADD COLUMN end_weather_key INTEGER,
ADD CONSTRAINT fk_end_weather
    FOREIGN KEY (end_weather_key)
    REFERENCES weather (key);

In [None]:
%%sql
UPDATE rides
SET start_weather_key = weather.key
FROM weather, hour 
WHERE rides.start_hour_key = hour.key AND hour.hour = TO_CHAR(weather.time, 'YYYY-MM-DD HH24:00:00');

In [None]:
%%sql
UPDATE rides
SET end_weather_key = weather.key
FROM weather, hour 
WHERE rides.end_hour_key = hour.key AND hour.hour = TO_CHAR(weather.time, 'YYYY-MM-DD HH24:00:00');

Some rides do not have weather captured because there are some missing hours in the weather data:

In [None]:
%%sql
SELECT COUNT(*) FROM rides
WHERE start_weather_key IS NULL;

In [None]:
%%sql
SELECT COUNT(*) FROM rides
WHERE end_weather_key IS NULL;

Let's find out the top 5 weather conditions that people ride bikeshare.

In [None]:
%%sql
SELECT w.conditions, COUNT(*) count
FROM rides
JOIN weather AS w
ON w.key = rides.start_weather_key
GROUP BY w.conditions
ORDER BY count DESC
LIMIT 5;

In [None]:
%matplotlib inline

In [None]:
_.bar()

And the top 10 weather conditions that people ride bikeshare the least often:

In [None]:
%%sql
SELECT w.conditions, COUNT(*) count
FROM rides
JOIN weather AS w
ON w.key = rides.start_weather_key
GROUP BY w.conditions
ORDER BY count ASC
LIMIT 10;

In [None]:
_.bar()