-sandbox
# ライブイベント向け商品レコメンド - スタジアムアナリティクス
<img style='float: right' width='600px' src='https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-notif.png'>

**ユースケースの概要**
* スポーツのライブ中継では大量の顧客データが生成されます。より良いファン体験を実現するのに活用できます。ファンとの関わりは、売上や顧客維持率の向上に直結します。このノートブックでは、ファンの購買履歴とスタジアムの座席の位置情報をもとにパーソナライズされた割引オファーを作成し、イベント中の追加販売を促進し、より個人的な体験を実現する方法を紹介します。
*目標：ファンへキャンペーンのオファーのプッシュ通知を送信します。
 
**ソリューションのビジネスインパクト**
* **ファン・エンゲージメントと顧客維持：** イベントに参加している間、より良い経験をしているファンは、将来的に別のイベントのために戻ってくる可能性が高くなります。
* **収益の増加：**ファンに パーソナライズされた割引キャンペーンを送信すると、イベント中にスタジアムの中の店からの購入が増えるのでお店(ベンダー)の収益の増加にも繋がります。

<!-- Collect usage data (view). Remove it to disable collection. View README for more details.  -->
<img width="1px" src="https://www.google-analytics.com/collect?v=1&gtm=GTM-NKQ8TT7&tid=UA-163989034-1&cid=555&aip=1&t=event&ec=field_demos&ea=display&dp=%2F42_field_demos%2Fmedia%2Fproduct_recommender_stadium%2Fnotebook&dt=MEDIA_USE_CASE">
<!-- [metadata={"description":"Product recommendation for live events. This demo shows how to create a personalized discount for a fan based on their purchasing history and location of where they sit in a stadium to drive additional sales during the game and create a more individualized experience.",
 "authors":["dan.morris@databricks.com"],
  "db_resources":{},
  "search_tags":{"vertical": "media", "step": "Data Engineering", "components": ["sparkml", "mlflow", "als", "recommender"]},
                 "canonicalUrl": {"AWS": "", "Azure": "", "GCP": ""}}] -->

# 設定と準備

## ダミデータ作成のdbldatagenのインストール

In [0]:
%pip install git+https://github.com/databrickslabs/dbldatagen faker

## データの準備

以下リンクからデータのZipファイルをダウンロードして/FileStore/tmpの直下に手動でアップロードしてください。

https://github.com/yulan-yan/Product_recommendation_stadium/blob/main/stadium_recommender.zip

In [0]:
# 一回のみ実行
# !unzip /dbfs/FileStore/tmp/stadium_recommender.zip -d /dbfs/FileStore/tmp/

## パスなど変数の設定

In [0]:
pandas_data_path = "/dbfs/FileStore/tmp/stadium_recommender"
data_path = "/FileStore/tmp/stadium_recommender"
dbName = "yulanDB"
current_user = "Yulan"

## Databaseの準備

In [0]:
spark.sql("drop database if exists yulanDB CASCADE")
spark.sql("create database if not exists yulanDB")
spark.sql("use yulanDB")

# レコメンデーションシステムの構築のワークフロー

## Step 1： データを読込・加工したらraw tablesに書き込む

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-1.png" width="1000px">

### VendorとItemのリスト（stadium_vendors）

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-vendors.png" width="400px" style="float: right" />
 
PandasとSparkを使えば、これらのデータを簡単に取り込み、データ型を調整することができます。

In [0]:
print("Vendors Item type data generation...")
import pandas as pd
from pyspark.sql.functions import col

pandasDF = pd.read_csv(f"{pandas_data_path}/data/vendors.csv")
df_items = spark.createDataFrame(pandasDF, ['vendor_id', 'vendor_location_number', 'vendor_name', 'vendor_scope', 'section', 'item_id', 'item_type', 'item', 'price', 'error'])
for c in "vendor_id", "vendor_location_number", "section", "item_id", "price":
  df_items = df_items.withColumn(c, col(c).cast('int'))

spark.sql("drop table if exists stadium_vendors")
df_items.write.saveAsTable("stadium_vendors")

### チケットの売上データ（Tickets sales）

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-pricing.png" width="400px" style="float: right" />

チケットの売上データもraw tableに書き込みます。

In [0]:
print("Ticket sales data generation...")
import dbldatagen as dg
from pyspark.sql.types import *
from pyspark.sql import functions as F
from faker import Faker
fake = Faker()

fake_name = F.udf(fake.name)
fake_phone = F.udf(fake.phone_number)

