#### Demo prep

In [3]:
session.sql('alter session set qa_mode=true').collect()
session.sql('alter session set enable_python_udf=true').collect()
session.sql('alter session set USE_ANACONDA_PYTHON_RUNTIME_IN_REMOTE_DEPLOYMENT = true').collect()
session.sql('alter session set ENABLE_PYTHON_UDF_INFO_SCHEMA_SHOW_AVAILABLE_PACKAGES = true').collect()
session.sql('alter session set PYTHON_UDF_PRPR_EXPOSE_PACKAGES = true').collect()
session.sql('alter session set ENABLE_DOP_DOWNGRADE = false').collect()
session.sql('alter session set ENABLE_PTRACE_SUPERVISOR_REPORT = false').collect()

[Row(status='Statement executed successfully.')]

In [4]:
def to_join_key_func(df, col):
    return df.join_key

def builtin(function_name):
    import snowflake.snowpark.functions as sf
    if function_name == 'to_join_key':
        return to_join_key_func
    return sf.builtin(function_name)

In [12]:
def enrich_with_geocoordinates(df):
    
    location_df = session.table('ipinfo.public.location')
    orders_ip_location_df = df.join(location_df, to_join_key(df, 'ip_address') == location_df.join_key) \
        .where(parse_ip(df.ip_address, 'inet')['ipv4'].between(location_df.start_ip_int, location_df.end_ip_int)) \
        .select('trnx_id', 'ip_address', location_df.lat.alias('ip_order_loc_lat'), location_df.lng.alias('ip_order_loc_lng')) \

    orders_shipping_location_all_locations_df = df.join(location_df, to_join_key(df, 'ip_address') == location_df.join_key) \
        .filter(df.shipping_zipcode == location_df.postal)  \
        .select('trnx_id', 'ip_address', location_df.lat.alias('shipping_lat'), location_df.lng.alias('shipping_lng')) \

    orders_shipping_location_avg_lat_df = orders_shipping_location_all_locations_df \
        .groupBy(['trnx_id', 'ip_address']).agg(avg(col('shipping_lat')).alias('shipping_lat'))
    
    orders_shipping_location_avg_lng_df = orders_shipping_location_all_locations_df \
    .groupBy(['trnx_id', 'ip_address']).agg(avg(col('shipping_lng')).alias('shipping_lng'))

    orders_shipping_location_df = orders_shipping_location_avg_lat_df \
        .join(orders_shipping_location_avg_lng_df, ['ip_address', 'trnx_id'])
    
    orders_location_df = df \
        .select('trnx_id', 'ip_address', 'shipping_zipcode' ) \
        .join(orders_ip_location_df, [ 'trnx_id', 'ip_address']) \
        .join(orders_shipping_location_df, [ 'trnx_id', 'ip_address'])
 
    return orders_location_df

## StyleMeUp - Fraud Detection in Online Retail 
### Data Engineering Notebook

In [5]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf, avg, col
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType
from config import snowflake_connection_params

#### Connect to Snowflake

In [6]:
session = Session.builder.configs(snowflake_connection_params).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

[Row(CURRENT_WAREHOUSE()='PYTHON', CURRENT_DATABASE()='RETAIL', CURRENT_SCHEMA()='TRANSACTIONS')]


#### Create dataframes for Snowflake tables

In [7]:
orders_df = session.table('orders')
orders_df.limit(10).toPandas()

