# Retrosheet Baseball Data -- Persist to Postgres

**Baseball Notebooks**  
1. Downloaded and unzipped baseball data.
2. Helper functions and their motivation for use.
3. Lahman data was wrangled and persisted.
4. Retrosheet Play by Play data was parsed, collected into 2 DataFrames, and persisted.
5. Wrangle the Retrosheet data in preparation for data analysis.
6. This notebook.

Load the wrangled data into Postgres.

This notebook is designed to be used with Jupyter Lab and the Table of Contents extension.  
https://github.com/jupyterlab/jupyterlab-toc

## Repeatable Research
All data processing should be documented so that others can repeat the results.  This includes every step from downloading the data through analysis.

In [1]:
import pandas as pd
import numpy as np
import os
import re
from pathlib import Path

In [2]:
# see Baseball Notebook #2
import helper_functions as bb

In [3]:
# create path objects -- these directories were created in the previous notebook
home = Path.home()
retrosheet = home.joinpath('data/retrosheet')
p_wrangled = retrosheet.joinpath('wrangled')

## Working with Postgres

1. The Postgres server should be installed and running.
2. The retrosheet schema should exist.
3. The ipython/Jupyter Lab SQL magic extension should be installed. See:  
https://github.com/catherinedevlin/ipython-sql

In [4]:
from sqlalchemy.engine import create_engine
from sqlalchemy.types import SmallInteger, Integer, BigInteger
from IPython.display import HTML, display

### Connect to DB

conn = create_engine(connect_str)

Using conn.execute(query), with conn created as above (i.e. a SQL Alchemy engine), will:
1. cause a DB connection object to be allocated for use
2. will use that connection object to execute the query
3. will commit any data changes
4. will release that connection object back to the open connection pool

For transaction processing, using the Python DB API, with SQL Alchemy, use:  
```connection = create_engine(connect_str).connect()```

In [5]:
# Get the user and password from the environment (rather than hardcoding it)
import os
db_user = os.environ.get('DB_USER')
db_pass = os.environ.get('DB_PASS')

# avoid putting passwords directly in code
connect_str = f'postgresql://{db_user}:{db_pass}@localhost:5432/baseball'

# treat sql alchmey engine as a connection to the database
conn = create_engine(connect_str)

## psql

Use the following to run psql commands from a Jupyter Code cell.

This will connect, execute, and disconnect from the database.

For this to work without a password, a .pgpass file is necessary.  
See: https://www.postgresql.org/docs/11/libpq-pgpass.html    

The .pgpass file should look like:  
```localhost:5432:*:<user>:<passwd>```

In [6]:
def psql(cmd, user='postgres', schema='baseball'):
    psql_out = !psql -H -U {user} {schema} -c "{cmd}"
    display(HTML(''.join(psql_out)))

In [7]:
!psql --version

psql (PostgreSQL) 11.2 (Ubuntu 11.2-1.pgdg18.04+1)


## 1. Player Game to Postgres

### Load into Postgres

df.to_sql() is a convenient method to load data into a database table, as well as create the schema for that table, if it does not exist.

However for large amounts of data, most database systems offer a "bulk copy" that is much faster than any other method of loading data.

The "create table" feature of df.to_sql() will be used here. 

For Postgres, the "bulk copy" is called COPY and can be run from the psql command line.

In [8]:
os.chdir(p_wrangled)

# just read in 10 rows
player_game = bb.from_csv_with_types('player_game.csv.gz', nrows=10)
dtypes = bb.optimize_db_dtypes(player_game)

In [9]:
conn.execute('DROP TABLE IF EXISTS r_player_game');

In [10]:
# copy over a few rows to create/replace the table with the proper datatypes
player_game.to_sql('r_player_game', conn, if_exists='replace', index=False, dtype=dtypes)

In [11]:
type(conn)

sqlalchemy.engine.base.Engine

In [13]:
# when conn is a SQL Alchemy engine, the changes will be committed automatically
conn.execute('DELETE FROM r_player_game');

In [14]:
rs = conn.execute('SELECT COUNT(*) FROM r_player_game')
rs.fetchall()

[(0,)]

In [15]:
# verify we are in correct directory and have zcat
# Shoule see about 3.5 million records for 1955 to 2018
!zcat player_game.csv.gz | wc -l