data_rows = 70000
df_spec = (dg.DataGenerator(spark, name="ticket_sales", rows=data_rows, partitions=4)
                            .withIdOutput()
                            .withColumn("game_id", IntegerType(), values=['3'])
                            .withColumn("ticket_price", IntegerType(), expr="floor(rand() * 350)")
                            .withColumn("gate_entrance", StringType(), values=['a', 'b', 'c', 'd', 'e', ], random=True, weights=[9, 1, 1, 2, 2])
                            .withColumn("section_number", IntegerType(), minValue=100, maxValue=347, random=True)
                            .withColumn("row_number", IntegerType(), minValue=1, maxValue=35 , random=True)
                            .withColumn("seat_number", IntegerType(), minValue=1, maxValue=30, random=True)
                            .withColumn("ticket_type", StringType(), values=['single_game', 'packaged_game', 'season_holder'], random=True, weights=[7, 2, 1]))
df_tickets = df_spec.build().withColumnRenamed("id", "customer_id")
df_tickets = df_tickets.withColumn("customer_name", fake_name())
df_tickets = df_tickets.withColumn("phone_number", fake_phone())

spark.sql("drop table if exists ticket_sales")
df_tickets.write.saveAsTable("ticket_sales")

### イベントのスケジュール（Games）

試合のイベントデータも同じ方法で取り込みます。

In [0]:
print("Game data generation...")
from pyspark.sql.functions import col, to_date
spark.conf.set("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")

schemaDDL = "game_id int, date_time string, date string, location string, home_team string, away_team string, error string, game_date string"
df_games = spark.read.csv(f"{data_path}/data/games.csv", sep=',', schema=schemaDDL)
df_games = df_games.withColumn("game_date", to_date(col("game_date"),'yyyy-MM-dd'))

spark.sql("drop table if exists games")
df_games.write.saveAsTable("games")

### Points of Saleデータ

POSデータも取り込みます。

In [0]:
print("Point of sales generation...")
data_rows = 1000000
df_spec = (dg.DataGenerator(spark, name="point_of_sale", rows=data_rows, partitions=4)
                            .withIdOutput()
                            .withColumn("game", IntegerType(), minValue=1, maxValue=97 , random=True)
                            .withColumn("item_purchased", IntegerType(), minValue=1, maxValue=364, random=True)
                            .withColumn("customer", IntegerType(), minValue=1, maxValue=1000, random=True))
                            
df_pos = df_spec.build().withColumnRenamed("id", "order_id")

# tableにデータを書き込みます。
spark.sql("drop table if exists point_of_sale")
df_pos.write.saveAsTable("point_of_sale")

### 購入履歴(purchase_history)を取り込みます。

In [0]:
print("Purchase History to evaluate model efficiency after real game...")
import pandas as pd
pandasDF = pd.read_csv(f"{pandas_data_path}/data/purchase_history.csv")
df_spec = spark.createDataFrame(pandasDF, ['item_id', 'game_id', 'customer_id', 'recommended_item_purchased'])

spark.sql("drop table if exists purchase_history")
df_spec.write.saveAsTable("purchase_history")

## Step 2: Silver tableを作成する

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-2.png" width="1000px">

In [0]:
silver_sales_df = spark.sql('''
  SELECT * FROM ticket_sales t 
  JOIN point_of_sale p ON t.customer_id = p.customer
  JOIN stadium_vendors s ON p.item_purchased = s.item_id AND t.game_id = p.game;  
''')

spark.sql("drop table if exists silver_sales")
silver_sales_df.createTempView("silver_sales")
display(silver_sales_df)

