# Upload file from GCS to Survey

## Initial Setup

#### Connect to GCP

In [None]:
from google.colab import auth
auth.authenticate_user()

print('Authenticated')

In [None]:
from google.cloud import bigquery
from google.oauth2 import service_account
import pandas_gbq as gbq

project='dummydummy'
client = bigquery.Client(project = project)


In [None]:
%env GOOGLE_CLOUD_PROJECT=dummydummy-dummydummy

#### Import Libraries

In [None]:
import io
import pandas as pd
import re
from google.colab import files
from time import strptime
import datetime
import numpy as np
from google.cloud import storage
#data  = pd.read_excel (io.BytesIO(uploaded[filename]))

#print(data.sheet_names)



#### Functions

In [None]:
# This function runs a query and returns the results.
# We use it to ensure we don't run too much in parallel

def exec_query(q):
  r1 = client.query(q)
  if r1.errors is None:
      r2 = r1.result()
  else:
      raise Exception(r1.error_result)
  return r1

In [None]:
# reading an xlsx file - raw data
def reading_file_from_bucket (filename, bucket_name = "dummydummy"):
  src = f"'gs://{bucket_name}/{filename}'"
  !gsutil -m cp {src} .
  xl = pd.ExcelFile(filename)
  return xl

In [None]:
def archive_processed_file(filename, bucket_name = "dummydummy"):
  src = f"""'gs://{bucket_name}/{filename}'"""
  to_scr = f"""'gs://{bucket_name}/Archive/'"""
  !gsutil mv {src} {to_scr}

In [None]:
# extract dummydummy year from file name
def get_survey_year(filename):
  survey_year = re.findall('[a-zA-Z]{3,4}[ |_]([0-9]{4})', filename)[0]
  return survey_year

In [None]:
# extract dummydummy month from file name
def get_survey_month(filename):
  survey_month = strptime(re.findall('([a-zA-Z]{3,4})[ |_][0-9]{4}', filename)[0][:3],'%b').tm_mon
  return survey_month

In [None]:
def get_parallel_tracker_txt(filename):
  if len(re.findall('(?i)parallel', filename)) == 0:
    parallel_id = ''
  else: 
    parallel_id = 'parallel_'
  return parallel_id

In [None]:
def push_raw_data_to_bq(xl, stg_dateset = 'dev_stg'):
  ds1 = xl.parse('Data')
  ds1.insert(0, "response_id",'', allow_duplicates =False)
  survey_year = get_survey_year(filename)
  survey_month = get_survey_month(filename)
  parallel_id = get_parallel_tracker_txt(filename)

  for index, row in ds1.iterrows():
    ds1.loc[index,'response_id'] = 'dummydummy' + str(parallel_id) + str(row['PID']) + '_' + str(row['INTERVIEW_START'])[0:10] + '_'+ str(row['STATUS'])
    ds1.loc[index,'audience_id'] = str(parallel_id) + str(row['PID'])
    ds1.loc[index,'survey_date'] = datetime.date(pd.to_numeric(dummydummy),survey_month,1)

  result = pd.melt(ds1, id_vars=['response_id', 'audience_id', 'STATUS','survey_date'] ).sort_values(['response_id', 'audience_id'])
  result['variable'] = str(parallel_id) + result['variable']
  gbq.to_gbq(result,f"{stg_dateset}.response_data", project, if_exists='replace')
  return result
  

In [None]:
def merge_into_response(dataset, stg_dateset = 'dev_stg'):
  q = f"""
  MERGE INTO {dataset}.response AS r
  USING {stg_dateset}.response_data t
  ON r.response_id = t.response_id and r.question_id = concat('dummydummy' , safe_cast(t.variable as string))
  WHEN NOT MATCHED THEN
    INSERT(
    response_id, 
    question_id,
    audience_id,
    supplier_name,
    supplier_respondent_id,
    supplier_question_id,
    text,
    created_at,
    updated_at,
    is_valid,
    survey_date)
    VALUES( concat(safe_cast(response_id as string)) ,
            concat('dummydummy' , safe_cast(variable as string)),
            concat('dummydummy' , safe_cast(audience_id as string)),
            'dummydummy',
            audience_id,
            variable,
            value,
            current_datetime("Pacific/Auckland"),
            current_datetime("Pacific/Auckland"),
            if(safe_cast(STATUS as string) = '1',true,false),
            safe_cast(survey_date as date)
          )
  WHEN MATCHED THEN
    UPDATE SET
              r.audience_id = concat('dummydummy' , safe_cast(t.audience_id as string)),
              r.supplier_name = 'dummydummy',
              r.supplier_respondent_id = t.audience_id,
              r.supplier_question_id = t.variable,
              r.text = t.value,
              r.updated_at = current_datetime("Pacific/Auckland"),
              r.is_valid = if(safe_cast(STATUS as string) = '1',true,false),
              r.survey_date = safe_cast(t.survey_date as date)"""
  r = exec_query(q)


