# Data Ingestion
## Contents

In the following examples, we'll follow the following process.
- Step #1: Query raw data from BigQuery
- Step #2: Preprocess raw data
- Step #3: Load preprocessed data back into BigQuery  
  
Storing the preprocessed data in BigQuery ensures that the training and evaluation data is easily accessible in its clean form for model training.

In [1]:
import pandas as pd
from google.cloud import bigquery
import numpy as np

# Step #1: Query raw data from BigQuery
As explained in the `Data_Ingestion.ipynb` notebook, the following example demonstrates how to query data from BigQuery using the Python BigQuery API.  
  
The results of the query are stored in a Pandas DataFrame, in this case `result`.

In [2]:
sql = """SELECT sends1.riid,
  sends1.campaign_send_dt,
  sends1.opened,
  SUM(IF(sends2.campaign_send_dt < sends1.campaign_send_dt, sends2.opened, 0)) as hist_opens,
  SUM(IF(sends2.campaign_send_dt < sends1.campaign_send_dt, 1, 0)) as hist_sends
FROM `email-propensity-sandbox.emails.sends` sends1
LEFT JOIN `email-propensity-sandbox.emails.sends` sends2
ON sends1.riid = sends2.riid
GROUP BY 1,2,3
"""

client = bigquery.Client()
query_job = client.query(sql) # API request
result = query_job.to_dataframe()

# Step #2: Preprocess raw data

In [25]:
result["hist_open_rate"] = result["hist_opens"]/result["hist_sends"]

You can create a column indicating which dataset (training or eval) the row should be part of.  
  
When using BigQuery ML, the sample will be used in the eval dataset if the value is False as shown in the `ModelTraining.ipynb` notebook.

In [14]:
unique_riid = np.unique(result["riid"])
num_eval = int(unique_riid.shape[0] * .25)
indices = np.random.choice(unique_riid.shape[0], num_eval, replace=False)
eval_riids = unique_riid[indices]
result["eval"] = np.where(result["riid"].isin(eval_riids), True, False)

# Step 3: Load preprocessed data back into BigQuery
Pandas DataFrames can be directly loaded into BigQuery.  
  
Some things to keep in mind:
- The DataFrame's column names will be used as the BigQuery column names.
- Update `test_upload` and `pandas_table` with the names of your dataset and destination table, respectively.
- The load_config sets the `write_disposition` to `WRITE_TRUNCATE` which means that the destination table's contents are overwriten. If you want to just append the DataFrame to an existing table (assuming same schema) remove the `load_config` from the `load_table_from_dataframe` call.

In [19]:
client = bigquery.Client()
dataset_ref = client.dataset('test_upload') # set to name of dataset
table_ref = dataset_ref.table('pandas_table') # set to name of destination table

# use load_config to overwrite old table contents
load_config = bigquery.job.LoadJobConfig(
    create_disposition=bigquery.job.CreateDisposition.CREATE_IF_NEEDED,
    write_disposition=bigquery.job.WriteDisposition.WRITE_TRUNCATE)

job = client.load_table_from_dataframe(
    dataframe=result, # set to name of DataFrame
    destination=table_ref,
    job_config=load_config)

In [20]:
result.head()

Unnamed: 0,riid,campaign_send_dt,opened,hist_opens,hist_sends,hist_open_rate,eval
0,664266962,2018-01-23,0,0,6,0.0,False
1,919579622,2018-01-26,0,2,6,0.333333,False
2,93217902,2018-01-28,0,1,6,0.166667,False
3,572530442,2018-01-28,0,5,6,0.833333,False
4,93217902,2018-01-30,0,1,7,0.142857,False