Unnamed: 0,ISFRAUD,TRNX_ID,IP_ADDRESS,CITY,SHIPPING_ZIPCODE,SHIPPING_STATE,PAYMENT_NETWORK,PAYMENT_TYPE,TOTAL_TRNX_AMOUNT,JOIN_KEY
0,0,7FO7L4A70T,103.55.45.248,Bellevue,98006,WA,Mastercard,Credit,388.3,1731657728
1,0,44ODC7GN7C,104.128.113.128,Los Angeles,90009,CA,Diners Club,Credit,261.06,1753219072
2,1,TVMVR5R155,104.149.135.14,Los Angeles,90009,CA,Other,Credit,788.98,1754595328
3,0,3C7KYHMGVV,104.151.240.51,Dearing,67340,KS,Amex,Credit,300.62,1754726400
4,1,MGEJT4Z1HR,104.156.237.244,Dallas,75270,TX,Amex,Credit,445.21,1755054080
5,1,XFXR76NSUK,104.168.23.0,Los Angeles,90009,CA,Mastercard,Other,189.78,1755840512
6,0,5JAWJ7FF7I,104.169.163.107,Monroe,28111,NC,Visa,Debit,164.78,1755906048
7,1,FG24GVTMP7,104.219.251.112,Phoenix,85001,AZ,Visa,Credit,65.72,1759182848
8,0,IFNDU4TGSB,104.238.156.136,Dearing,67340,KS,Visa,Debit,286.49,1760428032
9,0,6LQ7N7T6Z5,104.245.239.0,Los Angeles,90009,CA,Visa,Credit,61.37,1760886784


In [8]:
order_details_df = session.table('order_details')
order_details_df.limit(10).toPandas()

Unnamed: 0,TRNX_ID,ITEM,PRICE,QTY
0,00053C1FN7,GREEN GLASS TASSLE BAG CHARM,12.74,1
1,00053C1FN7,BLUE TILED TRAY,51.15,1
2,00053C1FN7,WOODLAND SMALL PINK FELT HEART,5.32,3
3,00053C1FN7,BATHROOM SET LOVE HEART DESIGN,19.74,3
4,00E73ZUAPA,PINK MARSHMALLOW SCARF KNITTING KIT,78.58,4
5,00E73ZUAPA,POPART RECT PENCIL SHARPENER ASST,20.24,5
6,00E73ZUAPA,EGG FRYING PAN IVORY,70.21,3
7,00EECSWKSP,TWO DOOR CURIO CABINET,40.53,1
8,00EECSWKSP,OLD DOC RUSSEL METAL SIGN,64.54,1
9,00FGRKWYL4,BISCUITS SMALL BOWL LIGHT BLUE,57.41,4


#### Aggregate avg_price_per_item feature

In [9]:
avg_price_df = orders_df.join(order_details_df, 'trnx_id') \
                        .groupBy(orders_df.trnx_id) \
                        .agg(avg(order_details_df.price).alias('avg_price_per_item')) 

avg_price_df.limit(10).toPandas()

Unnamed: 0,TRNX_ID,AVG_PRICE_PER_ITEM
0,2YRLZS2H8M,59.42
1,3EK5L3NENK,66.17
2,GEL8S8W5E2,46.8225
3,7FO7L4A70T,33.983333
4,T3B1G13OFG,44.262
5,F79FM2UKUJ,35.8575
6,KZ6FWOZPK1,37.135
7,3VR56TEQSU,64.8825
8,FWALW2GIAS,35.503333
9,LKUDIUAX3M,57.825


#### Enrich data with IPInfo Privacy dataset to determine if IP is masked

In [10]:
privacy_df = session.table('ipinfo.public.privacy')
parse_ip = builtin("parse_ip")
to_join_key = builtin("to_join_key")

orders_masked_df = orders_df \
    .join(privacy_df, to_join_key(orders_df, 'ip_address') == privacy_df.join_key) \
    .where(parse_ip(orders_df.ip_address, 'inet')['ipv4'].between(privacy_df.start_ip_int, privacy_df.end_ip_int)) \
    .select('trnx_id', 'ip_address', (privacy_df.proxy | privacy_df.tor | privacy_df.vpn).alias('is_masked'))  
    
orders_masked_df.limit(10).toPandas()