In [None]:
# step 1: load question data from google sheet
def question_override(dataset, stg_dateset = 'dev_stg', master = 'master_sheet'):
  q = f"""
      MERGE INTO {dataset}.question AS q
      USING {master}.dummydummy m
      ON q.question_id = concat('dummydummy', safe_cast(m.supplier_question_id as string))
      WHEN NOT MATCHED THEN
        INSERT(question_id, 
              survey_id,
              supplier_question_id, 
              page_heading, 
              page_sub_heading, 
              question_type, 
              question_tag, 
              question_tag_display_in_interface,
              created_at,
              created_by,
              updated_at,
              updated_by,
              supplier_name,
              index_type,
              index_weighting,
              index_rank)
        VALUES(concat('dummydummy', safe_cast(supplier_question_id as string)),
              'dummydummy',
              supplier_question_id, 
              page_heading, 
              REGEXP_REPLACE(page_sub_heading, "'", " "), 
              question_type, 
              question_tag, 
              question_tag_display_in_interface,
              current_datetime("Pacific/Auckland"),
              'dummydummy',
              current_datetime("Pacific/Auckland"),
              'dummydummy',
              'dummydummy',
              index_type,
              index_weighting,
              index_rank
              )
      WHEN MATCHED THEN
        UPDATE SET q.page_heading = m.page_heading,
                  q.page_sub_heading = REGEXP_REPLACE(m.page_sub_heading, "'|’", " "),
                  q.question_type = m.question_type,
                  q.question_tag = m.question_tag,
                  q.question_tag_display_in_interface = m.question_tag_display_in_interface,
                  q.index_type = m.index_type,
                  q.index_weighting = m.index_weighting,
                  q.index_rank = m.index_rank,
                  q.updated_at = current_datetime("Pacific/Auckland")
      """
  exec_query(q)

In [None]:
# process questions
def load_questions_from_xl(data, stg_dateset = 'dev_stg'):
  parallel_id = get_parallel_tracker_txt(filename)
  question_df = data.parse('Labels', skiprows=1)
  question_df = question_df.dropna()
  question_df['Variable'] = str(parallel_id) + question_df['Variable']
  question_df['question_text'] = question_df['Label'].replace(['^[A-Z0-9_]+ '], [''], regex=True)
  question_df['page_heading'] = question_df['Label'].str.findall(r'^([A-Z0-9_]+) ').apply(', '.join)
  separator = "\. |\? "
  ls = question_df['question_text'].str.split(separator).tolist()
  sub_page_separator = " - "
  question_df['page'] = [c[-1] for c in ls]
  question_df['sub_page'] = question_df['page'].str.findall(r"- (.*)").apply(''.join)
  question_df['page'] = question_df['page'].replace([' - .*$'],[''], regex = True)
  gbq.to_gbq(question_df,f'{stg_dateset}.dummydummy', project, if_exists='replace')

In [None]:
# insert_new_questions
def insert_new_questions(dataset, stg_dateset = 'dev_stg'):
  q = f"""
      MERGE INTO {dataset}.question AS q
      USING {stg_dateset}.dummydummy n
      ON q.question_id = concat('dummydummy', safe_cast(n.Variable as string))
      WHEN NOT MATCHED THEN
        INSERT(question_id, page_heading, page_sub_heading, survey_id, question_text, created_at, created_by, updated_at, updated_by, supplier_page_id, supplier_question_id, supplier_name, question_tag_display_in_interface)
        VALUES(concat('dummydummy', safe_cast(Variable as string)), 
                page, 
                sub_page,
                'dummydummy',
                question_text, 
                current_datetime("Pacific/Auckland"),
                'dummydummy',
                current_datetime("Pacific/Auckland"),
                'dummydummy',
                page_heading, 
                safe_cast(Variable as string),
                'dummydummy',
                True
              )
              """
  exec_query(q)

