# Neural network hybrid recommendation system on Google Analytics data preprocessing

This notebook demonstrates how to implement a hybrid recommendation system using a neural network to combine content-based and collaborative filtering recommendation models using Google Analytics data. We are going to use the learned user embeddings from [wals.ipynb](../wals.ipynb) and combine that with our previous content-based features from [content_based_using_neural_networks.ipynb](../content_based_using_neural_networks.ipynb)

First we are going to preprocess our data using BigQuery and Cloud Dataflow to be used in our later neural network hybrid recommendation model.

Apache Beam only works in Python 2 at the moment, so we're going to switch to the Python 2 kernel. In the above menu, click the dropdown arrow and select `python2`.

In [13]:
%%bash
source activate py2env
pip uninstall -y google-cloud-dataflow
conda install -y pytz==2018.4
pip install apache-beam[gcp]

Uninstalling google-cloud-dataflow-2.5.0:
  Successfully uninstalled google-cloud-dataflow-2.5.0
Collecting package metadata: ...working... done
Solving environment: ...working... done

# All requested packages already installed.



In [9]:
%%bash
conda update -n base -c defaults conda

Solving environment: ...working... done

## Package Plan ##

  environment location: /usr/local

  added / updated specs: 
    - conda


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    openssl-1.1.1b             |       h7b6447c_1         4.0 MB  defaults
    conda-4.6.8                |           py27_0         1.6 MB  defaults
    ------------------------------------------------------------
                                           Total:         5.6 MB

The following packages will be UPDATED:

    ca-certificates: 2018.03.07-0      defaults --> 2019.1.23-0       defaults
    certifi:         2018.11.29-py27_0 defaults --> 2019.3.9-py27_0   defaults
    conda:           4.5.12-py27_0     defaults --> 4.6.8-py27_0      defaults
    openssl:         1.1.1a-h7b6447c_0 defaults --> 1.1.1b-h7b6447c_1 defaults

Proceed ([y]/n)? 

Downloading and Extracting Packages
Preparing transaction: ..

openssl-1.1.1b       | 4.0 MB    |            |   0% openssl-1.1.1b       | 4.0 MB    | #######6   |  77% openssl-1.1.1b       | 4.0 MB    | #########9 | 100% openssl-1.1.1b       | 4.0 MB    | ########## | 100% 
conda-4.6.8          | 1.6 MB    |            |   0% conda-4.6.8          | 1.6 MB    | #######8   |  78% conda-4.6.8          | 1.6 MB    | ########9  |  89% conda-4.6.8          | 1.6 MB    | #########7 |  97% conda-4.6.8          | 1.6 MB    | ########## | 100% 


In [12]:
%%bash
pip install google-cloud-dataflow

Collecting google-cloud-dataflow
  Downloading https://files.pythonhosted.org/packages/72/29/3aaa67a276bcb07c3e85a6048b4d9610542082d262256cd6d232d6a8c00a/google-cloud-dataflow-2.5.0.tar.gz
Collecting apache-beam[gcp]==2.5.0 (from google-cloud-dataflow)
  Downloading https://files.pythonhosted.org/packages/ff/10/a59ba412f71fb65412ec7a322de6331e19ec8e75ca45eba7a0708daae31a/apache_beam-2.5.0-cp27-cp27mu-manylinux1_x86_64.whl (2.2MB)
Collecting httplib2<0.10,>=0.8 (from apache-beam[gcp]==2.5.0->google-cloud-dataflow)
  Downloading https://files.pythonhosted.org/packages/ff/a9/5751cdf17a70ea89f6dde23ceb1705bfb638fd8cee00f845308bf8d26397/httplib2-0.9.2.tar.gz (205kB)
Collecting typing<3.7.0,>=3.6.0 (from apache-beam[gcp]==2.5.0->google-cloud-dataflow)
  Using cached https://files.pythonhosted.org/packages/cc/3e/29f92b7aeda5b078c86d14f550bf85cff809042e3429ace7af6193c3bc9f/typing-3.6.6-py2-none-any.whl
Collecting hdfs<3.0.0,>=2.1.0 (from apache-beam[gcp]==2.5.0->google-cloud-dataflow)
Collecti

pandas-gbq 0.3.0 has requirement google-cloud-bigquery>=0.28.0, but you'll have google-cloud-bigquery 0.25.0 which is incompatible.
google-cloud-monitoring 0.28.0 has requirement google-cloud-core<0.29dev,>=0.28.0, but you'll have google-cloud-core 0.25.0 which is incompatible.
datalab 1.1.3 has requirement httplib2>=0.10.3, but you'll have httplib2 0.9.2 which is incompatible.


