In [1]:
import sys
sys.path.append('/mnt/code/helios/app')
sys.path.append('/mnt/code/helios/app/common')
sys.path.append('/mnt/code/helios/app/protoutils')
sys.path

['/opt/helios',
 '/opt/helios/bin',
 '/opt/helios/protoutils',
 '/opt/helios/spark/python',
 '/opt/helios/spark/python/lib/py4j-0.10.7-src.zip',
 '/usr/lib64/python36.zip',
 '/usr/lib64/python3.6',
 '/usr/lib64/python3.6/lib-dynload',
 '',
 '/usr/lib64/python3.6/site-packages',
 '/usr/lib/python3.6/site-packages',
 '/usr/lib/python3.6/site-packages/IPython/extensions',
 '/root/.ipython',
 '/mnt/code/helios/app',
 '/mnt/code/helios/app/common',
 '/mnt/code/helios/app/protoutils']

In [2]:
import datetime
import json
import time

import pyspark.sql.functions as func
from pyspark.sql.types import IntegerType, StringType, StructType, StructField

from common.log import INFO, ERROR
from spark_utils.spark_base import SparkBase
from spark_utils.time_utils import parse_time
from spark_utils.log_parser import filter_bro_logs, alter_col_name, extract_data_to_cs
from spark_utils.data_io_utils import load_kafka_batch, commit_kafka_offset, commit_log_time, write_data_cassandra
from spark_utils.dsaggr_process import aggr_mssql, aggr_mysql, dsaggr_process_sql, dsaggr_process_form,\
                                       aggr_ds_servers, aggr_ds_services
from spark_utils.ddn_config import LOGS_SCHEMA, MULTIHOME_DETECTION_WINDOW_LENGTH_DAYS
from spark_utils.constants import LOCAL_DATA_PATH, EXPECTED_QPS, NUM_ENTRIES_PER_PARTITION
from spark_utils.dataframe_utils import create_empty_dataframe
from spark_utils.multihome_detector import MultihomeDetector

import tags_pb2 as TAGS

