# Import Packages

In [1]:
import pandas as pd
import numpy as np
from py2neo import Graph, Node, Relationship
from collections import Counter
import csv
import geopy.distance as gdist
import os, sys, json, requests
import argparse
import jwt
from pytz import timezone, utc
from datetime import datetime, date
from dateutil.relativedelta import relativedelta
from dateutil.parser import parse
from urllib.parse import urljoin
import re
from jinja2 import Template
from time import time
from google.cloud import bigquery
from google.cloud import storage
import warnings
import csv
from gensim.models import KeyedVectors
from joblib import Parallel, delayed
import nodevectors
from scipy import spatial
from timeit import default_timer as timer
from datetime import timedelta
import csrgraph as cg
warnings.filterwarnings('ignore')
# graph=Graph(ip_addr = 'bolt://localhost:7687', username = 'neo4j', password = 'qwerty')
import pickle
import time

In [2]:
client_query = bigquery.Client(project="i-dss-cdm-data-dev")
client_storage = storage.Client(project="i-dss-cdm-data-dev")

 # Functions

<b>Function to get list of grouped dataframe of main dataframe</b>

In [3]:
def split(df, group):
    gb = df.groupby(group)
    ls=[]
    for x in gb.groups:
        try:
            ls.append(gb.get_group(x))
        except KeyError:
            print("ping")
            pass
    return ls

<b>Function to find distance in miles between two geospatial points</b>

In [4]:
def distance(lat1,lon1,lat2,lon2):
    coords_1 = (lat1,lon1)
    coords_2 = (lat2,lon2)
    return gdist.geodesic(coords_1, coords_2).miles

<b>Function to Fetch Data Using GCS API</b>

In [5]:
def data_fetching(args):
    data_sql = open(args['sql_location'], "r").read()
    data_sql = Template(data_sql).render(**args)   
    query_job=client_query.query(data_sql)
    print(query_job.result())
    print('Query Done start load to bucket')
    bucket = client_storage.get_bucket(args['bucket_name'])
    blobs = bucket.list_blobs()
    filenames=[]
    for item in blobs:
        if 'sub_abuse/' in item.name and item.name!='sub_abuse/':
            filenames.append("./data/"+str(item.name).split('/')[1])
            blob=bucket.get_blob(item.name)
            print(f'Downloading {item.name}')
            blob.download_to_filename("./data/"+str(item.name).split('/')[1])
            blob.delete()
    main_df=pd.concat([pd.read_csv(fn, compression='gzip')for fn in filenames])
    
    return main_df

<b>Function to load dataframes to Neo4j</b>