Now restart notebook's session kernel!

In [1]:
# Import helpful libraries and setup our project, bucket, and region
import os

output = os.popen("gcloud config get-value project").readlines()
project_name = output[0][:-1]

# change these to try this notebook out
PROJECT = project_name
BUCKET = project_name
#BUCKET = BUCKET.replace("qwiklabs-gcp-", "inna-bckt-")
REGION = 'europe-west1'  ## note: Cloud ML Engine not availabe in europe-west3!

print(PROJECT)
print(BUCKET)
print(REGION)

# do not change these
os.environ['PROJECT'] = PROJECT
os.environ['BUCKET'] = BUCKET
os.environ['REGION'] = REGION
os.environ['TFVERSION'] = '1.8'

qwiklabs-gcp-1e68e259ee8f6287
qwiklabs-gcp-1e68e259ee8f6287
europe-west1


In [2]:
%%bash
gcloud config set project $PROJECT
gcloud config set compute/region $REGION

Updated property [core/project].
Updated property [compute/region].


<h2> Create ML dataset using Dataflow </h2>
Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.

First, let's create our hybrid dataset query that we will use in our Cloud Dataflow pipeline. This will combine some content-based features and the user and item embeddings learned from our WALS Matrix Factorization Collaborative filtering lab that we extracted from our trained WALSMatrixFactorization Estimator and uploaded to BigQuery.

In [3]:
query_hybrid_dataset = """
WITH CTE_site_history AS (
  SELECT
      fullVisitorId as visitor_id,
      (SELECT MAX(IF(index = 10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS content_id,
      (SELECT MAX(IF(index = 7, value, NULL)) FROM UNNEST(hits.customDimensions)) AS category, 
      (SELECT MAX(IF(index = 6, value, NULL)) FROM UNNEST(hits.customDimensions)) AS title,
      (SELECT MAX(IF(index = 2, value, NULL)) FROM UNNEST(hits.customDimensions)) AS author_list,
      SPLIT(RPAD((SELECT MAX(IF(index = 4, value, NULL)) FROM UNNEST(hits.customDimensions)), 7), '.') AS year_month_array,
      LEAD(hits.customDimensions, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) AS nextCustomDimensions
  FROM 
    `cloud-training-demos.GA360_test.ga_sessions_sample`,   
     UNNEST(hits) AS hits
   WHERE 
     # only include hits on pages
      hits.type = "PAGE"
      AND
      fullVisitorId IS NOT NULL
      AND
      hits.time != 0
      AND
      hits.time IS NOT NULL
      AND
      (SELECT MAX(IF(index = 10, value, NULL)) FROM UNNEST(hits.customDimensions)) IS NOT NULL
),
CTE_training_dataset AS (
SELECT
  (SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(nextCustomDimensions)) AS next_content_id,
  
  visitor_id,
  content_id,
  category,
  REGEXP_REPLACE(title, r",", "") AS title,
  REGEXP_EXTRACT(author_list, r"^[^,]+") AS author,
  DATE_DIFF(DATE(CAST(year_month_array[OFFSET(0)] AS INT64), CAST(year_month_array[OFFSET(1)] AS INT64), 1), DATE(1970, 1, 1), MONTH) AS months_since_epoch
FROM
  CTE_site_history
WHERE (SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(nextCustomDimensions)) IS NOT NULL)

SELECT
  CAST(next_content_id AS STRING) AS next_content_id,
  
  CAST(training_dataset.visitor_id AS STRING) AS visitor_id,
  CAST(training_dataset.content_id AS STRING) AS content_id,
  CAST(IFNULL(category, 'None') AS STRING) AS category,
  CONCAT("\\"", REPLACE(TRIM(CAST(IFNULL(title, 'None') AS STRING)), "\\"",""), "\\"") AS title,
  CAST(IFNULL(author, 'None') AS STRING) AS author,
  CAST(months_since_epoch AS STRING) AS months_since_epoch,
  
  IFNULL(user_factors._0, 0.0) AS user_factor_0,
  IFNULL(user_factors._1, 0.0) AS user_factor_1,
  IFNULL(user_factors._2, 0.0) AS user_factor_2,
  IFNULL(user_factors._3, 0.0) AS user_factor_3,
  IFNULL(user_factors._4, 0.0) AS user_factor_4,
  IFNULL(user_factors._5, 0.0) AS user_factor_5,
  IFNULL(user_factors._6, 0.0) AS user_factor_6,
  IFNULL(user_factors._7, 0.0) AS user_factor_7,
  IFNULL(user_factors._8, 0.0) AS user_factor_8,
  IFNULL(user_factors._9, 0.0) AS user_factor_9,
  
  IFNULL(item_factors._0, 0.0) AS item_factor_0,
  IFNULL(item_factors._1, 0.0) AS item_factor_1,
  IFNULL(item_factors._2, 0.0) AS item_factor_2,
  IFNULL(item_factors._3, 0.0) AS item_factor_3,
  IFNULL(item_factors._4, 0.0) AS item_factor_4,
  IFNULL(item_factors._5, 0.0) AS item_factor_5,
  IFNULL(item_factors._6, 0.0) AS item_factor_6,
  IFNULL(item_factors._7, 0.0) AS item_factor_7,
  IFNULL(item_factors._8, 0.0) AS item_factor_8,
  IFNULL(item_factors._9, 0.0) AS item_factor_9,
  
  FARM_FINGERPRINT(CONCAT(CAST(visitor_id AS STRING), CAST(content_id AS STRING))) AS hash_id
FROM CTE_training_dataset AS training_dataset
LEFT JOIN `cloud-training-demos.GA360_test.user_factors` AS user_factors
  ON CAST(training_dataset.visitor_id AS FLOAT64) = CAST(user_factors.user_id AS FLOAT64)
LEFT JOIN `cloud-training-demos.GA360_test.item_factors` AS item_factors
  ON CAST(training_dataset.content_id AS STRING) = CAST(item_factors.item_id AS STRING)
"""

