In [None]:
%%capture

%pip install faker
%pip install pymongo
%pip install geopy

In [None]:
#UNTESTED IN NOTEBOOKS
from faker import Faker
from singlestoredb.management import get_secret
import singlestoredb as s2
import pymongo
import os
import csv

mgr = s2.manage_workspaces()

mongoUser = get_secret('mongo_user')
mongoPassword = get_secret('mongo_password')
mongoHost = get_secret('mongo_host')
mongoAppName = get_secret('mongo_app_name')
mongoDb = get_secret('mongo_db')
mongoCollection = get_secret('mongo_collection')

fake = Faker()

mongo_uri = f"mongodb+srv://{mongoUser}:{mongoPassword}@{mongoHost}/?retryWrites=true&w=majority&appName={mongoAppName}"

client = pymongo.MongoClient(mongo_uri)
db = client[f"{mongoDb}"]
collection = db[f"{mongoCollection}"]

sim_ids = []
with mgr.stage.open('/telco-fraud/sim_ids.csv', 'r') as file:
    reader = csv.reader(file)
    for row in reader:
        sim_ids.extend(row)

for sim_id in sim_ids:
    username = fake.user_name()
    email = fake.email()

    json_object = {
        "sim_id": sim_id.strip(),
        "username": username,
        "email": email
    }

    result = collection.insert_one(json_object)
    #print(f"Inserted document with ID: {result.inserted_id}")

client.close()

print("Data generation and insertion complete.")

In [None]:
#UNTESTED IN NOTEBOOKS
import csv
import random
import uuid
from datetime import datetime, timedelta
from geopy.distance import geodesic
from singlestoredb.management import get_secret
import singlestoredb as s2

mgr = s2.manage_workspaces()

# Configuration
TOTAL_EVENTS = 10000  # Total number of events to generate
EVENTS_PER_FILE = 5000  # Number of events per CSV file
USAGE_TYPES = ['Data', 'Call', 'Text']

# Load cities and their coordinates
CITIES = {}
with mgr.stage.open('/telco-fraud/cities.csv', 'r') as file:
    csv_reader = csv.reader(file)
    for row in csv_reader:
        city = row[0]
        point = row[1].replace('POINT(', '').replace(')', '').split()
        CITIES[city] = (float(point[1]), float(point[0]))  # (latitude, longitude)

# Load SIM IDs
with mgr.stage.open('/telco-fraud/sim_ids.csv', 'r') as file:
    SIM_IDS = [line.strip() for line in file]

def generate_event(sim_id, last_location, last_timestamp):
    current_timestamp = last_timestamp + timedelta(seconds=random.randint(1, 3600))
    
    if last_location:
        if SIM_IDS.index(sim_id) <= 4550:  # Normal users
            max_distance = 400  # km
        else:  # Special users
            max_distance = 10000  # km
        
        while True:
            new_city = random.choice(list(CITIES.keys()))
            new_location = CITIES[new_city]
            distance = geodesic(last_location, new_location).kilometers
            time_diff = (current_timestamp - last_timestamp).total_seconds() / 3600  # in hours
            if distance <= max_distance * time_diff:
                break
    else:
        new_city = random.choice(list(CITIES.keys()))
        new_location = CITIES[new_city]

    usage_type = random.choice(USAGE_TYPES)
    
    event = {
        'UsageID': str(uuid.uuid4()),  # Generate a random UUID
        'SIMID': sim_id,
        'Location': new_city,
        'UsageType': usage_type,
        'DataTransferAmount': round(random.uniform(1, 65535), 2) if usage_type == 'Data' else 0,
        'CallDuration': random.randint(1, 360) if usage_type == 'Call' else 0,
        'Timestamp': current_timestamp.strftime('%Y-%m-%d %H:%M:%S.%f'),
        'Geo': f'POINT({new_location[1]} {new_location[0]})'
    }
    
    return event, new_location, current_timestamp

def write_csv(filename, events):
    with open(filename, 'w', newline='') as csvfile:
        fieldnames = ['UsageID', 'SIMID', 'Location', 'UsageType', 'DataTransferAmount', 'CallDuration', 'Timestamp', 'Geo']
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()
        for event in events:
            writer.writerow(event)

