The goal of this file is to extract data from S3 bucket, in order to build machine learning models.


The process of extracting data is devided into 3 main parts, including installing needed resources, preparing data files and creating main functions.

# 1. Install Package Needed
* watch out bat package here, bat pacakge only needs to be installed once locally, but needs to be installed everytime a new instance is created in aws cloud platform

In [6]:
                                                                                                                                                                                                                                                                                                                                                                                                    # The following two lines are used to install bat package.
# When first use this notebook instance, please uncomment those two lines, since they allow you to install bat package.

# !conda install -y -c conda-forge pip
# !pip install bat

In [3]:
# intall other needed packages

import os
import re
import boto3
import pandas as pd
import numpy as np
from sagemaker import get_execution_role
from bat import bro_multi_log_reader

In [2]:
# get execution role from aws
role = get_execution_role()
region = boto3.Session().region_name
bucket='sagemaker-model-ml'

# 2. Data Preparation

## 2.1 Reading Files from S3 Bucket
### 2.1.1 define a class to read files
* Define a class MultiLogToDataFrame with functions to read log files into pandas dataframe directly from S3 bucket

In [4]:
class MultiLogToDataFrame(pd.DataFrame):
    """MultiLogToDataFrame: Converts multiple Bro logs to a Pandas DataFrame
        Args:
            log_fllename (string): The full path to the Bro log
            ts_index (bool): Set the index to the 'ts' field (default = True)
        Notes:
            This class is fairly simple right now but will probably have additional
            functionality for formal type specifications and performance enhancements
    """
    def __init__(self, log_filename, ts_index=False):
        """Initialize the LogToDataFrame class"""

        # Create a bro reader on a given log file
        reader = bro_multi_log_reader.BroMultiLogReader(log_filename)

        # Create a Pandas dataframe from reader
        super(MultiLogToDataFrame, self).__init__(reader.readrows())

        # Set the index
        if ts_index:
            self.set_index('ts', inplace=True)

### 2.1.2 download files from S3 to notebook instance 
* Download data and files needed for buidling models from S3

'/Users/Rachel/Desktop/AtomicIOT/Rachel/Sagemaker/Data/2018'

In [8]:
#download monthly log file from S3 and unzip it 

#s3 = boto3.resource('s3')
#s3.Bucket(bucket).download_file('2017-11.tgz','2017-11.tgz')

import tarfile
with tarfile.open("2017-11.tgz","r") as tar_ref:
    tar_ref.extractall("2017-11")

In [12]:
with tarfile.open("2018-01.tgz","r") as tar_ref:
    tar_ref.extractall("2018-01")

In [13]:
with tarfile.open("2018-02.tgz","r") as tar_ref:
    tar_ref.extractall("2018-02")
with tarfile.open("2018-03.tgz","r") as tar_ref:
    tar_ref.extractall("2018-03")
with tarfile.open("2018-04.tgz","r") as tar_ref:
    tar_ref.extractall("2018-04")
with tarfile.open("2018-05.tgz","r") as tar_ref:
    tar_ref.extractall("2018-05")
with tarfile.open("2018-06.tgz","r") as tar_ref:
    tar_ref.extractall("2018-06")

## 2.2 Process Individual Files Downloaded
* Four kinds of files are processed here, including conn.log files, dns.log files, http.log files and ssl.log files

### 2.2.1 Prepare conn.log files
* A function called CleanConn was defined below to complete basic processes for conn.log files.

In [None]:
def CleanConn(conn):
    #only external data
    #conn = conn[conn.local_resp.apply(lambda x: not x)]
    conn = conn[conn.proto == 'tcp']
    #select column
    conn = conn.drop(['tunnel_parents', 'uid'],1)
    #modify column
    if conn.duration.dtypes == 'object':
        conn['duration'] = conn.duration.replace('-', '00:00:00.00').astype('str')
        conn['duration'] = [datetime.datetime.strptime(x, "%H:%M:%S.%f") if re.match('\d+:\d+:\d+.\d+', x)
                            else datetime.datetime.strptime(x, "%d days, %H:%M:%S.%f") if 'days' in x
                            else datetime.datetime.strptime(x, "%d day, %H:%M:%S.%f") if 'day' in x
                            else datetime.datetime.strptime(x, "%H:%M:%S") for x in conn.duration]
        conn['duration'] = conn.duration.apply(lambda x :
                                               datetime.timedelta(hours=x.hour, minutes=x.minute, seconds=x.second))
    conn.service = conn.service.replace('-',np.NaN)
    return conn

