# Mimic Search Sanitization Tables in Staging (mozdata) 

### In the shared-prod environment, search terms sanitization modifies four tables:

1. It adds lots of rows, one for each of a all search terms deemed sanitary from the 48 hour raw terms table, to the sanitized terms table, which we keep for 30 days
2. It adds a row to a metadata table that records whether a run succeeded, how long it took, why it failed (if it failed), and some aggregate metrics about the terms seen in that run
3. It adds several rows to a languages table for every language seen in the last run, and how many search terms that run saw that the job detected for each language
4. It adds a row to a data validation table that records the proportion of all search terms in the run that displayed various characteristics (capital letters, U.S. 2010 census surnames, English language, etc)

### We want to be able to run and test jobs that write to these tables without messing up the production versions. 

So this notebook, run start to finish, populates tables of the same schema as prod in `mozdata`. You can see the names of the mozdata versions of the tables in the `Args` attributes below.

In [1]:
class Args:
    pass

args = Args()

# For populating metadata table (2)
args.sanitized_term_destination = 'mozdata.search_terms_unsanitized_analysis.prototype_sanitized_data'
args.job_reporting_destination = 'mozdata.search_terms_unsanitized_analysis.prototype_sanitization_job_metadata'

# For populating languages table (3) 
args.job_metadata_origin = 'mozdata.search_terms_unsanitized_analysis.prototype_sanitization_job_metadata'

# For populating data validation table (4)
args.job_reporting_origin = 'mozdata.search_terms_unsanitized_analysis.prototype_sanitization_job_metadata'
args.languages_origin = 'mozdata.search_terms_unsanitized_analysis.prototype_sanitization_job_languages'
args.data_validation_destination = 'mozdata.search_terms_unsanitized_analysis.prototype_data_validation_metrics'

### Step 1: Populate sanitized search terms table from unsanitized search term logs and record sanitization job metadata

In [2]:
from query_sanitization import stream_search_terms, detect_pii, export_search_queries_to_bigquery, record_job_metadata
import numpy
from datetime import datetime, timedelta
import pandas as pd

import collections
import functools
import operator

In [3]:
df = pd.read_csv('../assets/Names_2010Census.csv')
census_surnames = [str(name).lower() for name in df.name]

start_time = datetime.utcnow()

try:
    total_run = 0
    total_deemed_sanitary = 0
    summary_run_data = {}
    summary_language_data = {}
    
    unsanitized_search_term_stream = stream_search_terms() # load unsanitized search terms
    for raw_page in unsanitized_search_term_stream:
        total_run += raw_page.shape[0]
        pii_in_query_mask, run_data, language_data = await detect_pii(raw_page['query'], census_surnames)
        sanitized_page = raw_page.loc[~numpy.array(pii_in_query_mask)] # ~ reverses the mask so we get the queries WITHOUT PII in them
        total_deemed_sanitary += sanitized_page.shape[0]
        
        summary_language_data = dict(functools.reduce(operator.add,
                            map(collections.Counter, [summary_language_data, language_data])))
        summary_run_data = dict(functools.reduce(operator.add,
                            map(collections.Counter, [summary_run_data, run_data])))
                
        yesterday = datetime.utcnow().date() - timedelta(days=1)
        export_search_queries_to_bigquery(dataframe=sanitized_page, destination_table_id=args.sanitized_term_destination, date=yesterday)
    end_time = datetime.utcnow()
    
    implementation_notes = "Run with a page_size of 300k" 
    record_job_metadata(status='SUCCESS', started_at=start_time, ended_at=end_time, destination_table=args.job_reporting_destination, total_run=total_run, total_rejected=total_run - total_deemed_sanitary, run_data=summary_run_data, language_data=summary_language_data, implementation_notes=implementation_notes)
except Exception as e:
    # TODO: Make this more robust in actual failure cases
    # Maybe include the reason? Or should the logs be elsewhere for that
    record_job_metadata(status='FAILURE', started_at=start_time, ended_at=datetime.utcnow(), destination_table=args.job_reporting_destination, failure_reason=str(e))
    raise e



[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []]




[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []]




[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []]




[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []]




[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []]




[[], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], [], []]
New row representing job run successfully added.


### Step 2: Populate Languages Table (takes metadata table as input)

In [6]:
from google.cloud import bigquery

In [7]:
client = bigquery.Client()

POPULATE_LANGUAGES_TABLE = f"""
CREATE OR REPLACE TABLE
  `mozdata.search_terms_unsanitized_analysis.prototype_sanitization_job_languages`
AS
SELECT
  started_at AS job_start_time,
  lang.key AS language_code,
  SAFE_CAST(lang.value AS int) AS search_term_count
FROM
  `{args.job_metadata_origin}`
CROSS JOIN
  UNNEST(mozfun.json.js_extract_string_map(approximate_language_proportions_json)) AS lang
"""

query_job = client.query(POPULATE_LANGUAGES_TABLE)

### Step 3: Populate Data Validation Metrics Table (takes metadata table and languages table as input)

In [8]:
from data_validation import calculate_data_validation_metrics, export_data_validation_metrics_to_bigquery

In [9]:
df_i = calculate_data_validation_metrics(args.job_reporting_origin, args.languages_origin)

In [10]:
df_i.finished_at = df_i.finished_at.apply(lambda timestamp: timestamp.strftime("%Y-%m-%d %H:%M:%S"))

In [13]:
export_data_validation_metrics_to_bigquery(df_i, args.data_validation_destination)

[[]]
