# "Big Pandas" - Dask from the Inside
## Part 2 - Retrieving BTS OTP sample data 
### PyData Berlin tutorial, 30 June 2017
## Stephen Simmons

    

This notebook is for downloading and preparing the original source data.

You can also downloading pre-prepared data from here:  
* http://www.stevesimmons.com/pydata-ams2017/flights-201601-201701-csv-xz.tar (151MB)
* http://www.stevesimmons.com/pydata-ams2017/flights-201601-201701-parq.tar (158MB)


In [1]:
import numpy as np
import pandas as pd
import dask
import dask.dataframe as dd

import graphviz
import os
import time

np.__version__, pd.__version__, dask.__version__

('1.13.1', '0.20.3', '0.15.0')

In [2]:
# Support multiple lines of output in each cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Don't wrap tables
pd.options.display.max_rows = 20
pd.options.display.width = 200

# Show matplotlib graphs inline in Jupyter notebook
%matplotlib inline

In [5]:
# The sample data is the USA Bureau of Transportation Statistics 'On-Time' monthly series.
# This has actual arrival/departure times versus schedule for every domestic flight
# by major US carriers. For details, see the BTS website:
#    https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time

# We can download the monthly results directly from this URL, filling in the
# two parameters with the year (e.g. '2016') and month ('1' for January, '12' for December).
# The resulting zip files, each around 23MB in size when compressed,
# contain a 200MB .csv file with the same name (On_Time_On_Time_Performance_2016_1.csv)
# plus a 'readme.html' explaining the fields.
OTP_URL = 'https://transtats.bts.gov/PREZIP/On_Time_On_Time_Performance_%s_%s.zip'

OTP_COLUMNS_TO_LOAD = [
        'FlightDate', 'Origin', 'Dest', 'Distance',
        'Carrier', 'FlightNum', 'TailNum',
        'CRSDepTime', 'CRSArrTime', 'CRSElapsedTime',
        'Flights', 'Cancelled','Diverted',
        'DepTime', 'ArrTime', 'ActualElapsedTime',
        'DepDelay', 'ArrDelay', 'AirTime',
    ]

# Directory to store the resulting .zip files
if os.path.exists('/home/stephen/do-not-backup'):
    DIR_NAME = '/home/stephen/do-not-backup/data/usa-flights-otp'
else:
    DIR_NAME = '~/pydata-pandas/data'

In [15]:
# Download USA flight data as described at 
# https://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time
import logging
import multiprocessing
import ssl
import urllib.request
import io
import zipfile
import lzma
import os
import psutil

BASE_URL = 'https://transtats.bts.gov//PREZIP/On_Time_On_Time_Performance_%s_%s.zip'
DIR_NAME = 'pydata-pandas/data/usa-flights-otp'

def memory_usage():
    process = psutil.Process(os.getpid())
    mem_MB = process.memory_info().rss / (1024*1024)
    return mem_MB
    
def retrieve_data(year_month):
    """
    Retrieve BTS on-time flight data for given year and month,
    unpack csv data from zip file and save as 'flights-yyyy-mm.xz'.
     
    year_month  - Month of data to retrieve, in form of a tuple of ints 
                    like (2016, 1) for January 2016.
    """
    os.makedirs(DIR_NAME, exist_ok=True)
    filename = 'flights-%04d-%02d' % year_month
    zip_path = os.path.join(DIR_NAME, filename + '.zip')
    xz_path  = os.path.join(DIR_NAME, filename + '.xz' )
    csv_filename = 'On_Time_On_Time_Performance_%s_%s.csv' % year_month

    if os.path.exists(xz_path):
        print("%s - Already have .xz file" % filename)
    else:
        started = time.time()
        # Get zip file's data
        if os.path.exists(zip_path):
            # Extract from previously downloaded zip file
            print("%s - Reading csv from %s" % (filename, zip_path))
            zip_src = zip_path
        else:
            # Download zip file to memory
            url = OTP_URL % year_month
            print("%s - Downloading %s" % (filename, url))
            # We would like to do simply this:
            #   urllib.request.urlretrieve(url, dest_path)
            # but that gives SSL errors
            ctx = ssl.create_default_context()
            ctx.check_hostname = False
            ctx.verify_mode = ssl.CERT_NONE
            zip_data = urllib.request.urlopen(url, context=ctx).read() # 25MB
            zip_src = io.BytesIO(zip_data)
        # Extract csv data and recompress to .xz archive
        csv_data = zipfile.ZipFile(zip_src).open(csv_filename).read()  # 200MB!
        MB = 1024.0 * 1024.0
        print("%s - csv data is %0.1fMB. Now compressing..."
                        % (filename, len(csv_data) / MB))
        with lzma.open(xz_path, 'wb') as xzf:
            xzf.write(csv_data)
            csv_MB = xzf.tell() / MB
            xz_MB = xzf._fp.tell() / MB
        mins, secs = divmod(time.time() - started, 60)
        print("%s - Compressed csv from %0.1fMB to %0.1fMB [%02d:%02d, %0.1fMB mem]"
                        % (filename, csv_MB, xz_MB, mins, secs, memory_usage() ))


