In [1]:
# импортируем необходимые библиотеки
import psycopg2
from datetime import datetime
import re
import httpagentparser
import csv
import os
from collections import Counter

In [2]:
# все что нужно для спарка
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pyspark.sql.functions as F

Создаем спарк-сессию

In [3]:
spark = SparkSession.builder \
  .master("local[1]") \
  .appName("SparkFirst") \
  .appName("Timeout Troubleshooting") \
  .config("spark.executor.memory", "12g")\
  .config("spark.executor.cores", 8) \
  .config("spark.dynamicAllocation.enabled", "true") \
  .config("spark.dynamicAllocation.maxExecutors", 100) \
  .config("spark.network.timeout", "600s") \
  .config("spark.shuffle.service.enabled", "true") \
.getOrCreate()

Читаем данные из файла .log

Для того чтобы обработка смогла вообще произойти log файл пришлось разделить, здесь будет представленна обработка только первых 50к строк

In [4]:
base_df = spark.read.text('data/mini_dataset.log')

В качестве "хорошего тона" проверим нет ли в датафрейме пустых строк

In [5]:
base_df.filter(base_df['value'].isNull()).count()

0

Используя следующие регулярные выражения спарсим необходимые нам данные из строк датафрейма

In [6]:
host_pattern = r'(^\S+\.[\S+\.]+\S+)\s'                     #регулярное выражение для хоста

method_uri_protocol_pattern = r'\"(\S+) (.*?)'              #регулярное выражение для метода

status_pattern = r'\s(\d{3})\s'                             #регулярное выражение для ответа

user_agent_end=r'\((.*) '                                   #регулярнео выражение для окончания UA-строки

In [7]:
logs_df = base_df.select(regexp_extract('value', host_pattern, 1).alias('host'),
                         regexp_extract('value', method_uri_protocol_pattern, 1).alias('method'),
                         regexp_extract('value', status_pattern, 1).cast('integer').alias('status'),
                         regexp_extract('value', user_agent_end, 1).alias('user_agent_end'))

logs_df.show(10, truncate=True)
print((logs_df.count(), len(logs_df.columns)))

+-------------+------+------+--------------------+
|         host|method|status|      user_agent_end|
+-------------+------+------+--------------------+
| 54.36.149.41|   GET|   200|compatible; Ahref...|
|  31.56.96.51|   GET|   200|Linux; Android 6....|
|  31.56.96.51|   GET|   200|Linux; Android 6....|
|40.77.167.129|   GET|   200|compatible; bingb...|
|  91.99.72.15|   GET|   200|Windows NT 6.2; W...|
|40.77.167.129|   GET|   200|compatible; bingb...|
|40.77.167.129|   GET|   200|compatible; bingb...|
|40.77.167.129|   GET|   200|compatible; bingb...|
|66.249.66.194|   GET|   200|compatible; Googl...|
|40.77.167.129|   GET|   200|compatible; bingb...|
+-------------+------+------+--------------------+
only showing top 10 rows

(50000, 4)


Отдельно проверим есть ли какие-то Null значения

In [8]:
logs_df.filter(logs_df['host'].isNull()| 
                logs_df['method'].isNull() |
                logs_df['status'].isNull() |
                logs_df['user_agent_end'].isNull()).count()

0

Создадим (по необходимости) и откроем два .csv файла и получим на их основе два новых датафрейма

In [9]:
def browser_def(x:str)->str:
    pars = httpagentparser.detect(x)
    if 'browser' in pars:
        return pars['browser']['name']
    else:
        return 'None'
    
def device_def(x:str)->str:
    pars = httpagentparser.detect(x)
    if pars['platform']['name'] != 'None':
        return f"{pars['platform']['name']} {pars['platform']['version']}" 
    else:
        return 'None'
    