In [6]:
def load_to_neo4j(graph,df):
    ct="""CREATE CONSTRAINT ON (d:Device) ASSERT d.mcid IS UNIQUE ;
    CREATE CONSTRAINT ON (u:User) ASSERT u.user_id IS UNIQUE ;
    CREATE CONSTRAINT ON (v:Video) ASSERT v.mpx_guid IS UNIQUE ;
    CREATE INDEX ON :Device(user_id);
    CREATE INDEX ON :Device(mc_id)"""

    for line in ct.split(';'):
        graph.run(line)
        
    ls=split(df=df,group=['v69_registration_id_nbr'])

    create_user="""create (u:User{user_id:{reg_id},sub_package:{sub_package}})"""

    add_device="""match (u:User {user_id:{reg_id}}) merge (d:Device{mc_id:{mc_id},device_full_nm:{dnm},mobile_id:{mobile_id},freq_ip:{freq_ip},freq_locn:{home},freq_locn_st:{state},freq_locn_lat:{lat},freq_locn_lon:{lon},percent_home_streams:{percent_home_streams},device_type_nm:{dtype},device_os_nm:{os}})-[b:BELONGS_TO]->(u)"""

    add_show="""merge (v:Video {mpx_guid:{guid}})
    ON CREATE SET v.video_show_nm={sr_nm},v.video_title={title},v.video_episode_nbr={nbr},v.video_genre_nm={genre},v.VOD={vod}"""
    
    add_show_relationship="""match (d:Device {mc_id:{mc_id}}) match (v:Video {mpx_guid:{guid}}) merge (d)-[s:STREAMED {stream_start:{stream_st},stream_end:{stream_end},locn:{locn},locn_st:{sta}}]->(v)"""
    
    ct=0
    for data in ls:
        ct=ct+1
        print(ct)
        reg_id=str(data['v69_registration_id_nbr'].values[0])
        pc=data['subscription_package_desc'].values
        pc=[x for x in pc if str(x) != 'nan']
        if pc==[]:
            pc=['nan']
        sub_package=pc[0]
        graph.run(create_user,{"reg_id":reg_id,"sub_package":sub_package})
        ls1=split(df=data,group=['mc_visitor_id_nbr','post_visitor_id'])
        for data1 in ls1:
            try:
                dc={}
                dc['reg_id']=reg_id
                dc['mc_id']=str(data1.mc_visitor_id_nbr.values[0])
                if str(dc['mc_id'])=="00000000000000000000000000000000000000":
                    dc['mc_id']=str(data1.post_visitor_id.values[0])
                dc['freq_ip']=Counter(data1.post_ip_address_desc.values).most_common(1)[0][0]
                dc['home']=Counter(data1.geo_city_nm.values).most_common(1)[0][0]
                dc['percent_home_streams']=Counter(data1.geo_city_nm.values)[dc['home']]/len(data1.geo_city_nm.values)*100
                dc['state']=str(data1[data1.geo_city_nm==dc['home']].state_code.values[0])
                dc['lat']=str(data1[data1.geo_city_nm==dc['home']].int_point_lat.values[0])
                dc['lon']=str(data1[data1.geo_city_nm==dc['home']].int_point_lon.values[0])
                dc['dtype']=str(data1.device_type_nm.values[0])
                dc['mobile_id']=str(data1.mobile_id.values[0])
                dc['os']=str(data1.device_os_nm.values[0])
                dc['dnm']=str(data1.device_full_nm.values[0])
                graph.run(add_device,dc)
                shows=data1.video_show_nm.values
                titles=data1.video_title.values
                eve_st=data1.strm_start_event_dt_ht.values
                eve_end=data1.strm_end_event_dt_ht.values
                mpx=data1.v31_mpx_reference_guid.values
                lcn=data1.geo_city_nm.values
                st=data1.state_code.values
                nbr=data1.video_episode_nbr.values
                genres=data1.video_genre_nm.values
                vod=pd.Series(data1.livetv_affiliate_nm.values)
                vod=vod.isnull().values
                for x in range(len(data1)):
                    dc['stream_st']=pd.to_datetime(str(eve_st[x])).strftime('%Y-%m-%d %H:%M:%S')
                    dc['stream_end']=pd.to_datetime(str(eve_end[x])).strftime('%Y-%m-%d %H:%M:%S')
                    dc['guid']=str(mpx[x])
                    dc['locn']=lcn[x]
                    dc['sta']=st[x]
                    dc['sr_nm']=shows[x]
                    dc['title']=titles[x]
                    dc['nbr']=str(nbr[x])
                    dc['genre']=genres[x]
                    dc['vod']=str(vod[x])
                    graph.run(add_show,dc)
                    graph.run(add_show_relationship,dc)
            except Exception as e:
                print(ct,e)
        

<b>Function to preprocess,map and aggregate our dataframe</b>

In [7]:
# Mapping Func
def create_mapping(main_df):
    u_id=main_df['v69_registration_id_nbr'].unique().tolist()
    mc_id=main_df['mc_visitor_id_nbr'].unique().tolist()
    show_id=main_df['v31_mpx_reference_guid'].unique().tolist()
    ids=u_id+mc_id+show_id
    id_all=[str(i) for i in ids]
    t=0
    mapping={}
    for i in id_all:
        mapping[i]=t
        t+=1
    return mapping