def download_flight_data(start='1988-01', end=None, num_threads=4):
    """
    Download BTS On-Time flight data for one month or a range of months.
    Data is available from '1987-12' to '2017-01' inclusive.
    The resulting zip files are named 'files-yyyy-mm.zip'.
    """
    end   = tuple(map(int, min(end or start, '2017-01').split('-')))
    start = tuple(map(int, max(start,        '1987-12').split('-')))

    dates = (
        (year, month)
            for year in range(end[0], start[0] - 1, -1)
                for month in range(12, 0, -1)
                    if start <= (year, month) <= end
    )

    multiprocessing.Pool(num_threads).map(retrieve_data, dates)


In [18]:
download_flight_data('2015-01', '2017-01')

flights-2017-01 - Downloading https://transtats.bts.gov/PREZIP/On_Time_On_Time_Performance_2017_1.zip
flights-2016-09 - Already have .xz file
flights-2016-11 - Already have .xz file
flights-2016-10 - Already have .xz file
flights-2016-08 - Already have .xz file
flights-2016-07 - Already have .xz file
flights-2016-05 - Already have .xz file
flights-2016-03 - Already have .xz file
flights-2016-06 - Already have .xz file
flights-2016-04 - Already have .xz file
flights-2016-02 - Already have .xz file
flights-2016-01 - Already have .xz file
flights-2015-11 - Already have .xz file
flights-2015-09 - Already have .xz file
flights-2015-12 - Already have .xz file
flights-2015-10 - Already have .xz file
flights-2015-08 - Already have .xz file
flights-2015-07 - Already have .xz file
flights-2015-05 - Already have .xz file
flights-2015-03 - Already have .xz file
flights-2015-02 - Already have .xz file
flights-2015-04 - Already have .xz file
flights-2015-06 - Already have .xz file
flights-2015-01 - 

The downloaded zip files are around 23MB each, and contain two files:
- The `.csv` data, around 220MB in size, with a name like `On_Time_On_Time_Performance_2014_8.csv`
- `readme.html` explaining the fields.

To simplify subsequent mass processing, let's recompress just the .csv file. Using xz compression results in files around 12MB, half the size of zip. 

```
  $ for file in flights-*.zip; do 
      if [ ! -a "${file%.zip}.xz" ]; then 
        unzip -p "$file" *.csv | xz > "${file%.zip}.xz" 
      fi
    done
```


In [13]:
path = os.path.join(DIR_NAME, 'flights-2017-01.xz')
df = pd.read_csv(path, dialect="excel", nrows=10)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Columns: 110 entries, Year to Unnamed: 109
dtypes: float64(69), int64(27), object(14)
memory usage: 8.7+ KB


In [16]:
def memory_usage(log=False):
    """Return current memory usage or print in log. Requires `psutil` package installed."""
    pid = os.getpid()
    try:
        import psutil
        mem_MB = psutil.Process(pid).memory_info().rss / 1024.0 / 1024.0
        msg = "Memory for process %s is %0.1fMB" % (pid, mem_MB)
    except:
        mem_MB = None
        msg = "Process is pid %s. Memory usage unavailable" % pid
    if log:
        logging.info(msg)
    return mem_MB

In [19]:
def load_one_month(yyyy_mm, nrows=None):
    """
    Load one month's data as a pandas DataFrame. 
    Optionally limit max number of rows read.
    """
    started = time.time()

    # Load the csv from xz-compressed file
    path = os.path.join(DIR_NAME, 'flights-%s.xz' % yyyy_mm)
    df = pd.read_csv(path,
                     dialect="excel",
                     usecols=OTP_COLUMNS_TO_LOAD,
                     nrows=nrows,
                     parse_dates=['FlightDate'],
                     dtype={ 'FlightNum': str, }, # Keep as string, to later combine with carrier
                     )

    # Put columns in our standard order
    df = df[OTP_COLUMNS_TO_LOAD]
    df['FlightNum'] = df['Carrier'] + df['FlightNum']   # to give 'AA494'

    mm, ss = divmod(time.time() - started, 60)
    logging.info("Loading pd.DataFrame for %s took %02d:%02d (%dMB mem)", yyyy_mm, mm, ss, memory_usage())
    return df