In [3]:
class SparkRawDataProcessor(SparkBase):
    '''
    This spark jobs serves as the raw log parser or processor for all following jobs.
    It reads data from upstream (Kafka), then process the raw data, and prepares them for downstream jobs.
    '''
    def output_data(self, data):
        res_df_dict = dict()
        new_partition_number = int(EXPECTED_QPS * 3600 / NUM_ENTRIES_PER_PARTITION) + 1
        for topic, _ in LOGS_SCHEMA.items():
            INFO("processing topic %s" % topic)
            field_list = LOGS_SCHEMA[topic]
            source_schema = [StructField(field, StringType(), True)
                             for field in field_list]
            json_schema = StructType(source_schema)
            topic_df = data.filter(func.col('topic') == topic) \
                .withColumn("value", func.from_json("value", json_schema)) \
                .select('topic', 'value.*')
            topic_df = alter_col_name(topic_df)
            topic_df = parse_time(topic_df)
            topic_df = filter_bro_logs(topic_df, topic)
            topic_df.repartition(new_partition_number) \
                .write \
                .partitionBy('topic', 'year', 'month', 'day', 'hour') \
                .parquet(LOCAL_DATA_PATH, mode='append')
            res_df_dict[topic] = topic_df
            self.increment(topic + '.count', topic_df.count())
        return res_df_dict

    def output_data_stream(self, data):
        res_df_dict = dict()
        new_partition_number = int(EXPECTED_QPS * 3600 / NUM_ENTRIES_PER_PARTITION) + 1
        for topic, _ in LOGS_SCHEMA.items():
            # this is to throttle connections by Spark Structured Streaming
            time.sleep(10)

            INFO("processing topic %s" % topic)
            field_list = LOGS_SCHEMA[topic]
            source_schema = [StructField(field, StringType(), True)
                             for field in field_list]
            json_schema = StructType(source_schema)
            topic_df = data.filter(func.col('topic') == topic) \
                .withColumn("value", func.from_json("value", json_schema)) \
                .select('topic', 'value.*')
            topic_df = alter_col_name(topic_df)
            topic_df = parse_time(topic_df)
            topic_df = filter_bro_logs(topic_df, topic)
            query = topic_df.repartition(new_partition_number) \
                            .writeStream \
                            .queryName(topic) \
                            .format("parquet") \
                            .option("checkpointLocation", "/mnt/" + 'check_point/' + topic) \
                            .option("Trigger interval", "60s") \
                            .partitionBy('year', 'month', 'day', 'hour') \
                            .option('path', LOCAL_DATA_PATH + 'topic=' + topic) \
                            .start()
        return res_df_dict

    def save_to_cs(self, res_df_dict):
        for topic, topic_df in res_df_dict.items():
            if topic_df is None or topic_df.count() == 0:
                continue
            INFO("saving topic %s" % topic)
            try:
                extract_data_to_cs(topic_df, topic)
                # TODO: turn on the counter when POC starts
                # self.increment(topic + '.count', topic_df.count())
            except Exception:
                cnt = topic_df.count()
                ERROR("Error saivng data for %s with count %d" % (topic, cnt))
                if cnt > 0:
                    sample_rate = 10.0 / cnt if cnt >= 10 else 1.0
                    error_rows = topic_df.sample(False, sample_rate).collect()
                    for row in error_rows:
                        ERROR(str(row.asDict()))
                continue

    def output_storage_info(self, output_df_dict):
        mssql_join_df = aggr_mssql(output_df_dict)
        if mssql_join_df is not None:
            mssql_df = dsaggr_process_sql(mssql_join_df)
            #write_data_cassandra(mssql_df, 'data', 'storageinfo')

        mysql_join_df = aggr_mysql(output_df_dict)
        if mysql_join_df is not None:
            mysql_df = dsaggr_process_sql(mysql_join_df)
            #write_data_cassandra(mysql_df, 'data', 'storageinfo')

        form_df_network = dsaggr_process_form(output_df_dict, 'bro')
        form_df_network.show()
        if form_df_network is not None:
            self.increment('storageinfo.filesystem.invalid.storagetype.count',
                           form_df_network.filter(func.col('storage_type') == TAGS.DB_TYPE_DEFAULT).count())
            write_data_cassandra(form_df_network, 'data', 'storageinfo')

        form_df_scanner = dsaggr_process_form(output_df_dict, 'scanner')
        form_df_scanner.show()
        if form_df_scanner is not None:
            self.increment('scannerform.filesystem.invalid.storagetype.count',
                           form_df_scanner.filter(func.col('storage_type') == TAGS.DB_TYPE_DEFAULT).count())
            write_data_cassandra(form_df_scanner, 'data', 'scannerform')

    def output_dsaggr_info(self):
        df_ds_servers = aggr_ds_servers()
        write_data_cassandra(df_ds_servers, 'api', 'ds_servers')
        df_ds_services = aggr_ds_services()
        df_ds_services.show()
        write_data_cassandra(df_ds_services, 'api', 'ds_services')

    def output_multihome_info(self,
                              detection_log_end_time,
                              detection_window_length_days=MULTIHOME_DETECTION_WINDOW_LENGTH_DAYS):
        '''
        output multihome related info to cassandra.
        The multihome instances will be detected in time period
        [`detection_log_end_time` - `detection_window_length_days`, `detection_log_end_time).
        '''

        detection_log_start_time = detection_log_end_time - datetime.timedelta(days=detection_window_length_days)

        def get_multihome_info_detected(log_start_time, log_end_time):
            '''
            Return a Spark dataframe [ip: string, alter_ip: string] representing the
            multihome instances detected in time range [`log_start_time`, `log_start_time`).
            '''
            if MULTIHOME_DETECTION_WINDOW_LENGTH_DAYS is None:
                schema = StructType([
                    StructField('app_name', StringType(), False),
                    StructField('frontend_ip', StringType(), False),
                    StructField('frontend_port', IntegerType(), False),
                    StructField('backend_ip', StringType(), False),
                    StructField('datastore_ip', StringType(), False),
                    StructField('datastore_port', IntegerType(), False),
                    StructField('storage_type', IntegerType(), False)])
                return create_empty_dataframe(schema)

            multihome_df = MultihomeDetector.run_from_raw_tables(log_start_time, log_end_time)
            return multihome_df
        # Currently only support detected multihome info.
        # TODO add a mechanism to include hard-coded known multihome instances.
        multihome_info_detected = get_multihome_info_detected(detection_log_start_time,
                                                              detection_log_end_time)
        today = datetime.date.today()
        today = datetime.datetime(today.year, today.month, today.day)

        appended_df = (multihome_info_detected
                       .withColumn('run_date', func.lit(today))
                       .withColumn('log_start_time', func.lit(detection_log_start_time))
                       .withColumn('log_end_time', func.lit(detection_log_end_time)))
        write_data_cassandra(appended_df, 'data', 'detected_multihome', mode='overwrite')
        # Return dataframe mainly for trouble-shooting purpose.
        return appended_df

    def entry(self):
        data = load_kafka_batch()
        if data.count() == 0:
            # No data loaded. Abort.
            return

        output_df_dict = self.output_data(data)

        # The following section is batch mode
        INFO("Saving to CS")
        self.save_to_cs(output_df_dict)

        # Output storage info
        INFO("Saving to Storage Info")
        self.output_storage_info(output_df_dict)

        # Output dsaggr info
        INFO("Saving to DSAGGR Info")
        self.output_dsaggr_info()

        # Output multihome info
        INFO("Saving to Multihome Info")
        max_log_time = (data
                        .agg(func.max('timestamp').alias('max_time'))
                        .collect()[0]
                        .max_time)
        self.output_multihome_info(detection_log_end_time=max_log_time)

        # commit kafka offset and log_time only when all computations succeeded
        #TODO: Currently there is a low chance that when offset gets committed, log_time does not
        # To fix it, we will need a checkpoint system which does not rely solely on Cassandra data store.
        # It would be most reliable to store the checkpoints (offset, log_time) on a redundant file system
        # like HDFS (on-prem) or S3 (cloud). For now, we will use CS as a short-term solution and see how it goes.
        commit_kafka_offset(data)
        commit_log_time(data, table='kafka_log_time')