3549700


In [16]:
# psql command to copy gzipped csv file to postgres table
cmd="\copy r_player_game from program 'zcat player_game.csv.gz' CSV HEADER"

In [17]:
# this is MUCH faster than using df.to_sql()
%time psql(cmd)

CPU times: user 3.66 ms, sys: 3.91 ms, total: 7.57 ms
Wall time: 10.5 s


In [18]:
# add primary key constraint
sql = 'ALTER TABLE r_player_game ADD PRIMARY KEY (player_id, game_id)'
conn.execute(sql);

ProgrammingError: (psycopg2.ProgrammingError) column "player_id" named in key does not exist
 [SQL: 'ALTER TABLE r_player_game ADD PRIMARY KEY (player_id, game_id)'] (Background on this error at: http://sqlalche.me/e/f405)

In [None]:
# describe player_game table
psql('\d r_player_game')

## 2. Player Game Data Dictionary to Postgres

In [None]:
os.chdir(p_wrangled)
player_game_fields = pd.read_csv('player_game_fields.csv')
player_game_fields.to_sql('r_player_game_fields', conn, if_exists='replace', index=False)

In [None]:
# verify df.to_sql worked
rs = conn.execute("SELECT * FROM r_player_game_fields")
df = pd.DataFrame(rs.fetchall())
df.columns = rs.keys()
df

## 3. Game to Postgres

In [None]:
os.chdir(p_wrangled)

# just read in 10 rows
game = bb.from_csv_with_types('game.csv.gz', nrows=10)
dtypes = bb.optimize_db_dtypes(game)

In [9]:
conn.execute('DROP TABLE IF EXISTS r_game');

In [None]:
game.to_sql('r_game', conn, if_exists='replace', dtype=dtypes, index=False)

In [None]:
# when conn is a SQL Alchemy engine, the changes will be committed automatically
conn.execute('DELETE FROM r_game');

In [None]:
rs = conn.execute('SELECT COUNT(*) FROM r_game')
rs.fetchall()

In [None]:
# psql command to copy gzipped csv file to postgres table
cmd="\copy r_game from program 'zcat game.csv.gz' CSV HEADER"

In [None]:
# this is MUCH faster than using df.to_sql()
%time psql(cmd)

In [None]:
# add primary key constraint
sql = 'ALTER TABLE r_game ADD PRIMARY KEY (game_id)'
conn.execute(sql);

In [None]:
# describe player_game table
psql('\d r_game')

## 4. Game Data Dictionary to Postgres

In [None]:
os.chdir(p_wrangled)
game_fields = pd.read_csv('game_fields.csv')

In [9]:
conn.execute('DROP TABLE IF EXISTS r_game_fields');

In [None]:
game_fields.to_sql('r_game_fields', conn, if_exists='replace', index=False)

In [None]:
# verify df.to_sql worked
rs = conn.execute("SELECT * FROM r_game_fields")
df = pd.DataFrame(rs.fetchall())
df.columns = rs.keys()
df

## 5. Players to Postgres

In [None]:
os.chdir(p_wrangled)
players = pd.read_csv('players.csv', 
                      parse_dates=['player_debut', 'mgr_debut', 'coach_debut', 'ump_debut'])

In [9]:
conn.execute('DROP TABLE IF EXISTS r_players');

In [None]:
players.to_sql('r_players', conn, if_exists='replace', index=False)

In [None]:
# verify df.to_sql worked
rs = conn.execute("SELECT * FROM r_players LIMIT 3")
df = pd.DataFrame(rs.fetchall())
df.columns = rs.keys()
df

In [None]:
# describe player_game table
psql('\d r_players')

## 6. Stadium (Parks) to Postgres

In [None]:
os.chdir(p_wrangled)
parks = pd.read_csv('parks.csv', 
                      parse_dates=['start', 'end'])
parks.to_sql('r_parks', conn, if_exists='replace', index=False)

In [None]:
# verify df.to_sql worked
rs = conn.execute("SELECT * FROM r_parks LIMIT 3")
df = pd.DataFrame(rs.fetchall())
df.columns = rs.keys()
df

In [None]:
# describe player_game table
psql('\d r_parks')