### 2.2.2 Prepare dns.log files
* A function called CleanDns was defined below to complete basic processes for dns.log files.

In [15]:
def CleanDns(dns):
    dns.is_copy = False
    #select column
    dns = dns[['id.orig_h', 'id.orig_p', 'id.resp_h', 'ts','id.resp_p', 'query']]
    #modify column
    dns.loc[:, 'query'] = dns['query'].str.replace('\*[\\\\x00]{1,}', 'Unknown')
    dns.loc[:, 'query'] = dns['query'].str.split('.').apply(lambda x: x[-2] if len(x)>1 else x[0])
    return dns

### 2.2.3 Prepare http.log files
* A function called CleanHttp was defined below to complete basic processes for http.log files.

In [16]:
def CleanHttp(http):
    http.is_copy = False
    #select column
    http = http[['id.orig_h', 'id.orig_p', 'id.resp_h','ts', 'id.resp_p', 'uri', 'user_agent', 'host']]
    return http

### 2.2.4 Prepare ssl.log files
* A function called CleanSsl was defined below to complete basic processes for ssl.log files.

In [17]:
def CleanSsl(ssl):
    #select column
    ssl = ssl[['id.orig_h','id.orig_p','id.resp_h','id.resp_p','ts','subject','cert_chain_fuids']]
    return ssl

## 2.3 Define Mapping Functions
* Four mapping functions were defined below, including Aug_Mapping, Aug_Other, Oct_Mapping and Oct_other.
* These four mapping functions were used to create our training and testing data, based on network traffic and device inside atomic model, using MAC address of each device as primary key.

### 2.3.1 Aug_Mapping and Aug_Other

In [None]:
def Aug_Mapping(conn,Macone,MacExa,IPLook):
    conn2 = conn[[True if x in IPLook['IOT'].keys() else False for x in conn['id.orig_h']]]
    conn2.is_copy=False
    conn2['mac'] = [Macone.mac[y].values for y in [Macone.assigned_ip == x for x in conn2['id.orig_h']]]
    conn2['IOT'] = [Macone.IOT[y].values for y in [Macone.assigned_ip == x for x in conn2['id.orig_h']]]
    conn2['Device'] = [Macone.Device[y].values for y in [Macone.assigned_ip == x for x in conn2['id.orig_h']]]
    #
    for index, row in conn2.reset_index()[['ts','id.orig_h','mac','IOT','Device']].iterrows():
        for i , r in MacExa.iterrows():
            if row['ts'] >= r['min_ts'] and row['ts'] <= r['max_ts'] and row['id.orig_h'] == r['assigned_ip']:
                row['mac'] = r['mac']
                row['IOT'] = r['IOT']
                row['Device'] = r['Device']
            else:
                continue
    return conn2

def Aug_Other(df, Macone, MacExa, IPLook):
    df2 = df[[True if x in IPLook['IOT'].keys() else False for x in df['id.orig_h']]]
    df2.is_copy = False
    df2['mac'] = [Macone.mac[y].values for y in [Macone.assigned_ip == x for x in df2['id.orig_h']]]
    #
    for index, row in df2.reset_index()[['ts', 'id.orig_h', 'mac', 'IOT', 'Device']].iterrows():
        for i, r in MacExa.iterrows():
            if row['ts'] >= r['min_ts'] and row['ts'] <= r['max_ts'] and row['id.orig_h'] == r['assigned_ip']:
                row['mac'] = r['mac']
            else:
                continue
    return df2

### 2.3.2 Oct_Mapping and Oct_Other

In [19]:
def Oct_Mapping(conn,Oct_Mac):
    conn2 = conn[[True if x in Oct_Mac.assigned_ip.values else False for x in conn['id.orig_h']]]
    conn2['mac'] = [Oct_Mac.mac[y].values[0] for y in [Oct_Mac.assigned_ip == x for x in conn2['id.orig_h']]]
    conn2['IOT'] = [Oct_Mac.IOT[y].values[0] for y in [Oct_Mac.assigned_ip == x for x in conn2['id.orig_h']]]
    conn2['Device'] = [Oct_Mac.Device[y].values[0] for y in [Oct_Mac.assigned_ip == x for x in conn2['id.orig_h']]]
    return conn2