Let's pull a sample of our data into a dataframe to see what it looks like.

In [4]:
import google.datalab.bigquery as bq
df_hybrid_dataset = bq.Query(query_hybrid_dataset + "LIMIT 100").execute().result().to_dataframe()
df_hybrid_dataset.head()

Unnamed: 0,next_content_id,visitor_id,content_id,category,title,author,months_since_epoch,user_factor_0,user_factor_1,user_factor_2,...,item_factor_1,item_factor_2,item_factor_3,item_factor_4,item_factor_5,item_factor_6,item_factor_7,item_factor_8,item_factor_9,hash_id
0,299931241,1000148716229112932,299913368,News,"""U4-Störung legt Wiener Frühverkehr lahm""",Yvonne Widler,574,0.000832,-8.2e-05,-0.000356,...,0.04349357,0.1793136,-0.2215933,-0.1450238,-0.2655937,0.0841945,-0.08709898,-0.05611087,-0.01429722,-2184921408517706006
1,299913879,1000148716229112932,299931241,Stars & Kultur,"""Regisseur Michael Haneke kritisiert Flüchtlin...",,574,0.000832,-8.2e-05,-0.000356,...,-4.683084e-24,1.735423e-23,7.477076e-24,5.1655270000000003e-23,-9.793503e-24,-1.8990410000000003e-23,-2.913853e-23,2.222428e-24,7.229135000000001e-23,5621992188636621190
2,299912101,1000148716229112932,299913879,News,"""Python erwürgte thailändischen Besitzer""",,574,0.000832,-8.2e-05,-0.000356,...,3.136074e-16,4.155319e-16,4.458257e-16,-7.340608e-16,1.147638e-15,-1.369216e-15,-1.61224e-16,2.21777e-16,3.1044640000000004e-17,7485673666361980819
3,299922662,1000360453832106474,299925700,Lifestyle,"""Nach Tod von Vater: Tochter bekommt jedes Jah...",Marlene Patsalidis,574,-0.000236,-8e-06,0.001155,...,0.1375236,-0.04747139,0.08547803,-0.00114593,0.08357742,0.006587416,-0.07324135,-0.05101252,0.03820005,450595002488886092
4,299826775,1000360453832106474,299922662,Lifestyle,"""Australischer Fernsehstar rechtfertigt Sexism...",Marlene Patsalidis,574,-0.000236,-8e-06,0.001155,...,-0.0003428652,0.0008917232,-1.524432e-05,-0.0009830965,0.001352254,0.0003238236,-0.0005569104,-6.175535e-05,0.0005140019,-2937038193621424597


In [5]:
df_hybrid_dataset.describe()

Unnamed: 0,user_factor_0,user_factor_1,user_factor_2,user_factor_3,user_factor_4,user_factor_5,user_factor_6,user_factor_7,user_factor_8,user_factor_9,...,item_factor_1,item_factor_2,item_factor_3,item_factor_4,item_factor_5,item_factor_6,item_factor_7,item_factor_8,item_factor_9,hash_id
count,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,...,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0,100.0
mean,-0.000147,0.000685,-0.000627,-0.00115,-0.000492,-0.000155,0.000444,-0.000552,0.000201,0.000758,...,0.7296223,-0.3078193,-0.071049,-0.3969388,0.49663,-0.5494907,-0.2779562,0.4003812,-0.1501421,4.254542e+17
std,0.001493,0.001397,0.001859,0.002484,0.000871,0.003032,0.001697,0.00198,0.001692,0.001777,...,4.559783,3.366933,0.574152,1.860872,2.77531,4.605988,1.442863,3.901654,1.664435,5.206981e+18
min,-0.005072,-0.001792,-0.003466,-0.007429,-0.004008,-0.00702,-0.002884,-0.006178,-0.003074,-0.0056,...,-0.03037239,-22.70137,-5.523473,-11.59028,-0.265594,-32.37039,-12.50565,-9.549941,-11.15637,-9.12606e+18
25%,-0.000772,5e-06,-0.001001,-0.001109,-0.001173,-0.000269,-0.000124,-0.002153,-0.000349,-5e-06,...,-4.683084e-24,-0.005500714,-0.00012,-0.002404522,-0.000457,-4.063126e-07,-0.003291267,-0.0001949149,-2.214674e-23,-3.70462e+18
50%,8e-06,0.000151,-0.000233,3.1e-05,-0.000637,-0.000249,0.000125,-0.00032,-5.1e-05,0.000139,...,2.199448e-13,2.015904e-24,0.0,-1.7771170000000002e-17,0.0,0.0,-1.460821e-16,-1.102839e-35,1.785944e-15,2.377111e+17
75%,2.5e-05,0.001103,0.000263,0.000126,9.6e-05,0.002153,0.001355,-5e-05,1.1e-05,0.001451,...,0.04349357,0.0005184471,0.000114,8.476977000000001e-17,0.001352,0.006587416,9.396294e-21,6.632489e-06,0.001329006,5.453835e+18
max,0.005226,0.006763,0.006207,0.000501,0.002388,0.003629,0.007907,0.003185,0.004892,0.005829,...,32.29185,9.632271,0.768243,0.1868268,18.775856,5.649576,0.06752157,26.71273,5.161455,8.882012e+18


In [6]:
import apache_beam as beam
import datetime, os

def to_csv(rowdict):
  # Pull columns from BQ and create a line
  import hashlib
  import copy
  CSV_COLUMNS = 'next_content_id,visitor_id,content_id,category,title,author,months_since_epoch'.split(',')
  FACTOR_COLUMNS = ["user_factor_{}".format(i) for i in range(10)] + ["item_factor_{}".format(i) for i in range(10)]
    
  # Write out rows for each input row for each column in rowdict
  data = ','.join(['None' if k not in rowdict else (rowdict[k].encode('utf-8') if rowdict[k] is not None else 'None') for k in CSV_COLUMNS])
  data += ','
  data += ','.join([str(rowdict[k]) if k in rowdict else 'None' for k in FACTOR_COLUMNS])
  yield ('{}'.format(data))
  
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-hybrid-recommendation-features' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc/features'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/hybrid_recommendation/preproc/features/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
    RUNNER = 'DirectRunner'
  else:
    RUNNER = 'DataflowRunner'
  p = beam.Pipeline(RUNNER, options = opts)
  
  query = query_hybrid_dataset

  if in_test_mode:
    query = query + ' LIMIT 100' 

  for step in ['train', 'eval']:
    if step == 'train':
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hash_id), 10) < 9'.format(query)
    else:
      selquery = 'SELECT * FROM ({}) WHERE MOD(ABS(hash_id), 10) = 9'.format(query)

    (p 
     | '{}_read'.format(step) >> beam.io.Read(beam.io.BigQuerySource(query = selquery, use_standard_sql = True))
     | '{}_csv'.format(step) >> beam.FlatMap(to_csv)
     | '{}_out'.format(step) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{}.csv'.format(step))))
    )

  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")
    
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-hybrid-recommendation-features-190321-080809 ... hang on


Using this argument will have no effect on the actual scopes for tokens
requested. These scopes are set at VM instance creation time and
can't be overridden in the request.



Let's check our files to make sure everything went as expected

In [15]:
%%bash
rm -rf features
mkdir features

In [16]:
!gsutil -m cp -r gs://{BUCKET}/hybrid_recommendation/preproc/features/*.csv* features/

Copying gs://qwiklabs-gcp-1e68e259ee8f6287/hybrid_recommendation/preproc/features/eval.csv-00000-of-00001...
Copying gs://qwiklabs-gcp-1e68e259ee8f6287/hybrid_recommendation/preproc/features/train.csv-00000-of-00005...
Copying gs://qwiklabs-gcp-1e68e259ee8f6287/hybrid_recommendation/preproc/features/train.csv-00001-of-00005...
Copying gs://qwiklabs-gcp-1e68e259ee8f6287/hybrid_recommendation/preproc/features/train.csv-00002-of-00005...
Copying gs://qwiklabs-gcp-1e68e259ee8f6287/hybrid_recommendation/preproc/features/train.csv-00003-of-00005...
Copying gs://qwiklabs-gcp-1e68e259ee8f6287/hybrid_recommendation/preproc/features/train.csv-00004-of-00005...
| [6/6 files][114.6 MiB/114.6 MiB] 100% Done                                    
Operation completed over 6 objects/114.6 MiB.                                    


In [12]:
!head -3 features/*

==> features/eval.csv-00000-of-00001 <==
710535,951784927766849126,710535,News,"Haus aus Marmor und Grabsteinen",None,503,-0.00170100899413,0.00496714003384,0.0040482301265,0.000690933316946,3.52509268851e-05,-0.00172890012618,0.00153049221262,0.00100265210494,0.00228979066014,-0.00201142113656,-5.59943889043e-19,7.42678684608e-19,-1.3985523895e-19,3.42277049416e-19,1.11620765154e-18,2.17990091471e-18,-2.42801472173e-19,1.5953545546e-19,-1.10792405809e-18,-4.38625547901e-19
971465,4674958417287013114,711895,None,"Impressum KURIER.at",None,553,-0.000477781286463,0.00125698221382,-0.000599682505708,0.00186442385893,-0.00068538391497,0.00188264774624,0.000171602805494,-0.000178980146302,0.000812514859717,0.00132207910065,-6.17342042923,14.5652112961,17.5528583527,3.3229033947,-44.9284629822,29.9998893738,18.7066059113,-14.6920909882,-20.3173618317,-3.7755317688
711895,8640555275627058154,711895,None,"Impressum KURIER.at",None,553,1.48057097249e-05,-1.99350433832e-05,-1.66832815012e-05,

<h2> Create vocabularies using Dataflow </h2>

Let's use Cloud Dataflow to read in the BigQuery data, do some preprocessing, and write it out as CSV files.

Now we'll create our vocabulary files for our categorical features.

In [13]:
query_vocabularies = """
SELECT
  CAST((SELECT MAX(IF(index = index_value, value, NULL)) FROM UNNEST(hits.customDimensions)) AS STRING) AS grouped_by