def logs_pars_to_file(logs_df, df_key:str, pars_key:str, file_name, pars_function):
    count = 0
    res = []
    keys = []

    for row in logs_df.toLocalIterator():
        count += 1
        row_dict = row.asDict()
        row_dict[pars_key] = pars_function(row_dict[df_key])
        res.append(row_dict)

        if count == 1:
            keys = res[0].keys()
            with open(file_name, 'w', newline='') as output_file:
                dict_writer = csv.DictWriter(output_file, keys)
                dict_writer.writeheader()
                
        if count % 500000 == 0:
            print(count)
            with open(file_name, 'a', newline='') as output_file:
                dict_writer = csv.DictWriter(output_file, keys)
                dict_writer.writerows(res)
                del res
                res = []

    with open(file_name, 'a', newline='') as output_file:
                dict_writer = csv.DictWriter(output_file, keys)
                dict_writer.writerows(res)
    del res

In [10]:
if not os.path.isfile('data/logs_browser.csv'):
    logs_pars_to_file(logs_df=logs_df, df_key='user_agent_end', pars_key='browser', file_name='data/logs_browser.csv', pars_function=browser_def)

logs_browser = spark.read.csv('data/logs_browser.csv', header=True, sep=',') 
logs_browser.show(5)    


if not os.path.isfile('data/logs_platform.csv'):
    logs_pars_to_file(logs_df, 'user_agent_end', 'platform', 'data/logs_platform.csv', device_def)

logs_platform = spark.read.csv('data/logs_platform.csv', header=True, sep=',')
logs_platform.show(5)

+-------------+------+------+--------------------+-------+
|         host|method|status|      user_agent_end|browser|
+-------------+------+------+--------------------+-------+
| 54.36.149.41|   GET|   200|compatible; Ahref...|   None|
|  31.56.96.51|   GET|   200|Linux; Android 6....| Chrome|
|  31.56.96.51|   GET|   200|Linux; Android 6....| Chrome|
|40.77.167.129|   GET|   200|compatible; bingb...|BingBot|
|  91.99.72.15|   GET|   200|Windows NT 6.2; W...|Firefox|
+-------------+------+------+--------------------+-------+
only showing top 5 rows

+-------------+------+------+--------------------+-----------+
|         host|method|status|      user_agent_end|   platform|
+-------------+------+------+--------------------+-----------+
| 54.36.149.41|   GET|   200|compatible; Ahref...|  None None|
|  31.56.96.51|   GET|   200|Linux; Android 6....|Android 6.0|
|  31.56.96.51|   GET|   200|Linux; Android 6....|Android 6.0|
|40.77.167.129|   GET|   200|compatible; bingb...|  None None|
|  

Удалим три колонки для облегчения дальнейшего объединения

In [11]:
logs_browser = logs_browser.drop('method').drop('status').drop('user_agent_end')
logs_browser.show(3)

+------------+-------+
|        host|browser|
+------------+-------+
|54.36.149.41|   None|
| 31.56.96.51| Chrome|
| 31.56.96.51| Chrome|
+------------+-------+
only showing top 3 rows



Из датафрейма logs_platform удалим колонку с юзер агентом, так как он уже выполнил то для чего мы его сохраняли

In [12]:
logs_platform = logs_platform.drop('user_agent_end')
logs_platform.show(3)

+------------+------+------+-----------+
|        host|method|status|   platform|
+------------+------+------+-----------+
|54.36.149.41|   GET|   200|  None None|
| 31.56.96.51|   GET|   200|Android 6.0|
| 31.56.96.51|   GET|   200|Android 6.0|
+------------+------+------+-----------+
only showing top 3 rows



Объединим данные датафреймы для дальнейшей обработки

In [13]:
del logs_df
del base_df

In [14]:
all_df = logs_platform.join(logs_browser, 'host', how = 'inner')
all_df.show(3)

+------------+------+------+---------+-------+
|        host|method|status| platform|browser|
+------------+------+------+---------+-------+
|54.36.149.41|   GET|   200|None None|   None|
|54.36.149.41|   GET|   200|None None|   None|
|54.36.149.41|   GET|   200|None None|   None|
+------------+------+------+---------+-------+
only showing top 3 rows