def Oct_Other(df, Oct_Mac):
    df2 = df[[True if x in Oct_Mac.assigned_ip.values else False for x in df['id.orig_h']]]
    df2['mac'] = [Oct_Mac.mac[y].values[0] for y in [Oct_Mac.assigned_ip == x for x in df['id.orig_h']]]

# 3. Create Main Function
* In the body of this function, two sets of parameters used in mapping process were defined, these two include:
    * IPLook & DeviceLook
    * Macone & MacExa

In [18]:
def main(filepath,month):
    
    # prepare IPLook and DeviceLook
    IPrefer = pd.read_excel('IOT.xlsx', header=None)
    IPrefer.columns = ['refer', 'IOT', 'Manufactor', 'Device','Other']
    IPrefer.refer = IPrefer.refer.apply(lambda x: '192.168.0.' + x)
    IPrefer.loc[:, 'refer'] = IPrefer.refer.str.split('-', expand=True)[0]
    IPrefer = IPrefer.apply(lambda x: x.str.strip())
    IPLook = IPrefer[['refer', 'IOT']].set_index('refer').to_dict()
    DeviceLook = IPrefer[['refer', 'Device']].set_index('refer').to_dict()
    
    # prepare Macone and MacExa
    Oct_Mac = pd.read_csv('/Users/Rachel/Desktop/AtomicIOT/Rachel/Sagemaker/Data/OCT_mac.csv')
    Aug_Mac = pd.read_excel('/Users/Rachel/Desktop/AtomicIOT/Rachel/Sagemaker/Data/Aug_DHCP.xlsx')
    Aug_Mac.loc[:, 'IOT'] = [IPLook['IOT'][x] for x in Aug_Mac.assigned_ip]
    Aug_Mac.loc[:, 'Device'] = [DeviceLook['Device'][x] for x in Aug_Mac.assigned_ip]
    DupIP = Aug_Mac.groupby('assigned_ip').mac.count()[Aug_Mac.groupby('assigned_ip').mac.count() > 1].index.values
    DupMac = Aug_Mac.groupby('mac').assigned_ip.count()[Aug_Mac.groupby('mac').assigned_ip.count() > 1].index.values
    Aug_Mac.loc[Aug_Mac.mac == DupMac[0], 'Device'] = 'Gaming'
    Aug_Mac.loc[41, 'Device'] = 'Mobile'
    Macone = Aug_Mac[Aug_Mac.assigned_ip.apply(lambda x: x not in DupIP)]
    Macdup = Aug_Mac[Aug_Mac.assigned_ip.apply(lambda x: x in DupIP)]
    Macone = Macone.append(Macdup[Macdup.mac.apply(lambda x: x in DupMac[0])], ignore_index=True)
    MacExa = Macdup[~Macdup.mac.apply(lambda x: x in DupMac[0])]
    
    for i in range(1, 32):
        print(i)
        if i <= 9 :
            i = '0'+str(i)
        else:
            i = str(i)
        conn = MultiLogToDataFrame(filepath+
                                   '2017-' + month + '/'
                                   '2017-' + month + '-' + i +'/conn.*.log.gz')
        conn = CleanConn(conn)
        if month == '08':
            conn2 = Aug_Mapping(conn,Macone,MacExa,IPLook)
        else:
            conn2 = Oct_Mapping(conn,Oct_Mac)
            
        dns = MultiLogToDataFrame(filepath +
                                  '2017-' + month + '/'
                                  '2017-' + month + '-' + i + '/dns.*.log.gz')
        dns = CleanDns(dns)
        if month == '08':
            dns2 = Aug_Mapping(dns,Macone,MacExa,IPLook)
        else:
            dns2 = Oct_Mapping(dns,Oct_Mac)
            
        http = MultiLogToDataFrame(filepath +
                                   '2017-' + month + '/'
                                   '2017-' + month + '-' + i + '/http.*.log.gz')
        http = CleanHttp(http)
        if month == '08':
            http2 = Aug_Mapping(http,Macone,MacExa,IPLook)
        else:
            http2 = Oct_Mapping(http,Oct_Mac)
            
        ssl = MultiLogToDataFrame(filepath +
                                  '2017-' + month + '/'
                                  '2017-' + month + '-' + i + '/ssl.*.log.gz')
        ssl = CleanSsl(ssl)
        if month == '08':
            ssl2 = Aug_Mapping(ssl,Macone,MacExa,IPLook)
        else:
            ssl2 = Oct_Mapping(ssl,Oct_Mac)

        conn2.to_csv('test_data/conn_'+month+'_'+i+'.csv',index=False)
        dns2.to_csv('test_data/dns_'+month+'_'+i+'.csv',index=False)
        http2.to_csv('test_data/http_'+month+'_'+i+'.csv',index=False)
        ssl2.to_csv('test_data/ssl_'+month+'_'+i+'.csv',index=False)