customer_id,game_id,ticket_price,gate_entrance,section_number,row_number,seat_number,ticket_type,customer_name,phone_number,order_id,game,item_purchased,customer,vendor_id,vendor_location_number,vendor_name,vendor_scope,section,item_id,item_type,item,price,error
409,3,239,a,131,3,24,packaged_game,Patrick Stone,565-419-7838,23,3,190,409,17,8,Streats of Detroit,Food,223,190,Pretzels,Pretzel,8,
130,3,314,a,319,14,10,single_game,Joseph Miller,+1-403-618-2192x853,41,3,124,130,17,2,Streats of Detroit,Food,120,124,Pretzels,Pretzel,8,
255,3,263,a,156,33,5,season_holder,Michelle Powell,+1-616-413-6531x2595,58,3,175,255,17,7,Streats of Detroit,Food,217,175,Hot Dog,Value Meal,13,
962,3,146,a,283,25,7,single_game,Sarah Diaz,+1-046-711-5786x77573,63,3,24,962,2,3,Big Boy,Food,230,24,Fries,Fries,12,
177,3,96,a,326,31,21,single_game,Barbara Henderson,+1-906-673-0706x273,434,3,80,177,9,4,Hungry Howie's,Food,226,80,Alcohol,Domestic Beer,11,
833,3,178,a,225,3,20,single_game,Jessica Butler,(915)906-2476,458,3,313,833,23,1,Kickoff Classics,Food,208,313,Fries,Fries,12,
127,3,124,a,115,27,2,season_holder,Lisa Brown,(456)086-9595x356,538,3,189,127,17,8,Streats of Detroit,Food,223,189,Snack,Popcorn,7,
69,3,258,d,125,12,19,single_game,Christina Campos,490-904-7112,573,3,168,69,17,6,Streats of Detroit,Food,137,168,Pretzels,Pretzel,8,
474,3,73,d,216,29,12,season_holder,Jose Tapia,9544192028,696,3,249,474,19,2,Tailgate Grill,Food,135,249,Sandwich,Chicken Tenders with Fries,17,
189,3,201,d,237,18,2,single_game,Harry Petersen,527.704.8727x92145,707,3,133,189,17,3,Streats of Detroit,Food,124,133,Hot Dog,Hot Dog,13,


最も購入されたアイテムのトップ 10

In [0]:
spark.sql('''
 SELECT item_id, count(item_id) AS item_purchases FROM silver_sales GROUP BY item_id order by item_purchases desc limit 10
''').show()

## Step 3: リコメンデーション・モデルの構築

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-3.png" width="1000px">

<img src= https://databricks.com/wp-content/uploads/2020/04/databricks-adds-access-control-to-mlflow-model-registry_01.jpg width='500px' style='float: right'>

### MLFlowを用いたALSレコメンダーの構築
 

Mlflowは、モデル自身を含むすべての実験メトリクスの追跡に使用されています。
さらに、MLFlowを使って、レジストリにモデルをproductionとしてデプロイします。
デプロイができたら、このモデルを再利用して、最終的なレコメンデーションを取得し、パーソナルなオファーをプッシュ通知で送ることができるようになります。

In [0]:
import mlflow
from pyspark.ml.recommendation import ALS

with mlflow.start_run() as run:
  #MLFlowで実行のパラメータなどを自動的にログに記録します。
  mlflow.pyspark.ml.autolog()
  
  df = spark.sql("select customer_id, item_id, count(item_id) as item_purchases from silver_sales group by customer_id, item_id")
  # ALSを使用してトレーニングデータからレコメンデーションモデルを構築する。
  # コールドスタート戦略を'drop'に設定して、NaN評価メトリクスにならないようにすることに注意してください。
  # 評価マトリックスは別の情報源から導出されます (つまり、他の情報から推測されます)。より良い結果を得るために、implicitPrefs を true に設定します。
  als = ALS(rank=3, userCol="customer_id", itemCol="item_id", ratingCol="item_purchases", implicitPrefs=True, seed=0, coldStartStrategy="nan")
  
  num_cores = sc.defaultParallelism
  als.setNumBlocks(num_cores)
  
  model = als.fit(df)
  
  # モデルもログに記録します。
  mlflow.spark.log_model(model, "spark-model")
    
  #　別のセルからこの実行の他の数値を追加する必要があるため、実行IDを取得しましょう。
  run_id = run.info.run_id

モデルをMLflowのモデルレジストリに登録します。

In [0]:
model_registered = mlflow.register_model("runs:/"+run_id+"/spark-model", "Stadium_Recommendation")

登録したモデルをProductionステージにプロモートします。

In [0]:
client = mlflow.tracking.MlflowClient()
print("registering model version "+model_registered.version+" as production model")
client.transition_model_version_stage(name = "Stadium_Recommendation", version = model_registered.version, stage = "Production", archive_existing_versions=True)

## Step 4: モデルでレコメンデーションリストを作成する

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-4.png" width="1000px">

これでMLFlowのレジストリにモデルが登録されたので、本番のパイプラインで使い始めることができます。

あとはMLFlowからロードして推論を適用するだけです。

In [0]:
#                                                               　Stage/version
#                                        Model name                   |
#                                             |                       |
model = mlflow.spark.load_model(f'models:/Stadium_Recommendation/Production')

In [0]:
# 顧客毎の上位10件のレコメンデーションアイテムを取得する
recommendations = model.stages[0].recommendForAllUsers(10)

spark.sql("drop table if exists silver_recommendations")
recommendations.createOrReplaceTempView("silver_recommendations")
display(recommendations)

