# Imports

In [2]:
import dask.dataframe as dd
from datetime import datetime, timedelta
import os

In [5]:
#Directories
answers_dir = os.path.join( "..", "data", "raw_external", 'answers')
r52_dir = os.path.join( "..", "data", "raw_external", 'r52')
processed_dir = os.path.join( "..", "data", "processed")

# Define the file paths relative to the r52_dir
file_file_path = os.path.join(r52_dir, 'file.csv')
http_file_path = os.path.join(r52_dir, 'http.csv')
psychometric_file_path = os.path.join(r52_dir, 'psychometric.csv')

# Define the file paths relative to the processed_dir
filedf_parquet_directory_path = os.path.join(processed_dir, 'file_feature_engineered.parquet')
httpdf_parquet_directory_path = os.path.join(processed_dir, 'http_feature_engineered.parquet')
final_dataset_path = os.path.join(processed_dir, 'FEData_For_Modelling.parquet')

# Absolute Paths
print("File File Path:", file_file_path)
print("HTTP File Path:", http_file_path)
print("Psychometric File Path:", psychometric_file_path)
print("FileDF Parquet Directory Path:", filedf_parquet_directory_path)
print("HTTPDF Parquet Directory Path:", httpdf_parquet_directory_path)
print("Final Dataset Path:", final_dataset_path)

File File Path: ..\data\raw_external\r52\file.csv
HTTP File Path: ..\data\raw_external\r52\http.csv
Psychometric File Path: ..\data\raw_external\r52\psychometric.csv
FileDF Parquet Directory Path: ..\data\processed\file_feature_engineered.parquet
HTTPDF Parquet Directory Path: ..\data\processed\http_feature_engineered.parquet
Final Dataset Path: ..\data\processed\FEData_For_Modelling.parquet


# Helper Functions

In [6]:
def outside_work_hours(dt):
    start, end, current = datetime.strptime("8:00", "%H:%M").time(
    ), datetime.strptime("17:00", "%H:%M").time(), dt.time()
    return (current < start) or (current > end)


def weekend(dt):
    #Sunday 0 , Saturday 6
    return (dt.strftime("%w") in ['0', '6'])

In [7]:
# Group By and Count Function as tested below
def grpbycount(ddf,name):
    ddf = ddf.groupby('user').size().reset_index()
    ddf = ddf.rename(columns={0:name})
    return ddf

# Merge and set Na to fill_value as tested below
def mergeddf(*ddfs,fill_value):
    # Perform the first merge (with itself)
    merged_ddf = ddfs[0]
    for ddf in ddfs[1:]:
        # Merge the two DataFrames on the 'user' column
        #Without outer only the intersection remains 
        merged_ddf = dd.merge(ddf,merged_ddf, on='user', how='outer').fillna(fill_value)
    return merged_ddf


In [8]:
def check_cloud_domain(domain):
    domain_keywords = ['dropbox', 'drive.google', 'mega.co.nz', 'download', 'upload']
    # Check if the domain contains any of the specified substrings
    return any(keyword in domain for keyword in domain_keywords)

def check_job_domain(domain):
    domain_keywords = ['linkedin.com', 'indeed.com', 'careerbuilder.com', 'simplyhired.com', 'monster.com','job','recruit']
    #maybe riskier finds but likely to be a jobsite
    search_keywords = ['hunt', 'search']
    return any(keyword in domain for keyword in domain_keywords) or any(keyword in domain for keyword in search_keywords)

def check_sus_domain(domain):
    #any domain associated to words used in hacking tools
    domain_keywords = ['watcher', 'alert', 'activity', 'soft', 'spy','leak','keylog','secret','hack']
    return any(keyword in domain for keyword in domain_keywords)

# Notes About Files

In [None]:
'''
FILES
['id' 'date' 'user' 'pc' 'filename' 'activity' 'to_removable_media'
 'from_removable_media' 'content']
 
HTTP
['id' 'date' 'user' 'pc' 'url' 'content']
 
LOGON 
['id' 'date' 'user' 'pc' 'activity']

EMAIL
['id' 'date' 'user' 'pc' 'to' 'cc' 'bcc' 'from' 'activity' 'size'
 'attachments' 'content']

DEVICE
['id' 'date' 'user' 'pc' 'file_tree' 'activity']

PSYCHO
['employee_name' 'user_id' 'O' 'C' 'E' 'A' 'N']

LDAP
[employee_name,user_id,email,role,projects,business_unit,functional_unit,department,team,supervisor]

 '''

