In [1]:
# importing necessary libraries
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import os
import sys
import unittest
import datetime

# explicitly stating pyspark python env var
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# initializing spark session
spark = SparkSession.builder.appName('adlumin_data_challenge').getOrCreate()

# Exercise 1

### Part (a): Rewriting Transformations using Pyspark

In [2]:
# loading only necessary columns into pyspark df from the partitioned parquet folder
def read_data(df):
    # if location str
    if isinstance(df, str):
        df = spark.read.parquet(df)
        
    # would add more input validation (cols to select are present, data types)
    # would also add other file input options
        
    return df\
    .select("process_owner", "process_name", "parent_process_name", "computername", "parent_executablepath", "executable_path", "year", "month", "day", "hour", "elastic_id")\
    .dropna().withColumn("date", expr("make_date(year, month, day)")) # dropped rows with null values and created data col from year, month, day cols

In [3]:
# replicating given sql selection with pyspark operations
def sql_tf(pysp_df):
    return pysp_df\
    .filter((pysp_df["date"] >= '2021-10-14') & (pysp_df["date"] < '2021-10-15'))\
    .groupBy("process_owner", "process_name", "parent_process_name", "computername", "parent_executablepath", "executable_path", "hour")\
    .agg(count("*").alias("count"), collect_list("elastic_id").alias("elastic_id"))\
    .select("process_owner", "process_name", "parent_process_name", "computername", "parent_executablepath", "executable_path", "hour", "count", "elastic_id")\
    .distinct()

In [4]:
# replicating given python/pandas function with pyspark operations
def preprocess(grouped, cols=['process_owner', 'process_name', 'parent_process_name', 'computername', 'parent_executablepath', 'executable_path', 'hour'], elastic_ids=False, generate=False):
    grouped = grouped.withColumn('process_owner', regexp_replace('process_owner', '\d+', '*'))
    grouped = grouped.withColumn('computername', regexp_replace('computername', '\d+', '*'))

    navBckLvlUDF = udf(lambda x: '\\'.join(x.split('\\')[:-1]), StringType()) 
    grouped = grouped.withColumn("executable_path", navBckLvlUDF(col("executable_path")))
    if 'parent_executablepath' in cols:
        grouped = grouped.withColumn("parent_executablepath", navBckLvlUDF(col("executable_path")))

        grouped = grouped.withColumn('parent_executablepath', regexp_replace('parent_executablepath', '[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}', 'UUID'))
        grouped = grouped.withColumn('parent_executablepath', regexp_replace('parent_executablepath', '0[xX][0-9a-fA-F]+', 'HEX'))
  
    grouped = grouped.withColumn('executable_path', regexp_replace('executable_path', '[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}', 'UUID'))
    grouped = grouped.withColumn('executable_path', regexp_replace('executable_path', '0[xX][0-9a-fA-F]+', 'HEX'))

    grouped = grouped.withColumn('process_name', regexp_replace('process_name', '[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}', 'UUID'))
    grouped = grouped.withColumn('process_name', regexp_replace('process_name', '0[xX][0-9a-fA-F]+', 'HEX'))

    grouped = grouped.withColumn('parent_process_name', regexp_replace('parent_process_name', '[0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12}', 'UUID'))
    grouped = grouped.withColumn('parent_process_name', regexp_replace('parent_process_name', '0[xX][0-9a-fA-F]+', 'HEX'))

    if 'parent_executablepath' in cols:
        grouped = grouped.withColumn('parent_executablepath', regexp_replace('parent_executablepath', '\d+', '*'))
                                     
    grouped = grouped.withColumn('executable_path', regexp_replace('executable_path', '\d+', '*'))
    grouped = grouped.withColumn('process_name', regexp_replace('process_name', '\d+', '*'))
    grouped = grouped.withColumn('parent_process_name', regexp_replace('parent_process_name', '\d+', '*'))


    if not generate:
        if elastic_ids:
            grouped = grouped.groupBy("process_owner", "process_name", "parent_process_name", "computername", "parent_executablepath", "executable_path", "hour")\
            .agg(sum("count").alias("count"), flatten(collect_list('elastic_id')).alias("elastic_id"), min("count").alias("min_time"), max("count").alias("max_time"))
        else:
            grouped = grouped.groupBy("process_owner", "process_name", "parent_process_name", "computername", "parent_executablepath", "executable_path", "hour")\
            .agg(sum("count").alias("count"), min("count").alias("min_time"), max("count").alias("max_time"))

    return grouped

In [5]:
# both transformation combined for ease of use
def full_tf(input_df, fcols=['process_owner', 'process_name', 'parent_process_name', 'computername', 'parent_executablepath', 'executable_path', 'hour'], felastic_ids=False, fgenerate=False):
    return preprocess(sql_tf(input_df), cols=fcols, elastic_ids=felastic_ids, generate=fgenerate)