Unnamed: 0,TRNX_ID,IP_ADDRESS,IS_MASKED
0,K0YIM7P5XD,85.221.49.0,True
1,5ZMAV5ZVY6,85.222.133.0,
2,IK7461NDQ1,100.36.226.0,True
3,N0N0MGB13V,102.135.146.41,True
4,Z6F7EFGQMA,49.232.150.80,
5,3KMUCFDH8V,89.33.166.0,True
6,5SXVDXA6TB,50.116.103.199,
7,M1RNRLFXNQ,50.116.69.134,
8,ORY1E9T98Z,50.116.82.112,
9,REDD5XCLQT,90.86.128.148,True


#### Enrich data with IPInfo Location dataset to get geo-coordinates

In [13]:
loc_df = enrich_with_geocoordinates(orders_df)
loc_df.limit(10).toPandas()

Unnamed: 0,TRNX_ID,IP_ADDRESS,SHIPPING_ZIPCODE,IP_ORDER_LOC_LAT,IP_ORDER_LOC_LNG,SHIPPING_LAT,SHIPPING_LNG
0,1OVCK1KK7U,75.144.208.56,33102,25.77427,-80.19366,25.77427,-80.19366
1,IX8ZKDWQQR,74.2.16.0,95103,37.33939,-121.89496,37.33939,-121.89496
2,3AH7NGSYLY,209.160.114.244,60666,41.85003,-87.65005,41.85003,-87.65005
3,DZ5W029283,75.144.86.64,49455,42.67087,-83.03298,42.67087,-83.03298
4,EO87GDCRLX,24.216.242.112,97365,44.63678,-124.05345,44.63678,-124.05345
5,ZUPHOLKUN3,24.216.29.180,23434,34.49544,-77.55497,36.72836,-76.58496
6,ZYAEO0ENEH,74.208.25.200,27697,37.05869,-95.71331,35.7721,-78.63861
7,MOMYW6PAMP,207.186.167.0,60666,41.85003,-87.65005,41.85003,-87.65005
8,S3Q0AY5XY4,108.169.140.132,27613,35.8949,-78.7051,35.8949,-78.7051
9,WZUEIILDXE,216.254.16.188,98111,47.60621,-122.33207,47.60621,-122.33207


#### Calculate distance between order IP and shipping locations

##### Register function as UDF 

In [None]:
import geopandas as gpd
from shapely.geometry import Point

session.addPackages('geopandas')

@udf
def calculate_distance(lat1: float, long1: float, lat2: float, long2: float)-> float:
  if (lat1 is  None or long1 is None or lat2 is None or long2 is None):
    return None
  pnt1 = Point(long1, lat1)
  pnt2 = Point(long2, lat2)
  points_df = gpd.GeoDataFrame({'geometry': [pnt1, pnt2]}, crs='EPSG:4326')
  points_df = points_df.to_crs('EPSG:3310')
  points_df2 = points_df.shift() #We shift the dataframe by 1 to align pnt1 with pnt2
  return points_df.distance(points_df2).iloc[1]


##### Call UDF to calculate ip_to_shipping_distance feature

In [None]:
distance_df = loc_df.select(loc_df.trnx_id, loc_df.ip_address, loc_df.shipping_zipcode, \
   calculate_distance(loc_df.ip_order_loc_lat, loc_df.ip_order_loc_lng, loc_df.shipping_lat, loc_df.shipping_lng) \
   .alias('ip_to_shipping_distance') ) 

distance_df.limit(10).toPandas()

#### Write enriched data back to a new Snowflake table

In [None]:
orders_merged_df = orders_df.join(orders_masked_df, ['trnx_id', 'ip_address'], 'left_outer') \
    .join(loc_df,['trnx_id', 'ip_address', 'shipping_zipcode'],  'left_outer') \
    .join(distance_df,['trnx_id', 'ip_address', 'shipping_zipcode'], 'left_outer') \
    .join(avg_price_df,'trnx_id', 'left_outer') \
    .write.mode('overwrite').saveAsTable('online_retail.transactions.enriched_data')