In [18]:
%timeit
df = load_one_month('2017-01')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 450017 entries, 0 to 450016
Data columns (total 19 columns):
FlightDate           450017 non-null datetime64[ns]
Origin               450017 non-null object
Dest                 450017 non-null object
Distance             450017 non-null float64
Carrier              450017 non-null object
FlightNum            450017 non-null object
TailNum              449378 non-null object
CRSDepTime           450017 non-null int64
CRSArrTime           450017 non-null int64
CRSElapsedTime       450013 non-null float64
Flights              450017 non-null float64
Cancelled            450017 non-null float64
Diverted             450017 non-null float64
DepTime              441476 non-null float64
ArrTime              440746 non-null float64
ActualElapsedTime    439645 non-null float64
DepDelay             441476 non-null float64
ArrDelay             439645 non-null float64
AirTime              439645 non-null float64
dtypes: datetime64[ns](1), float64(1

In [20]:
for col in ['FlightDate','Origin','Dest','Carrier','FlightNum','TailNum']:
    df[col] = df[col].astype('category', ordered=True)
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 450017 entries, 0 to 450016
Data columns (total 19 columns):
FlightDate           450017 non-null category
Origin               450017 non-null category
Dest                 450017 non-null category
Distance             450017 non-null float64
Carrier              450017 non-null category
FlightNum            450017 non-null category
TailNum              449378 non-null category
CRSDepTime           450017 non-null int64
CRSArrTime           450017 non-null int64
CRSElapsedTime       450013 non-null float64
Flights              450017 non-null float64
Cancelled            450017 non-null float64
Diverted             450017 non-null float64
DepTime              441476 non-null float64
ArrTime              440746 non-null float64
ActualElapsedTime    439645 non-null float64
DepDelay             441476 non-null float64
ArrDelay             439645 non-null float64
AirTime              439645 non-null float64
dtypes: category(6), float64(11)

In [21]:
df['Carrier'].cat.categories

Index(['AA', 'AS', 'B6', 'DL', 'EV', 'F9', 'HA', 'NK', 'OO', 'UA', 'VX', 'WN'], dtype='object')

In [109]:
df

Unnamed: 0,FlightDate,Origin,Dest,Distance,Carrier,FlightNum,TailNum,CRSDepTime,CRSArrTime,CRSElapsedTime,Flights,Cancelled,Diverted,DepTime,ArrTime,ActualElapsedTime,DepDelay,ArrDelay,AirTime
0,2017-01-17,CLT,PHX,1773.0,AA,AA494,N583AA,1619,1856,277.0,1.0,0.0,0.0,1616.0,1842.0,266.0,-3.0,-14.0,244.0
1,2017-01-18,CLT,PHX,1773.0,AA,AA494,N544AA,1619,1856,277.0,1.0,0.0,0.0,1614.0,1821.0,247.0,-5.0,-35.0,228.0
2,2017-01-19,CLT,PHX,1773.0,AA,AA494,N553AA,1619,1856,277.0,1.0,0.0,0.0,1611.0,1826.0,255.0,-8.0,-30.0,236.0
3,2017-01-20,CLT,PHX,1773.0,AA,AA494,N191AA,1619,1856,277.0,1.0,0.0,0.0,1656.0,1929.0,273.0,37.0,33.0,252.0
4,2017-01-21,CLT,PHX,1773.0,AA,AA494,N170AA,1619,1856,277.0,1.0,0.0,0.0,1632.0,1858.0,266.0,13.0,2.0,245.0
5,2017-01-22,CLT,PHX,1773.0,AA,AA494,N179AA,1619,1856,277.0,1.0,0.0,0.0,1636.0,1921.0,285.0,17.0,25.0,254.0
6,2017-01-23,CLT,PHX,1773.0,AA,AA494,N579AA,1619,1856,277.0,1.0,0.0,0.0,1616.0,1907.0,291.0,-3.0,11.0,264.0
7,2017-01-24,CLT,PHX,1773.0,AA,AA494,N583AA,1619,1856,277.0,1.0,0.0,0.0,1619.0,1904.0,285.0,0.0,8.0,261.0
8,2017-01-25,CLT,PHX,1773.0,AA,AA494,N167AA,1619,1856,277.0,1.0,0.0,0.0,1616.0,1906.0,290.0,-3.0,10.0,262.0
9,2017-01-26,CLT,PHX,1773.0,AA,AA494,N551AA,1619,1856,277.0,1.0,0.0,0.0,1618.0,1902.0,284.0,-1.0,6.0,268.0


In [108]:
df.memory_usage(deep=True).sum()

52720496