Для дальнейшего анализа закешируем новый датафрейм

In [15]:
all_df.cache()

DataFrame[host: string, method: string, status: string, platform: string, browser: string]

In [16]:
df_p = all_df.filter(all_df.platform != 'None None') 
clean_df = df_p.filter(df_p.browser != 'None')

In [17]:
# clean_df.show(3)

In [18]:
clean_df.cache()

DataFrame[host: string, method: string, status: string, platform: string, browser: string]

In [19]:
total_users = clean_df.select('host').count()
total_users

37455707

In [20]:
device_name = clean_df.select('platform').distinct().show()

+--------------------+
|            platform|
+--------------------+
|       Android 4.2.1|
|       Mac OS X 10.9|
|          Windows 10|
|           Windows 8|
|     Mac OS X 10.7.3|
|       Android 4.2.2|
|          iOS 11.2.5|
|       Android 4.0.4|
|    Mac OS X 10.13.6|
|             iOS 7.0|
|       Android 4.4.4|
|       Android 5.0.1|
|          iOS 11.2.2|
| like Gecko) Vers...|
|         Windows 8.1|
| like Gecko) Chro...|
|    Mac OS X 10.12.6|
|    Mac OS X 10.10.2|
|            Windows |
|Android 4.1.2 Per...|
+--------------------+
only showing top 20 rows



Количество пользователей устройства

In [21]:
device_users=clean_df.groupby('platform').agg(count('host').alias('device_users'))     

device_users.show(5)

+---------------+------------+
|       platform|device_users|
+---------------+------------+
|  Android 4.2.1|        1944|
|  Mac OS X 10.9|          12|
|     Windows 10|      315455|
|      Windows 8|      733530|
|Mac OS X 10.7.3|      745200|
+---------------+------------+
only showing top 5 rows



Доля пользователей по устройствам

In [22]:
df0 = clean_df.groupby('platform')\
                .agg(count('platform').alias('device_users'))\
                .withColumn('part_device_users', round(F.col('device_users')/total_users * 100, 2))

df0.show(10)

+----------------+------------+-----------------+
|        platform|device_users|part_device_users|
+----------------+------------+-----------------+
|   Android 4.2.1|        1944|             0.01|
|   Mac OS X 10.9|          12|              0.0|
|      Windows 10|      315455|             0.84|
|       Windows 8|      733530|             1.96|
| Mac OS X 10.7.3|      745200|             1.99|
|   Android 4.2.2|       50856|             0.14|
|      iOS 11.2.5|         441|              0.0|
|   Android 4.0.4|         324|              0.0|
|Mac OS X 10.13.6|           8|              0.0|
|         iOS 7.0|      129109|             0.34|
+----------------+------------+-----------------+
only showing top 10 rows



Количество совершенных действий для данного устройства

*Посчитаны просто все действия, без разделения на типы*

In [23]:
device_actions=clean_df.groupby(col('platform')).agg(count('method').alias('device_actions'))

device_actions.show(5)

+---------------+--------------+
|       platform|device_actions|
+---------------+--------------+
|  Android 4.2.1|          1944|
|  Mac OS X 10.9|            12|
|     Windows 10|        315455|
|      Windows 8|        733530|
|Mac OS X 10.7.3|        745200|
+---------------+--------------+
only showing top 5 rows



Всего совершенных действий

In [24]:
total_actions = clean_df.select('method').count()
total_actions

37455707

Доля совершенных действий с данного устройства относительно других устройств

In [25]:
df1 = clean_df.groupby(col('platform'))\
                .agg(count("method").alias("device_actions"))\
                .withColumn('part_device_actions', round(F.col('device_actions')/total_actions * 100, 2))

df1.show(5)

