Skip to content

Commit

Permalink
Splitting out ACLs from Kafka and HBase setup (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinleet committed Apr 6, 2017
1 parent 06667d4 commit 6dc741f
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ class EnrichmentCommands:
__enrichment_error_topic = None
__threat_intel_error_topic = None
__kafka_configured = False
__kafka_acl_configured = False
__hbase_configured = False
__hbase_acl_configured = False
__geo_configured = False

def __init__(self, params):
Expand All @@ -42,29 +44,51 @@ def __init__(self, params):
self.__enrichment_topology = params.metron_enrichment_topology
self.__enrichment_topic = params.metron_enrichment_topic
self.__kafka_configured = os.path.isfile(self.__params.enrichment_kafka_configured_flag_file)
self.__kafka_acl_configured = os.path.isfile(self.__params.enrichment_kafka_acl_configured_flag_file)
self.__hbase_configured = os.path.isfile(self.__params.enrichment_hbase_configured_flag_file)
self.__hbase_acl_configured = os.path.isfile(self.__params.enrichment_hbase_acl_configured_flag_file)
self.__geo_configured = os.path.isfile(self.__params.enrichment_geo_configured_flag_file)

def is_kafka_configured(self):
return self.__kafka_configured

def is_kafka_acl_configured(self):
return self.__kafka_acl_configured

def set_kafka_configured(self):
Logger.info("Setting Kafka Configured to True")
File(self.__params.enrichment_kafka_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)

def set_kafka_acl_configured(self):
Logger.info("Setting Kafka ACL Configured to True")
File(self.__params.enrichment_kafka_acl_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)

def is_hbase_configured(self):
return self.__hbase_configured

def is_hbase_acl_configured(self):
return self.__hbase_acl_configured

def set_hbase_configured(self):
Logger.info("Setting HBase Configured to True")
File(self.__params.enrichment_hbase_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)

def set_hbase_acl_configured(self):
Logger.info("Setting HBase ACL Configured to True")
File(self.__params.enrichment_hbase_acl_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)

def is_geo_configured(self):
return self.__geo_configured

Expand Down Expand Up @@ -109,9 +133,9 @@ def remote_repo():
def init_geo(self):
if self.__params.security_enabled:
kinit(self.__params.kinit_path_local,
self.__params.metron_keytab_path,
self.__params.metron_jaas_principal,
self.__params.metron_user)
self.__params.metron_keytab_path,
self.__params.metron_jaas_principal,
self.__params.metron_user)

Logger.info("Creating HDFS location for GeoIP database")
self.__params.HdfsResource(self.__params.geoip_hdfs_dir,
Expand Down Expand Up @@ -153,13 +177,6 @@ def init_kafka_topics(self):
--replication-factor {4} \
--config retention.bytes={5}"""

acl_template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--add \
--allow-principal User:{2} \
--topic {3}"""

num_partitions = 1
replication_factor = 1
retention_gigabytes = int(self.__params.metron_topic_retention)
Expand All @@ -176,14 +193,36 @@ def init_kafka_topics(self):
replication_factor,
retention_bytes),
user=self.__params.kafka_user)

Logger.info("Done creating Kafka topics")
self.set_kafka_configured()

def init_kafka_acls(self):
Logger.info('Creating Kafka topics')
if self.__params.security_enabled:
kinit(self.__params.kinit_path_local,
self.__params.kafka_keytab_path,
self.__params.kafka_principal_name,
self.__params.kafka_user)