customer_id,recommendations
1,"List(List(176, 0.14427546), List(277, 0.13161671), List(202, 0.1279852), List(89, 0.10887258), List(134, 0.10677869), List(144, 0.103235595), List(225, 0.09793375), List(13, 0.09731455), List(298, 0.09656529), List(112, 0.09282864))"
2,"List(List(86, 0.15349907), List(9, 0.1406347), List(242, 0.1399593), List(170, 0.13806199), List(137, 0.13628921), List(94, 0.12949055), List(229, 0.12005211), List(348, 0.11618918), List(83, 0.114641495), List(168, 0.11294341))"
3,"List(List(277, 0.15080534), List(202, 0.14858834), List(176, 0.1423753), List(89, 0.13583069), List(134, 0.12896664), List(112, 0.12317881), List(13, 0.12196676), List(124, 0.12008062), List(347, 0.119291686), List(298, 0.11450268))"
4,"List(List(9, 0.08039679), List(94, 0.07752084), List(16, 0.07671808), List(105, 0.0750363), List(358, 0.07308273), List(86, 0.07221078), List(175, 0.07065378), List(344, 0.06994742), List(171, 0.068965204), List(210, 0.066962376))"
5,"List(List(86, 0.16690658), List(242, 0.16258657), List(170, 0.16127582), List(137, 0.14963375), List(225, 0.14956415), List(25, 0.14404398), List(254, 0.13983487), List(93, 0.13295387), List(9, 0.13239591), List(229, 0.12969957))"
6,"List(List(86, 0.09751831), List(9, 0.0972947), List(94, 0.091394424), List(137, 0.08732138), List(170, 0.08711954), List(242, 0.0861789), List(348, 0.078760624), List(358, 0.07820542), List(33, 0.07675724), List(83, 0.07512931))"
7,"List(List(124, 0.24629103), List(202, 0.17663442), List(89, 0.17509808), List(277, 0.16421854), List(321, 0.16207178), List(199, 0.16136813), List(112, 0.16108565), List(179, 0.15421654), List(239, 0.14695744), List(305, 0.14580926))"
8,"List(List(124, 0.40366957), List(202, 0.33595598), List(89, 0.32072398), List(277, 0.31988606), List(112, 0.28936127), List(321, 0.28319845), List(347, 0.26163477), List(199, 0.25940818), List(13, 0.2579494), List(176, 0.25615487))"
9,"List(List(86, 0.23330273), List(9, 0.209794), List(242, 0.20734237), List(137, 0.20002082), List(229, 0.19354671), List(170, 0.19066472), List(348, 0.17991853), List(83, 0.17839867), List(94, 0.17730801), List(25, 0.15967326))"
10,"List(List(124, 0.15485469), List(179, 0.12603055), List(275, 0.12016465), List(244, 0.11577828), List(199, 0.115241766), List(354, 0.11479628), List(343, 0.11232121), List(31, 0.111399874), List(333, 0.11083978), List(45, 0.11025844))"


## Step 5: レコメンデーション結果のフィルタリングと再ランク付け


顧客毎のレコメンドを展開し、アイテムや顧客の詳細を取得します。

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-5.png" width="1000px">

In [0]:
gold_recommendations_df = spark.sql('''
SELECT ci.*, vendor_id, vendor_location_number, vendor_name, vendor_scope, section, item_type, item, price, error
FROM stadium_vendors v
JOIN (SELECT customer_id, items.*
FROM   (SELECT customer_id, explode(recommendations) AS items
FROM   silver_recommendations) a) ci
ON v.item_id = ci.item_id
''') 

spark.sql("drop table if exists gold_recommendations")
gold_recommendations_df.createTempView("gold_recommendations")
spark.sql('select * from gold_recommendations').display()

customer_id,item_id,rating,vendor_id,vendor_location_number,vendor_name,vendor_scope,section,item_type,item,price,error
1,176,0.14427546,17,7,Streats of Detroit,Food,217,Nachos,Nacho Grande,14,
1,277,0.13161671,19,5,Tailgate Grill,Food,242,Non-Alcoholic,Regular or Large Soda,10,
1,202,0.1279852,17,9,Streats of Detroit,Food,227,Snack,Peanuts,8,
1,89,0.10887258,11,2,Lefty's Cheesesteaks,Food,135,Sandwich,Lefty's Cheesteak,16,
1,134,0.10677869,17,3,Streats of Detroit,Food,124,Snack,Popcorn,7,
1,144,0.103235595,17,4,Streats of Detroit,Food,126,Hot Dog,Hot Dog,13,
1,225,0.09793375,17,11,Streats of Detroit,Food,229,Snack,Cheese Cups,10,
1,13,0.09731455,2,2,Big Boy,Food,127,Sandwich,Slim Jim,14,
1,298,0.09656529,22,1,Detroit Bloody Mary Bar,Bar,210,Non-Alcoholic,Regular or Large Soda,10,
1,112,0.09282864,17,1,Streats of Detroit,Food,109,Snack,Popcorn,8,


