In [2]:
import os
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pickle
from functools import reduce
from pyspark.sql import DataFrame
import numpy as np
import json

In [3]:
spark = SparkSession.builder.master("local[20]").config("spark.executor.memory", "512g") \
.config("spark.driver.memory", "512g") \
.config("spark.memory.offHeap.size", "512g") \
.config("spark.memory.offHeap.enabled", True) \
.appName("tc_dataprocessing").getOrCreate()

23/03/24 12:30:04 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [4]:
full_pickles_dir = '/mnt/raid0_ssd_8tb/dataset/trace/new_full_graphs/full_pickles'
kg_dir = '/mnt/raid0_ssd_8tb/dataset/trace/new_full_graphs/benign'
out_dir = '/home/ykim/workspace/cyber_spring23/preprocess/raw_text/'
graph_dir = '/home/ykim/workspace/cyber_spring23/preprocess/graph/'
node_path = '/home/ykim/workspace/cyber_spring23/data/all_entities.txt'

root_data = "/mnt/raid0_ssd_8tb/dataset/trace/data/phishing/trace_phishing"

In [4]:
print("Loading files pickle...")
with open(os.path.join(full_pickles_dir, 'files.pkl'), 'rb') as f:
    files = pickle.load(f)
print("Loading processes pickle...")
with open(os.path.join(full_pickles_dir, 'processes.pkl'), 'rb') as f:
    processes = pickle.load(f)
print("Loading sockets pickle...")
with open(os.path.join(full_pickles_dir, 'sockets.pkl'), 'rb') as f:
    sockets = pickle.load(f)

Loading files pickle...
Loading processes pickle...
Loading sockets pickle...


In [8]:
with open(os.path.join(root_data, 'entities.pkl'), 'rb') as f:
    entity = pickle.load(f)

In [9]:
entity

{'FE2A7FA8-7CB7-009D-CCA0-43C49990008F': {'path': '/com/ubuntu/upstart',
  'type': 'file',
  'label': 0},
 '9D056036-D5F2-4B92-6C76-1E9D23114D22': {'path': '/var/run/dbus/system_bus_socket',
  'type': 'file',
  'label': 0},
 '992E9916-AE71-8CF6-5F99-8807961A3B28': {'remoteAddress': '128.55.12.170',
  'remotePort': 5353,
  'type': 'socket',
  'label': 0},
 '0D0B7E42-66ED-FC67-D5D3-2FDAE7467BDD': {'remoteAddress': '128.55.12.171',
  'remotePort': 5353,
  'type': 'socket',
  'label': 0},
 '30E80A46-BFD1-6E72-AC6B-EA6D752707FE': {'remoteAddress': '128.55.12.170',
  'remotePort': 5353,
  'type': 'socket',
  'label': 0},
 'A7D37930-7B6A-01CB-805E-6D9F7D72CF30': {'remoteAddress': '128.55.12.171',
  'remotePort': 5353,
  'type': 'socket',
  'label': 0},
 'FC81CA04-15FB-232E-FF5A-08ADDFCEABCC': {'remoteAddress': '128.55.12.172',
  'remotePort': 5353,
  'type': 'socket',
  'label': 0},
 'AEA6F6BA-CDFB-EEC8-165B-3C273D5E69C2': {'remoteAddress': '128.55.12.170',
  'remotePort': 5353,
  'type': 'so

In [44]:
df = spark.read.options(delimiter='\t').csv(node_path, mode="PERMISSIVE", inferSchema=True).distinct()

In [49]:
df1 = df.filter(F.col('_c0').isin({'process'}))
test_df = df1.select(F.col("_c1").alias("key"), F.col("_c0").alias("type"), F.col("_c3").alias("cmd"))
cmd_dict = {row['key']: (row['cmd'], row['type']) for row in test_df.collect()}
cmd_dict

{-7860785410850191020: ('firefox', 'process'),
 -5796564299011278881: ('firefox', 'process'),
 2256094522604729729: ('firefox', 'process'),
 6221283964386447004: ('firefox', 'process'),
 -7993234438770321689: ('firefox', 'process'),
 -8868165518105199706: ('firefox', 'process'),
 -8742907472247777390: ('firefox', 'process'),
 -8849023626888934555: ('firefox', 'process'),
 -2154421563623497176: ('firefox', 'process'),
 4516474927082471376: ('firefox', 'process'),
 -7069258009479899437: ('firefox', 'process'),
 -1250652446815909700: ('firefox', 'process'),
 -5061235318522724978: ('firefox', 'process'),
 7125501918538734615: ('firefox', 'process'),
 3405486570157663374: ('firefox', 'process'),
 5680677225121902121: ('firefox', 'process'),
 3218021364635296023: ('firefox', 'process'),
 -6125056958167334313: ('firefox', 'process'),
 4710910454189288633: ('firefox', 'process'),
 -5260077331260755931: ('firefox', 'process'),
 -4225869841109117944: ('firefox', 'process'),
 -2357195972826751398

In [50]:
df1 = df.filter(F.col('_c0').isin({'file'}))
test_df = df1.select(F.col("_c1").alias("key"), F.col("_c0").alias("type"), F.col("_c2").alias("file"))
file_dict = {row['key']: (row['file'], row['type']) for row in test_df.collect()}
file_dict

{837889173181973256: ('/dev/shm/org.chromium.Y0uedP', 'file'),
 7844353071669721358: ('/dev/shm/org.chromium.7i7ROk', 'file'),
 -2008692790499641820: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/doomed/2103229471',
  'file'),
 -6906152592439566824: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/safebrowsing-updating/testexcept-flashallow-simple.sbstore',
  'file'),
 -9008946197495610167: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/entries/4ABFC86E818B0C3C7E7CB35BFDB9C0B81B4902C4',
  'file'),
 -7959402225900344675: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/doomed/164170469',
  'file'),
 1057921697713958626: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/entries/4ABFC86E818B0C3C7E7CB35BFDB9C0B81B4902C4',
  'file'),
 -7798092677035437413: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/safebrowsing-updating',
  'file'),
 -2789436160720205328: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/safebrowsing-up

