<a href="https://colab.research.google.com/github/kbhuvi/myTest/blob/master/lb_v1_0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
#from google.colab import drive
#drive.mount('/content/drive')

In [None]:
#%cd /content/drive/My Drive/lb_digital/


# Introduction:
This colab notebook outlines the logic and explanation on approach to develop a simple product recommendation algorithm using co-purchase frequency.

# Tools and Technology:
Pyspark : Pyspark - python API for spark, offers quick data processing on very large datasets while providing the advantage of python programming.Spark’s in-memory data engine enables faster data processing and pyspark's advance Machine Learning offerings can help with extending this basic version to sophisticated version serving ML models to implement this algorithm.

In [None]:
# Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.9 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=211fa90dd6c8afc1129f8e93e9aa0469b883c1c19d6079dc92175f9fa48c3071
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

Create a new spark session with Spark master URL to run locally with any number of cores available.



In [None]:
# Import SparkSession
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Check Spark Session Information
spark


In [None]:
# Import a Spark function from library
from pyspark.sql.functions import col
from pyspark.sql.functions import array_contains
from pyspark.sql.functions import count
from pyspark.sql.functions import size
from pyspark.sql.functions import explode
from pyspark.sql.functions import concat_ws

Spark SQL provides functionality to read a  JSON file into Spark DataFrame. Once the data is loaded to data frame, data processing becomes quick and easy.

In [None]:
df = spark.read.json("transactions.txt")

In [None]:
df.printSchema() #Infer schema

root
 |-- customer: long (nullable = true)
 |-- date: string (nullable = true)
 |-- itemList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- item: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- quantity: double (nullable = true)
 |-- store: string (nullable = true)



In [None]:
print((df.count(), len(df.columns)))


(1377443, 4)


Below cell will read the product tsv file to a dataframe. This will be used later to map the product code with product name.

In [None]:
prd_df = spark.read.option("header","false").option("sep", "\t").option("multiLine", "true").option("quote","\"").option("escape","\"").option("ignoreTrailingWhiteSpace", True).csv("products.txt")
prd_df = prd_df.toDF('product', 'product_code', 'product_name')


One observation here is the product name contains special characters like commas and hyphen. This might require some additional handling to avoid formatting errors.

In [None]:
prd_df.show()

+--------------+------------+--------------------+
|       product|product_code|        product_name|
+--------------+------------+--------------------+
|   20000002_EA|   M10210701|Tuna Chunks in Broth|
|   20000005_EA|   M02270201|Fresh-Pressed Swe...|
|   20000053_EA|   M10210901|French Dijon Mustard|
|20000056001_KG|   M02270304|     Anaheim Peppers|
|   20000068_KG|   M05350101|        Swiss Cheese|
|   20000086_KG|   M10240102|       Juice Berries|
|   20000093_EA|   M10220203|Probiotic Yogurt,...|
|   20000100_EA|   M10210602|Ground Mexican Ch...|
|   20000104_EA|   M10230202|       Broccoli Cuts|
|   20000128_EA|   M10211103|Fibro-Contact Refill|
|   20000151_EA|   M10250119|Genius Kids & Tee...|
|   20000160_EA|   M05350101|               Basil|
|   20000165_KG|   M11320103|Shrimp, Fresh Peeled|
|20000172001_EA|   M10210303|   Fruit Punch Drink|
|20000172002_EA|   M10210303|         Peach Drink|
|   20000177_EA|   M10210901|Creamy Horseradis...|
|20000207001_EA|   M10211001|Ex

In [None]:
print((prd_df.count(), len(prd_df.columns)))


(70771, 3)


In [None]:
df_subset = df.filter(array_contains(df.itemList.item,'20592676_EA'))

In [None]:
print((df_subset.count(), len(df_subset.columns)))


(596, 4)


# Baseline recommendation Algorithm using co-purchase frequency:
The approach here is to 