+---------------+--------------+-------------------+
|       platform|device_actions|part_device_actions|
+---------------+--------------+-------------------+
|  Android 4.2.1|          1944|               0.01|
|  Mac OS X 10.9|            12|                0.0|
|     Windows 10|        315455|               0.84|
|      Windows 8|        733530|               1.96|
|Mac OS X 10.7.3|        745200|               1.99|
+---------------+--------------+-------------------+
only showing top 5 rows



In [26]:
total_browser_cnt = clean_df.select('browser').count() # ну мало ли количество вдруг разное

browser_cnt = clean_df.groupby('platform').agg(count('browser').alias('browser_cnt'))

print(total_browser_cnt)
browser_cnt.show(5)

37455707
+---------------+-----------+
|       platform|browser_cnt|
+---------------+-----------+
|  Android 4.2.1|       1944|
|  Mac OS X 10.9|         12|
|     Windows 10|     315455|
|      Windows 8|     733530|
|Mac OS X 10.7.3|     745200|
+---------------+-----------+
only showing top 5 rows



Доля использования для данного браузера относительно остальных браузеров

In [27]:
df2 = clean_df.groupby(col('platform'))\
                .agg(count('browser').alias('browser_cnt'))\
                .withColumn('part_browser', round(F.col('browser_cnt')/total_browser_cnt * 100, 2))  

df2.show(5)

#данные по частоте такаие же как и на браузере, так как браузер на платферме один и тот же

+---------------+-----------+------------+
|       platform|browser_cnt|part_browser|
+---------------+-----------+------------+
|  Android 4.2.1|       1944|        0.01|
|  Mac OS X 10.9|         12|         0.0|
|     Windows 10|     315455|        0.84|
|      Windows 8|     733530|        1.96|
|Mac OS X 10.7.3|     745200|        1.99|
+---------------+-----------+------------+
only showing top 5 rows



Соединим то что уже есть и посмотрим на это

In [28]:
df3=df0.join(df1, on='platform', how='left').join(df2, on='platform', how='left').drop('count').drop('method')
df3.show(3)

+-------------+------------+-----------------+--------------+-------------------+-----------+------------+
|     platform|device_users|part_device_users|device_actions|part_device_actions|browser_cnt|part_browser|
+-------------+------------+-----------------+--------------+-------------------+-----------+------------+
|Android 4.2.1|        1944|             0.01|          1944|               0.01|       1944|        0.01|
|Mac OS X 10.9|          12|              0.0|            12|                0.0|         12|         0.0|
|   Windows 10|      315455|             0.84|        315455|               0.84|     315455|        0.84|
+-------------+------------+-----------------+--------------+-------------------+-----------+------------+
only showing top 3 rows



Рассмотрим статистику по кодам статусов

In [29]:
status=clean_df.groupBy('status').count().show()

+------+--------+
|status|   count|
+------+--------+
|   200|32448454|
|   302| 2424037|
|   502|    6638|
|   404| 1039048|
|   403|      42|
|   500|    6414|
|   304| 1270663|
|   499|  107431|
|   301|  152980|
+------+--------+



Отдельно рассчитаем следующие группы ответов

200

In [30]:
answers_200 = clean_df.filter(clean_df.status == '200')\
                        .groupBy('platform')\
                        .agg(count("status").alias("answers_200"))

answers_200.show(5)

+---------------+-----------+
|       platform|answers_200|
+---------------+-----------+
|  Android 4.2.1|       1894|
|  Mac OS X 10.9|          8|
|     Windows 10|     306035|
|      Windows 8|     729630|
|Mac OS X 10.7.3|     741750|
+---------------+-----------+
only showing top 5 rows



!200

In [31]:
answers_ne200 = clean_df.filter(clean_df.status != '200')\
                        .groupBy(col('platform'))\
                        .agg(count("status").alias("answers_ne200"))

answers_ne200.show(5)

+---------------+-------------+
|       platform|answers_ne200|
+---------------+-------------+
|  Android 4.2.1|           50|
|  Mac OS X 10.9|            4|
|     Windows 10|         9420|
|      Windows 8|         3900|
|Mac OS X 10.7.3|         3450|
+---------------+-------------+
only showing top 5 rows



