Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 171 additions & 0 deletions use-cases/fraud-detection/Fraud-Stats-Grafana.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
{
"annotations": {
"list": [
{
"builtIn": 1,
"datasource": "-- Grafana --",
"enable": true,
"hide": true,
"iconColor": "rgba(0, 211, 255, 1)",
"name": "Annotations & Alerts",
"type": "dashboard"
}
]
},
"editable": true,
"gnetId": null,
"graphTooltip": 0,
"id": 3,
"links": [],
"panels": [
{
"datasource": "Redis",
"fieldConfig": {
"defaults": {
"custom": {
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 0,
"y": 0
},
"id": 2,
"options": {
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"mean"
],
"fields": "",
"values": false
},
"showThresholdLabels": false,
"showThresholdMarkers": true
},
"pluginVersion": "7.3.7",
"targets": [
{
"aggregation": "",
"command": "ts.range",
"keyName": "clean",
"query": "",
"refId": "A",
"streaming": false,
"type": "timeSeries"
}
],
"timeFrom": null,
"timeShift": null,
"title": "Clean",
"transparent": true,
"type": "gauge"
},
{
"datasource": "Redis",
"fieldConfig": {
"defaults": {
"custom": {
"align": null,
"filterable": false
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 0
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 9,
"w": 12,
"x": 12,
"y": 0
},
"id": 3,
"options": {
"orientation": "horizontal",
"reduceOptions": {
"calcs": [
"mean"
],
"fields": "",
"values": false
},
"showThresholdLabels": true,
"showThresholdMarkers": true
},
"pluginVersion": "7.3.7",
"targets": [
{
"aggregation": "",
"command": "ts.range",
"keyName": "click_spam",
"query": "",
"refId": "B",
"streaming": false,
"type": "timeSeries"
},
{
"aggregation": "",
"command": "ts.range",
"keyName": "ip_blacklist",
"query": "",
"refId": "C",
"streaming": false,
"type": "timeSeries"
}
],
"timeFrom": null,
"timeShift": null,
"title": "Fraud Types",
"transparent": true,
"type": "gauge"
}
],
"refresh": "5s",
"schemaVersion": 26,
"style": "dark",
"tags": [],
"templating": {
"list": []
},
"time": {
"from": "now-30m",
"to": "now"
},
"timepicker": {},
"timezone": "",
"title": "Fraud Stats",
"uid": "P0uE4KEMz",
"version": 3
}
3 changes: 2 additions & 1 deletion use-cases/fraud-detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ docker run -d -p 6379:6379 redislabs/redismod

This will start a docker which runs a flask server on port 5000.


## Sample curl:

```
curl --location --request POST 'localhost:5000' \
curl --request POST 'localhost:5000' \
--header 'Content-Type: application/json' \
--data-raw '{
"device_id": "111-000-000",
Expand Down
14 changes: 8 additions & 6 deletions use-cases/fraud-detection/app/constants.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
class Constants:

IP_CUCKOO_FILTER_NAME = "ip_cf"
AD_STACK_WINDOW = 10 * 1000 # default window in seconds
AD_STACK_THRESHOLD = 2
CLEAN_STREAM_NAME = 'clean_stream'
FRAUD_STREAM_NAME = 'fraud_stream'
CLEAN_TS = "clean_ts"
FRAUD_TS = "fraud_ts"
CLICK_SPAM_WINDOW_IN_SEC = 10
CLICK_SPAM_THRESHOLD = 2
STREAM_NAME = "data_stream"
CLEAN = "clean"
FRAUD = "fraud"
CLICK_SPAM = "click_spam"
IP_BLACKLIST = "ip_blacklist"

50 changes: 26 additions & 24 deletions use-cases/fraud-detection/app/fraud_checks.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
import json
import os

from singleton_decorator import singleton

Expand All @@ -11,38 +11,40 @@
class FraudChecks:

def __init__(self):
pass
self.click_spam_window_in_sec = int(os.getenv("CLICK_SPAM_WINDOW_IN_SEC", Constants.CLICK_SPAM_WINDOW_IN_SEC)) * 1000
self.click_spam_threshold = int(os.getenv("CLICK_SPAM_THRESHOLD", Constants.CLICK_SPAM_THRESHOLD))

def check_fraud(self, data):
if self.ip_fraud(data):
return 'Fraud IP'
if self.ad_stack(data):
return 'Ad Stacked'
return 'Clean Event'
data['status'] = Constants.CLEAN
data['fraud_type'] = Constants.CLEAN
data['ts'] = int(time.time()*1000)
if self.ip_fraud(data) or self.click_spam(data):
pass
self.publish(data)
return data['fraud_type']

def ip_fraud(self, data):
exists = RedisConn().bloom().cfExists(Constants.IP_CUCKOO_FILTER_NAME, data['ip'])
self.publish(data, "Clean" if not exists else "Fraud")
if exists:
data['fraud_type'] = Constants.IP_BLACKLIST
data['status'] = Constants.FRAUD

return exists

def ad_stack(self, data):
def click_spam(self, data):
##
# 'key' of each sorted is the device_id received.
# Ad stacked is True if the count in the range of scores (which is timestamp of event) -
# click spam is True if the count in the range of scores (which is timestamp of event) -
# - is greater than a threshold.
#
##
ts = int(time.time() * 1000)
is_ad_stacked = False
member = json.dumps({'device_id': data['device_id'], 'transaction_id': data['transaction_id'], 'ts': ts})
RedisConn().redis().zadd(data.get('device_id'), {member: ts})
count = RedisConn().redis().zcount(data.get('device_id'), ts - Constants.AD_STACK_WINDOW, ts)
if count > Constants.AD_STACK_THRESHOLD:
is_ad_stacked = True
self.publish(data, "Fraud" if is_ad_stacked else "Clean")
return is_ad_stacked

def publish(self, data, status):
data['status'] = status
stream = Constants.CLEAN_STREAM_NAME if status == 'Clean' else Constants.FRAUD_STREAM_NAME
RedisConn().redis().xadd(stream, data, id='*')
is_click_spammed = False
count = RedisConn().redis().zcount(data.get('device_id'), data['ts'] - self.click_spam_window_in_sec, data['ts'])
if count >= self.click_spam_threshold:
is_click_spammed = True
data['fraud_type'] = Constants.CLICK_SPAM
data['status'] = Constants.FRAUD
return is_click_spammed

def publish(self, data):
RedisConn().redis().xadd(Constants.STREAM_NAME, data, id='*')
28 changes: 19 additions & 9 deletions use-cases/fraud-detection/app/setup.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from gearsclient import GearsRemoteBuilder as GearsBuilder
from gearsclient import execute


from redis_conn import RedisConn
from constants import Constants

import os
import json
import redis
from redistimeseries.client import Client


class Setup:
Expand All @@ -12,15 +14,23 @@ def init(self):
self.__register_gears()

def __register_gears(self):
# Todo: need better way to check if gears is registered.
redis_conn = RedisConn().redis()
is_reg = redis_conn.get("gears_registered")
if is_reg and int(is_reg) == 1:
# Gears already registered
return

GearsBuilder(reader='StreamReader', r=redis_conn).foreach(lambda x: execute("TS.INCRBY", "clean_ts", 1))\
.register(Constants.CLEAN_STREAM_NAME)
GearsBuilder(reader='StreamReader', r=redis_conn).foreach(lambda x: execute("TS.INCRBY", "fraud_ts", 1))\
.register(Constants.FRAUD_STREAM_NAME)
redis_conn.set("gears_registered", 1)
def stream_handler(item):
data = item['value']
member = json.dumps(
{'device_id': data['device_id'],
'transaction_id': data['transaction_id'],
'ts': data['ts'],
})
redis.Redis().zadd(data.get('device_id'), {member: data['ts']})
Client().incrby(data['fraud_type'], 1)

GearsBuilder(reader='StreamReader', r=redis_conn, requirements=["redis", "redistimeseries"]).foreach(stream_handler).register('data_stream')
# To avoid multiple gears from being registered for single use case, set this when register is done,
# unset this if you want to re-register the Gear when application runs again.
redis_conn.set("gears_registered", 1)