# StyleMeUp - Fraud Detection in Online Retail

### Problem Description:

A global retailer 'StyleMeUp' has been experiencing transaction fraud. To reduce costs related to fraudulent transactions, StyleMeUp wants to implement a fraud detection solution that leverages machine learning.

This demo shocases how Data Engineering and Data Science teams at StyleMeUp can use familiar programming concepts and APIs, and a rich ecosystem of open source packages provided by Snowpark for Python to collaborate and build this solution.

### Data Engineering Notebook

As a data engineer we have been tasked to not only load the orders and details of customer transactions but also help data scientist to quickly identify if the transaction could be fraud. In order to do that we will analyze the origin ip address of the transaction and build features using a third party and second party data sets right from Snowflake marketplace and data exchange

We will use the built in functions, Python Snowpark API and UDF's to create enriched data and features.

**Lets start by writing some helper functions that we will use later**

we need helper fuctions to make our life easy in the data pipeline. It will help, when we join the orders data with IpInfo data for identifying features.

In [None]:
def to_join_key_func(df, col):
    return df.join_key

def builtin(function_name):
    import snowflake.snowpark.functions as sf
    if function_name == 'to_join_key':
        return to_join_key_func
    return sf.builtin(function_name)

In [None]:
def enrich_with_geocoordinates(df):
    
    location_df = session.table('ipinfo.public.location')
    orders_ip_location_df = df.join(location_df, to_join_key(df, 'ip_address') == location_df.join_key) \
        .where(parse_ip(df.ip_address, 'inet')['ipv4'].between(location_df.start_ip_int, location_df.end_ip_int)) \
        .select('trnx_id', 'ip_address', location_df.lat.alias('ip_order_loc_lat'), location_df.lng.alias('ip_order_loc_lng')) \

    orders_shipping_location_all_locations_df = df.join(location_df, to_join_key(df, 'ip_address') == location_df.join_key) \
        .filter(df.shipping_zipcode == location_df.postal)  \
        .select('trnx_id', 'ip_address', location_df.lat.alias('shipping_lat'), location_df.lng.alias('shipping_lng')) \

    orders_shipping_location_avg_lat_df = orders_shipping_location_all_locations_df \
        .groupBy(['trnx_id', 'ip_address']).agg(avg(col('shipping_lat')).alias('shipping_lat'))
    
    orders_shipping_location_avg_lng_df = orders_shipping_location_all_locations_df \
    .groupBy(['trnx_id', 'ip_address']).agg(avg(col('shipping_lng')).alias('shipping_lng'))

    orders_shipping_location_df = orders_shipping_location_avg_lat_df \
        .join(orders_shipping_location_avg_lng_df, ['ip_address', 'trnx_id'])
    
    orders_location_df = df \
        .select('trnx_id', 'ip_address', 'shipping_zipcode' ) \
        .join(orders_ip_location_df, [ 'trnx_id', 'ip_address']) \
        .join(orders_shipping_location_df, [ 'trnx_id', 'ip_address'])
 
    return orders_location_df

## Loading Libreries

In [None]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf, avg, col
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType
import pandas as pd
import sys
sys.path.append('..')
from utilities.creds import Credentials

In [None]:
from snowflake.snowpark import version
print(version.VERSION)

## Connect to Snowflake

In [None]:
session = Session.builder.configs(Credentials().__dict__).create()
print(session.sql('USE WAREHOUSE LEARNINGSNOWPARKVW').collect())
print(session.sql('USE DATABASE LEARNINGSNOWPARKDB').collect())
print(session.sql('USE SCHEMA frauddemo').collect())
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

In [None]:
orders_df = session.table('orders')
orders_df.limit(10).toPandas()

In [None]:
order_details_df = session.table('order_details')
order_details_df.limit(10).toPandas()

**Getting the average price per item**

In [None]:
avg_price_df = orders_df.join(order_details_df, 'trnx_id') \
                        .groupBy(orders_df.trnx_id) \
                        .agg(avg(order_details_df.price).alias('avg_price_per_item')) 

avg_price_df.limit(10).toPandas()

## Enrich data with IPInfo Privacy dataset to determine if IP is masked

## LET'S ADD SOME DATA FROM THE MARKET PLACE

In [None]:
privacy_df = session.table('ipinfo.public.privacy')
parse_ip = builtin("parse_ip")
to_join_key = builtin("to_join_key")

orders_masked_df = orders_df \
    .join(privacy_df, to_join_key(orders_df, 'ip_address') == privacy_df.join_key) \
    .where(parse_ip(orders_df.ip_address, 'inet')['ipv4'].between(privacy_df.start_ip_int, privacy_df.end_ip_int)) \
    .select('trnx_id', 'ip_address', (privacy_df.proxy | privacy_df.tor | privacy_df.vpn).alias('is_masked'))  
 
#orders_masked_df.schema   
#orders_masked_df.collect()
orders_masked_df.sample(n=10).toPandas()

**Enrich data with IPInfo Location dataset to get geo-coordinates**

In [None]:
loc_df = enrich_with_geocoordinates(orders_df)
loc_df.sample(n=10).toPandas()

**Calculate distance between order IP and shipping locations using Snowflake's built in Geography functions**

In [None]:
%%time
import snowflake.snowpark.functions as F

session.sql("alter session set geography_output_format='WKT'").collect()


distance_df = loc_df.select(loc_df.trnx_id, loc_df.ip_address, loc_df.shipping_zipcode, \
                        F.call_builtin("st_makepoint",loc_df.IP_ORDER_LOC_LNG,loc_df.IP_ORDER_LOC_LAT).alias('ipinfo_point') \
                       ,F.call_builtin("st_makepoint",loc_df.SHIPPING_LNG,loc_df.SHIPPING_LAT).alias('shipping_point') \
                       ,F.call_builtin("st_distance",col("ipinfo_point"),col("shipping_point")).alias("ip_to_shipping_distance") \
                       ,(col("ip_to_shipping_distance")/1609).alias("distance_in_miles") )
#distance_df.sample(n=10).toPandas()

**Write enriched data back to a new Snowflake table**

In [None]:
%%time
orders_merged_df = orders_df.join(orders_masked_df, ['trnx_id', 'ip_address'], 'left_outer') \
    .join(loc_df,['trnx_id', 'ip_address', 'shipping_zipcode'],  'left_outer') \
    .join(distance_df,['trnx_id', 'ip_address', 'shipping_zipcode'], 'left_outer') \
    .join(avg_price_df,'trnx_id', 'left_outer') \
    .write.mode('overwrite').saveAsTable('enriched_data')

In [None]:
enr_df = session.table('enriched_data').sample(n = 20000)
enr_df.sample(n=10).toPandas()

In [None]:
enr_df.write.mode('overwrite').saveAsTable('new_transaction_data')

## Closing Connection

In [None]:
session.close()