300 - 399

In [32]:
answers_3XX = clean_df.filter((clean_df.status >= '300') & (clean_df.status < '400'))\
                        .groupby('platform')\
                        .agg(count("status").alias("answers_3XX"))

answers_3XX.show(5)

+-------------+-----------+
|     platform|answers_3XX|
+-------------+-----------+
|Android 4.2.1|         50|
|Mac OS X 10.9|          4|
|   Windows 10|       6258|
|    Windows 8|        450|
|Android 4.2.2|        443|
+-------------+-----------+
only showing top 5 rows



400 - 499

In [33]:
answers_4XX = clean_df.filter((clean_df.status >= '400') & (clean_df.status < '500'))\
                        .groupby('platform')\
                        .agg(count("status").alias("answers_4XX"))

answers_4XX.show(5)

+---------------+-----------+
|       platform|answers_4XX|
+---------------+-----------+
|     Windows 10|       3162|
|      Windows 8|       3450|
|Mac OS X 10.7.3|       3450|
|  Android 4.2.2|       1034|
|     iOS 11.2.5|         84|
+---------------+-----------+
only showing top 5 rows



500+

In [34]:
answers_5XX = clean_df.filter(clean_df.status >= '500')\
                        .groupby('platform')\
                        .agg(count("status").alias("answers_5XX"))

answers_5XX.show(5)

+-------------+-----------+
|     platform|answers_5XX|
+-------------+-----------+
|Android 6.0.1|      13052|
+-------------+-----------+



Объединение answers фреймов

In [35]:
df_mart = df3.join(answers_200, on ='platform', how='left')\
                .join(answers_ne200, on='platform', how='left')\
                .join(answers_3XX, on ='platform', how='left')\
                .join(answers_4XX, on ='platform', how='left')\
                .join(answers_5XX, on ='platform', how='left')

df_mart.show(5)

+---------------+------------+-----------------+--------------+-------------------+-----------+------------+-----------+-------------+-----------+-----------+-----------+
|       platform|device_users|part_device_users|device_actions|part_device_actions|browser_cnt|part_browser|answers_200|answers_ne200|answers_3XX|answers_4XX|answers_5XX|
+---------------+------------+-----------------+--------------+-------------------+-----------+------------+-----------+-------------+-----------+-----------+-----------+
|  Android 4.2.1|        1944|             0.01|          1944|               0.01|       1944|        0.01|       1894|           50|         50|       null|       null|
|  Mac OS X 10.9|          12|              0.0|            12|                0.0|         12|         0.0|          8|            4|          4|       null|       null|
|     Windows 10|      315455|             0.84|        315455|               0.84|     315455|        0.84|     306035|         9420|       6258

Запишем получившуюся "витрину" в файл

In [36]:
df = df_mart.toPandas()

df['answers_200'] = df['answers_200'].fillna(0)
df['answers_ne200'] = df['answers_ne200'].fillna(0)
df['answers_3XX'] = df['answers_3XX'].fillna(0) 
df['answers_4XX'] = df['answers_4XX'].fillna(0) 
df['answers_5XX'] = df['answers_5XX'].fillna(0)

In [None]:
id_count = len(df)
id = [x+1 for x in list(range(id_count))]

df.insert(0, 'ID', id)

In [None]:
df.to_csv('data/df_mart.csv', index=False, sep="|")

Запишем данные в Postgres DataBase (поднимается из докера в папке postgres)

In [None]:
db_con = psycopg2.connect(database='testdb',     
                        user='postgres',
                        password='postgres',
                        host='localhost',
                        port=5432)
cur = db_con.cursor()

Создадим таблицу в соответствии с схемой в csv

In [None]:
db_con.commit()
with open('data/df_mart.csv', 'r') as file:
    next(file) 
    cur.copy_from(file, 'log_mart', sep='|')
    db_con.commit()

In [None]:
db_con.close()