# 仮説設定をしてKPIを定めてみよう

データとして出てくるのが以下のようなデータです。

- check：決済した
- id: ユーザID
- money:いくら使った

```
+-------+---+-----+
|actions| id|money|
+-------+---+-----+
|  check|  2|  200|
|  check|  1| 1000|
+-------+---+-----+

```

例えば、
現状は、お金を使ってくれるユーザの売上（実際だと直近1週間などで計算することが多い）が5000円以上のユーザをロイヤルユーザとして  
そこに分類する人たちがより多くのおかねを使ってくれるのではないか？といった施作が考えられる。

そこで、お金をたくさん使ってくれるユーザ(ロイヤルユーザ)に対して、Aの広告を出力する  
お金をあまり使ってくれないユーザに対して、Bの広告を出力する
クーポンを配布するなどでもOK。

今回は、簡単のために
モデルの成功としては、現状の全体の売上1200円(KPI)でそれが１０％上がれば成功といえる(ことにする)。
（経費は分かりやすさのため0円）

# 今回の環境についての説明 
環境説明のためのコマンド群

## 環境の立ち上げ

```
docker-compose up -d
```

## dockerコンテナへログイン

```
docker exec -it pyspark_mlops /bin/bash
```

## pyspark-topicの作成コマンド

```
/home/pyspark/kafka/bin/kafka-topics.sh \
    --create --topic pyspark-topic \
    --replication-factor 1 \
    --partitions 1 \
    --bootstrap-server kafka_mlops:9092 
```

## Pyspakrでストリーミングデータを読み取る

### 接続のためのコマンド

```
pyspark --packages org.apache.spark:spark-streaming_2.13:3.2.4,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.4,org.apache.spark:spark-avro_2.12:3.2.4
```

### ストリーミングの準備

```
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka_mlops:9092") \
  .option("subscribe", "pyspark-topic") \
  .load()
```

### ストリーミングデータの読み込み

```
file_stream = df \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("parquet") \
  .option("path", "/tmp/share_file/datalake/web_actions/") \
  .outputMode("append") \
  .partitionBy("key") \
  .trigger(processingTime="5 seconds") \
  .option("checkpointLocation", "/tmp/kafka/parquet/") \
  .start()
```

### 停止するときはこちら

```
file_stream.stop()
```

### データの確認を行う
http://localhost:3001/done/?id=1
df = spark.read.parquet("/tmp/share_file/datalake/web_actions/")

# データの取得と蓄積
末尾のIDやURLを変えながら、何回かデータを送ってみます。  
IDは最低でも3人以上用意してみましょう。

### 買い上げ完了画面
http://localhost:3001/done/?id=1

## データの読み込みと書き込みでデータウェアハウスにデータを保存してきましょう

```
df=spark.read.parquet("/tmp/share_file/datalake/web_actions")
```

### unable to infer schemaが出てしまった場合

以下のようにスキーマ設定をしてみてください。

```
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col

struct = StructType([
    StructField("value", StringType(), False),
    StructField("key", StringType(), False)
])

df=spark.read.parquet("/tmp/share_file/datalake/web_actions", inferSchema=False, schema=struct)

```

### jsonをバラバラにして扱いやすくする

```
df.createOrReplaceTempView("web_actions")
result_df=spark.sql("select key,id,money,action,sendtime from web_actions LATERAL VIEW json_tuple(value,'id','money','action','sendtime') user as id, money, action, sendtime")
```

### ファイルをparquetで吐き出す

```
result_df.coalesce(1).write.mode('overwrite').parquet("/tmp/share_file/datamart/web_actions/")
```

# データを読み込んでみる

```
df2 = spark.read.parquet("/tmp/share_file/datamart/web_actions/")
```

# データのテストとSparkの使い方速習

データのテストは、自分自身が思い描いた形にデータがなっているかを確認すること。  
データは多くの人に使われることを想定するため、汎用的な状態で保存されていることが多い。  
自身が実現したいデータの状態に変更していくことが大事。

また、思いがけず変なデータ（例えば、売上のデータなのになぜかマイナスになっているなど）があることがあるのでそのようなデータは除外したりすることで
機械学習のモデルの担保を行うことが多い。

## データインテグレティ
データのテストをする際に確認すると良いポイントとしてデータインテグレティが存在する。  
いくつか指標があるが、重要なポイントをいくつかピックアップ。

- 完全性 -> 活動に関する全ての情報が記録されているか？
- 判読性 -> 人間が理解できる？プログラムとして定義できる？

これらを確認するチェックを行うと良い。

実はデータを活用するプロジェクトでは、80%以上の時間をこのテストをする時間に割いていると言われています。

### 完全性のチェックの例
データとして本来であればlogin -> check_cartの2種類が必ず存在しているはず。  
仮にloginがなくてcheckだけであればなにか不正なデータの可能性やシステムの不具合などの状態が考えらるため  
計算から除外するなどの対応を検討しないといけない。

ログインURL
http://localhost:3001/?id=1

```
import pyspark.sql.functions as F

df3=df2.groupby('id').agg(F.collect_list('action').alias('arrayed'))
df3.withColumn('check_completion', F.when(F.array_contains(F.col('arrayed'),'check_cart'),F.array_contains(F.col('arrayed'),'login'))).show()
```

### 判読性のチェックの例
例えば、売上のデータにマイナスがなどがあると人間が理解できない状態になる。  
「なんで、マイナス？」「売上といいつつ、払い戻しもある？」などの考えが巡ってしまう。  
データを正しく使うためにデータを正しく理解する必要がある。

sparkプログラム

```
df2.withColumn("check_money", F.col("money") > 0).groupby("check_money").count().show()
```

# SparkMLで簡単なモデルを作成してみましょう

今回はKmeansを使って、データを2つに分類していきます。

分類に利用するデータは、売上で売上が高い人と低い人で分類を行なっていきます。

lectureフォルダに配置された「KMeans.py」をみながら話を進めていきましょう。


# モデルの結果をデータベースに保存してみよう

いよいよモデルの結果をデプロイしていきます。  
今回はモデルの結果をmongodbに格納してNodeJs上のアプリケーションから利用していきます。

## mongodbへの保存を行なっていきます

```
pyspark --packages org.mongodb.spark:mongo-spark-connector_2.12:3.0.2 \
        --conf spark.mongodb.input.uri=mongodb://action:pass123@mongo_data_mlops:27017/user_prediction \
        --conf spark.mongodb.output.uri=mongodb://action:pass123@mongo_data_mlops:27017/user_prediction
```

## parquetの読み込み

```
df = spark.read.parquet("/tmp/share_file/datamodel/part1")
```

## mongodbへの書き込みを行う

```
df.repartition(1).write \
    .format('com.mongodb.spark.sql.DefaultSource') \
    .option( "uri", "mongodb://action:pass123@mongo_data_mlops:27017/user_prediction.prediction") \
    .save()
```

## mongodbのデータを確認してみよう
書き込んだデータを確認してみましょう。

### 対象のコンテナに接続

```
docker exec -it mongo_data_mlops /bin/bash
```

### mongodbに接続を行う
mongo -u action -p pass123 user_prediction

### 簡単にいくつか検索してみます
predictionはテーブル名

```
db.prediction.find()
db.prediction.find({id:1})
db.prediction.find({id:1},{prediction:1, _id:0})
```

In [None]:
# テストの解答

df2.withColumn("key_check", F.when(F.to_date(F.col("key"),"yyyy-mm-dd").isNotNull(), True))