In [19]:
# calling the main function to write processed data into test_data folder

%%time
filepath = '/home/ec2-user/SageMaker/AtomicMole-Model/'
main(filepath, '11')

1
Successfully monitoring /tmp/tmpw9sdpqga...
Removed temporary file /tmp/tmpw9sdpqga...
Successfully monitoring /tmp/tmplr7ae7jc...
Removed temporary file /tmp/tmplr7ae7jc...
Successfully monitoring /tmp/tmptwhqskph...
Removed temporary file /tmp/tmptwhqskph...
Successfully monitoring /tmp/tmpceagjxlf...
Removed temporary file /tmp/tmpceagjxlf...
Successfully monitoring /tmp/tmp9le1dj52...
Removed temporary file /tmp/tmp9le1dj52...
Successfully monitoring /tmp/tmp0y_a_9s2...
Removed temporary file /tmp/tmp0y_a_9s2...
Successfully monitoring /tmp/tmpfu2dem4c...
Removed temporary file /tmp/tmpfu2dem4c...
Successfully monitoring /tmp/tmpibmfbsf4...


  object.__getattribute__(self, name)
  return object.__setattr__(self, name, value)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  self.obj[item] = s
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
  app.launch_new_instance()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: http://pandas.pydata.org/pandas-docs/stable/indexing.html#indexing-view-versus-copy
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See 

Removed temporary file /tmp/tmpibmfbsf4...
Successfully monitoring /tmp/tmpbf14r5zm...
Removed temporary file /tmp/tmpbf14r5zm...
Successfully monitoring /tmp/tmp98qfzhmz...
Removed temporary file /tmp/tmp98qfzhmz...
Successfully monitoring /tmp/tmp9qgnpuyd...
Removed temporary file /tmp/tmp9qgnpuyd...
Successfully monitoring /tmp/tmpfa3t2hu3...
Removed temporary file /tmp/tmpfa3t2hu3...
Successfully monitoring /tmp/tmpbj4fig0m...
Removed temporary file /tmp/tmpbj4fig0m...
Successfully monitoring /tmp/tmp0e87t369...
Removed temporary file /tmp/tmp0e87t369...
Successfully monitoring /tmp/tmp9aj0yjcx...
Removed temporary file /tmp/tmp9aj0yjcx...
Successfully monitoring /tmp/tmp0m_r_yd3...
Removed temporary file /tmp/tmp0m_r_yd3...
2
Successfully monitoring /tmp/tmpei9oax_q...
Removed temporary file /tmp/tmpei9oax_q...
Successfully monitoring /tmp/tmpkps_h6bw...
Removed temporary file /tmp/tmpkps_h6bw...
Successfully monitoring /tmp/tmp3kiidaxs...
Removed temporary file /tmp/tmp3kiidaxs..

Removed temporary file /tmp/tmpddnf948r...
Successfully monitoring /tmp/tmpup0u0unz...
Removed temporary file /tmp/tmpup0u0unz...
Successfully monitoring /tmp/tmpgtjdz7eu...
Removed temporary file /tmp/tmpgtjdz7eu...
Successfully monitoring /tmp/tmpnh73t_qw...
Removed temporary file /tmp/tmpnh73t_qw...
Successfully monitoring /tmp/tmpnrsmjn15...
Removed temporary file /tmp/tmpnrsmjn15...
Successfully monitoring /tmp/tmp_3geph26...
Removed temporary file /tmp/tmp_3geph26...
Successfully monitoring /tmp/tmp7v5wsm_z...
Removed temporary file /tmp/tmp7v5wsm_z...
Successfully monitoring /tmp/tmpn5xbn5ec...
Removed temporary file /tmp/tmpn5xbn5ec...
Successfully monitoring /tmp/tmpw8_lreug...
Removed temporary file /tmp/tmpw8_lreug...
Successfully monitoring /tmp/tmpn76og438...
Removed temporary file /tmp/tmpn76og438...
Successfully monitoring /tmp/tmpnjugorfe...
Removed temporary file /tmp/tmpnjugorfe...
Successfully monitoring /tmp/tmpdglf14fy...
Removed temporary file /tmp/tmpdglf14fy...