def main():
    sim_locations = {sim_id: (None, datetime.now() - timedelta(hours=8)) for sim_id in SIM_IDS}
    events = []
    file_count = 1

    for i in range(TOTAL_EVENTS):
        sim_id = random.choice(SIM_IDS)
        last_location, last_timestamp = sim_locations[sim_id]
        event, new_location, new_timestamp = generate_event(sim_id, last_location, last_timestamp)
        sim_locations[sim_id] = (new_location, new_timestamp)
        events.append(event)

        if len(events) == EVENTS_PER_FILE:
            write_csv(f'usage_data_{file_count}.csv', events)
            mgr.stage.upload_file(f'usage_data_{file_count}.csv', '/telco-fraud/data/')
            events = []
            file_count += 1

    if events:
        write_csv(f'usage_data_{file_count}.csv', events)
        mgr.stage.upload_file(f'usage_data_{file_count}.csv', '/telco-fraud/data/')

if __name__ == "__main__":
    main()

In [None]:
%%sql
-- Create and use the main Database
CREATE DATABASE IF NOT EXISTS telco;
USE telco;

In [None]:
%%sql
-- Create usage Table
CREATE TABLE IF NOT EXISTS `usage` (
    UsageID VARCHAR(50) NOT NULL,
    SIMID VARCHAR(50),
    Location VARCHAR(100),
    UsageType VARCHAR(255),
    DataTransferAmount NUMERIC(18,2),
    CallDuration INT,
    Timestamp TIMESTAMP(6),
    Geo GEOGRAPHYPOINT,
    SHARD KEY (UsageID)
);

In [None]:
%%sql
-- Create potential_fraud Table
CREATE TABLE IF NOT EXISTS `potential_fraud` (
    FraudID BIGINT NOT NULL AUTO_INCREMENT,
    UsageID1 VARCHAR(50),
    UsageID2 VARCHAR(50),
    Timestamp TIMESTAMP(6),
    SHARD KEY (FraudID)
);

In [None]:
%%sql
-- Create Procedure to calculate Frauds
CREATE OR REPLACE PROCEDURE fraud (batch QUERY(
        UsageID VARCHAR(50),
        SIMID VARCHAR(50),
        Location VARCHAR(100),
        UsageType VARCHAR(255),
        DataTransferAmount NUMERIC(18,2),
        CallDuration INT,
        Timestamp TIMESTAMP(6),
        Geo GEOGRAPHYPOINT
        ))
RETURNS void
AS
BEGIN
    INSERT INTO `usage` (UsageID, SIMID, Location, Timestamp, Geo) select UsageID, SIMID, Location, Timestamp, Geo from batch;  
    INSERT INTO potential_fraud (UsageID1, UsageID2, Timestamp)
        (
            select UsageID1, UsageID2, Timestamp2
        from
            (SELECT UsageID as UsageID1, SIMID as ID1, Timestamp as Timestamp1, Geo as Geo1
            FROM `usage` 
            WHERE SIMID in (Select SIMID from batch)) as U
         join
            (Select UsageID as UsageID2, SIMID as ID2, Timestamp as Timestamp2, Geo as Geo2
            from batch) as F
            on U.ID1 = F.ID2
        where 
            (ROUND((GEOGRAPHY_DISTANCE(F.Geo2, U.Geo1)/1000),0)) > 2000
            AND MINUTE(TIMEDIFF(F.Timestamp2, U.Timestamp1)) < 20
        );
END

In [None]:
%%sql
-- Get CSV Data
CREATE OR REPLACE PIPELINE loadFrauds AS
LOAD DATA S3 's3://menglert-us/telco-fraud/'
CONFIG '{"region":"us-east-1"}'
INTO PROCEDURE fraud
IGNORE 1 LINES
FORMAT CSV
FIELDS TERMINATED BY ','
(
    UsageID,
    SIMID,
    Location,
    UsageType,
    DataTransferAmount,
    CallDuration,
    Timestamp,
    Geo
);

In [None]:
%%sql
TEST PIPELINE loadFrauds LIMIT 2;

