# Parse zipped PostgreSQL's logs and save them in a parquet file

I'm administrating a large number of PostgreSQL's servers and I get their logs zipped. To analyze them I've done a Spark task for:

1. Unzip the files
2. Parse then logs of PostgreSQL
3. Save (append) the data into a parquet file

In a following notebook I will show how to query them to get usefull information.


### PostgreSQL logs

The log format specified in the PostgreSQL's config file is the following:

`log_line_prefix = '%t %a %u %d %c '`

Special values:

* %a = application name
* %u = user name
* %d = database name
* %t = timestamp without milliseconds
* %c = session ID

Importing needed packages:

In [None]:
from datetime import datetime
import io
import re
import zipfile

from pyspark.sql.types import Row
from pyspark.sql import SparkSession

Create Spark Session to work with:

In [None]:
spark = (SparkSession
         .builder
         .appName('PostgreSQL_logs_into_Parquet')
         .config("spark.sql.execution.arrow.enabled", "true")
         .getOrCreate())

---

## Functions 

### Read ZIP files

To read zip files, there's no native funcition in Spark, so I had to create a function for this. I got the idea from [StackOverflow](
https://stackoverflow.com/questions/28569788/how-to-open-stream-zip-files-through-spark).

In [None]:
def extract_postgresql_zip(zip_filename: str, zip_bytes: bytes):
    """Unzip a zip bytes (already in memory, not a file) and add additional fields based on the filename of the zip file
    
    Parameters
    ----------
    zip_filename : str
        Full path of the zip filename
    zip_bytes : bytes
        Bytes of the zip file

    Returns
    -------
    result : list
        List of tuples (info_file, raw_log_file ) 
        where info_file is a dict with the zip_filename and zip_filename break into properties
        and raw_log_file is a str
    Example:
        [({'type': 'POSTGRESQL',
        'servername': 'myserver',
        'date': '2018-10-02_120852',
        'filename': 'postgresql-2018-01-01_000000.log',
        'zip_filename': 'file:/zip_files/POSTGRESQL__myserver__2018-10-02_120852.zip'},
        'raw_postgresql_log_data_HERE'), ({},)]
    """
    raw_log_files = zip_extract(zip_bytes)
    zip_filename_splitted = zip_filename.split('/')[-1].split('.')[0].split('__')
    zip_filename_parts = {
        'type': zip_filename_splitted[0],
        'servername': zip_filename_splitted[1]
    }
    
    return [({**zip_filename_parts, 'log_filename': file, 'zip_filename': zip_filename }, content)
                       for file, content in raw_log_files]


def zip_extract(zip_bytes: bytes):
    """Unzip a zip bytes (already in memory, not a file)
    
    Parameters
    ----------
    zipbytes : bytes
        Bytes of the zip file

    Returns
    -------
    result : list
        List of tuples (postgresql_log_filename, raw_log_content)
    """
    in_memory_data = io.BytesIO(zip_bytes)
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    zipped_files = [i for i in file_obj.namelist()]
    files_content = []
    for file in zipped_files:
        try:
            files_content.append((file, file_obj.open(file).read().decode('UTF-8')))
        except:
            print('Failed to process file {file}'.format(file=file))
    return files_content

### To parse PostgreSQL logs

`parse_postgresql_log_file()` detects every log entry. It's important to note that a log entry can have multiple lines, for example when PostgreSQL logs DDL it writes in the log file all SQL executed to create tables, functions...

Then each log entry is parsed with `parse_postgresql_log_entry()`.

In [None]:
def parse_postgresql_log_file(info_file: dict, raw_file: str):
    """Parse a PostgreSQL log file with the folowing log line prefix:
                log_line_prefix = '%t %a %u %d %c '

    Parameters
    ----------
    info_file : dict
        Information related to the raw_file (date, etc..)
    raw_file : str
        The PostgreSQL log to parse

    Returns
    -------
    result : list
        List of all entries parsed. So, it's a list of dicts
    """
    reg_expression = '^(?P<date>\d{4}-\d{2}-\d{2}) (?P<time>\d{2}:\d{2}:\d{2}) CEST '
    pattern = re.compile(reg_expression, re.MULTILINE)
    raw_entries = pattern.finditer(raw_file)
    prior_entry = None
    log_entries = []
    for entry in raw_entries:
        if prior_entry is not None:
            log_entries.append(parse_postgresql_log_entry(info_file, raw_file[prior_entry.start():entry.start()]))
        prior_entry = entry
    return log_entries

