# ETL Pipeline- Working Copy

## Imports

In [1]:
import os
from google.cloud import bigquery
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import style
import seaborn as sns
import warnings
from IPython.display import display

warnings.filterwarnings('ignore')
%matplotlib inline

style.use('seaborn')

pd.options.display.max_columns = None

## Configure credentials

In [2]:
key_location = # Your key location here (remove before commit). Maybe we can use os to configure this automatically?
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = key_location

## Set up client

In [5]:
client = bigquery.Client()
dataset_ref = client.dataset("chicago_crime", project="bigquery-public-data")
dataset = client.get_dataset(dataset_ref)
table_ref = dataset_ref.table("crime")
table = client.get_table(table_ref)

## Query the Crime Table

In [57]:
# ENTER YOUR QUERY HERE:

MAX_GB = 2 # Change this if desired

QUERY = """
SELECT *
FROM `bigquery-public-data.chicago_crime.crime`
WHERE year in (2016)
"""

In [75]:
def safe_query_to_dataframe(client, table, sql_query, max_gb=0):
    """
    Wrapper function for bigquery.client.query.  Will throw an error if the query exceeds the desired limit.
        
    params
        > client: a bigquery client object
        > table: a bigquery table object
        > sql_query (string): an SQL query on the table
        > max_gb (int): GB limit of query
        
    returns
        > Error: if query size exceeds limit
        > Dataframe: Dataframe representation of the query
    """
    job_config = bigquery.QueryJobConfig(dry_run=True, use_query_cache=False)
    query_job = client.query(sql_query, job_config=job_config)
    gbs_used = query_job.total_bytes_processed / 1e9
    assert gbs_used < max_gb, f"This query will process {gbs_used} GB, which exceeds your desired limit of {max_gb} GB."
    query_job = client.query(sql_query)
    return query_job.to_dataframe()

In [73]:
df = safe_query_to_dataframe(client=client, table=table, sql_query=QUERY, max_gb=MAX_GB)

## Pipeline

In [124]:
def add_dt_attributes(df):
    """
    Adds datetime attributes for easier querying
    """
    df["month"] = pd.to_numeric(df.date.dt.month, downcast="unsigned")
    df["hour"] = pd.to_numeric(df.date.dt.hour, downcast="unsigned")
    df["dayofweek"] = pd.to_numeric(df.date.dt.dayofweek, downcast="unsigned")
    df["weekend"] = df.date.dt.dayofweek >= 5
    
# TODO: create more cleaning functions similar to add_dt_attributes

def etl_pipeline(df):
    """
    This is the whole pipeline.  
    Add function calls to mutate the inputted dataframe into something that we can work with.
    """
    add_dt_attributes(df)
    # TODO: add more!

## Check out the Results

#### Original

In [125]:
df.head()

Unnamed: 0,unique_key,case_number,date,block,iucr,primary_type,description,location_description,arrest,domestic,beat,district,ward,community_area,fbi_code,x_coordinate,y_coordinate,year,updated_on,latitude,longitude,location
0,10388497,HZ124976,2016-01-04 12:00:00+00:00,097XX S GREENWOOD AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,511,5,8,50,11,1185255.0,1840735.0,2016,2018-02-10 15:50:01+00:00,41.718123,-87.597078,"(41.718123266, -87.597077957)"
1,10405571,HZ142126,2016-02-06 03:15:00+00:00,024XX N NEVA AVE,1020,ARSON,BY FIRE,VEHICLE NON-COMMERCIAL,False,False,2512,25,36,18,9,1128193.0,1915393.0,2016,2018-02-10 15:50:01+00:00,41.924147,-87.804395,"(41.924147072, -87.804395081)"
2,10416597,HZ154516,2016-02-15 23:00:00+00:00,049XX N OAK PARK AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,1613,16,41,10,11,1130249.0,1932513.0,2016,2018-02-10 15:50:01+00:00,41.971091,-87.796446,"(41.971091234, -87.796446304)"
3,10434360,HZ171993,2016-03-02 14:18:00+00:00,059XX N BERNARD ST,630,BURGLARY,ATTEMPT FORCIBLE ENTRY,RESIDENCE,False,False,1711,17,39,13,5,1152305.0,1939231.0,2016,2018-02-10 15:50:01+00:00,41.989118,-87.715165,"(41.989117883, -87.715165019)"
4,10467777,HZ206618,2016-03-28 18:01:00+00:00,051XX S PULASKI RD,1152,DECEPTIVE PRACTICE,ILLEGAL USE CASH CARD,BANK,False,False,822,8,23,62,11,1150573.0,1870133.0,2016,2018-02-10 15:50:01+00:00,41.79954,-87.723343,"(41.799540004, -87.723342501)"


#### Pipelined

In [126]:
cp = df.copy()

In [127]:
etl_pipeline(cp)

In [129]:
cp.head()

Unnamed: 0,unique_key,case_number,date,block,iucr,primary_type,description,location_description,arrest,domestic,beat,district,ward,community_area,fbi_code,x_coordinate,y_coordinate,year,updated_on,latitude,longitude,location,month,hour,dayofweek,weekend
0,10388497,HZ124976,2016-01-04 12:00:00+00:00,097XX S GREENWOOD AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,511,5,8,50,11,1185255.0,1840735.0,2016,2018-02-10 15:50:01+00:00,41.718123,-87.597078,"(41.718123266, -87.597077957)",1,12,0,False
1,10405571,HZ142126,2016-02-06 03:15:00+00:00,024XX N NEVA AVE,1020,ARSON,BY FIRE,VEHICLE NON-COMMERCIAL,False,False,2512,25,36,18,9,1128193.0,1915393.0,2016,2018-02-10 15:50:01+00:00,41.924147,-87.804395,"(41.924147072, -87.804395081)",2,3,5,True
2,10416597,HZ154516,2016-02-15 23:00:00+00:00,049XX N OAK PARK AVE,1153,DECEPTIVE PRACTICE,FINANCIAL IDENTITY THEFT OVER $ 300,RESIDENCE,False,False,1613,16,41,10,11,1130249.0,1932513.0,2016,2018-02-10 15:50:01+00:00,41.971091,-87.796446,"(41.971091234, -87.796446304)",2,23,0,False
3,10434360,HZ171993,2016-03-02 14:18:00+00:00,059XX N BERNARD ST,630,BURGLARY,ATTEMPT FORCIBLE ENTRY,RESIDENCE,False,False,1711,17,39,13,5,1152305.0,1939231.0,2016,2018-02-10 15:50:01+00:00,41.989118,-87.715165,"(41.989117883, -87.715165019)",3,14,2,False
4,10467777,HZ206618,2016-03-28 18:01:00+00:00,051XX S PULASKI RD,1152,DECEPTIVE PRACTICE,ILLEGAL USE CASH CARD,BANK,False,False,822,8,23,62,11,1150573.0,1870133.0,2016,2018-02-10 15:50:01+00:00,41.79954,-87.723343,"(41.799540004, -87.723342501)",3,18,0,False


In [130]:
# TODO: export the cleaned data to a csv