### 顧客毎のベストアイテムを取得します。

アイテムのパーソナライゼドレコメンドを行う時に、近い距離で購入できるアイテムも探したいです。そのために、お客様の席からお店の場所までの距離を計算します。

In [0]:
sections_recommendations_df = spark.sql("""
SELECT r.customer_id, item_id, rating, section, section_number, abs(section-section_number) as distance
FROM gold_recommendations r
JOIN ticket_sales s on s.customer_id = r.customer_id
""")

spark.sql("drop table if exists sections_recommendations")
sections_recommendations_df.createTempView("sections_recommendations")       
spark.sql("select * from sections_recommendations").display()

customer_id,item_id,rating,section,section_number,distance
1,176,0.14427546,217,127,90
1,277,0.13161671,242,127,115
1,202,0.1279852,227,127,100
1,89,0.10887258,135,127,8
1,134,0.10677869,124,127,3
1,144,0.103235595,126,127,1
1,225,0.09793375,229,127,102
1,13,0.09731455,127,127,0
1,298,0.09656529,210,127,83
1,112,0.09282864,109,127,18


In [0]:
final_recommendations_df = spark.sql("""
SELECT * FROM (
  SELECT *, RANK() OVER (PARTITION BY customer_id ORDER BY distance ASC) AS rnk FROM sections_recommendations
  ) WHERE rnk = 1
""")

spark.sql("drop table if exists final_recommendations")
final_recommendations_df.createTempView("final_recommendations")
spark.sql("SELECT * FROM final_recommendations").show()

## Step 6: ファンにキャンペーンのオファーの送信

これで、全顧客に送るべきアイテムが揃いました！通知をテストしてみましょう。

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-6.png" width="1000px">

In [0]:
# import IPython
# from IPython.display import display, HTML

def send_push_notification(title, subject, phone_number):
  #ノートブックを実行しているユーザーでメッセージを調整してください。リアルでは、顧客の詳細を含むテーブルから情報を取るべきです。
  first_name = "Yulan"
  import re
  subject = re.sub("\n", "<br/>", subject) 
  subject = re.sub("こんにちは！([0-9]*)", "こんにちは！"+first_name, subject) 
  subject = re.sub("大好きな([0-9]*)", "大好きなピザ", subject) 
  subject = re.sub("([0-9]*)で販売されている", "Goal Post Grill & Pizzeriaで販売されている", subject) 


  displayHTML(f"""<div style="border-radius: 10px; background-color: #adeaff; padding: 10px; width: 400px; box-shadow: 2px 2px 2px #F7f7f7; margin-bottom: 3px">
        <div style="padding-bottom: 5px"><img style="width:20px; margin-bottom: -3px" src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/bell.png"/> <strong>{title}</strong></div>
        {subject}
        </div>""")

In [0]:
recommendation = spark.sql("select * from final_recommendations where customer_id = 1").collect()[0]

title = "あなたにお得なキャンペーンオファー！"
subject= f"こんにちは！{recommendation['customer_id']}さん! 大好きな{recommendation['item_id']}を特別価格でご提供いたします。 {recommendation['section']}で販売されているので、チェックしてみてくださいね!" 

user_cell_phone = "<Put your real cell phone here to test the notification!>"
 
send_push_notification(title, subject, user_cell_phone)

## Step 7: イベントキャンペーンとメトリクスのトラッキング

イベント中は、すべての指標をリアルタイムで取得し、イベントのKPIをモニターするダッシュボードを構築することができます。
 
ゲーム終了後、プロモーションの成功率を確認し、次のゲームに必要な調整を行うこともできます。

<img src="https://github.com/QuentinAmbard/databricks-demo/raw/main/media/resources/images/product_reco_stadium-7.png" width="1000px">

In [0]:

## visualisation: Bar, Series groupings: item_purchased, Values: count, Aggregation: SUM

visual_df = spark.sql("""
select count(*) as count, item_purchased from (
  SELECT *, CASE WHEN recommended_item_purchased = 1 THEN 'Yes' ELSE 'No' END as item_purchased
  FROM final_recommendations r
  LEFT JOIN ticket_sales s USING(customer_id)
  LEFT JOIN purchase_history p USING(game_id, customer_id)) group by item_purchased
""")
visual_df.show()