In [None]:
# load audience into BQ
def load_raw_audience(dataset, response, stg_dateset='dev_stg'):
  col_keep = [
       'response_id','RESPID','STATUS','PID','survey_date',
       'S2', # gender
       'S3', # age
       'S4', # region/area
       'A3_1','A3_2','A3_3', # shopper type. e.g. non shopper, only shopper
       'C2_1','C2_2','C2_3','C2_4','C2_5','C2_6','C2_7','C2_8','C2_9', # main shopper
       'CX_1','CX_2','CX_3', # own shopper
       'X2AA_4','X12A_1','X12A_2','X12A_3','X12A_4','X12A_5', # life-stage
       'E11', # Online Flag
       'D10_1', # reward I value
       'D10C',
       ]
  question_df = exec_query(f"""select * from {dataset}.question""").to_dataframe()
  a_df = response[response.variable.isin(question_df[question_df['question_type'] == 'DUMMY'].supplier_question_id)]
  a_df = a_df[a_df.STATUS == 1]
  a_df['variable'] = a_df['variable'].str.replace('parallel_', '')
  audience_p = a_df.pivot(index='response_id', columns='variable', values='value').rename_axis(None)
  audience_p.reset_index(inplace = True)
  for c in col_keep:
   if c not in audience_p.columns:
     audience_p[c] = None
  gbq.to_gbq(audience_p,f"""{stg_dateset}.dummydummy""", project, if_exists='replace')

In [None]:
## create dummydummy dummydummy audience file for reporting
def create_audience_table(dataset, stg_dateset='dev_stg'):
  q = f"""
  create or replace table {stg_dateset}.dummydummy as
  With Audience as
  (
  SELECT  
    index as response_id,
    case SAFE_CAST(S2 AS NUMERIC) when 1 then 'MALE' when 2 then 'FEMALE' else null end as gender,

        CASE 
dummydummy
dummydummy
dummydummy
          ELSE NULL
        END as age,
       
       dummydummy
       dummydummy
       dummydummy
              else null end as region,
              
        CASE WHEN SAFE_CAST(S4 AS NUMERIC) = 2 then 'AUCKLAND'
              dummydummy
              dummydummy
              dummydummy
              else null end as area,          

                  CASE 
                      dummydummy
                      dummydummy
                      dummydummy
                      dummydummy
                      END as lifestage_segment,
    case SAFE_CAST(E11 AS NUMERIC) when 1 then 1 else 0 end as dummydummy,
    
    case SAFE_CAST(CX_1 AS NUMERIC) when 1 then 1 else 0 end as dummydummy,
    case SAFE_CAST(CX_3 AS NUMERIC) when 1 then 1 else 0 end as dummydummy,
    case SAFE_CAST(CX_2 AS NUMERIC) when 1 then 1 else 0 end as dummydummy,
    
    case SAFE_CAST(C2_1 AS NUMERIC) when 5 then 1 else 0 end as dummydummy,
    case SAFE_CAST(C2_3 AS NUMERIC) when 5 then 1 else 0 end as dummydummy,
    case SAFE_CAST(C2_2 AS NUMERIC) when 5 then 1 else 0 end as dummydummy,
    
    case when SAFE_CAST(CX_1 AS NUMERIC) = 1 and SAFE_CAST(CX_3 AS NUMERIC) = 1  then 1 else 0 end as dummydummy,
    case when SAFE_CAST(CX_1 AS NUMERIC) = 1 and SAFE_CAST(CX_2 AS NUMERIC) = 1  then 1 else 0 end as dummydummy,
    SAFE_CAST(D10_1 AS INT64) as dummydummy,
    case when SAFE_CAST(D10C AS INT64) = 1 then 'Food'
         when SAFE_CAST(D10C AS INT64) = 2 then 'Fuel'
         else 'None' end as dummydummy,
  FROM {stg_dateset}.dummydummy
  )
  ,
  Quota as
  (
  Select 
    gender,
    area,
    age,
    target
  from {dataset}.dummydummy
  )
  Select 
  response_id, Audience.gender, Audience.age, region, Audience.area, lifestage_segment, online_shopper, CD_own_shopper, NW_own_shopper, PNS_own_shopper, CD_main_shopper, NW_main_shopper, PNS_main_shopper, cross_shopper_CD_NW, cross_shopper_CD_PNS, Onecard_Reward_I_Value, onecard_reward_type,
  target / (sum(1) over (partition by Audience.area,Audience.age,Audience.gender)/sum(1) over ()) as weight,
  --sum(1) over (partition by Audience.area,Audience.age,Audience.gender) as group_count_temp,
  --sum(1) as monthly_count
  from Audience 
  left join Quota on concat(Quota.gender, Quota.age,Quota.area) = concat(dummydummy.gender, dummydummy.age,dummydummy.area)
  """
  r = exec_query(q)

  insert_q = f"""
  MERGE INTO {dataset}.dummydummy AS a
  USING {stg_dateset}.dummydummy t
  ON a.response_id = t.response_id
  WHEN NOT MATCHED THEN
  INSERT(response_id, gender, age, region, area, lifestage_segment, 
        online_shopper, dummydummy, dummydummy, PNS_own_shopper, dummydummy, 
        dummydummy, dummydummy, dummydummy, dummydummy, 
        Onecard_Reward_I_Value, weight, onecard_reward_type)
  VALUES(response_id, gender, age, region, area, lifestage_segment, 
        online_shopper, dummydummy, dummydummy, dummydummy, dummydummy, 
        dummydummy, dummydummy, dummydummy, dummydummy, 
        dummydummy, weight, dummydummy
         ) 
  WHEN MATCHED THEN
  UPDATE SET a.gender = t.gender,
             a.age = t.age,
             a.region = t.region,
             a.area = t.area,
             a.dummydummy = t.dummydummy,
             a.online_shopper = t.online_shopper,
             a.CD_own_shopper = t.CD_own_shopper,
             a.NW_own_shopper = t.NW_own_shopper,
             a.PNS_own_shopper = t.PNS_own_shopper,
             a.CD_main_shopper = t.CD_main_shopper,
             a.NW_main_shopper = t.NW_main_shopper,
             a.PNS_main_shopper = t.PNS_main_shopper,
             a.cross_shopper_CD_NW = t.cross_shopper_CD_NW,
             a.cross_shopper_CD_PNS = t.cross_shopper_CD_PNS,
             a.dummydummy = t.dummydummy,
             a.weight = t.weight,
             a.onecard_reward_type = t.onecard_reward_type
  """
  r = exec_query(insert_q)


