In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *

Waiting for a Spark session to start...

Waiting for a Spark session to start...

In [2]:
# 建立 sparkcontext
sc.stop()
sc = SparkContext().getOrCreate()
sc
# 建立 sparksession
ss = SparkSession.builder.getOrCreate()

In [6]:
# 讀取取資料
textFile = sc.textFile("hdfs://192.168.70.132:9000/data/tony/http.log")


# 資料處理
s1 = textFile.filter(lambda x: '#' not in x)
s2 = s1.map(lambda x: x.split('\t'))
s3 = s2.map(lambda x: tuple([str(i.strip()) for i in x]))

In [7]:
# 建立資料的 schema
fields = [StructField(f_name.replace('.','_').strip(), StringType(), True) for f_name in textFile.take(7)[6].replace('#fields\t', '').split('\t')]
schema = StructType(fields)
# print field

# 建立 dataframe 並綁定資料和 schema
schemaReq = ss.createDataFrame(s3, schema)
schemaReq.createOrReplaceTempView("Req")

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType


def replace_str(tmpStr):
    return tmpStr.replace('https', 'http').strip('/')

def chk_head(tmp_resp, tmp_agent):
    chk_con_type = {'text/html', 'text/css', 'application/x-javascript', 'application/x-shockwave-flash'}
    browsers = {'chrome', 'firefox', 'safari', 'waterfox', 'mozilla'}
    
    if tmp_resp in chk_con_type and any(x in tmp_agent.lower() for x in browsers):
        return 1      
    else:
        return 0

def chk_agent(tmpStr):
    browsers = {'chrome', 'firefox', 'safari', 'waterfox'}
    if any(x in tmpStr.lower() for x in browsers):
        return 'browser'
    else:
        return 'background app'
    
    

test_udf_str = udf(lambda x: replace_str(x), StringType())
head_udf = udf(lambda x, y: chk_head(x, y), IntegerType())
label_udf = udf(lambda x: chk_agent(x), StringType())

In [9]:
res_df = schemaReq.select(
    "ts", "uid", "id_orig_h", "id_orig_p", "id_resp_h", 
    "id_resp_p", "trans_depth", "method", "host", "uri",
    "version", "user_agent", "request_body_len",
    "response_body_len", "status_code", "status_msg", "info_code",
    "info_msg", "tags", "username", "password", "proxied",
    "orig_fuids", "orig_filenames", "orig_mime_types",
    "resp_fuids", "resp_filenames", "resp_mime_types",
    
    test_udf_str(schemaReq['referrer']).alias('referrer'),
    head_udf(schemaReq['resp_mime_types'], schemaReq['user_agent']).alias('head'),
    label_udf(schemaReq['user_agent']).alias('label'),
    'user_agent', 'resp_mime_types'
)



In [15]:
res_df.filter((res_df['resp_mime_types'] == 'text/html') | (res_df['user_agent'] != '-')).select(
    'host', 'uri', 'user_agent', 'label', 'referrer', 'head'
).show()

+------------------+--------------------+--------------------+--------------+--------------------+----+
|              host|                 uri|          user_agent|         label|            referrer|head|
+------------------+--------------------+--------------------+--------------+--------------------+----+
|      www.yolo.com|                   /|python-requests/2...|background app|                   -|   0|
|           qoo.com|                   /|         curl/7.54.0|background app|                   -|   0|
|       www.qoo.com|                   /|         curl/7.54.0|background app|                   -|   0|
| ocsp.digicert.com|/MFYwVKADAgEAME0w...|trustd (unknown v...|background app|                   -|   0|
| www.pchome.com.tw|                   /|Mozilla/5.0 (Maci...|       browser|http://www.ecosia...|   1|
| www.pchome.com.tw|/js/index.js?1519...|Mozilla/5.0 (Maci...|       browser|http://www.pchome...|   0|
|adcl.pchome.com.tw|/getAd2JS.html?ty...|Mozilla/5.0 (Maci...|  