<img src="https://ibin.co/3hXAuMuHhEeK.png" alt="Cisco Systems" width="200" height="600"
style="float:left;width:372px;height:118px;">

# Custom alert
## This User App alerts when unauthorized services servers are used
### It looks for other servers providing the same service, but not listed in the official list
### Returns rogue provider and misconfigured consummers
### It is expected to be scheduled every hour

In [None]:
"""
Copyright (c) 2018 Cisco and/or its affiliates.
This software is licensed to you under the terms of the Cisco Sample
Code License, Version 1.0 (the "License"). You may obtain a copy of the
License at
               https://developer.cisco.com/docs/licenses
All use of the material herein must be in accordance with the terms of
the License. All rights not expressly granted by the License are
reserved. Unless required by applicable law or agreed to separately in
writing, software distributed under the License is distributed on an "AS
IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
"""

__author__ = "Damien Gouju"
__copyright__ = "Copyright (c) 2018 Cisco and/or its affiliates."
__license__ = "Cisco Sample Code License, Version 1.0"

In [None]:
# pylint: disable=invalid-name
# Import required libs
import json
from datetime import datetime
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# Configuration
# Kafka TAP to send alert to
# Keep it empty if you don't want to send an alert and just get result
kafka_target = "Kafka-TAP"

# To identify the service in the alert message
service_type = "DNS"

# Datalake directory where the official list is uploaded using "Data platform --> Data Lake" in the shared zone
directory = "/shared/services-list/dns"

# CSV file must be single column, with this header. Ex of a valid CSV file (without """ btw):
"""
dns_servers
8.8.8.8
192.168.0.1
"""
field_name = "dns_servers"

# Port of the service (53 for DNS, 123 for NTP, ...)
port = "53"

# Procotol, UDP vs TCP
protocol = "UDP"

In [None]:
# Loading latest version of the valid DNS servers list
# Datalake directory

latest_version = sc._jvm.com.tetration.apps.IO.list(sqlContext._ssql_ctx, directory, "CSV")[-1]
print("Latest version: "+latest_version)
dns_list = sc._jvm.com.tetration.apps.IO.read(sqlContext._ssql_ctx, directory, "CSV", latest_version)
dns_list.registerTempTable("valid_providers")

valid_providers_dataset = sqlContext.sql("SELECT * FROM valid_providers").toPandas()
sqlContext.dropTempTable("valid_providers")
print("Valid providers:")
print(valid_providers_dataset)

In [None]:
# Look for last hour data
lasthour_data = sc._jvm.com.tetration.apps.IO.read(sqlContext._ssql_ctx, "/tetration/flows/", "PARQUET", "LASTHOUR")
lasthour_data.registerTempTable("flows1h")

# Look for Consumers / Providers couples that match:
# - match the port
# - match the protocol
# - not match the valid list of providers
# and group by unique consumer / provider couples
# We don't care here about the amount of flows

# Building the SQL query
first = 1
for server in valid_providers_dataset.iterrows():
    if first:
        where_clause = " dst_address = \'"+server[1][field_name]+"\' "
        first = 0
    else:
        where_clause = where_clause + "OR dst_address = \'"+server[1][field_name]+"\' "
#print(where_clause)

incorrect_dns_flows_dataset = sqlContext.sql("SELECT src_address as Consumer, dst_address as Provider FROM flows1h WHERE dst_port = \'"+port+"\' AND proto = \'"+protocol+"\' AND NOT ("+where_clause+") GROUP BY src_address, dst_address").toPandas()
sqlContext.dropTempTable("flows1h")
#print(incorrect_dns_flows_dataset.to_string(index=False))
print("Unofficial "+service_type+" providers: "+str(incorrect_dns_flows_dataset['Provider'].unique()))
print("Badly configured consumers: "+str(incorrect_dns_flows_dataset['Consumer'].unique()))

In [None]:
#print(incorrect_dns_flows_dataset.to_json(orient='records'))

# If there are such flows
if not incorrect_dns_flows_dataset.empty and kafka_target != "":
    kafka_json = json.loads('{"src_dst":'+incorrect_dns_flows_dataset.to_json(orient='records')+'}')

    # Tag message with categories
    kafka_json.update({"message_type":"alert"})
    kafka_json.update({"message_desc":"rogue "+service_type+" servers usage"})
    kafka_json.update({"message_timestamp": (datetime.today()).strftime('%Y-%m-%d %H:%M:%S')+" UTC"})

    # Send to Kafka
    sc._jvm.com.tetration.apps.DataTaps.sendMessage(json.dumps(kafka_json), kafka_target)
    
    print("Message sent to Kafka server "+kafka_target)
else:
    print("No message sent to Kafka")