In [51]:
df1 = df.filter(F.col('_c0').isin({'socket'}))
test_df = df1.select(F.col("_c1").alias("key"), F.col("_c0").alias("type"), F.col("_c2").alias("ip"))
ip_dict = {row['key']: (row['ip'], row['type']) for row in test_df.collect()}
ip_dict

{6624547607655422634: ('162.93.202.80:80', 'socket'),
 -1922223636371594440: ('128.55.12.10:53', 'socket'),
 1144423403721197188: ('128.55.12.10:53', 'socket'),
 3373205248532375613: ('128.55.12.171:5353', 'socket'),
 -4796313797603920807: ('128.55.12.171:5353', 'socket'),
 -5805684431444990418: ('128.55.12.10:53', 'socket'),
 4601667438186288242: ('216.251.114.10:80', 'socket'),
 4885659859401539516: ('128.55.12.170:5353', 'socket'),
 -7270476452769910362: ('128.55.12.172:5353', 'socket'),
 -3633478456183905688: ('128.55.12.170:5353', 'socket'),
 -5165842772261656687: ('128.55.12.170:5353', 'socket'),
 2959596435331558559: ('128.55.12.172:5353', 'socket'),
 -8401117607528421448: ('128.55.12.10:53', 'socket'),
 -8604459350864847460: ('128.55.12.171:5353', 'socket'),
 -6826784511248396313: ('128.55.12.170:5353', 'socket'),
 5229045615547013967: ('128.55.12.170:5353', 'socket'),
 -1279360949648447183: ('209.132.177.50:80', 'socket'),
 -1667187478053452472: ('128.55.12.172:5353', 'socket'

In [52]:
def Merge(dict1, dict2):
    return(dict2.update(dict1))

In [53]:
Merge(cmd_dict, file_dict)

In [56]:
Merge(ip_dict, file_dict)

In [59]:
file_dict

{837889173181973256: ('/dev/shm/org.chromium.Y0uedP', 'file'),
 7844353071669721358: ('/dev/shm/org.chromium.7i7ROk', 'file'),
 -2008692790499641820: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/doomed/2103229471',
  'file'),
 -6906152592439566824: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/safebrowsing-updating/testexcept-flashallow-simple.sbstore',
  'file'),
 -9008946197495610167: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/entries/4ABFC86E818B0C3C7E7CB35BFDB9C0B81B4902C4',
  'file'),
 -7959402225900344675: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/doomed/164170469',
  'file'),
 1057921697713958626: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/entries/4ABFC86E818B0C3C7E7CB35BFDB9C0B81B4902C4',
  'file'),
 -7798092677035437413: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/safebrowsing-updating',
  'file'),
 -2789436160720205328: ('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/safebrowsing-up

In [60]:
import pickle

with open("/home/ykim/workspace/cyber_spring23/data/entity_dict.pkl", "wb") as file:
    pickle.dump(file_dict, file)

In [63]:
with open('/home/ykim/workspace/cyber_spring23/data/entity_dict.pkl', 'rb') as handle:
    unserialized_data = pickle.load(handle)

In [64]:
unserialized_data[-2008692790499641820]

('/home/admin/.cache/mozilla/firefox/wyd8o2f2.default/cache2/doomed/2103229471',
 'file')

In [5]:
cmd2id_dict = dict()
with open(os.path.join(graph_dir, 'cmd2id.txt'), 'r') as f:
    for line in f:
        elements = line.split("\t")
        cmd2id_dict[elements[0]] = int(elements[1])

In [10]:
  loaded_embed = np.load('/home/ykim/workspace/cyber_spring23/embeddings/transformer/pid2emb64.npy', allow_pickle=True)

In [63]:
loaded_embed.item()[1]

tensor([ 0.2321, -0.2491,  0.0024,  0.3135, -0.2358, -0.0157,  0.1031,  0.2764,
         0.1309, -0.1493, -0.0917,  0.4228,  0.2153, -0.3874,  0.1360, -0.2161,
        -0.1215, -0.0221, -0.2618,  0.0744, -0.2102, -0.1663,  0.3763, -0.1711,
        -0.2540, -0.0928,  0.3583,  0.1085, -0.0950, -0.0432, -0.2318, -0.1204,
         0.0057,  0.1603, -0.0299, -0.1708,  0.1509,  0.1450, -0.0272, -0.0420,
         0.3397, -0.0504, -0.1881,  0.0171,  0.2132,  0.0105,  0.0104, -0.0243,
        -0.1195, -0.0336,  0.0288, -0.0959, -0.2449,  0.2291,  0.1941, -0.3336,
         0.4879, -0.0907, -0.0043,  0.4021, -0.0929,  0.1060,  0.1700, -0.2232])

In [64]:
from torch.nn.functional import normalize
t2 = normalize(loaded_embed.item()[1], p=2.0, dim = 0)

In [69]:
t2 = normalize(torch.from_numpy(np.array([1.0, 1.0, 0.0, 0.0])), p=2.0, dim = 0)

In [70]:
t2

tensor([0.7071, 0.7071, 0.0000, 0.0000], dtype=torch.float64)

In [7]:
with open(graph_dir + 'cmd2idx.txt', 'w') as fp:
    for k, v in processes.items():
        if v['cmdLine'] in cmd2id_dict.keys():
            fp.write(str(k) + '\t' + str(cmd2id_dict[v['cmdLine']]) + '\n')
        else:
            print(v['cmdLine'])
        # print(k, cmd2id_dict[v['cmdLine']])

In [24]:
with open(os.path.join(kg_dir, 'kg.txt'), 'r') as f:
    uniq_entity = set()
    for line in f:
        elements = line.split("\t")
        uniq_entity.add(elements[0])
        uniq_entity.add(elements[2])

In [27]:
processes

{'D5391408-5348-7AC2-494D-E2AB6816B8E3': {'pid': 21790,
  'cmdLine': '/usr/local/firefox-54.0.1/obj-x86_64-pc-linux-gnu/dist/bin/firefox -contentproc -childID 1 -isForBrowser -intPrefs 5:50|6:-1|18:0|28:1000|33:20|34:10|43:128|44:10000|48:0|50:400|51:1|52:0|53:0|58:0|59:120|60:120|133:2|134:1|147:5000|157:0|159:0|170:10000|195:24|196:32768|198:0|199:0|207:5|211:1048576|212:100|213:5000|215:600|216:4|217:1|226:2|241:60000| -boolPrefs 1:0|2:0|4:0|26:1|27:1|30:0|35:1|36:0|37:0|38:0|39:1|40:0|41:1|42:1|45:0|46:0|47:0|49:0|54:1|55:1|56:0|57:1|61:1|62:1|63:0|64:1|65:1|66:0|67:1|70:0|71:0|74:1|75:1|79:1|80:1|81:0|82:0|84:0|85:0|86:1|87:0|90:0|91:1|92:1|93:1|94:1|95:1|96:0|97:0|98:1|99:0|100:0|101:0|102:1|103:1|104:0|105:1|106:1|107:0|108:0|109:1|110:1|111:1|112:0|113:1|114:1|115:1|116:1|117:1|118:1|119:1|120:1|122:0|123:0|124:0|125:1|126:0|127:1|131:1|132:1|135:1|136:0|141:0|146:0|149:0|151:1|152:1|154:1|158:0|160:0|162:0|164:1|165:1|171:0|172:0|173:1|175:0|186:0|193:0|194:0|197:1|200:0|202:0

In [30]:
file_list = set()
socket_list = set()
process_list = set()
for x in list(uniq_entity):
    if x in files:
        file_list.add(files[x]['path'])
    elif x in sockets:
        socket_list.add(str(sockets[x]['remoteAddress']) + ":" + str(sockets[x]['remotePort']))
    elif x in processes:
        process_list.add(processes[x]['cmdLine'])

In [48]:
files['ADCB354C-BAA9-51E3-2FD8-11B149324A3F']

{'path': '/lib/x86_64-linux-gnu/libc.so.6'}

In [31]:
# with open(out_dir + 'file_raw.txt', 'w') as fp:
#     fp.write('\n'.join(list(file_list)))
with open(out_dir + 'socket_raw.txt', 'w') as fp:
    fp.write('\n'.join(list(socket_list)))
# with open(out_dir + 'process_raw.txt', 'w') as fp:
#     fp.write('\n'.join(list(process_list)))

In [None]:
out_dir + 'file_raw.txt'

In [None]:
df = spark.read.options(delimiter='\t').csv(os.path.join(kg_dir, 'kg.txt'))\
.withColumnRenamed("_c0","src_uid")\
.withColumnRenamed("_c1","relation")\
.withColumnRenamed("_c2","dst_uid")\
.withColumnRenamed("_c3","timestamp")

In [None]:
src_df = df.select('src_uid').alias('uid').distinct()
dst_df = df.select('dst_uid').alias('uid').distinct()

In [None]:
def union_all(*dfs):
    return reduce(DataFrame.union, dfs)
  
union_df = union_all(*[src_df, dst_df])

In [None]:
union_df.count()

In [None]:
union_df = union_df.distinct()

In [None]:
union_df.count()

In [None]:
src_list = [data[0] for data in union_df.select('src_uid').distinct().collect()]

In [None]:
dst_list = [data[0] for data in df.select('dst_uid').distinct().collect()]

In [None]:
# src_df.show()
def extract_entity_info(x):
    try:
        if x in files:
            return "file"
        elif x in sockets:
            return "socket"
        elif x in processes:
            return "process"
    except Exception as e:
        raise Exception(x,e)
src_df = src_df.withColumn('type', F.udf(lambda x: extract_entity_info(x), T.StringType())(F.col('src_uid')))

In [None]:
schema = T.StructType([ \
   T.StructField("type", T.StringType(),True), \
   T.StructField("raw", T.StringType(),True), \
  ])
def extract_entity_info(x):
    try:
        if x in files:
            return {
                "type": "file",
                "raw": files[x],
            }
        elif x in sockets:
            return {
                "type": "socket",
                "raw": sockets[x],
            }
        elif x in processes:
            return {
                "type": "process",
                "raw": processes[x],
            }
    except Exception as e:
        raise Exception(x,e)
        
src_df = src_df.withColumn('src_info', F.udf(lambda x: extract_entity_info(x), schema)(F.col('src_uid')))

In [None]:
df

In [50]:
values = [1, 0, 3, 4]
n_ips = 256
n_ports = 65535
np.eye(n_ips)[values]

array([[0., 1., 0., ..., 0., 0., 0.],
       [1., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

In [53]:
def decimal_to_binary(dec, bit=8):
    if bit == 32:
        return [int(i) for i in list('{0:032b}'.format(dec))]
    elif bit == 8:
        return [int(i) for i in list('{0:012b}'.format(dec))]

In [54]:
decimal_to_binary(255)

[0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1]

23/03/06 16:30:06 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 136093 ms exceeds timeout 120000 ms
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 12288 bytes for committing reserved memory.
# An error report file with more information is saved as:
# /home/ykim/workspace/cyber_spring23/preprocess/hs_err_pid2345469.log
[thread 140294662002432 also had an error]




In [55]:
# self.myvectors = gensim.models.KeyedVectors.load_word2vec_format(cfg.vec_dir)
# self.vec_weights = torch.FloatTensor(self.myvectors.vectors)
# self.embeds = torch.nn.Embedding.from_pretrained(self.vec_weights)
# self.embeds = torch.nn.Embedding.from_pretrained(self.vec_weights, freeze=False)


import torch
import torch.nn as nn

# FloatTensor containing pretrained weights
weight = torch.FloatTensor([[1, 2.3, 3], [4, 5.1, 6.3]])
embedding = nn.Embedding.from_pretrained(weight)
# Get embeddings for index 1
input = torch.LongTensor([1])
embedding(input)

tensor([[4.0000, 5.1000, 6.3000]])

In [81]:
embedding = nn.Embedding(4, 2)

In [82]:
input = torch.LongTensor([[0,1,2,3], [1,2,3,0]])
embedding(input)

tensor([[[-1.0155, -0.2160],
         [-0.1263, -0.0656],
         [-0.6380, -0.9855],
         [ 0.8145,  0.1153]],

        [[-0.1263, -0.0656],
         [-0.6380, -0.9855],
         [ 0.8145,  0.1153],
         [-1.0155, -0.2160]]], grad_fn=<EmbeddingBackward>)

In [77]:
t2 = normalize(torch.from_numpy(np.array([0.0013, 0.4233, -1.1136, -0.6648])), p=2.0, dim = 0)

In [78]:
t2

tensor([ 0.0010,  0.3103, -0.8163, -0.4873], dtype=torch.float64)