# FILES PREPROCESSING

In [None]:
# Define explicit data types for each column since CSV unstable import format 
file_dtypes = {
    'id': str,
    'date': str,
    'user': str,
    'pc': str,
    'filename': str,
    'activity': str,
    'to_removable_media': bool,
    'from_removable_media': bool,
    'content': str
}
file_df = dd.read_csv(file_file_path, dtype=file_dtypes)
# Drop Unnecessary columns 1
file_df = file_df.drop(columns=['content', 'from_removable_media', 'pc'])
# Filter as discussed 
file_df = file_df[(file_df['activity'] == 'File Write') & 
                 (file_df['to_removable_media'] == True)]
# Drop Unnecessary columns 2 
file_df = file_df.drop(columns=['activity', 'to_removable_media']) #add 'filename' if needed
# Setting all date objects as datatime objects 
file_df['date']= dd.to_datetime(file_df['date'])
# Apply the outside_work_hours function to the 'date' column using map_partitions to create new boolean column
file_df['file_copy_outside_work_hours'] = file_df['date'].map_partitions(lambda partition: partition.apply(outside_work_hours), meta=('bool'))
# Repeated the above for weekend  to create new boolean column
file_df['file_copy_during_weekend'] = file_df['date'].map_partitions(lambda partition: partition.apply(weekend), meta=('bool'))
#Dropping Date and id Columns
file_df = file_df.drop(columns=['date', 'id']) 

Write Parquet

In [None]:
#Export file_df to parquet
file_df.to_parquet(filedf_parquet_directory_path, engine='pyarrow')

# HTTP PREPROCESSING

In [None]:
http_dtypes = {
    'id': str,
    'date': str,
    'user': str,
    'pc': str,
    'url': str,
    'content': str
}


http_df = dd.read_csv(http_file_path, dtype=http_dtypes)
# Drop Unnecessary columns 1
http_df = http_df.drop(columns=['content', 'pc'])
# Setting all date objects as datatime objects 
http_df['date']= dd.to_datetime(http_df['date'])
# Apply the X function to the 'url' column using map_partitions to create new boolean column (job for finding ppl searching for new jobs but have risk of catching recruiters of the ocmpany)
http_df['is_job_domain'] = http_df['url'].map_partitions(lambda partition: partition.apply(check_job_domain), meta=('boolean'))
http_df['is_cloud_domain'] = http_df['url'].map_partitions(lambda partition: partition.apply(check_cloud_domain), meta=('boolean'))
http_df['is_sus_domain'] = http_df['url'].map_partitions(lambda partition: partition.apply(check_sus_domain), meta=('boolean'))
# Apply the outside_work_hours function to the 'date' column using map_partitions to create new boolean column
http_df['link_accessed_outside_work_hours'] = http_df['date'].map_partitions(lambda partition: partition.apply(outside_work_hours), meta=('boolean'))
http_df['link_accessed_during_weekend'] = http_df['date'].map_partitions(lambda partition: partition.apply(weekend), meta=('boolean'))

# Filter to only those links that we are interested in for the senario
http_df = http_df[(http_df['is_job_domain'] == True) |
                 (http_df['is_cloud_domain'] == True) |
                 (http_df['is_sus_domain'] == True)]
#Dropping Date and id Columns
http_df = http_df.drop(columns=['date', 'id','url']) 

### Mass Filtering ###
## note: tilde (~) operator is a bitwise negation operator 

