Imports, and setting up Spark sqlContext

In [2]:
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
sqlContext = SQLContext(sc)

### Information about Hourly Flow batches available in the Data Lake

In [3]:
all_instances = sorted(sc._jvm.com.tetration.apps.IO.listInstances(sqlContext._ssql_ctx, "/tetration/flows", "PARQUET", "HOURLY"))
print("Number of hourly batches available: ", len(all_instances))
print("First available batch in data lake: ", all_instances[0])
print("Most recent available batch in data lake: ", all_instances[-1])

Number of hourly batches available:  862
First available batch in data lake:  201901230500
Most recent available batch in data lake:  201902280200


### Setting Up Query Filter, Date Range, and output location and format

In [29]:
from datetime import datetime, timedelta
batch_format = "%Y%m%d%H00"
begin_day_format = "%Y%m%d0000"
end_day_format = "%Y%m%d2300"

In [30]:
start_from_batch = datetime.strptime("201902010000", batch_format)
end_before_batch = datetime.strptime("201903010000", batch_format)

In [31]:
query_filter_selection = "dst_address = '10.226.6.109'"

In [32]:
daily_output_location = "/user/daily_conversations"
daily_output_format = "PARQUET"  # Use CSV for easier daily download.  Use PARQUET if rolling up to monthly
monthly_output_location = "/user/monthly_conversations"
monthly_output_format = "CSV"

### Daily Aggregation
The following loops over each day of the month, reads 24 hours of data, filters and aggregates that data, and then writes the aggregated daily data out to the Data Lake

In [33]:
def get_conversations(daily_flows, query_filter):
    daily_flows.registerTempTable("daily_flows")
    conversations = sqlContext.sql("""
        SELECT src_address, dst_address, dst_port, proto, count(1) as flow_count
        FROM daily_flows
        WHERE {}
        GROUP BY src_address, dst_address, dst_port, proto
    """.format(query_filter))
    return conversations

In [None]:
day_batch = start_from_batch
while (day_batch < end_before_batch):
    startBatch = datetime.strftime(day_batch, begin_day_format)
    endBatch =  datetime.strftime(day_batch, end_day_format)
    print("Aggregating data for", startBatch, "to", endBatch)
    daily_flows = sc._jvm.com.tetration.apps.IO.read(sqlContext._ssql_ctx, "/tetration/flows", "PARQUET", startBatch, endBatch)
    selected_flows = get_conversations(daily_flows, query_filter_selection)
    #selected_flows.orderBy(desc("flow_count")).limit(10).show()
    sc._jvm.com.tetration.apps.IO.write(selected_flows._jdf, daily_output_location, daily_output_format, datetime.strftime(day_batch, batch_format), True)
    day_batch = day_batch + timedelta(days=1)

Aggregating data for 201902010000 to 201902012300
Aggregating data for 201902020000 to 201902022300
Aggregating data for 201902030000 to 201902032300
Aggregating data for 201902040000 to 201902042300
Aggregating data for 201902050000 to 201902052300
Aggregating data for 201902060000 to 201902062300


### After the daily aggregations are written, then a monthly aggregation can be computed
Aggregation columns need to correspond with daily aggregation columns.  If the above select statement in `get_conversations` is changes, then the select statement below needs to be changed as well.

In [None]:
month_batch_start = datetime.strftime(start_from_batch, batch_format)
month_batch_end = datetime.strftime(end_before_batch - timedelta(days=1), batch_format)
print("Aggregating daily batches from {} to {}".format(month_batch_start, month_batch_end))
monthly_flows = sc._jvm.com.tetration.apps.IO.read(sqlContext._ssql_ctx, daily_output_location, daily_output_format, month_batch_start, month_batch_end)
monthly_flows.registerTempTable("monthly_flows")
monthly_flows_agg = sqlContext.sql("""
        SELECT src_address, dst_address, dst_port, proto, sum(flow_count) as flow_count
        FROM monthly_flows
        GROUP BY src_address, dst_address, dst_port, proto
    """)
sc._jvm.com.tetration.apps.IO.write(monthly_flows_agg._jdf, monthly_output_location, monthly_output_format, month_batch_start, True)