In [None]:
%%sql
-- Create Mongo Link
CREATE LINK IF NOT EXISTS mongoTelcoUsers AS MONGODB
CONFIG '{"mongodb.hosts":"<redacted>:27017",
"collection.include.list": "telco.telco-users",
"mongodb.ssl.enabled":"true",
"mongodb.authsource":"admin"}'
CREDENTIALS '{"mongodb.user":"me",
"mongodb.password":"<redacted>"}';

In [None]:
%%sql
-- Use Mongo Link to create the pipeline
CREATE TABLES AS INFER PIPELINE AS LOAD DATA LINK mongoTelcoUsers '*' FORMAT AVRO;

In [None]:
%%sql
-- Run Pipelines
START ALL PIPELINES;

In [None]:
%%sql
-- Show the created Tables
SHOW TABLES;

In [22]:
%%sql
-- Check Usage Data
SELECT COUNT(DISTINCT(SIMID)) FROM `usage`;

COUNT(DISTINCT(SIMID))
4999


In [23]:
%%sql
-- Check Fraud Data
SELECT * FROM potential_fraud LIMIT 5;

FraudID,UsageID1,UsageID2,Timestamp
24,0fa6f5ef-b022-4cc5-90ad-d685ea7b4880,864bbd6c-7f05-4d79-88b4-3bff8963fa57,2024-06-28 08:51:27.540969
302,787e4355-aaad-4d5c-a244-17e50d4836ea,0a1f184c-62d8-409f-ac76-325b0ad1a7dc,2024-06-28 10:55:03.541677
415,78b98c29-0411-4c4f-a82a-99f6e435aaca,98324f7e-8754-49e9-bc69-743a367a1508,2024-06-28 11:50:13.541912
599,2caf12c4-cfdb-4300-ab09-b70aef3b4e1d,2bc66ad8-83c7-4fc3-bdab-1d4e27efc8e2,2024-06-28 09:27:37.541176
651,30246252-1b52-4042-aa24-aaa47027ca87,267d742b-7180-4bd6-9453-5a502f21254a,2024-06-28 14:38:56.541585


In [38]:
%%sql
-- Check Users
SELECT _id:>JSON, BSON_EXTRACT_STRING(_more, 'username') AS username 
    FROM `telco-users` LIMIT 5;

_id:>JSON,username
{'$oid': '667ef906db10d7d1cfaf34a2'},gpratt
{'$oid': '667ef96ddb10d7d1cfaf37a3'},carlahart
{'$oid': '667ef98cdb10d7d1cfaf389b'},brandycooper
{'$oid': '667ef990db10d7d1cfaf38c2'},megan74
{'$oid': '667ef9a5db10d7d1cfaf395e'},drodriguez


In [95]:
%%sql
-- Create Table with new Full-Text Index over JSON
CREATE TABLE `telco-users-fts` (
    id BSON,
    records JSON,
    FULLTEXT USING VERSION 2 rec_ft_index (records));

In [88]:
%%sql
SELECT _id:>JSON, _more:>JSON 
    FROM `telco-users` LIMIT 5;

_id:>JSON,_more:>JSON
{'$oid': '667ef940db10d7d1cfaf3655'},"{'email': 'scottmichael@example.org', 'sim_id': '83a902e2-e710-48dd-a0de-573ec319e21a', 'username': 'nsantos'}"
{'$oid': '667ef90fdb10d7d1cfaf34e8'},"{'email': 'zaguilar@example.org', 'sim_id': 'f84aacc8-3957-444a-b635-a9047f6a9986', 'username': 'andrewhall'}"
{'$oid': '667ef91fdb10d7d1cfaf3561'},"{'email': 'xgray@example.org', 'sim_id': '87d2026a-9789-4452-a88f-4596f88005e7', 'username': 'julierogers'}"
{'$oid': '667ef97adb10d7d1cfaf380b'},"{'email': 'csantana@example.com', 'sim_id': 'eb68d1e5-9288-4d0c-b940-d4e827a806c6', 'username': 'aramirez'}"
{'$oid': '667ef9c2db10d7d1cfaf3a44'},"{'email': 'ezhang@example.net', 'sim_id': 'b5f56e9c-63ab-434b-a5a0-0e238298ce22', 'username': 'slowe'}"