1.   Filter records for specific product purchase from transaction
Consider the list of items under itemList column
2.   Above step will eliminate other rows and only present the transactions with list of items purchased  along with the item of interest.
3.   Next step is to explode the item array , group by each item and find count of each item across the filtered set of transactions.This count gives the count of occurence of the item in the filtered set of trasactions.
4.   Next sort the dataset using item count in descending order, exclude the item for which co-purchase frequency is calculated and return the top 5/ n rows.
5.   Only the top n item codes are required from this function which will be used later to find the list of product names.

In [None]:
def find_top_n_co_purchase_product(df,item_code,n=5):
  #filter transaction dataframe to only include rows with given item code
  df_subset = df.filter(array_contains(df.itemList.item,item_code))
  #select the column of interest 
  items = df_subset.select("itemList.item")
  #Ensure transaction rows where only this one item was purchased
  #items = items.withColumn("item_count",size(col('item'))).filter(col("item_count") > 1)
  #Explode the dataframe 
  items_explode = items.select("*",explode(items['item']).alias("value"))
  final_items = items_explode.groupBy("value").agg(count("*").alias("count"))
  top_n_result = final_items.sort(["count"], ascending=[False]).select("*").filter(col('value').isin([item_code]) == False).head(n)
  print(top_n_result)
  top_n_values = [ ele.__getattr__('value') for ele in top_n_result]
  return top_n_values


Below logic to is build a lookup logic to find the product names associated with the item code.
As mentioned earlier, product names might contain special characters and below function takes care of handling commas.

In [None]:
def lookup_product(prd_df,item_arr):
    products = []
    for item in item_arr:
      products.append(prd_df.filter(col("product")==item).select("product_name").head())
    top_n_products = [ ele.__getattr__('product_name').replace(",", "") for ele in products]
    return top_n_products

The cells below calls the function cretead above to display the top 5 co-purchased products for item code "20592676_EA" and "20801754003_C15"

In [None]:
top_n_items_20592676_EA = find_top_n_co_purchase_product(df,'20592676_EA')
print(top_n_items_20592676_EA)
top_n_products_20592676_EA = lookup_product(prd_df,top_n_items_20592676_EA)
print(top_n_products_20592676_EA)


[Row(value='20189092_EA', count=193), Row(value='20379763_EA', count=96), Row(value='20175355001_KG', count=92), Row(value='20668578_EA', count=37), Row(value='20812144001_EA', count=31)]
['20189092_EA', '20379763_EA', '20175355001_KG', '20668578_EA', '20812144001_EA']
['Plastic Bags', 'Celebration Cupcakes White', 'Bananas Bunch', 'PENNY ROUNDING - DO NOT TOUCH', 'Grade A White Eggs Large']


In [None]:
top_n_items_20801754003_C15 = find_top_n_co_purchase_product(df,'20801754003_C15')
print(top_n_items_20801754003_C15)
top_n_products_20801754003_C15 = lookup_product(prd_df,top_n_items_20801754003_C15)
print(top_n_products_20801754003_C15)

[Row(value='20175355001_KG', count=188), Row(value='20801754001_C15', count=171), Row(value='20189092_EA', count=159), Row(value='20962518_EA', count=85), Row(value='20668578_EA', count=62)]
['20175355001_KG', '20801754001_C15', '20189092_EA', '20962518_EA', '20668578_EA']
['Bananas Bunch', 'Pepsi', 'Plastic Bags', 'Milk 2%', 'PENNY ROUNDING - DO NOT TOUCH']


###build_submission_data 
This function helps with building the json response that needs to be posted to the iap service,

In [None]:
import json
def build_submission_data():
  data = {"name": "Bhuvana Adur Kannan","email": "bhuvana.adurkannan@gmail.com","20592676_EA": top_n_products_20592676_EA,"20801754003_C15": top_n_products_20801754003_C15}
  return data