In [None]:
def create_table_for_dashboard(dataset):
  q = f"""
      create or replace table dummydummy.dummydummy as
      with t1 as
      (
        SELECT 
        aud.*,
        res.survey_date,
        res.question_id,
        q.supplier_question_id,
        q.question_text,
        q.page_heading ,
        q.page_sub_heading ,
        q.index_type,
        q.index_weighting,
        q.index_rank,
        res.text,
      FROM {dataset}.dummydummy aud
      inner join {dataset}.response res on res.response_id = aud.response_id
      inner join {dataset}.question q on q.question_id = res.question_id 
      where 1 = 1
        and q.question_id is not null
        and res.text is not null
        and res.text <> ' '
        and q.question_tag_display_in_interface
        and (q.question_type <> 'IGNORE' or q.question_type is null)
      )
      ,
      t2 as
      (
      select
      max(survey_date) reporting_month
      from t1
      )
      select
      *,
      date_diff(reporting_month, survey_date, Month) month_rank
      from t1,t2
      """
  r = exec_query(q)

## Processing Data

In [None]:
gcs_client = storage.Client()
bucket = gcs_client.bucket('dummydummy')
for blob in bucket.list_blobs(prefix='ORD'):
  filename = blob.name
  xl = reading_file_from_bucket (filename, bucket_name = "dummydummy")
  print('file loaded')
  response = push_raw_data_to_bq(xl)
  merge_into_response(dataset = 'dev_survey', stg_dateset = 'dev_stg')
  print('response table updated')
  question_override(dataset = 'dev_survey', stg_dateset = 'dev_stg', master = 'master_sheet')
  load_questions_from_xl(data=xl)
  insert_new_questions(dataset = 'dev_survey', stg_dateset = 'dev_stg')
  print('question table updated')
  load_raw_audience(dataset='dev_survey', response = response)
  print('raw data loaded')
  create_audience_table(dataset='dev_survey', stg_dateset='dev_stg')
  print('audience table updated')
  # archive_processed_file(filename)
  # print('file is archived')

In [None]:
create_table_for_dashboard(dataset='dev_survey')
print('table for reporting updated')

In [None]:
# question_override(dataset = 'dev_survey', stg_dateset = 'dev_stg', master = 'master_sheet')

#### Run once

In [None]:
filename = 'dummydummy'
xl = reading_file_from_bucket (filename, bucket_name = "dummydummy")

In [None]:
push_raw_data_to_bq(xl, stg_dateset = 'dev_stg')

In [None]:
load_questions_from_xl(data=xl)

#### Test