In [4]:
spark_raw_data_processor = SparkRawDataProcessor(cs_cluster="192.168.7.51", cs_consistency_level='LOCAL_ONE')

In [5]:
data = load_kafka_batch(kafka_host="192.168.7.51", kafka_offset=None, topics=["pii_ie"])
output_df_dict = spark_raw_data_processor.output_data(data)

In [6]:
output_df_dict['pii_ie'].show()
#output_df_dict['mysql'].show()

+------+----------+--------+--------------------+--------------------+--------------------+--------------------+------+-------------+------+-------------+------+-----------+--------------------+---------+-------+---------+--------------------+--------------------+------------+----------+-------------------+----+----+-----+----+
| topic|@timestamp|@version|       timestamp_utc|  processed_filename|       pii_json_list|                sha1|source|       orig_h|orig_p|       resp_h|resp_p|source_uuid|             ie_time|ie_exists|service| smb_name|            smb_path|            filename|storage_type|        ts|       timestamp_dt|hour| day|month|year|
+------+----------+--------+--------------------+--------------------+--------------------+--------------------+------+-------------+------+-------------+------+-----------+--------------------+---------+-------+---------+--------------------+--------------------+------------+----------+-------------------+----+----+-----+----+
|pii_ie|  

In [7]:
spark_raw_data_processor.output_storage_info(output_df_dict)

+--------------------+-------------+----+----------+--------------------+------------+--------------------+--------------------+------+--------------------+-------------------+-------------+----------+
|                 loc|           ip|port| file_name|          field_name|storage_type|                sha1|     entity_pii_list|source|             ie_time|      log_timestamp|       orig_h| file_path|
+--------------------+-------------+----+----------+--------------------+------------+--------------------+--------------------+------+--------------------+-------------------+-------------+----------+
|192.168.8.134445...|192.168.8.134| 445|/form2.pdf|          FIRST_NAME|           3|50ac3a7fc58a4ccfe...|[0A AA 03 0A 26 0...|   bro|2019-02-23 03:03:...|2019-02-23 03:03:22|192.168.8.230|/form2.pdf|
|192.168.8.134445...|192.168.8.134| 445|/form2.pdf|           LAST_NAME|           3|50ac3a7fc58a4ccfe...|[0A AA 03 0A 26 0...|   bro|2019-02-23 03:03:...|2019-02-23 03:03:22|192.168.8.230|/f

In [8]:
spark_raw_data_processor.output_dsaggr_info()

+-------------+----+------------+---+------+-----+
|           ip|port|storage_type|dbs|tables|files|
+-------------+----+------------+---+------+-----+
| 192.168.8.74|3306|           2|  6|   146|    0|
|192.168.8.155|1433|           1|  3|     8|    0|
| 192.168.8.86|3306|           2| 10|   157|    0|
|192.168.8.134| 445|           3|  0|     0|    5|
+-------------+----+------------+---+------+-----+