Removed temporary file /tmp/tmpeka6dnp0...
Successfully monitoring /tmp/tmpw1vw4fum...
Removed temporary file /tmp/tmpw1vw4fum...
Successfully monitoring /tmp/tmp244gv1cp...
Removed temporary file /tmp/tmp244gv1cp...
Successfully monitoring /tmp/tmp7aliguei...
Removed temporary file /tmp/tmp7aliguei...
Successfully monitoring /tmp/tmp5zt67irz...
Removed temporary file /tmp/tmp5zt67irz...
Successfully monitoring /tmp/tmprzcvxypt...
Removed temporary file /tmp/tmprzcvxypt...
Successfully monitoring /tmp/tmpduktai_2...
Removed temporary file /tmp/tmpduktai_2...
Successfully monitoring /tmp/tmplal04f1o...
Removed temporary file /tmp/tmplal04f1o...
Successfully monitoring /tmp/tmp8lcg191j...
Removed temporary file /tmp/tmp8lcg191j...
Successfully monitoring /tmp/tmpnahthnef...
Removed temporary file /tmp/tmpnahthnef...
Successfully monitoring /tmp/tmpt8hb8hlk...
Removed temporary file /tmp/tmpt8hb8hlk...
Successfully monitoring /tmp/tmphm4647zp...
Removed temporary file /tmp/tmphm4647zp...


19
Successfully monitoring /tmp/tmpemz71clv...
Removed temporary file /tmp/tmpemz71clv...
Successfully monitoring /tmp/tmpnm5tp6lf...
Removed temporary file /tmp/tmpnm5tp6lf...
Successfully monitoring /tmp/tmpmhh4l1e3...
Removed temporary file /tmp/tmpmhh4l1e3...
Successfully monitoring /tmp/tmpy4lsjd08...
Removed temporary file /tmp/tmpy4lsjd08...
Successfully monitoring /tmp/tmpwq48roox...
Removed temporary file /tmp/tmpwq48roox...
Successfully monitoring /tmp/tmp1r1phl8t...
Removed temporary file /tmp/tmp1r1phl8t...
Successfully monitoring /tmp/tmpg2t_t9tz...
Removed temporary file /tmp/tmpg2t_t9tz...
Successfully monitoring /tmp/tmprnnl698p...
Removed temporary file /tmp/tmprnnl698p...
Successfully monitoring /tmp/tmp9hozb4av...
Removed temporary file /tmp/tmp9hozb4av...
Successfully monitoring /tmp/tmp49_dw5me...
Removed temporary file /tmp/tmp49_dw5me...
Successfully monitoring /tmp/tmputg92x8u...
Removed temporary file /tmp/tmputg92x8u...
Successfully monitoring /tmp/tmpsipk9lo0

25
Successfully monitoring /tmp/tmpfhbs9a3u...
Removed temporary file /tmp/tmpfhbs9a3u...
Successfully monitoring /tmp/tmpohi90z58...
Removed temporary file /tmp/tmpohi90z58...
Successfully monitoring /tmp/tmp4us8ut2b...
Removed temporary file /tmp/tmp4us8ut2b...
Successfully monitoring /tmp/tmprh5a74t9...
Removed temporary file /tmp/tmprh5a74t9...
Successfully monitoring /tmp/tmp60kcrfi_...
Removed temporary file /tmp/tmp60kcrfi_...
Successfully monitoring /tmp/tmpb459kfvz...
Removed temporary file /tmp/tmpb459kfvz...
Successfully monitoring /tmp/tmpi3pyz5b3...
Removed temporary file /tmp/tmpi3pyz5b3...
Successfully monitoring /tmp/tmpqwdbeyok...
Removed temporary file /tmp/tmpqwdbeyok...
Successfully monitoring /tmp/tmpjpwau2l3...
Removed temporary file /tmp/tmpjpwau2l3...
Successfully monitoring /tmp/tmpe38ow1_5...
Removed temporary file /tmp/tmpe38ow1_5...
Successfully monitoring /tmp/tmpj6esy3h_...
Removed temporary file /tmp/tmpj6esy3h_...
Successfully monitoring /tmp/tmpq6st9_6m

31


AttributeError: 'MultiLogToDataFrame' object has no attribute 'local_resp'