In [6]:
# sample read_data output
ped_df = read_data('data_parquet/tenant_id=tenant_c')


-RECORD 0----------------------------------------------------------------------------------------
 process_owner         | name_511                                                                
 process_name          | executable_287                                                          
 parent_process_name   | executable_343                                                          
 computername          | name_74                                                                 
 parent_executablepath | c:\dir_a_j\dir_b_j32\executable_343                                     
 executable_path       | c:\dir_a_j\dir_b_j32\executable_287                                     
 year                  | 2021                                                                    
 month                 | 10                                                                      
 day                   | 13                                                                      
 hour               

In [9]:
# sample sql_tf output
ped_df = sql_tf(ped_df)
ped_df.show(truncate=False, vertical=True)

-RECORD 0------------------------------------------------------------------
 process_owner         | name_440                                          
 process_name          | executable_162                                    
 parent_process_name   | asdf_0x197ab829c_executable_7                     
 computername          | name_84                                           
 parent_executablepath | asdf_0x197ab829c_executable_7                     
 executable_path       | c:\dir_a_j\dir_b_j32\executable_162               
 hour                  | 10                                                
 count                 | 1                                                 
 elastic_id            | [s1pzfnwB0-fRXDnoxutL]                            
-RECORD 1------------------------------------------------------------------
 process_owner         | name_440                                          
 process_name          | executable_162                                    
 parent_proc