FROM `cloud-training-demos.GA360_test.ga_sessions_sample`,
  UNNEST(hits) AS hits
WHERE
  # only include hits on pages
  hits.type = "PAGE"
  AND (SELECT MAX(IF(index = index_value, value, NULL)) FROM UNNEST(hits.customDimensions)) IS NOT NULL
GROUP BY
  grouped_by
"""

In [14]:
import apache_beam as beam
import datetime, os

def to_txt(rowdict):
  # Pull columns from BQ and create a line

  # Write out rows for each input row for grouped by column in rowdict
  return '{}'.format(rowdict['grouped_by'].encode('utf-8'))
  
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-hybrid-recommendation-vocab-lists' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc/vocabs'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/hybrid_recommendation/preproc/vocabs/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'
      
  p = beam.Pipeline(RUNNER, options = opts)
  
  def vocab_list(index, name):
    query = query_vocabularies.replace("index_value", "{}".format(index))

    (p 
     | '{}_read'.format(name) >> beam.io.Read(beam.io.BigQuerySource(query = query, use_standard_sql = True))
     | '{}_txt'.format(name) >> beam.Map(to_txt)
     | '{}_out'.format(name) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{0}_vocab.txt'.format(name))))
    )

  # Call vocab_list function for each
  vocab_list(10, 'content_id') # content_id
  vocab_list(7, 'category') # category
  vocab_list(2, 'author') # author
  
  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")
    
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-hybrid-recommendation-vocab-lists-190321-081839 ... hang on


Also get vocab counts from the length of the vocabularies

In [17]:
import apache_beam as beam
import datetime, os

def count_to_txt(rowdict):
  # Pull columns from BQ and create a line

  # Write out count
  return '{}'.format(rowdict['count_number'])
  
def mean_to_txt(rowdict):
  # Pull columns from BQ and create a line

  # Write out mean
  return '{}'.format(rowdict['mean_value'])
  
def preprocess(in_test_mode):
  import shutil, os, subprocess
  job_name = 'preprocess-hybrid-recommendation-vocab-counts' + '-' + datetime.datetime.now().strftime('%y%m%d-%H%M%S')

  if in_test_mode:
      print('Launching local job ... hang on')
      OUTPUT_DIR = './preproc/vocab_counts'
      shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
      os.makedirs(OUTPUT_DIR)
  else:
      print('Launching Dataflow job {} ... hang on'.format(job_name))
      OUTPUT_DIR = 'gs://{0}/hybrid_recommendation/preproc/vocab_counts/'.format(BUCKET)
      try:
        subprocess.check_call('gsutil -m rm -r {}'.format(OUTPUT_DIR).split())
      except:
        pass

  options = {
      'staging_location': os.path.join(OUTPUT_DIR, 'tmp', 'staging'),
      'temp_location': os.path.join(OUTPUT_DIR, 'tmp'),
      'job_name': job_name,
      'project': PROJECT,
      'teardown_policy': 'TEARDOWN_ALWAYS',
      'no_save_main_session': True
  }
  opts = beam.pipeline.PipelineOptions(flags = [], **options)
  if in_test_mode:
      RUNNER = 'DirectRunner'
  else:
      RUNNER = 'DataflowRunner'
      
  p = beam.Pipeline(RUNNER, options = opts)
  
  def vocab_count(index, column_name):
    query = """
