# ETL Pipeline from Qualtrics to Google BigQuery
This is a simple extract, transform, load pipeline to extract data from a Qualtrics survey, transform / process the data to meet database requirements and load the data to a Google BigQuery dataset.


## TOC
* [1. Extract Data from Qualtrics](#1)
* [2. Transform Data](#2)
    * [2a. Create Data Dictionary](#2a)
    * [2b. Transform Response Data](#2b)
* [3. Load Data to BigQuery](#3)

In [None]:
#Import libraries & view function for qgrid
import pandas as pd
import numpy as np
import pickle
import re
import json
import io
from html.parser import HTMLParser
from io import StringIO

#For Qualtrics API
from QualtricsAPI.Survey import Responses #python package that loads responses as pandas dataframe
import os
import requests

#For BigQuery Import
from datetime import datetime
from google.cloud import bigquery
from oauth2client.service_account import ServiceAccountCredentials
from google.oauth2 import service_account

## 1. Extract Data From Qualtrics <a class="anchor" id="1"></a>

### Enter API Credentials <a class="anchor" id="1a"></a>

In [None]:
#Save tokens and survey ID here -- need Qualtrics API access -- replace xxxx below with credentials and survey ID
token="xxxx"
data_center="xxxx"
directory_id="xxxx"
survey="xxxx"

In [None]:
#Initital QualtricsAPI setup
from QualtricsAPI.Setup import Credentials

#Create an instance of Credentials
c = Credentials()

#Call the qualtrics_api_credentials() method
c.qualtrics_api_credentials(token, data_center, directory_id)

### Extract Survey Responses and Survey Design using API

#### Survey Responses

In [None]:
#Create an instance
r = Responses()

#Store responses in a pandas dataframe
df = r.get_survey_responses(survey)    

#### Survey Details
* Will be used to develop data dictionary

In [None]:
# Setting user Parameters - this may throw an error after it is set, comment out, if so
apiToken = os.environ[token] 
dataCenter = os.environ[data_center]

baseUrl = "https://{0}.qualtrics.com/API/v3/surveys/{1}".format(os.environ['data_center'], survey)
headers = {
    "x-api-token": os.environ['token'],
    }

response = requests.get(baseUrl, headers=headers)
data = response.json()

#Set data to result level
data = data['result']

## 2. Transform Data <a class="anchor" id="2"></a>

### 2a. Create Data Dictionary <a class="anchor" id="2a"></a>

#### Save data in dependencies folder to use in data_dictionary_dev.py run

In [None]:
#save data (survey details)
with open('Dependencies/data.pkl', 'wb') as handle:
    pickle.dump(data, handle, protocol=pickle.HIGHEST_PROTOCOL)
    
#save df (survey responses)
df.to_pickle('Dependencies/responses_df.pkl')

#### Run data_dictionary_dev.py
* This will create a data dictionary to accompany the response data
* Additional details on the data dictionary development process can be found here: https://github.com/rrmahr/Qualtrics-Data-Dictionary

In [None]:
#Output is data_dict_df
%run Dependencies/data_dictionary_dev.py

#Uncomment to inspect
#data_dict_df

### 2b. Transform Response Data<a class="anchor" id="2b"></a>

#### Drop Unwanted Data


In [None]:
#Drop first two rows with labels and QID's
df = df.drop([0,1]).reset_index().drop(columns='index')

In [None]:
#Delete unnecessary columns / personally identifiable information (PII) to protect respondent privacy
cols_to_delete = ["RecipientLastName","RecipientFirstName","RecipientEmail","LocationLatitude","LocationLongitude",
                  "IPAddress"]

#Delete from df
df = df.drop(columns = cols_to_delete)

#Delete from data dictionary
data_dict_df = data_dict_df[~data_dict_df['Var_Name'].isin(cols_to_delete)].copy().reset_index().drop(columns='index')

#### Assign Data Types based on Data Dictionary
Need to assign data types prior to additional data processing / transformations to avoid errors

In [None]:
#Integers
Int_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='INT']['Var_Name']:
    if i not in Int_List:
        Int_List.append(i)

#Convert just to numeric for now not to INT to avoid errors when creating new vars based on existing INT vars
df[Int_List] = df[Int_List].apply(pd.to_numeric)


#Floats
Float_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='FLOAT']['Var_Name']:
    if i not in Float_List:
        Float_List.append(i)

df[Float_List] = df[Float_List].astype(float)


#String
String_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='STRING']['Var_Name']:
    if i not in String_List:
        String_List.append(i)
     
df[String_List] = df[String_List].astype(str)
for i in String_List:
    df[i] = df[i].replace('nan',np.NaN, regex=False)

#Timestamp
Timestamp_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='TIMESTAMP']['Var_Name']:
    if i not in Timestamp_List:
        Timestamp_List.append(i)

for i in Timestamp_List:
    df[i] = pd.to_datetime(df[i])

#### Rename Variables for Consistency with database requirements / naming conventions

In [None]:
#Example: Qualtrics 'Duration (in seconds)' var has spaces in name which could present issues
cols_to_rename = {'Duration (in seconds)': 'duration_seconds'}
df = df.rename(columns = cols_to_rename)

#rename in data dictionary as well
data_dict_df = data_dict_df.replace({"Var_Name": cols_to_rename})

#### Create Additional Variables for Analysis

In [None]:
#Example: Age Groups
conditions = [(df['age'] < 18),
              ((df['age'] >= 18) & (df['age']<= 26)),
              ((df['age'] >= 27) & (df['age']<= 42)),
              ((df['age'] >= 43) & (df['age']<= 58)),
              ((df['age'] >= 59) & (df['age']<= 68)),
              (df['age'] > 68)]

values = [1,2,3,4,5,6]


df['age_groups'] = np.select(conditions, values, default = np.nan)
df['age_groups'] = df['age_groups'].apply(pd.to_numeric)

#Add to data dictionary
data_dict_add = {
    'Var_Name' : ['age_groups','age_groups','age_groups','age_groups','age_groups','age_groups'],
    'Var_Label' : ['Age groups based on generation','Age groups based on generation','Age groups based on generation',
                   'Age groups based on generation','Age groups based on generation','Age groups based on generation'],
    'Value' : [1,2,3,4,5,6],
    'Value_Label' : ['Under 18 years','Gen Z 18-26', 'Millennials 27-42','Gen X 43-58','Boomers 59-68','Other - Senior 69+'],
    'Data_Type' : ['INT','INT','INT','INT','INT','INT']}
    
data_dict_add_df = pd.DataFrame(data_dict_add)
data_dict_df = pd.concat([data_dict_df,data_dict_add_df])

#### Conduct Additional Transformations
There are many data transformations that can occur here, some examples include:
* Merging separate datasets with additional respondent information on ResponseId, or with additional project/client level information
* Recoding values to match database coding conventions
* Identifying and cleaning fraudulent responses
* Transposing/melting/pivoting data to be at appropriate level for database integration

#### Assign Data Types Again
* AFTER all transformations are complete, need to assign data types to df again as well as to data dictionary.
* This step is necessary to ensure all new variables have appropriate data type for data base integration as well as to assign to integer data type.

In [None]:
#df

#Integers
Int_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='INT']['Var_Name']:
    if i not in Int_List:
        Int_List.append(i)