In [10]:
# sample preprocess output
ped_df = preprocess(ped_df)
ped_df.show(truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------------------------
 process_owner         | name_*                                                      
 process_name          | executable_*                                                
 parent_process_name   | executable_*                                                
 computername          | name_*                                                      
 parent_executablepath | c:\dir_a_j                                                  
 executable_path       | c:\dir_a_j\dir_b_j*                                         
 hour                  | 3                                                           
 count                 | 27                                                          
 min_time              | 1                                                           
 max_time              | 1                                                           
-RECORD 1---------------------------------------------

In [11]:
# sample final output
#full_tf(read_data('data_parquet/tenant_id=tenant_c')).show(truncate=False, vertical=True)

### Part (b): Unit Testing for Transformation Accuracy

In [12]:
# test case class
class PySparkTfTestCase(unittest.TestCase):
    #
    @classmethod
    def setUpClass(cls):
        cls.spark = (SparkSession
                     .builder
                     .master("local[*]")
                     .appName("Unit-tests")
                     .getOrCreate())
        
    @classmethod
    def tearDownClass(cls):
        cls.spark.stop()
        
    # test sql_tf
    def test_sql_tf(self):
        # preparing input df that matches function input
        input_schema = StructType([
            StructField('process_owner', StringType(), True),
            StructField('process_name', StringType(), True),
            StructField('parent_process_name', StringType(), True),
            StructField('computername', StringType(), True),
            StructField('parent_executablepath', StringType(), True),
            StructField('executable_path', StringType(), True),
            StructField('year', IntegerType(), True),
            StructField('month', IntegerType(), True),
            StructField('day', IntegerType(), True),
            StructField('hour', IntegerType(), True),
            StructField('elastic_id', StringType(), True),
            StructField('date', DateType(), True)])
        input_data = [("name_622", "executable_174", "executable_163", "name_47", "c:\dir_a_j\dir_b_j32\executable_163", "c:\dir_a_j\dir_b_j32\executable_174", 2021, 10, 13, 21, 'lZ-Ze3wBV1WtlBxDx3Rt', datetime.datetime.strptime('2021-10-13', "%Y-%m-%d").date()),
                      ("name_526", "executable_160", "executable_160", "name_47", "c:\dir_a_b\dir_b_ce\dir_c_i\dir_d_d\dir_e_j\dir_f_j\dir_g_j\executable_160", "c:\dir_a_b\dir_b_ce\dir_c_i\dir_d_d\dir_e_j\dir_f_j\dir_g_j\executable_160", 2021, 10, 14, 22, 'LzL4gHwBqhyjhem8EVRC', datetime.datetime.strptime('2021-10-14', "%Y-%m-%d").date())                                              ,
                      ("name_444", "executable_175", "executable_166", "name_650", "c:\dir_a_a86)\dir_b_b\dir_c_a\dir_d_a\executable_166", "c:\dir_a_a86)\dir_b_b\dir_c_a\dir_d_a\94dir_e_d0.992.47\executable_175", 2021, 10, 14, 20, 'Fe2JgHwBV1WtlBxDNEN5', datetime.datetime.strptime('2021-10-14', "%Y-%m-%d").date()),
                      ("name_457", "executable_163", "executable_398", "name_10", "c:\dir_a_j\dir_b_j32\executable_398", "c:\dir_a_j\dir_b_j32\executable_163", 2021, 10, 13, 17, 'hWO-enwBNBIFWOYHJzSP', datetime.datetime.strptime('2021-10-13', "%Y-%m-%d").date()),
                      ("name_533", "executable_201", "executable_201", "name_44", "c:\dir_a_a86)\dir_b_bg\dir_c_ac\dir_d_g\dir_e_b1\executable_201", "c:\dir_a_a86)\dir_b_bg\dir_c_ac\dir_d_g\dir_e_b1\executable_201", 2021, 10, 13, 19, '1l07e3wBEhvPAxw3KTWi', datetime.datetime.strptime('2021-10-13', "%Y-%m-%d").date()),
                      ("name_482", "executable_166", "executable_166", "name_30", "c:\dir_a_a86)\dir_b_b\dir_c_a\dir_d_a\executable_166", "c:\dir_a_a86)\dir_b_b\dir_c_a\dir_d_a\executable_166", 2021, 10, 13, 13, 'Z0HfeXwBEhvPAxw3WBmg', datetime.datetime.strptime('2021-10-13', "%Y-%m-%d").date())]
        input_df = self.spark.createDataFrame(data=input_data, schema=input_schema)

        # preparing expected df that matches function output
        expected_schema = StructType([
            StructField('process_owner', StringType(), True),
            StructField('process_name', StringType(), True),
            StructField('parent_process_name', StringType(), True),
            StructField('computername', StringType(), True),
            StructField('parent_executablepath', StringType(), True),
            StructField('executable_path', StringType(), True),
            StructField('hour', IntegerType(), True),
            StructField('count', LongType(), False),
            StructField('elastic_id', ArrayType(StringType(), False), False)])

        expected_data = [("name_526", "executable_160", "executable_160", "name_47", "c:\dir_a_b\dir_b_ce\dir_c_i\dir_d_d\dir_e_j\dir_f_j\dir_g_j\executable_160", "c:\dir_a_b\dir_b_ce\dir_c_i\dir_d_d\dir_e_j\dir_f_j\dir_g_j\executable_160", 22, 1, ['LzL4gHwBqhyjhem8EVRC']),
                        ("name_444", "executable_175", "executable_166", "name_650", "c:\dir_a_a86)\dir_b_b\dir_c_a\dir_d_a\executable_166", "c:\dir_a_a86)\dir_b_b\dir_c_a\dir_d_a\94dir_e_d0.992.47\executable_175", 20, 1, ['Fe2JgHwBV1WtlBxDNEN5'])]
        expected_df = self.spark.createDataFrame(data=expected_data, schema=expected_schema)
        
        sql_tf_df = sql_tf(input_df)

        # compare schema of sql_tf_df and expected_df
        field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
        fields1 = [*map(field_list, sql_tf_df.schema.fields)]
        fields2 = [*map(field_list, expected_df.schema.fields)]
        res = set(fields1) == set(fields2)
        self.assertTrue(res)
        
        # compare data in transformed_df and expected_df
        self.assertEqual(sorted(expected_df.collect()), sorted(sql_tf_df.collect()))

# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!please read!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

#     def read_data(self):
#         # similarly to above, would check if input df (assumed to have been checked for necessary cols, minus date)
#         # matches the expected df (has no rows with nas and a date col)
        
#         # input cols: [parent_process_md5, parent_process_sha1, parent_process_sha256, parent_process_creation_date, process_id, agentid, elastic_id, usernames_id, agent_id, event_type, year, month, day, hour, parent_executablepath, executable_path, computername, parent_process_owner, process_owner, process_domain, parent_process_name, process_name, tenant_id, command_line, parent_command_line]
#         # output cols: [process_owner, process_name, parent_process_name, computername, parent_executablepath, executable_path, year, month, day, hour, elastic_id, date]
#         pass
        
#     def test_preprocess(self):
#         # similarly to above, would check if input df (output of sql_tf, having the groupby cols + count + elastic_id list)
#         # matches the expected df (4 diff possibilities of output depending on if elastic_ids and generate is True/False,
#         # would create 4 expected cases and check if tf_df matches any)
        
#         # input cols: [process_owner, process_name, parent_process_name, computername, parent_executablepath, executable_path, hour, count, elastic_id]
#         # output cols: [process_owner, process_name, parent_process_name, computername, parent_executablepath, executable_path, hour, count, min_time, max_time, ~elastic_id]
#         pass

# !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

unittest.main(argv=[''], verbosity=2, exit=False)

  self._sock = None
  self._sock = None
ok

----------------------------------------------------------------------
Ran 1 test in 12.669s

OK


<unittest.main.TestProgram at 0x25b60150310>

I believe there are three main components to this transformation process: changes to data when loading it in, changes with sql transformation (pyspark replication), and changes with python transformation (pyspark replication). For this reason I included three main tests (six really if schema tests are counted separately) to make sure the input and output schema/data within and between each function/transformation match properly.