#Response submission using credentials provided
This was little challenging to understand how Identity-Aware Proxy works and steps involved to use the given service account key file to generate signed credntial object which in turn used to obtain the identification token that is required to make a post request to Identity-Aware Proxy-protected URL 

The function 
####make_iap_request 
####get_google_open_id_connect_token 
were both referred from https://gist.github.com/JensRantil/a3cc32c80c3594844009c55ff9233591

Minor modification made to the code to ignore credential refresh() and provide correct value for target_audience parameter so that the service_account_credentials obtained are valid.
###Other References: 

https://blog.realkinetic.com/api-authentication-with-gcp-identity-aware-proxy-3a4147b30770
https://engineering.wingify.com/posts/programmatic-authentication-under-iap/

In [None]:
! pip install requests_toolbelt

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting requests_toolbelt
  Downloading requests_toolbelt-0.10.1-py2.py3-none-any.whl (54 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m54.5/54.5 KB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: requests_toolbelt
Successfully installed requests_toolbelt-0.10.1


In [None]:
# Copyright 2016 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example use of a service account to authenticate to Identity-Aware Proxy."""

# [START iap_make_request]
import google.auth
import google.auth.app_engine
import google.auth.iam
from google.auth.transport.requests import Request
import google.oauth2.credentials
import google.oauth2.service_account
import requests
import requests_toolbelt.adapters.appengine
import json

IAM_SCOPE = 'https://www.googleapis.com/auth/iam'
OAUTH_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'
#'https://oauth2.googleapis.com/token'


def make_iap_request(url, client_id, result_data):
    """Makes a request to an application protected by Identity-Aware Proxy.
    Args:
      url: The Identity-Aware Proxy-protected URL to fetch.
      client_id: The client ID used by Identity-Aware Proxy.
    Returns:
      The page body, or raises an exception if the page couldn't be retrieved.
    """
    # Figure out what environment we're running in and get some preliminary
    # information about the service account.
    bootstrap_credentials = google.oauth2.service_account.Credentials.from_service_account_file('ld-ds-take-home-service-account.json')
    print(bootstrap_credentials.signer)
    if isinstance(bootstrap_credentials,
                  google.oauth2.credentials.Credentials):
        raise Exception('make_iap_request is only supported for service '
                        'accounts.')
    elif isinstance(bootstrap_credentials,
                    google.auth.app_engine.Credentials):
        requests_toolbelt.adapters.appengine.monkeypatch()

    # For service account's using the Compute Engine metadata service,
    # service_account_email isn't available until refresh is called.
    # bootstrap_credentials.refresh(Request())

    signer_email = bootstrap_credentials.service_account_email
    print(f"signer_email -> {signer_email}")
    if isinstance(bootstrap_credentials,
                  google.auth.compute_engine.credentials.Credentials):
        # Since the Compute Engine metadata service doesn't expose the service
        # account key, we use the IAM signBlob API to sign instead.
        # In order for this to work:
        #
        # 1. Your VM needs the https://www.googleapis.com/auth/iam scope.
        #    You can specify this specific scope when creating a VM
        #    through the API or gcloud. When using Cloud Console,
        #    you'll need to specify the "full access to all Cloud APIs"
        #    scope. A VM's scopes can only be specified at creation time.
        #
        # 2. The VM's default service account needs the "Service Account Actor"
        #    role. This can be found under the "Project" category in Cloud
        #    Console, or roles/iam.serviceAccountActor in gcloud.
        signer = google.auth.iam.Signer(
            Request(), bootstrap_credentials, signer_email)
    else:
        # A Signer object can sign a JWT using the service account's key.
        signer = bootstrap_credentials.signer

    # Construct OAuth 2.0 service account credentials using the signer
    # and email acquired from the bootstrap credentials.
    print(f"signer -> {signer}")
    service_account_credentials = google.oauth2.service_account.Credentials(
        signer, signer_email, token_uri=OAUTH_TOKEN_URI, additional_claims={
            'target_audience': client_id
        })


    # service_account_credentials gives us a JWT signed by the service
    # account. Next, we use that to obtain an OpenID Connect token,
    # which is a JWT signed by Google.
    
    google_open_id_connect_token = get_google_open_id_connect_token(
        service_account_credentials)
    
    # Fetch the Identity-Aware Proxy-protected URL, including an
    # Authorization header containing "Bearer " followed by a
    # Google-issued OpenID Connect token for the service account.
    
    headers = {'Authorization': 'Bearer {}'.format(google_open_id_connect_token)}
    #print(f"header: {headers}")
    
    resp = requests.post(
        url,
        data=data,
        headers=headers
          )
    
    if resp.status_code == 403:
        raise Exception('Service account {} does not have permission to '
                        'access the IAP-protected application.'.format(
                            signer_email))
    elif resp.status_code != 200:
        raise Exception(
            'Bad response from application: {!r} / {!r} / {!r}'.format(
                resp.status_code, resp.headers, resp.text))
    else:
        return resp.text


In [None]:
def get_google_open_id_connect_token(service_account_credentials):
    """Get an OpenID Connect token issued by Google for the service account.
    This function:
      1. Generates a JWT signed with the service account's private key
         containing a special "target_audience" claim.
      2. Sends it to the OAUTH_TOKEN_URI endpoint. Because the JWT in #1
         has a target_audience claim, that endpoint will respond with
         an OpenID Connect token for the service account -- in other words,
         a JWT signed by *Google*. The aud claim in this JWT will be
         set to the value from the target_audience claim in #1.
    For more information, see
    https://developers.google.com/identity/protocols/OAuth2ServiceAccount .
    The HTTP/REST example on that page describes the JWT structure and
    demonstrates how to call the token endpoint. (The example on that page
    shows how to get an OAuth2 access token; this code is using a
    modified version of it to get an OpenID Connect token.)
    """

    service_account_jwt = (
        service_account_credentials._make_authorization_grant_assertion())
    request = google.auth.transport.requests.Request()
    body = {
        'assertion': service_account_jwt,
        'grant_type': google.oauth2._client._JWT_GRANT_TYPE,
    }
    token_response = google.oauth2._client._token_endpoint_request(
        request, OAUTH_TOKEN_URI, body)
    #print(f"token_response -> {token_response}")
    return token_response['id_token']

# [END iap_make_request]

Below cell contains the code to submit answer to test submission service.

In [None]:
client_id = '459957645727-m8ksvmqcp3sk4ok9cgh61u2q85knaqjg.apps.googleusercontent.com'
#'107505745258773426561'
url = 'https://ld-ds-take-home-test.appspot.com/submissions'
data = build_submission_data()
print(data)
print(make_iap_request(url, client_id,data))

{'name': 'Bhuvana Adur Kannan', 'email': 'bhuvana.adurkannan@gmail.com', '20592676_EA': ['Plastic Bags', 'Celebration Cupcakes White', 'Bananas Bunch', 'PENNY ROUNDING - DO NOT TOUCH', 'Grade A White Eggs Large'], '20801754003_C15': ['Bananas Bunch', 'Pepsi', 'Plastic Bags', 'Milk 2%', 'PENNY ROUNDING - DO NOT TOUCH']}
<google.auth.crypt._python_rsa.RSASigner object at 0x7fbc37302b50>
signer_email -> submission-account@ld-ds-take-home-test.iam.gserviceaccount.com
signer -> <google.auth.crypt._python_rsa.RSASigner object at 0x7fbc37302b50>
Response received for Bhuvana Adur Kannan (bhuvana.adurkannan@gmail.com) - thank you for taking the test!


#Deploy and serve the algorithm
In order to put this algorithm in use for other users/services to access, we can deploy this model as a web service and serve it using Flask. The rest service end point can be exposed to authenticated users.