In [8]:
#Parallel Preprocessing Func
def parallel_preprocessing(data):
    u_dev=[]
    d_show=[]
    pc=data['subscription_package_desc'].values
    pc=[x for x in pc if str(x) != 'nan']
    if pc==[]:
        pc=['nan']
    sub_package=pc[0]
    ls1=split(df=data,group=['mc_visitor_id_nbr','post_visitor_id'])
    for data1 in ls1:
            try:
                dc={}
                dc['reg_id']=str(data1.v69_registration_id_nbr.values[0])
                dc['mc_id']=str(data1.mc_visitor_id_nbr.values[0])
                dc['freq_ip']=Counter(data1.post_ip_address_desc.values).most_common(1)[0][0]
                dc['home']=Counter(data1.geo_city_nm.values).most_common(1)[0][0]
                dc['percent_home_streams']=Counter(data1.geo_city_nm.values)[dc['home']]/len(data1.geo_city_nm.values)*100
                dc['state']=str(data1[data1.geo_city_nm==dc['home']].state_code.values[0])
                dc['lat']=str(data1[data1.geo_city_nm==dc['home']].int_point_lat.values[0])
                dc['lon']=str(data1[data1.geo_city_nm==dc['home']].int_point_lon.values[0])
                dc['cnt_strms']=len(data1)
                dc['dtype']=str(data1.device_type_nm.values[0])
                dc['mobile_id']=str(data1.mobile_id.values[0])
                dc['os']=str(data1.device_os_nm.values[0])
                dc['dnm']=str(data1.device_full_nm.values[0])
                reg_id=str(data['v69_registration_id_nbr'].values[0])
                shows=data1.video_show_nm.values
                titles=data1.video_title.values
                eve_st=data1.strm_start_event_dt_ht.values
                eve_end=data1.strm_end_event_dt_ht.values
                mpx=data1.v31_mpx_reference_guid.values
                lcn=data1.geo_city_nm.values
                st=data1.state_code.values
                nbr=data1.video_episode_nbr.values
                genres=data1.video_genre_nm.values
                vod=pd.Series(data1.livetv_affiliate_nm.values)
                vod=vod.isnull().values
                u_dev.append(dc)
                mc_id=dc['mc_id']
                freq_ip=dc['freq_ip']
                hm=dc['home']
                dtype=dc['dtype']
                for x in range(len(data1)):
                    dc={}
                    dc['reg_id']=reg_id
                    dc['sub_package']=sub_package
                    dc['mc_id']=mc_id
                    dc['freq_ip']=freq_ip
                    dc['freq_locn']=hm
                    dc['dtype']=dtype
                    dc['stream_st']=pd.to_datetime(str(eve_st[x])).strftime('%Y-%m-%d %H:%M:%S')
                    dc['stream_end']=pd.to_datetime(str(eve_end[x])).strftime('%Y-%m-%d %H:%M:%S')
                    dc['guid']=str(mpx[x])
                    dc['locn']=lcn[x]
                    dc['sta']=st[x]
                    dc['sr_nm']=shows[x]
                    dc['title']=titles[x]
                    dc['nbr']=str(nbr[x])
                    dc['genre']=genres[x]
                    dc['vod']=str(vod[x])
                    d_show.append(dc)
            except Exception as e:
                print(ct,e)
    return u_dev,d_show

In [9]:
# Processing Func
def preprocess(main_df): 
    
    print('start the function')
    st=timer()
    u_dev=[]
    d_show=[]
    
    data=split(df=main_df,group=['v69_registration_id_nbr'])
    print('Data Split Done',timedelta(seconds=timer()-st)) 
    
    print('start parallel')
    st=timer()
    res=Parallel(n_jobs=500,verbose=3,prefer="threads")(delayed(parallel_preprocessing)(data[i]) for i in range(0,len(data)))
    print('Parallel Processing',timedelta(seconds=timer()-st)) 
    

    st=timer()
    print('start loading')
    u_dev=[res[x][0][l] for x in range(0,len(data)) for l in range(0,len(res[x][0]))]
    d_show=[res[x][1][l] for x in range(0,len(data)) for l in range(0,len(res[x][1]))]
    print('Data Load into list Done',timedelta(seconds=timer()-st)) 

    st=timer()
    print('start saving')
    mdf=pd.DataFrame(d_show)
    ddf=pd.DataFrame(u_dev)
    print('Saving Data Done',timedelta(seconds=timer()-st)) 
    
    st=timer()
    print('start Manipulation')
    ddf['freq_locn']=ddf['home']
    ddf['freq_locn_lat']=ddf['lat']
    ddf['freq_locn_lon']=ddf['lon']
    del ddf['home'],ddf['lat'],ddf['lon']
    print('Manipulate ddf Done',timedelta(seconds=timer()-st))  
    
    return mdf,ddf 