acl_template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--add \
--allow-principal User:{2} \
--topic {3}"""

topics = [self.__enrichment_topic]
for topic in topics:
Logger.info("Setting ACL for topic'{0}'".format(topic))
Execute(acl_template.format(self.__params.kafka_bin_dir,
self.__params.zookeeper_quorum,
self.__params.storm_principal_name,
topic),
user=self.__params.kafka_user)

Logger.info("Done creating Kafka topics")
self.set_kafka_configured()
self.set_kafka_acl_configured()

def start_enrichment_topology(self):
Logger.info("Starting Metron enrichment topology: {0}".format(self.__enrichment_topology))
Expand Down Expand Up @@ -248,32 +287,45 @@ def create_hbase_tables(self):
user=self.__params.metron_user
)

add_enrichment_acl_cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n".format(self.__params.storm_principal, self.__params.enrichment_table)
Execute(add_enrichment_acl_cmd,
add_threatintel_cmd = "echo \"create '{0}','{1}'\" | hbase shell -n".format(self.__params.threatintel_table, self.__params.threatintel_cf)
Execute(add_threatintel_cmd,
tries=3,
try_sleep=5,
logoutput=False,
path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
user=self.__params.metron_user
)

add_threatintel_cmd = "echo \"create '{0}','{1}'\" | hbase shell -n".format(self.__params.threatintel_table, self.__params.threatintel_cf)
Execute(add_threatintel_cmd,
Logger.info("Done creating HBase Tables")
self.set_hbase_configured()

def set_hbase_acls(self):
Logger.info("Setting HBase ACLs")
if self.__params.security_enabled:
kinit(self.__params.kinit_path_local,
self.__params.hbase_keytab_path,
self.__params.hbase_principal_name,
self.__params.metron_user)

add_enrichment_acl_cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n".format(self.__params.storm_principal, self.__params.enrichment_table)
Execute(add_enrichment_acl_cmd,
tries=3,
try_sleep=5,
logoutput=False,
path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
user=self.__params.metron_user
)

add_enrichment_acl_cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n".format(self.__params.storm_principal, self.__params.threatintel_table)
Execute(add_enrichment_acl_cmd,
add_threatintel_acl_cmd = "echo \"grant '{0}', 'RW', '{1}'\" | hbase shell -n".format(self.__params.storm_principal, self.__params.threatintel_table)
Execute(add_threatintel_acl_cmd,
tries=3,
try_sleep=5,
logoutput=False,
path='/usr/sbin:/sbin:/usr/local/bin:/bin:/usr/bin',
user=self.__params.metron_user
)

Logger.info("Done creating HBase Tables")
self.set_hbase_configured()
Logger.info("Done setting HBase ACLs")
self.set_hbase_acl_configured()


Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ def start(self, env, upgrade_type=None):

if not commands.is_kafka_configured():
commands.init_kafka_topics()
if params.security_enabled and not commands.is_kafka_acl_configured():
commands.init_kafka_acls()
if not commands.is_hbase_configured():
commands.create_hbase_tables()
if params.security_enabled and not commands.is_hbase_acl_configured():
commands.set_hbase_acls()
if not commands.is_geo_configured():
commands.init_geo()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,34 @@ class IndexingCommands:
__params = None
__indexing = None
__configured = False
__acl_configured = False

def __init__(self, params):
if params is None:
raise ValueError("params argument is required for initialization")
self.__params = params
self.__indexing = params.metron_indexing_topology
self.__configured = os.path.isfile(self.__params.indexing_configured_flag_file)
self.__acl_configured = os.path.isfile(self.__params.indexing_acl_configured_flag_file)

def is_configured(self):
return self.__configured

def is_acl_configured(self):
return self.__acl_configured

def set_configured(self):
File(self.__params.indexing_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)

def set_acl_configured(self):
File(self.__params.indexing_acl_configured_flag_file,
content="",
owner=self.__params.metron_user,
mode=0775)

def setup_repo(self):
def local_repo():
Logger.info("Setting up local repo")
Expand Down Expand Up @@ -89,13 +100,6 @@ def init_kafka_topics(self):
--replication-factor {4} \
--config retention.bytes={5}"""