In [96]:
%%sql
-- Insert Data into new FTS Table
INSERT INTO `telco-users-fts`
    (SELECT _id, _more:>JSON FROM `telco-users`);

In [98]:
%%sql
-- Check Users in new Table
SELECT id:>JSON 
    FROM `telco-users-fts` LIMIT 5;

id:>JSON
{'$oid': '667ef90adb10d7d1cfaf34be'}
{'$oid': '667efacbdb10d7d1cfaf428a'}
{'$oid': '667efad6db10d7d1cfaf42df'}
{'$oid': '667ef91edb10d7d1cfaf355d'}
{'$oid': '667efa81db10d7d1cfaf404e'}


In [113]:
%%sql
-- Test the Full-Text Index
SELECT records, (
    MATCH (
    TABLE `telco-users-fts`) AGAINST ('records:julie*')) AS score
    FROM `telco-users-fts`
    WHERE score > 0
    ORDER BY score DESC;

records,score
"{'email': 'xgray@example.org', 'sim_id': '87d2026a-9789-4452-a88f-4596f88005e7', 'username': 'julierogers'}",1.0
"{'email': 'julie84@example.com', 'sim_id': '4cdaa8b2-b32d-4095-bab8-bd55aeda91c0', 'username': 'vhahn'}",1.0
"{'email': 'julie24@example.org', 'sim_id': 'cfa98723-c183-442c-a14b-986927fdb70f', 'username': 'susanwatkins'}",1.0
"{'email': 'ywolf@example.org', 'sim_id': '28efc7bf-92b2-4fa0-8c08-d64cc5c21dc4', 'username': 'julie20'}",1.0
"{'email': 'julienash@example.net', 'sim_id': 'a58f0ec3-7417-42e1-9a1d-73eef53327bc', 'username': 'hwinters'}",1.0
"{'email': 'juliesmith@example.net', 'sim_id': 'a44596bb-e993-4cb5-990a-e5a422d5f066', 'username': 'strongchristopher'}",1.0
"{'email': 'juliereyes@example.net', 'sim_id': '923f927b-b1ab-4e32-b9fb-29d9fc8f28d2', 'username': 'mguerrero'}",1.0
"{'email': 'zacharyjohnson@example.org', 'sim_id': 'b27d4b5f-7fdf-471e-a96d-9034ea997747', 'username': 'julie52'}",1.0
"{'email': 'freyes@example.com', 'sim_id': '3d3f4597-b8bc-4fc6-8793-7c7af9d08b34', 'username': 'julie55'}",1.0
"{'email': 'julie19@example.net', 'sim_id': '870128e1-5f93-4d96-bb80-36fafa95f3c0', 'username': 'alicia36'}",1.0


In [153]:
%%sql
SELECT 
    pf.FraudID as id, 
    CONCAT('Potential fraud for user ', JSON_EXTRACT_STRING(tuf.records, 'username')) as description,
    GREATEST(u1.Timestamp, u2.Timestamp) as date
    FROM potential_fraud pf
    JOIN `usage` u1 ON pf.UsageID1 = u1.UsageID
    JOIN `usage` u2 ON pf.UsageID2 = u2.UsageID
    JOIN `telco-users-fts` tuf ON JSON_EXTRACT_STRING(tuf.records, 'sim_id') = u1.SIMID
    WHERE MATCH (TABLE tuf) AGAINST ('records:julie88');

id,description,date
536,Potential fraud for user julie88,2024-06-28 14:50:40.541489
264,Potential fraud for user julie88,2024-06-28 14:50:40.541489
477,Potential fraud for user julie88,2024-06-28 14:26:05.541489
555,Potential fraud for user julie88,2024-06-28 14:15:35.541489
283,Potential fraud for user julie88,2024-06-28 13:15:04.541489
537,Potential fraud for user julie88,2024-06-28 13:49:42.541489
708,Potential fraud for user julie88,2024-06-28 13:49:42.541489
268,Potential fraud for user julie88,2024-06-28 12:31:40.541489
143,Potential fraud for user julie88,2024-06-28 14:47:43.541489
157,Potential fraud for user julie88,2024-06-28 14:50:40.541489