<b>Function to create edge list for our Embeddings algorithm</b>

In [10]:
def create_edgelist(mdf,ddf,args,mapping):
    with open("./emb/"+args['file_name']+".csv", "w") as edges_file:
        u_dev=ddf
        u=u_dev['reg_id'].values
        d=u_dev['mc_id'].values
        writer = csv.writer(edges_file, delimiter=",")
        for source,target in zip(d,u):
            writer.writerow([mapping[str(source)], mapping[str(target)]])
        d_show=mdf
        d=d_show['mc_id'].values
        s=d_show['guid'].values
        for source,target in zip(d,s):
            writer.writerow([mapping[str(source)], mapping[str(target)]])

<b>CheckMul Module<b>

In [11]:
def checkMul(dfs,dev_th,loc_th,no_ip_ott,home):
    flag=0
    dts=dfs['stream_st'].values
    dc={
        'mult_dev_same_day_cnt':0,
        'max_mult_dev':1,
        'mult_loc_same_day_cnt':0,
        'max_mult_loc':1
    }
    try:
        dc['no_ott_ip_home']=len(dfs[(dfs['freq_locn']==home) & (dfs['dtype']=='Set Top Box')]['freq_ip'].unique())
    except ValueError:
        home=Counter(dfs['freq_locn'].values).most_common(1)[0][0]
        dc['no_ott_ip_home']=len(dfs[(dfs['freq_locn']==home) & (dfs['dtype']=='Set Top Box')]['freq_ip'].unique())
    dts_d=[]
    for x in dts:
        dts_d.append(x.split()[0])
    dfs['day']=dts_d
    ls=split(df=dfs,group=['day'])
    for dat in ls:
        if len(dat['mc_id'].unique())>dev_th:
            if len(dat['freq_ip'].unique())>1:
                dc['mult_dev_same_day_cnt']=dc['mult_dev_same_day_cnt']+1
                flag=1
                if len(dat['mc_id'].unique())>dc['max_mult_dev']:
                    dc['max_mult_dev']=len(dat['mc_id'].unique())
        else:
            if len(dat['mc_id'].unique())>dc['max_mult_dev']:
                dc['max_mult_dev']=len(dat['mc_id'].unique())
        if len(dat['locn'].unique())>loc_th:
            if len(dat['mc_id'].unique())>1 and len(dat['freq_ip'].unique())>1:
                flag=1
                dc['mult_loc_same_day_cnt']=dc['mult_loc_same_day_cnt']+1
                if len(dat['locn'].unique())>dc['max_mult_loc']:
                    dc['max_mult_loc']=len(dat['locn'].unique())
        else:
            if len(dat['locn'].unique())>dc['max_mult_loc']:
                    dc['max_mult_loc']=len(dat['locn'].unique())
    if flag==1:
        return True,dc
    if dc['no_ott_ip_home']>no_ip_ott:
        return True,dc
    return False,dc


<b>CheckAlt Module<b>

In [12]:
def checkAlt(dfs,alt_cnt_th):
    dc={
        'alt_stream_cnt':0,
    }
    dfs=dfs.sort_values(by=['stream_st'])
    city=dfs['locn'].values
    devid=dfs['mc_id'].values
    dtype=dfs['dtype'].values
    dts=dfs['stream_st'].values
    freq_ip=dfs['freq_ip'].values
    dts_d=[]
    for x in dts:
        dts_d.append(x.split()[0])
    dfs['day']=dts_d
    day=dfs.day.values
    for x in range(1,len(dfs)-1):
        i=0
        if city[x]!=city[x-1] and day[x]==day[x-1] and devid[x]!=devid[x-1] and city[x]!=city[x+1] and day[x]==day[x+1] and devid[x]!=devid[x+1] and freq_ip[x]!=freq_ip[x-1] and freq_ip[x]!=freq_ip[x+1]:
            i=1
        if i==1:
            dc['alt_stream_cnt']=dc['alt_stream_cnt']+1
    if dc['alt_stream_cnt']>alt_cnt_th:
        return True,dc
    return False,dc

<b>community check Module<b>