def parse_postgresql_log_entry(info_file: dict, log_entry: str):
    """Parse an entry of a PostgreSQL log file. Usuarlly one entry is one line, 
            but can be multiple lines in case of DDL.

    Parameters
    ----------
    info_file : dict
        Information related to the raw_file (date, etc..)
    log_entry : str
        Log entry to parse. Must have the folowing log line prefix:
                log_line_prefix = '%t %a %u %d %c '

    Returns
    -------
    result : dict
        A dictionary with the parsed information
    """    
    try:
        reg_expression = '(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) CEST (?P<app>.*) (?P<user>\S*) (?P<db>\S*) (?P<trans_id>\w{8}\.\w*) (?P<msg_type>INFO|STATEMENT|ERROR|LOG|DETAIL|FATAL):  (?P<msg>(.|\s)*)'
        pattern = re.compile(reg_expression)
        match = pattern.match(log_entry)
        dict_mappings = match.groupdict()
        dict_mappings['log_type'] = info_file['type']
        dict_mappings['servername'] = info_file['servername']
        dict_mappings['timestamp'] = datetime.strptime(dict_mappings['timestamp'], '%Y-%m-%d %H:%M:%S') #'2018-03-27 09:25:22'
        return dict_mappings
    except:
        reg_expression = '(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) CEST (?P<msg>.*)'
        pattern = re.compile(reg_expression)
        match = pattern.match(log_entry)
        if match:
            dict_mappings = match.groupdict()
            dict_mappings['log_type'] = info_file['type']
            dict_mappings['servername'] = info_file['servername']
            dict_mappings['timestamp'] = datetime.strptime(dict_mappings['timestamp'], '%Y-%m-%d %H:%M:%S')
            dict_mappings['msg_type'] = 'NOT_PARSED'
            dict_mappings['app'] = ''
            dict_mappings['user'] = ''
            dict_mappings['db'] = ''
            dict_mappings['trans_id'] = ''
        else:
            dict_mappings = { 'log_type': info_file['type'], 'servername': info_file['servername'],
                             'msg_type': 'NOT_PARSED', 'app': '', 'user': '', 'db': '',
                             'timestamp': None, 'trans_id': '', 'msg': log_entry }
        return dict_mappings

---

##  Process zip files

`binaryFiles()` returns a RDD where each element is a tuple with the content: `(zip_filename, zip_content)`

In [None]:
zips = spark.sparkContext.binaryFiles("/zips/POSTGRESQL__*.zip")
zips.count()

I use `flatMap()` the zip extraction because we want that each row is a log file, instead of being grouped by the original zip files.

Then `raw_pg_logs` is a RDD of tuples. An example of the tuple:

`({'type': 'POSTGRESQL',
        'servername': 'myserver',
        'date': '2018-10-02_120852',
        'filename': 'postgresql-2018-01-01_000000.log',
        'zip_filename': 'file:/zip_files/POSTGRESQL__myserver__2018-10-02_120852.zip'},
        'raw_postgresql_log_data_HERE')`

In [None]:
raw_pg_logs = zips.flatMap(lambda x: extract_postgresql_zip(x[0], x[1]))

`flatMap()` the parsing of the log files to have that each raw is a log entry:

In [None]:
pg_logs = raw_pg_logs.flatMap(lambda log: parse_postgresql_log_file(log[0], log[1]))

---

## Save logs to a Parquet fine

When saving a Parquet file, Spark enables us to partition it by a field. Databricks Support [recomendation](https://forums.databricks.com/questions/101/what-is-an-optimal-size-for-file-partitions-using.html) about Parquet partitions size:
> This all depends on the dataset size and specific use cases, but, in general, we've seen that Parquet partitions of about 1GB are optimal.

To save the RDD to a parquet file, I convert it to a Spark DataFrame:

In [None]:
%%time
df = pg_logs.map(lambda r: Row(**r)).toDF()
df.write.partitionBy(['servername']).mode("append").parquet('pg_logs.parquet')