SELECT
  COUNT(*) AS count_number
FROM ({})
""".format(query_vocabularies.replace("index_value", "{}".format(index)))

    (p 
     | '{}_read'.format(column_name) >> beam.io.Read(beam.io.BigQuerySource(query = query, use_standard_sql = True))
     | '{}_txt'.format(column_name) >> beam.Map(count_to_txt)
     | '{}_out'.format(column_name) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{0}_vocab_count.txt'.format(column_name))))
    )
    
  def global_column_mean(column_name):
    query = """
SELECT
  AVG(CAST({1} AS FLOAT64)) AS mean_value
FROM ({0})
""".format(query_hybrid_dataset, column_name)
    
    (p 
     | '{}_read'.format(column_name) >> beam.io.Read(beam.io.BigQuerySource(query = query, use_standard_sql = True))
     | '{}_txt'.format(column_name) >> beam.Map(mean_to_txt)
     | '{}_out'.format(column_name) >> beam.io.Write(beam.io.WriteToText(os.path.join(OUTPUT_DIR, '{0}_mean.txt'.format(column_name))))
    )
    
  # Call vocab_count function for each column we want the vocabulary count for
  vocab_count(10, 'content_id') # content_id
  vocab_count(7, 'category') # category
  vocab_count(2, 'author') # author
  
  # Call global_column_mean function for each column we want the mean for
  global_column_mean('months_since_epoch') # months_since_epoch
  
  job = p.run()
  if in_test_mode:
    job.wait_until_finish()
    print("Done!")
    
preprocess(in_test_mode = False)

Launching Dataflow job preprocess-hybrid-recommendation-vocab-counts-190321-082637 ... hang on


Let's check our files to make sure everything went as expected

In [None]:
%%bash
rm -rf vocabs
mkdir vocabs

In [None]:
!gsutil -m cp -r gs://{BUCKET}/hybrid_recommendation/preproc/vocabs/*.txt* vocabs/

In [None]:
!head -3 vocabs/*

In [None]:
%%bash
rm -rf vocab_counts
mkdir vocab_counts

In [None]:
!gsutil -m cp -r gs://{BUCKET}/hybrid_recommendation/preproc/vocab_counts/*.txt* vocab_counts/

In [None]:
!head -3 vocab_counts/*