In [13]:
def comm_detect(dfd,model,th,strm_cd_th,home,mapping,Nmap):
    try:
        homelat=dfd[dfd['freq_locn']==home]['freq_locn_lat'].values[0]
        homelon=dfd[dfd['freq_locn']==home]['freq_locn_lon'].values[0]
        homeip=dfd[dfd['freq_locn']==home]['freq_ip'].values[0]
    except IndexError:
        home=Counter(dfd['freq_locn'].values).most_common(1)[0][0]
        homelat=dfd[dfd['freq_locn']==home]['freq_locn_lat'].values[0]
        homelon=dfd[dfd['freq_locn']==home]['freq_locn_lon'].values[0]
        homeip=dfd[dfd['freq_locn']==home]['freq_ip'].values[0]
    devid=list(dfd['mc_id'].values)
    devloc=dfd['freq_locn'].values
    devlat=dfd['freq_locn_lat'].values
    devlon=dfd['freq_locn_lon'].values
    devip=dfd['freq_ip'].values
    cnt_strms=dfd['cnt_strms'].values
    bk=[]
    for y,loc,lat,lon,ip,cnt in zip(devid,devloc,devlat,devlon,devip,cnt_strms):
        f={}
        f['id']=y
        n_st=cnt
        l=[]
        for z in devid:
            try:
                d=1 - spatial.distance.cosine(model[Nmap[(mapping[str(y)])]],model[Nmap[(mapping[str(z)])]])
                l.append(d)
            except:
                continue
        try:
            f['score']=(sum(l)/len(l))
        except:
            f['score']=0
        f['ct']=n_st
        f['devloc']=loc
        f['devlat']=lat
        f['devlon']=lon
        f['devip']=ip
        bk.append(f)
    mx=0
    for y in bk:
        if y['devloc']==home and y['ct']>strm_cd_th:
            if y['score']>mx:
                mx=y['score']
    fl=0
    for y in bk:
        if y['score']<mx and y['devloc']!=home and y['devip']!=homeip and y['ct']>strm_cd_th:
            if distance(homelat,homelon,y['devlat'],y['devlon'])>th:
                if len(devid)>2:
                    return True
    return False   


<b>Profile Check Module <b>

