# cleanup

In [0]:
# %sql
# drop table vr_demo.fraud2.visits;
# drop table vr_demo.fraud2.customers;
# drop table vr_demo.fraud2.locations;

# visits

In [0]:
from pyspark.sql import functions as F

df_fraud_abt = spark.table("vr_demo.fraud.visits")
fraud_count = df_fraud_abt.filter(F.col('fraud_report') == 'Y').count()
non_fraud_count = df_fraud_abt.filter(F.col('fraud_report') == 'N').count()

fraud_sample_size = int(10000 * (fraud_count / (fraud_count + non_fraud_count)))
non_fraud_sample_size = 10000 - fraud_sample_size

fraud_sample = df_fraud_abt.filter(F.col('fraud_report') == 'Y').sample(False, fraud_sample_size / fraud_count)
non_fraud_sample = df_fraud_abt.filter(F.col('fraud_report') == 'N').sample(False, non_fraud_sample_size / non_fraud_count)

balanced_sample = fraud_sample.union(non_fraud_sample)
balanced_sample.write.mode('overwrite').saveAsTable("vr_demo.fraud2.visits")

In [0]:
%sql
select count(*) from vr_demo.fraud2.visits

In [0]:
%sql
select count(distinct visit_id) from vr_demo.fraud2.visits

In [0]:
%sql
create or replace table vr_demo.fraud2.visits as
select
  * except (fraud_report),
  case when fraud_report = 'Y' then 1 else 0 end as fraud_report
from vr_demo.fraud2.visits

# customers

In [0]:
spark.table('vr_demo.fraud.customers').dropDuplicates().createOrReplaceTempView('customers_dedup')

In [0]:
%sql
select count(*) from customers_dedup

In [0]:
%sql
select count(distinct customer_id) from customers_dedup

In [0]:
%sql
create table vr_demo.fraud2.customers as
select c.* from customers_dedup c
inner join (select distinct customer_id from vr_demo.fraud2.visits) v
on c.customer_id = v.customer_id

In [0]:
%sql
select count(*) from vr_demo.fraud2.customers

In [0]:
%sql
select count(distinct customer_id) from vr_demo.fraud2.customers

# locations

In [0]:
spark.table('vr_demo.fraud.locations').dropDuplicates().createOrReplaceTempView('locations_dedup')

In [0]:
%sql
select count(*) from locations_dedup

In [0]:
%sql
select count(distinct atm_id) from locations_dedup

In [0]:
%sql
create table vr_demo.fraud2.locations as
select l.* from locations_dedup l
inner join (select distinct atm_id from vr_demo.fraud2.visits) v
on l.atm_id = v.atm_id

In [0]:
%sql
select count(*) from vr_demo.fraud2.locations

In [0]:
%sql
select count(distinct atm_id) from vr_demo.fraud2.locations

# inputs

In [0]:
df1 = spark.table('vr_demo.fraud2.fraud_abt').where('fraud_report=1').drop('fraud_report').limit(20)
df0 = spark.table('vr_demo.fraud2.fraud_abt').where('fraud_report=0').drop('fraud_report').limit(980)
df = df1.union(df0)
df.write.mode('overwrite').saveAsTable("vr_demo.fraud2.fraud_abt_inputs")

# predictions

In [0]:
df = spark.table('fraud_abt')

import mlflow
from pyspark.sql.functions import struct, col, explode, current_timestamp, lit
logged_model = 'runs:/6d343fd67795470bac54b96831d2d506/model'

# Load model as a Spark UDF. Override result_type if the model does not return double values.
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model)

# Predict on a Spark DataFrame.
pred = (df
  .withColumn('predictions', explode(loaded_model(struct(*map(col, df.columns)))))
  .withColumn('ts', current_timestamp())
  .withColumn('model_name', lit('Fraud Prevention Model'))
)

pred.write.mode('overwrite').option('mergeSchema', 'true').saveAsTable('fraud_predictions')

# sales_monthly

In [0]:
spark.table("vr_demo.crisp.sales_monthly").where("""
  (product_id=820558267321802768 AND store_id=1186089951684251282) OR
  (product_id=181643065667445703 AND store_id=2282245423997110910) OR
  (product_id=5655945986815814052 AND store_id=4328351085320015674) OR
  (product_id=181643065667445703 AND store_id=5029475122466982522) OR
  (product_id=181643065667445703 AND store_id=749605886145008442) OR
  (product_id=820558267321802768 AND store_id=1726271638195919732) OR
  (product_id=820558267321802768 AND store_id=1922502944801829527) OR
  (product_id=181643065667445703 AND store_id=6732994364523568257) OR
  (product_id=820558267321802768 AND store_id=806622226360471425) OR
  (product_id=181643065667445703 AND store_id=2391965958404173729)
""").write.saveAsTable("vr_demo.academy.sales_monthly")

# export

In [0]:
spark.table('vr_demo.academy.visits').repartition(1).write.format('parquet').mode('overwrite').save('/Volumes/vr_demo/academy/export/visits')

In [0]:
spark.table('vr_demo.academy.customers').repartition(1).write.format('parquet').mode('overwrite').save('/Volumes/vr_demo/academy/export/customers')

In [0]:
spark.table('vr_demo.academy.locations').repartition(1).write.format('parquet').mode('overwrite').save('/Volumes/vr_demo/academy/export/locations')

In [0]:
spark.table('vr_demo.academy.fraud_abt').repartition(1).write.format('parquet').mode('overwrite').save('/Volumes/vr_demo/academy/export/fraud_abt')

In [0]:
spark.table('vr_demo.academy.fraud_abt_inputs').repartition(1).write.format('parquet').mode('overwrite').save('/Volumes/vr_demo/academy/export/fraud_abt_inputs')

In [0]:
spark.table('vr_demo.academy.sales_monthly').repartition(1).write.format('parquet').mode('overwrite').save('/Volumes/vr_demo/academy/export/sales_monthly')