#Now convert to 'Int64'
df[Int_List] = df[Int_List].apply(pd.to_numeric)
df[Int_List] = df[Int_List].astype('Int64')


#Floats
Float_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='FLOAT']['Var_Name']:
    if i not in Float_List:
        Float_List.append(i)

df[Float_List] = df[Float_List].astype(float)


#String
String_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='STRING']['Var_Name']:
    if i not in String_List:
        String_List.append(i)
     
df[String_List] = df[String_List].astype(str)
for i in String_List:
    df[i] = df[i].replace('nan',np.NaN, regex=False)

#Timestamp
Timestamp_List = []
for i in data_dict_df[data_dict_df['Data_Type']=='TIMESTAMP']['Var_Name']:
    if i not in Timestamp_List:
        Timestamp_List.append(i)

for i in Timestamp_List:
    df[i] = pd.to_datetime(df[i])

In [None]:
#data dictionary

#Float
data_dict_df['Value'] = data_dict_df['Value'].apply(pd.to_numeric, errors='coerce')
data_dict_df['Value'] = data_dict_df['Value'].astype(float)


#String
String_List = ['Var_Name','Var_Label','Value_Label','Data_Type','Question_Type']

data_dict_df[String_List] = data_dict_df[String_List].astype(str)
for i in String_List:
    data_dict_df[i] = data_dict_df[i].replace('nan',np.NaN, regex=False)

## 3. Load Data to BigQuery <a class="anchor" id="3"></a>
For simplicity, the code below will create a new table in the "survey_responses" dataset and in the "data_dictionaries" dataset in a Google BigQuery project with the current date and time as part of the table name to avoid duplication.

#### Set API Credentials
* Below uses service account private key files to obtain credentials for the service account
* For more information see Google's API documentation here: https://googleapis.dev/python/google-auth/1.7.0/user-guide.html

In [None]:
#Input service account private key file to connect to BigQuery
credentials = service_account.Credentials.from_service_account_file(
    'Dependencies/private_key_file.json') 

scoped_credentials = credentials.with_scopes(
    ['https://www.googleapis.com/auth/cloud-platform'])

client = bigquery.Client(credentials=credentials, project=credentials.project_id,)

#credentials.project_id

#### Load Survey Responses to BigQuery

In [None]:
#enter the BigQuery dataset name
dataset_id = 'survey_responses'

#name table to create
table_id = 'survey_responses' + '_' + str(datetime.now().strftime("%Y-%m-%d_%H%M%S"))

#Create the table in BigQuery
client.create_table(f"{credentials.project_id}.{dataset_id}.{table_id}")
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True

# load the df into the table in BigQuery
job = client.load_table_from_dataframe(df, table_ref, job_config=job_config) # Make an API request to Load

#allow the job to complete
job.result()

# Prints number of rows loaded into BigQuery table if job worked
print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))

#### Load Data Dictionary to BigQuery

In [None]:
#enter the BigQuery dataset name
dataset_id = 'data_dictionaries'

#name table to create
table_id = 'data_dictionary' + '_' + str(datetime.now().strftime("%Y-%m-%d_%H%M%S"))

#Create the table in BigQuery
client.create_table(f"{credentials.project_id}.{dataset_id}.{table_id}")
dataset_ref = client.dataset(dataset_id)
table_ref = dataset_ref.table(table_id)
job_config = bigquery.LoadJobConfig()
job_config.autodetect = True

# load the df into the table in BigQuery
job = client.load_table_from_dataframe(data_dict_df, table_ref, job_config=job_config) # Make an API request to Load

#allow the job to complete
job.result()

# Prints number of rows loaded into BigQuery table if job worked
print("Loaded {} rows into {}:{}.".format(job.output_rows, dataset_id, table_id))