In [14]:
# Profile Module Func
def profile_check(pr_df,prf_cnt_th,sub_ip_cnt_th,kid_ip_cnt_th,ccrnt_strm_cnt_th):
    dc={"p_cnt":"",
        "sub_ip_cnt":"",
        "kid_ip_cnt":"",
        "ccrn_strm_cnt":""}
    flag=0
    
    #Profile Count Check
    p_cnt=pr_df['v126_profile_id'].nunique()
    if p_cnt==1:
        flag=0
    else:
        if p_cnt>prf_cnt_th:
            dc["p_cnt"]=p_cnt
            flag=1
        else:
            sb_df=pr_df[(pr_df['master_profile_ind']==False)]            
            if len(sb_df)==0:#Concurrent Streaming Check
                pr_df=pr_df.sort_values(by=['strm_start_event_dt_ht'])
                strm_st=pr_df.strm_start_event_dt_ht.values
                content=pr_df.video_title.values
                day=pr_df.day_dt.values
                prf_id=pr_df.v126_profile_id.values
                strm_lon=pr_df.int_point_lon.values
                strm_lat=pr_df.int_point_lat.values
                ccrn_strm_cnt=0
                for c in range(len(pr_df)-1):
                    if day[c] != day[c+1]:
                        continue
                    elif pd.to_datetime(strm_st[c]).hour!=pd.to_datetime(strm_st[c+1]).hour:
                        continue
                    elif content[c]!= content[c+1]: 
                        continue
                    elif prf_id[c]==prf_id[c+1]:  
                        continue
                    elif distance(strm_lat[c],strm_lon[c],strm_lat[c+1],strm_lon[c+1])<10:
                        continue
                    else:
                        ccrn_strm_cnt+=1
                if ccrn_strm_cnt>ccrnt_strm_cnt_th:
                        dc["ccrn_strm_cnt"]=ccrn_strm_cnt 
                        flag=1           
            else:#Distant Sub profiles ip count Check
                home={}
                ms_df=pr_df[pr_df['master_profile_ind']==True]
                try:
                    home['ms_id']=Counter(ms_df['v126_profile_id'].values).most_common(1)[0][0]
                    home['ms_ip']=Counter(ms_df['post_ip_address_desc'].values).most_common(1)[0][0]
                    home['lat']=Counter(ms_df['int_point_lat'].values).most_common(1)[0][0]
                    home['lon']=Counter(ms_df['int_point_lon'].values).most_common(1)[0][0]
                except IndexError:
                    home['ms_id']=pr_df['v126_profile_id'].values[0]
                    home['ms_ip']=pr_df['post_ip_address_desc'].values[0]
                    home['lat']=pr_df['int_point_lat'].values[0]
                    home['lon']=pr_df['int_point_lon'].values[0]

                pr_df['distance'] = pr_df.apply(lambda x: distance(home['lat'],home['lon'],x['int_point_lat'],x['int_point_lon']),axis=1)
                temp_sub=pr_df[pr_df['distance']>50]['post_ip_address_desc'].value_counts().reset_index()
                sub_ip_cnt=temp_sub[temp_sub['post_ip_address_desc']>=3]['index'].count()
                if sub_ip_cnt>sub_ip_cnt_th:
                    dc["sub_ip_cnt"]=sub_ip_cnt
                    flag=1 
                else:#Distant Kid profiles ip count Check
                    kid_df=pr_df[(pr_df['profile_type_cd'] !='ADULT')&(pr_df['post_ip_address_desc']!=home['ms_ip'])&(pr_df['v126_profile_id']!=home['ms_id'])]
                    if len(kid_df)==0:
                        flag=0
                    else:
                        temp_kid=kid_df[kid_df['distance']>10]['post_ip_address_desc'].value_counts().reset_index()
                        kid_ip_cnt=temp_kid[temp_kid['post_ip_address_desc']>=3]['index'].count()
                        if kid_ip_cnt>kid_ip_cnt_th:
                            dc["kid_ip_cnt"]=kid_ip_cnt
                            flag=1
                            
                            
    if flag==1:
        return True,dc
    else:
        return False,dc


<b>Defining Arguements <b>

In [15]:
# Adjustment 3 args
args = {'start_dt':'2022-03-01','end_dt':'2022-03-01','sample_size':5000,'min_streams':50,\
        'load_neo4j_first':False,'file_name':'20220301_5k','load_neo4j_only_abusers':False,\
        'sql_location':'./data_sql.sql','comm_detect_miles_threshold':50,\
        'comm_detect_stream_cnt_threshold_device':10,'checkmul_device_cnt_threshold':4,\
        'checkmul_locn_cnt_threshold':3,'checkmul_ip_ott_threshold':1,'checkalt_threshold':4,\
        'profile_cnt_threshold':5,'sub_ip_cnt_threshold':8,\
        'kid_ip_cnt_threshold':1,'concurrent_distant_same_content_streaming_cnt_threshold':1,\
       'table_name':'sub_abuse_20220301_5k_sample','bucket_name':'jay_staging'}

<b>Function that runs all the three modules for entire sample<b>

In [16]:
def sub_ab_detection(mapping,Nmap,main_df,mdf,ddf,model,th=args['comm_detect_miles_threshold'],\
                     strm_cd_th=args['comm_detect_stream_cnt_threshold_device'],\
                     dev_th=args['checkmul_device_cnt_threshold'],loc_th=args['checkmul_locn_cnt_threshold'],\
                     alt_cnt_th=args['checkalt_threshold'],no_ip_ott=args['checkmul_ip_ott_threshold'],prf_cnt_th=args['profile_cnt_threshold'],\
                     sub_ip_cnt_th=args['sub_ip_cnt_threshold'],kid_ip_cnt_th=args['kid_ip_cnt_threshold'],\
                     ccrnt_strm_cnt_th=args['concurrent_distant_same_content_streaming_cnt_threshold']):
    fd=[]
    tm=time.time()
    rs= split(mdf,group=['reg_id'])
    fs= ddf.groupby(['reg_id'])
    main_s=main_df.groupby(['v69_registration_id_nbr'])
    
    fd=Parallel(n_jobs=500,verbose=3,prefer="threads")\
    (delayed(parallel_sub_abuse)(x,fs.get_group(y['reg_id'].values[0]),\
                                 main_s.get_group(int(z['reg_id'].values[0])),mapping,Nmap,model,th, strm_cd_th, dev_th, loc_th, alt_cnt_th, no_ip_ott,prf_cnt_th,sub_ip_cnt_th,kid_ip_cnt_th,ccrnt_strm_cnt_th) for x,y,z in zip(rs,rs,rs))
    print(time.time()-tm)                       
    return fd