# Filter out rows where 'is_job_domain' is True and 'link_accessed_outside_work_hours' is False and 'link_accessed_during_weekend' is False
job_domains_work_hours = http_df[(http_df['is_job_domain']== True) & (http_df['link_accessed_outside_work_hours']== False) & (http_df['link_accessed_during_weekend']== False)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain'])
# Filter out rows where 'is_job_domain' is True and 'link_accessed_outside_work_hours' is True and 'link_accessed_during_weekend' is False
job_domains_outside_work_hours = http_df[(http_df['is_job_domain']== True) & (http_df['link_accessed_outside_work_hours']== True) & (http_df['link_accessed_during_weekend']== False)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 
# Filter out rows where 'is_job_domain' is True and 'link_accessed_during_weekend' is True
job_domains_weekend = http_df[(http_df['is_job_domain']== True) & (http_df['link_accessed_during_weekend']== True)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 

# same as above for cloud domains 
cloud_domains_work_hours = http_df[(http_df['is_cloud_domain']== True) & (http_df['link_accessed_outside_work_hours']== False) & (http_df['link_accessed_during_weekend']== False)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 
cloud_domains_outside_work_hours = http_df[(http_df['is_cloud_domain']== True) & (http_df['link_accessed_outside_work_hours']== True) & (http_df['link_accessed_during_weekend']== False)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 
cloud_domains_weekend = http_df[(http_df['is_cloud_domain']== True) & (http_df['link_accessed_during_weekend']== True)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 



# same as above for sus domains 
sus_domains_work_hours = http_df[(http_df['is_sus_domain']== True) & (http_df['link_accessed_outside_work_hours']== False) & (http_df['link_accessed_during_weekend']== False)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 
sus_domains_outside_work_hours = http_df[(http_df['is_sus_domain']== True) & (http_df['link_accessed_outside_work_hours']== True) & (http_df['link_accessed_during_weekend']== False)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 
sus_domains_weekend = http_df[(http_df['is_sus_domain']== True) & (http_df['link_accessed_during_weekend']== True)].drop(columns=['link_accessed_during_weekend', 'link_accessed_outside_work_hours','is_job_domain','is_cloud_domain','is_sus_domain']) 


#print(job_domains_work_hours.head())


# Group by 'user' THEN merge
job_domains_work_hours = grpbycount(job_domains_work_hours,'JDwh')
job_domains_outside_work_hours = grpbycount(job_domains_outside_work_hours,'JDowh')
job_domains_weekend = grpbycount(job_domains_weekend,'JDwke')
cloud_domains_work_hours = grpbycount(cloud_domains_work_hours,'CDwh')
cloud_domains_outside_work_hours = grpbycount(cloud_domains_outside_work_hours,'CDowh')
cloud_domains_weekend = grpbycount(cloud_domains_weekend,'CDwke')
sus_domains_work_hours = grpbycount(sus_domains_work_hours,'SDwh')
sus_domains_outside_work_hours = grpbycount(sus_domains_outside_work_hours,'SDowh')
sus_domains_weekend = grpbycount(sus_domains_weekend,'SDwke')

#Fill to missing to 0 because there was no information on them copying -> they copied 0 files
merged_ddf_http_counts = mergeddf(job_domains_work_hours,job_domains_outside_work_hours,job_domains_weekend,cloud_domains_work_hours,cloud_domains_outside_work_hours,cloud_domains_weekend,sus_domains_work_hours,sus_domains_outside_work_hours,sus_domains_weekend,fill_value=0)


In [None]:
#Export http_df to parquet
merged_ddf_http_counts.to_parquet(httpdf_parquet_directory_path, engine='pyarrow')

# Combining all the different CSVs into a single feature engineered set

Read the FILE and HTTP parquet files in (advantage of parquet is datatypes are preserved so don't have to respec) 

In [6]:
# Read the multi-part Parquet dataset
ddf_file = dd.read_parquet(filedf_parquet_directory_path)
'''[user	filename	file_copy_outside_work_hours	file_copy_during_weekend]'''
# Read the multi-part Parquet dataset
ddf_http = dd.read_parquet(httpdf_parquet_directory_path)
'''[user	SDwke	SDowh	SDwh	CDwke	CDowh	CDwh	JDwke	JDowh	JDwh]'''

'[user\tSDwke\tSDowh\tSDwh\tCDwke\tCDowh\tCDwh\tJDwke\tJDowh\tJDwh]'

Read the PYSCHOMETRIC CSV

In [13]:
# Define explicit data types for each column since CSV unstable import format 
pyscho_dtypes = {
    'employee_name': str,
    'user_id': str,
    'O': str,
    'C': str,
    'E': str,
    'A': str,
    'N': str
}
pyscho_df = dd.read_csv(psychometric_file_path, dtype=pyscho_dtypes)
'''[employee_name,  user_id,  O,   C,   E,   A,   N]'''
# Drop Unnecessary columns 1
pyscho_df = pyscho_df.drop(columns=['employee_name'])
#Rename user_id column to match naming conventions of the rest of the DataSet
pyscho_df = pyscho_df.rename(columns={'user_id': 'user'})

pyscho_df.head()

Unnamed: 0,user,O,C,E,A,N
0,MMK1532,17,17,16,22,28
1,NTB0710,46,37,32,22,21
2,MTD0971,22,38,17,39,40
3,NHB1529,41,40,20,44,25
4,HBW0057,35,33,43,44,29


Adjust FILE to the correct format

In [None]:
ddf_file.drop(columns=['filename'])
#Filter
filecopy_work_hours = ddf_file[(ddf_file['file_copy_outside_work_hours']== False) & (ddf_file['file_copy_during_weekend']== False)].drop(columns=['file_copy_outside_work_hours', 'file_copy_during_weekend'])
filecopy_outside_work_hours = ddf_file[(ddf_file['file_copy_outside_work_hours']== True) & (ddf_file['file_copy_during_weekend']== False)].drop(columns=['file_copy_outside_work_hours', 'file_copy_during_weekend'])
filecopy_weekend = ddf_file[(ddf_file['file_copy_during_weekend']== True)].drop(columns=['file_copy_outside_work_hours', 'file_copy_during_weekend'])



#Groupby and count based on filter
filecopy_work_hours = grpbycount(filecopy_work_hours,'FCwh')
filecopy_outside_work_hours = grpbycount(filecopy_outside_work_hours,'FCowh')
filecopy_weekend = grpbycount(filecopy_weekend,'FCwke')

#Merge all the above (#Fill to missing to 0 because there was no information on them copying -> they copied 0 files)
merged_ddf_file_counts = mergeddf(filecopy_work_hours,filecopy_outside_work_hours,filecopy_weekend,fill_value=0)
'''[user	FCwke	FCowh	FCwh]'''

Merge FILE and HTTP

In [9]:
#Fill to missing to 0 because there was no information on them copying -> they copied 0 files
merged_master_ddf = mergeddf(ddf_http,merged_ddf_file_counts,fill_value=0) 
'''[user	FCwke	FCowh	FCwh	SDwke	SDowh	SDwh	CDwke	CDowh	CDwh	JDwke	JDowh	JDwh]'''
merged_master_ddf.compute()

Unnamed: 0,user,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh
0,AAB1302,23.0,37.0,142.0,0.0,1.0,10.0,0.0,0.0,0.0,40.0,81.0,246.0
1,AAB1762,0.0,194.0,654.0,0.0,9.0,14.0,0.0,0.0,4.0,0.0,243.0,679.0
2,AAC0904,0.0,0.0,0.0,0.0,22.0,144.0,0.0,5.0,34.0,0.0,499.0,4025.0
3,AAC1033,0.0,0.0,0.0,0.0,14.0,72.0,0.0,0.0,1.0,0.0,1.0,2.0
4,AAC1489,0.0,0.0,0.0,0.0,1.0,175.0,0.0,0.0,2.0,0.0,101.0,5058.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
1991,ZRF1980,0.0,9.0,1272.0,0.0,16.0,298.0,0.0,0.0,11.0,0.0,28.0,494.0
1992,ZUR1449,0.0,0.0,0.0,0.0,1.0,7.0,0.0,65.0,296.0,0.0,293.0,1322.0
1993,ZVW1475,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,2.0,7.0
1994,ZWS0755,0.0,0.0,0.0,0.0,58.0,266.0,0.0,0.0,0.0,0.0,148.0,810.0


Merge PSYCHOMETRY with Merged_FILE_HTTP

In [10]:
#Special overide for fills because we want to assume that if no psycho test -> we take neutral of 50 for all
fill_values = {'O': 50, 'C': 50, 'E': 50, 'A': 50, 'N': 50,'FCwke':0,'FCowh':0,'FCwh':0,'SDwke':0,'SDowh':0,'SDwh':0,'CDwke':0,'CDowh':0,'CDwh':0,'JDwke':0,'JDowh':0,'JDwh':0}
merged_master_ddf = mergeddf(merged_master_ddf,pyscho_df,fill_value=fill_values) 
'''[user	O	C	E	A	N	FCwke	FCowh	FCwh	SDwke	SDowh	SDwh	CDwke	CDowh	CDwh	JDwke	JDowh	JDwh]'''
merged_master_ddf.compute()

Unnamed: 0,user,O,C,E,A,N,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh
0,AAB1302,38,41,10,38,28,23.0,37.0,142.0,0.0,1.0,10.0,0.0,0.0,0.0,40.0,81.0,246.0
1,AAB1762,32,39,18,18,33,0.0,194.0,654.0,0.0,9.0,14.0,0.0,0.0,4.0,0.0,243.0,679.0
2,AAC0904,31,30,36,18,28,0.0,0.0,0.0,0.0,22.0,144.0,0.0,5.0,34.0,0.0,499.0,4025.0
3,AAC1033,42,40,40,22,29,0.0,0.0,0.0,0.0,14.0,72.0,0.0,0.0,1.0,0.0,1.0,2.0
4,AAC1489,24,13,42,24,38,0.0,0.0,0.0,0.0,1.0,175.0,0.0,0.0,2.0,0.0,101.0,5058.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1995,ZRF1980,43,41,26,16,30,0.0,9.0,1272.0,0.0,16.0,298.0,0.0,0.0,11.0,0.0,28.0,494.0
1996,ZUR1449,31,42,12,36,17,0.0,0.0,0.0,0.0,1.0,7.0,0.0,65.0,296.0,0.0,293.0,1322.0
1997,ZVW1475,38,35,18,24,37,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,2.0,7.0
1998,ZWS0755,45,47,42,39,28,0.0,0.0,0.0,0.0,58.0,266.0,0.0,0.0,0.0,0.0,148.0,810.0


Read ANSWERS File to get malicious users for labelling later

In [12]:
malicious_filenames = os.listdir(answers_dir)
malicious_users = []

for filename in malicious_filenames:
    if filename.startswith('r5.2-2-') and filename.endswith('.csv'):
        malicious_users.append(filename.replace('r5.2-2-', "").replace('.csv', ""))

malicious_users

['BYO1846',
 'CHP1711',
 'CIF1430',
 'CKP0630',
 'DCC1119',
 'GWG0497',
 'HIS1394',
 'HMS1658',
 'HSN0675',
 'HXP0976',
 'ICB1354',
 'ITA0159',
 'JAL0811',
 'KSS1005',
 'LVF1626',
 'MCP0611',
 'MDS0680',
 'MGB1235',
 'NAH1366',
 'OKM1092',
 'OSS1463',
 'RRS0056',
 'SIS0042',
 'SNK1280',
 'TMT0851',
 'TNB1616',
 'TRC1838',
 'VCF1602',
 'WDT1634',
 'ZIE0741']

Add malicious labels to users

In [31]:
# Create a boolean column based on whether the 'users' column is in the malicious_users list
merged_master_ddf['malicious'] = merged_master_ddf['user'].isin(malicious_users)

In [29]:
merged_master_ddf[merged_master_ddf['malicious']==True].compute()

Unnamed: 0,user,O,C,E,A,N,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh,malicious
264,BYO1846,35,28,35,47,31,0,0,237,0,0,65,0,0,2,0,0,835,True
341,CHP1711,43,25,37,14,28,0,24,183,0,15,69,0,0,0,0,34,234,True
345,CIF1430,14,37,41,24,35,0,0,201,0,0,0,0,0,0,0,1,168,True
364,CKP0630,36,26,43,50,36,0,7,174,0,0,2,0,0,0,0,7,164,True
449,DCC1119,47,39,26,36,22,0,880,90,0,128,807,0,32,202,0,12,47,True
718,GWG0497,12,35,22,24,28,0,14,187,0,99,612,0,0,0,0,174,1216,True
766,HIS1394,36,43,39,17,27,0,27,205,0,7,43,0,42,265,0,238,1301,True
795,HMS1658,36,26,37,13,30,0,14,258,0,1,5,0,0,3,0,28,285,True
822,HSN0675,43,15,11,19,38,0,25,323,0,35,234,0,62,433,0,252,1324,True
836,HXP0976,50,33,14,46,34,0,7,251,0,15,168,0,0,0,0,11,157,True


## Casting the datatypes for export

Checking max values

In [32]:
max_values = merged_master_ddf.max(axis=0)

# Display the maximum value of each column
print(max_values.compute())
#The boolean will return max = True which is correct becacuse 1>0 (by right should look at max counts for that but we are looking to compress numerics in this step so is fine)

user         ZXR1452
O                 50
C                 50
E                 50
A                 50
N                 48
FCwke           1060
FCowh           1204
FCwh            4007
SDwke           1702
SDowh           1122
SDwh            4467
CDwke            492
CDowh            488
CDwh            2054
JDwke           3037
JDowh           1779
JDwh           13126
malicious       True
dtype: object


Check current dtypes

In [33]:
merged_master_ddf

Unnamed: 0_level_0,user,O,C,E,A,N,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh,malicious
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,string,int8,int8,int8,int8,int8,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,bool
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


To note :

8bit max : 256
 
16bit max : 65,536

Can improve the user by changing from string to int16: 1....n and store seperately but for ease we can just leave it as is.

In [35]:
#OCEAN is bounded by 0 100 --> 8bit is good enough 
#Rest are > 256 so we can use 16bit

# Define the data types for columns
data_types = {
    'user': 'string',  # String
    'O': 'int8',       # Int8
    'C': 'int8',       # Int8
    'E': 'int8',       # Int8
    'A': 'int8',       # Int8
    'N': 'int8',       # Int8
    'FCwke': 'int16',  # Int16
    'FCowh': 'int16',  # Int16
    'FCwh': 'int16',   # Int16
    'SDwke': 'int16',  # Int16
    'SDowh': 'int16',  # Int16
    'SDwh': 'int16',   # Int16
    'CDwke': 'int16',  # Int16
    'CDowh': 'int16',  # Int16
    'CDwh': 'int16',   # Int16
    'JDwke': 'int16',  # Int16
    'JDowh': 'int16',  # Int16
    'JDwh': 'int16',    # Int16
    'malicious': 'bool'  # Boolean
}

merged_master_ddf = merged_master_ddf.astype(data_types)
merged_master_ddf


Unnamed: 0_level_0,user,O,C,E,A,N,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh,malicious
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1
,string,int8,int8,int8,int8,int8,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,bool
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [36]:
merged_master_ddf.compute()

Unnamed: 0,user,O,C,E,A,N,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh,malicious
0,AAB1302,38,41,10,38,28,23,37,142,0,1,10,0,0,0,40,81,246,False
1,AAB1762,32,39,18,18,33,0,194,654,0,9,14,0,0,4,0,243,679,False
2,AAC0904,31,30,36,18,28,0,0,0,0,22,144,0,5,34,0,499,4025,False
3,AAC1033,42,40,40,22,29,0,0,0,0,14,72,0,0,1,0,1,2,False
4,AAC1489,24,13,42,24,38,0,0,0,0,1,175,0,0,2,0,101,5058,False
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1995,ZRF1980,43,41,26,16,30,0,9,1272,0,16,298,0,0,11,0,28,494,False
1996,ZUR1449,31,42,12,36,17,0,0,0,0,1,7,0,65,296,0,293,1322,False
1997,ZVW1475,38,35,18,24,37,0,0,0,0,0,1,0,0,0,0,2,7,False
1998,ZWS0755,45,47,42,39,28,0,0,0,0,58,266,0,0,0,0,148,810,False


Export Final Dataset (Feature Engineered Data For Modelling)

In [16]:
merged_master_ddf.to_parquet(final_dataset_path, engine='pyarrow')

In [17]:
# Read the multi-part Parquet dataset
fds = dd.read_parquet(final_dataset_path)
fds.head(4000)

Unnamed: 0,user,O,C,E,A,N,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh
0,AAB1302,38,41,10,38,28,23,37,142,0,1,10,0,0,0,40,81,246
1,AAB1762,32,39,18,18,33,0,194,654,0,9,14,0,0,4,0,243,679
2,AAC0904,31,30,36,18,28,0,0,0,0,22,144,0,5,34,0,499,4025
3,AAC1033,42,40,40,22,29,0,0,0,0,14,72,0,0,1,0,1,2
4,AAC1489,24,13,42,24,38,0,0,0,0,1,175,0,0,2,0,101,5058
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1995,ZRF1980,43,41,26,16,30,0,9,1272,0,16,298,0,0,11,0,28,494
1996,ZUR1449,31,42,12,36,17,0,0,0,0,1,7,0,65,296,0,293,1322
1997,ZVW1475,38,35,18,24,37,0,0,0,0,0,1,0,0,0,0,2,7
1998,ZWS0755,45,47,42,39,28,0,0,0,0,58,266,0,0,0,0,148,810


In [18]:
fds

Unnamed: 0_level_0,user,O,C,E,A,N,FCwke,FCowh,FCwh,SDwke,SDowh,SDwh,CDwke,CDowh,CDwh,JDwke,JDowh,JDwh
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,string,int8,int8,int8,int8,int8,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16,int16
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


# Test codes (DO NOT RUN THESE)

## Test Function for the groupby count

In [116]:
import pandas as pd

# Sample data
data = {
    'users': ['user1', 'user2', 'user1', 'user3', 'user2', 'user1', 'user1', 'user3']
}

# Create a pandas DataFrame
df = pd.DataFrame(data)

# Display the DataFrame
print(df)
# Convert pandas DataFrame to Dask DataFrame
ddf = dd.from_pandas(df, npartitions=2)

# Now you can perform the groupby and count operation on ddf
# (Use the code provided in the previous response)
# Group by the 'users' column and count occurrences of each user
user_counts = ddf.groupby('users').size().reset_index()
user_counts = user_counts.rename(columns={0:'count'})


# Compute the result
result = user_counts.compute()
print(result)


   users
0  user1
1  user2
2  user1
3  user3
4  user2
5  user1
6  user1
7  user3
   users  count
0  user1      4
1  user2      2
2  user3      2


In [None]:
# Group By and Count Function as tested below
def grpbycount(ddf,name):
    ddf = ddf.groupby('users').size().reset_index()
    ddf = ddf.rename(columns={0:name})
    return ddf

In [94]:
x = grpbycount(ddf,'hello')
x.compute()

Unnamed: 0,users,hello
0,user1,4
1,user2,2
2,user3,2


## Test Function for the merge

In [88]:
# Sample DataFrame 1: User activity 1 count
data1 = {
    'user': ['user1', 'user3'],
    'activity_1_count': [3, 2]
}
df1 = dd.from_pandas(pd.DataFrame(data1), npartitions=2)

# Sample DataFrame 2: User activity 2 count
data2 = {
    'user': ['user1', 'user2'],
    'activity_2_count': [5, 4]
}
df2 = dd.from_pandas(pd.DataFrame(data2), npartitions=2)

# Merge the two DataFrames on the 'user' column
#Without outer only the intersection remains 
merged_df = dd.merge(df1, df2, on='user', how='outer').fillna(0)

# Compute the result
result = merged_df.compute()

print(result)

    user  activity_1_count  activity_2_count
0  user3               2.0               0.0
0  user1               3.0               5.0
1  user2               0.0               4.0


In [89]:
data3 = {
    'user': ['user1', 'user4'],
    'activity_3_count': [7, 2]
}
df3 = dd.from_pandas(pd.DataFrame(data3), npartitions=2)

merged_df = dd.merge(df3, merged_df, on='user', how='outer').fillna(0)


# Compute the result
result = merged_df.compute()
print(result)

    user  activity_3_count  activity_1_count  activity_2_count
0  user3               0.0               2.0               0.0
0  user1               7.0               3.0               5.0
1  user2               0.0               0.0               4.0
2  user4               2.0               0.0               0.0


In [105]:
# Merge and set Na to 0 as tested below
def mergeddf(*ddfs):
    # Perform the first merge (with itself)
    merged_ddf = ddfs[0]
    for ddf in ddfs[1:]:
        # Merge the two DataFrames on the 'user' column
        #Without outer only the intersection remains 
        merged_ddf = dd.merge(ddf,merged_ddf, on='user', how='outer').fillna(0)
    return merged_ddf

mergeddfx = mergeddf(df1,df2,df3)

mergeddfx.compute()

Unnamed: 0,user,activity_3_count,activity_2_count,activity_1_count
0,user3,0.0,0.0,2.0
0,user1,7.0,5.0,3.0
1,user2,0.0,4.0,0.0
2,user4,2.0,0.0,0.0