acl_template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--add \
--allow-principal User:{2} \
--topic {3}"""

num_partitions = 1
replication_factor = 1
retention_gigabytes = int(self.__params.metron_topic_retention)
Expand All @@ -110,12 +114,30 @@ def init_kafka_topics(self):
replication_factor,
retention_bytes),
user=self.__params.kafka_user)
Logger.info("Done creating Kafka topics")

def init_kafka_acls(self):
Logger.info('Creating Kafka ACLs')
if self.__params.security_enabled:
kinit(self.__params.kinit_path_local,
self.__params.kafka_keytab_path,
self.__params.kafka_principal_name,
self.__params.kafka_user)

acl_template = """{0}/kafka-acls.sh \
--authorizer kafka.security.auth.SimpleAclAuthorizer \
--authorizer-properties zookeeper.connect={1} \
--add \
--allow-principal User:{2} \
--topic {3}"""

Logger.info("Creating ACL for topic'{0}'".format(self.__indexing))
Execute(acl_template.format(self.__params.kafka_bin_dir,
self.__params.zookeeper_quorum,
self.__params.storm_principal_name,
self.__indexing),
user=self.__params.kafka_user)
Logger.info("Done creating Kafka topics")
Logger.info("Done creating Kafka ACLs")

def init_hdfs_dir(self):
Logger.info('Creating HDFS indexing directory')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def configure(self, env, upgrade_type=None, config_dir=None):
commands.init_kafka_topics()
commands.init_hdfs_dir()
commands.set_configured()
if params.security_enabled and not commands.is_acl_configured():
commands.init_kafka_acls()
commands.set_acl_configured()

def start(self, env, upgrade_type=None):
from params import params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,14 @@
metron_zookeeper_config_dir = status_params.metron_zookeeper_config_dir
metron_zookeeper_config_path = status_params.metron_zookeeper_config_path
parsers_configured_flag_file = status_params.parsers_configured_flag_file
parsers_acl_configured_flag_file = status_params.parsers_acl_configured_flag_file
enrichment_kafka_configured_flag_file = status_params.enrichment_kafka_configured_flag_file
enrichment_kafka_acl_configured_flag_file = status_params.enrichment_kafka_acl_configured_flag_file
enrichment_hbase_configured_flag_file = status_params.enrichment_hbase_configured_flag_file
enrichment_hbase_acl_configured_flag_file = status_params.enrichment_hbase_acl_configured_flag_file
enrichment_geo_configured_flag_file = status_params.enrichment_geo_configured_flag_file
indexing_configured_flag_file = status_params.indexing_configured_flag_file
indexing_acl_configured_flag_file = status_params.indexing_acl_configured_flag_file
global_json_template = config['configurations']['metron-env']['global-json']
global_properties_template = config['configurations']['metron-env']['elasticsearch-properties']

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
metron_zookeeper_config_dir = config['configurations']['metron-env']['metron_zookeeper_config_dir']
metron_zookeeper_config_path = format('{metron_home}/{metron_zookeeper_config_dir}')
parsers_configured_flag_file = metron_zookeeper_config_path + '/../metron_parsers_configured'
parsers_acl_configured_flag_file = metron_zookeeper_config_path + '/../metron_parsers_acl_configured'

# Enrichment
metron_enrichment_topology = 'enrichment'
Expand All @@ -44,10 +45,13 @@
# Indexing
metron_indexing_topology = config['configurations']['metron-env']['metron_indexing_topology']
indexing_configured_flag_file = metron_zookeeper_config_path + '/../metron_indexing_configured'
indexing_acl_configured_flag_file = metron_zookeeper_config_path + '/../metron_indexing_acl_configured'

# Enrichment
enrichment_kafka_configured_flag_file = metron_zookeeper_config_path + '/../metron_enrichment_kafka_configured'
enrichment_kafka_acl_configured_flag_file = metron_zookeeper_config_path + '/../metron_enrichment_kafka_acl_configured'
enrichment_hbase_configured_flag_file = metron_zookeeper_config_path + '/../metron_enrichment_hbase_configured'
enrichment_hbase_acl_configured_flag_file = metron_zookeeper_config_path + '/../metron_enrichment_hbase_acl_configured'
enrichment_geo_configured_flag_file = metron_zookeeper_config_path + '/../metron_enrichment_geo_configured'

# Storm
Expand Down

0 comments on commit 6dc741f

Please sign in to comment.