<b>Parallel detection Function<b>

In [17]:
def parallel_sub_abuse(dfs,dfd,pr_df,mapping,Nmap,model,th, strm_cd_th, dev_th, loc_th, alt_cnt_th, no_ip_ott,prf_cnt_th,sub_ip_cnt_th,kid_ip_cnt_th,ccrnt_strm_cnt_th):

    dc={}
    dc['ud']=mapping[str(dfs['reg_id'].values[0])]
    dc['sub_package']=dfs['sub_package'].values[0]
    dc['reg_id']=dfs['reg_id'].values[0]
    home=Counter(dfs['locn'].values).most_common(1)[0][0]
    fl=0
    dc['reason']=""

    r,res=checkAlt(dfs,alt_cnt_th)
    if r==True:    
        dc['reason']='checkAlt'
        fl=1
        dc=dict(dc, **res)
    else:
        r,res=checkMul(dfs,dev_th,loc_th,no_ip_ott,home)
        if r==True:
            dc['reason']='checkMul'
            fl=1
            dc=dict(dc,**res)
        else:
            r=comm_detect(dfd,model,th,strm_cd_th,home,mapping,Nmap)
            if r==True:
                dc['reason']='comm_detect'
                fl=1
            else:
                r,res=profile_check(pr_df,prf_cnt_th,sub_ip_cnt_th,kid_ip_cnt_th,ccrnt_strm_cnt_th)
                if r==True:              
                    dc['reason']='ProfileCheck'
                    fl=1
                    dc=dict(dc,**res)
    if fl==1:
        return dc
    else:
        return 1

<b>Running the entire pipeline<b>

In [21]:

def pipeline(args):
    start=timer()
    print("fetching data")

    main_df = data_fetching(args)

    end=timer()
    print('Fetching Data Done',timedelta(seconds=end-start))
    
    start=timer()
    print("Creating Mapping")
    mapping=create_mapping(main_df) 
    
    end=timer()
    print('Mapping Creating Done',timedelta(seconds=end-start))
    
    
    start=timer()
    print("preprocessing data")
    mdf,ddf=preprocess(main_df)
    end=timer()
    print('Data Processing Done',timedelta(seconds=end-start))

    start=timer()
    print("creating edgelist")
    create_edgelist(mdf,ddf,args,mapping)
    G= cg.read_edgelist("./emb/"+args['file_name']+".csv", directed=False, sep=',')
    Nmap=pd.Series(G.nodes().index.values,index=G.nodes()) 
    ids=list(main_df['v69_registration_id_nbr'].unique())
    end=timer()
    print('Edgelist Done',timedelta(seconds=end-start))
    
    
   
    start=timer()
    print("Try Embeddings")
    model=nodevectors.GGVec(n_components=30,
        order=1,
        learning_rate=0.1, tol_samples=30,
        negative_ratio=0.05,
        max_epoch=200).fit_transform(G)

    
    end=timer()
    print('Train Embeddings Done',timedelta(seconds=end-start))
    
    
    start=timer()
    print("detecting abusers") 
    fds=sub_ab_detection(mapping,Nmap,main_df,mdf,ddf,model,th=args['comm_detect_miles_threshold'],\
                     strm_cd_th=args['comm_detect_stream_cnt_threshold_device'],\
                     dev_th=args['checkmul_device_cnt_threshold'],loc_th=args['checkmul_locn_cnt_threshold'],\
                     alt_cnt_th=args['checkalt_threshold'],no_ip_ott=args['checkmul_ip_ott_threshold'],prf_cnt_th=args['profile_cnt_threshold'],\
                     sub_ip_cnt_th=args['sub_ip_cnt_threshold'],kid_ip_cnt_th=args['kid_ip_cnt_threshold'],\
                     ccrnt_strm_cnt_th=args['concurrent_distant_same_content_streaming_cnt_threshold'])
    end=timer()
    print('Detection Done',timedelta(seconds=end-start))
    
    start=timer()
    print('Saving Results')
    res=[x for x in fds if x !=1]
    
    res=pd.DataFrame(res)
    res.to_csv("./result/"+args['file_name']+"_abusers.csv")
    end=timer()

    print("results available in "+"./result/"+args['file_name']+"_abusers.csv",timedelta(seconds=end-start))
    
    if args['load_neo4j_only_abusers']==True:
        print("loading abusers to Neo4j")
        abusers=main_df[main_df['v69_registration_id_nbr'].isin(res.reg_id.values)]
        load_to_neo4j(graph=graph,df=abusers)
    
    return res

In [19]:
res=pipeline(args)

fetching data
<google.cloud.bigquery.table._EmptyRowIterator object at 0x7fdaef65a3c8>
Query Done start load to bucket
Downloading sub_abuse/sub_abuse_20220301_5k_sample_000000000000.csv.gz
Fetching Data Done 0:00:20.332432
Creating Mapping
Mapping Creating Done 0:00:00.014877
preprocessing data
start the function
Data Split Done 0:00:01.172435
start parallel


[Parallel(n_jobs=500)]: Using backend ThreadingBackend with 500 concurrent workers.
[Parallel(n_jobs=500)]: Done 218 tasks      | elapsed:   27.6s
[Parallel(n_jobs=500)]: Done 634 tasks      | elapsed:   31.3s
[Parallel(n_jobs=500)]: Done 1114 tasks      | elapsed:   38.3s
[Parallel(n_jobs=500)]: Done 1658 tasks      | elapsed:   52.8s
[Parallel(n_jobs=500)]: Done 2883 out of 2911 | elapsed:  1.0min remaining:    0.6s
[Parallel(n_jobs=500)]: Done 2911 out of 2911 | elapsed:  1.0min finished


Parallel Processing 0:01:00.689007
start loading
Data Load into list Done 0:00:00.010289
start saving
Saving Data Done 0:00:00.102725
start Manipulation
Manipulate ddf Done 0:00:00.005912
Data Processing Done 0:01:02.049610
creating edgelist
Edgelist Done 0:00:00.420953
Try Embeddings
Train Embeddings Done 0:00:05.609643
detecting abusers


[Parallel(n_jobs=500)]: Using backend ThreadingBackend with 500 concurrent workers.
[Parallel(n_jobs=500)]: Done 152 tasks      | elapsed:   13.1s
[Parallel(n_jobs=500)]: Done 568 tasks      | elapsed:   14.2s
[Parallel(n_jobs=500)]: Done 1048 tasks      | elapsed:   19.8s
[Parallel(n_jobs=500)]: Done 1592 tasks      | elapsed:   25.3s


68.23240685462952
Detection Done 0:01:08.267532
Saving Results
results available in ./results/20220301_5k_abusers.csv 0:00:00.008973


[Parallel(n_jobs=500)]: Done 2883 out of 2911 | elapsed:  1.1min remaining:    0.7s
[Parallel(n_jobs=500)]: Done 2911 out of 2911 | elapsed:  1.1min finished


In [20]:
res

Unnamed: 0,ud,sub_package,reg_id,reason,alt_stream_cnt,mult_dev_same_day_cnt,max_mult_dev,mult_loc_same_day_cnt,max_mult_loc,no_ott_ip_home
0,470,LC,47895387,checkAlt,8.0,,,,,
1,606,LC,48315640,checkAlt,12.0,,,,,
2,780,LC,48892385,checkAlt,20.0,,,,,
3,839,LC,49032778,checkAlt,7.0,,,,,
4,907,LC,49258960,checkMul,,0.0,2.0,1.0,4.0,0.0
5,1307,LC,49551024,checkAlt,5.0,,,,,
6,1748,LC,49996661,checkAlt,5.0,,,,,
7,1876,LC,50200949,checkMul,,1.0,5.0,0.0,2.0,0.0
8,2087,LC,50670945,checkAlt,5.0,,,,,
9,2468,LC,52394034,checkAlt,13.0,,,,,
