# Convert a day of data into parquet

We're interested in parsing `pg_dump` data from the mozaggregator database in the following form:

```
../data
├── build_id
│   └── 20191201
│       ├── 474306.dat.gz
│       └── toc.dat
└── submission
    └── 20191201
        ├── 474405.dat.gz
        ├── 474406.dat.gz
        ...
        ├── 474504.dat.gz
        └── toc.dat

4 directories, 103 files
```

In [1]:
data_dir="../data"

## Add flattened metadata to the aggregates

In [2]:
from pyspark.sql import functions as F, types as T

metadata_struct = T.StructType([
    T.StructField("aggregate_type", T.StringType(), False),
    T.StructField("ds_nodash", T.StringType(), False),
    T.StructField("table_id", T.IntegerType(), False),
])
@F.udf(metadata_struct)
def parse_filename(path):
    aggregate_type, ds_nodash, filename = path.split("/")[-3:]
    return aggregate_type, ds_nodash, int(filename.split(".")[0])

def read_pg_dump(input_dir):
    return (
        spark.read.csv(f"{input_dir}/*.dat.gz", sep="\t", schema="dimension string, aggregate string")
        .withColumn("file_name", parse_filename(F.input_file_name()))
        .select("dimension", "aggregate", "file_name.*")
    )

df = read_pg_dump(f"{data_dir}/submission_date/20191201")
df.show()
df.count()

+--------------------+--------------------+---------------+---------+--------+
|           dimension|           aggregate| aggregate_type|ds_nodash|table_id|
+--------------------+--------------------+---------------+---------+--------+
|{"os": "Darwin", ...|{0,0,0,0,0,0,0,1,...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{0,0,0,0,0,0,0,0,...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{0,0,0,0,2,0,0,0,...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{0,0,4,1,2,0,0,1,...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{17,0,0,0,0,0,0,0...|submission_date| 20191201|  474922|
|{"os": "Darwin", ...|{0,11,0,0,0,0,0,0...|submissio

3123817

In [3]:
df.limit(1).show(vertical=True, truncate=False)

-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 dimension      | {"os": "Darwin", "child": "true", "label": "", "metric": "SCALARS_MEDIA.AUTOPLAY_WOULD_NOT_BE_ALLOWED_COUNT", "osVersion": "16.5.0", "application": "Firefox", "architecture": "x86-64"}                       
 aggregate      | {0,0,0,0,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,16,2} 
 aggregate_type | submission_date                                                                                                                                                                                                
 ds_nodash      | 20191201                                                                      

In [4]:
df.select("table_id").distinct().count()

100

## Parse the table of contents

In [5]:
toc_file = f"{data_dir}/submission_date/20191201/toc.dat"

with open(toc_file, "rb") as f:
    data = f.read()

In [6]:
dat = [x for x in data.split(b"\n") if b".dat" in x]
dat[:3]

[b'\x00\x00\x00474879.dat\x00\xd0>\x07\x00\x00\x01\x00\x00\x00\x00\x01\x00\x00\x000\x00\x08\x00\x00\x0090014330\x00"\x00\x00\x00submission_date_aurora_40_20191201\x00',
 b'\x00\x00\x00474832.dat\x00\xe0>\x07\x00\x00\x01\x00\x00\x00\x00\x01\x00\x00\x000\x00\x08\x00\x00\x0090014699\x00"\x00\x00\x00submission_date_aurora_41_20191201\x00',
 b'\x00\x00\x00474848.dat\x00\xcd>\x07\x00\x00\x01\x00\x00\x00\x00\x01\x00\x00\x000\x00\x08\x00\x00\x0090014321\x00"\x00\x00\x00submission_date_aurora_42_20191201\x00']

In [7]:
def extract_mapping(line):
    """Parse the binary toc files for the table and the table name."""
    
    # We rely on the padding in the binary file to extract the necessary information
    # b'\x00\x00\x00474424.dat\x00%=\x07\x00\x00\x01\x00\x00\x00\x00\x01\x00\x00\x000
    # \x00\x08\x00\x00\x0090014321\x00"\x00\x00\x00submission_date_aurora_42_20191201\x00'
    processed = line.replace(b"\x00", b" ").strip().split()
    
    # [b'474455.dat', b'(=\x07', b'\x01', b'\x01', b'0', b'\x08', b'90014330', b'"', 
    # b'submission_date_aurora_40_20191201']
    table_name = processed[-1].decode()
    if b"CREATE INDEX" in line:
        # this is an indexed table, get the actual name
        for element in processed:
            if b"public." not in element:
                continue
            table_name = element.split(b"public.")[-1].decode()
            
    return {"table_id": processed[0].split(b".")[0].decode(), "table_name": table_name}

def parse_toc(data):
    return [extract_mapping(line) for line in data.split(b"\n") if b".dat" in line]

parse_toc(data)[:10]

[{'table_id': '474879', 'table_name': 'submission_date_aurora_40_20191201'},
 {'table_id': '474832', 'table_name': 'submission_date_aurora_41_20191201'},
 {'table_id': '474848', 'table_name': 'submission_date_aurora_42_20191201'},
 {'table_id': '474829', 'table_name': 'submission_date_aurora_43_20191201'},
 {'table_id': '474845', 'table_name': 'submission_date_aurora_44_20191201'},
 {'table_id': '474889', 'table_name': 'submission_date_aurora_45_20191201'},
 {'table_id': '474864', 'table_name': 'submission_date_aurora_46_20191201'},
 {'table_id': '474875', 'table_name': 'submission_date_aurora_47_20191201'},
 {'table_id': '474881', 'table_name': 'submission_date_aurora_48_20191201'},
 {'table_id': '474858', 'table_name': 'submission_date_aurora_49_20191201'}]

In [8]:
spark.createDataFrame(parse_toc(data)).show(truncate=False, n=5)

+--------+----------------------------------+
|table_id|table_name                        |
+--------+----------------------------------+
|474879  |submission_date_aurora_40_20191201|
|474832  |submission_date_aurora_41_20191201|
|474848  |submission_date_aurora_42_20191201|
|474829  |submission_date_aurora_43_20191201|
|474845  |submission_date_aurora_44_20191201|
+--------+----------------------------------+
only showing top 5 rows





### TOC for `build_id`s

In [9]:
toc_file = f"{data_dir}/build_id/20191220/toc.dat"

with open(toc_file, "rb") as f:
    data = f.read()

In [10]:
list(filter(lambda x: b".dat" in x, data.split(b"\n")))

[b"\x00\x00\x00496168.dat\x00'\x92\x07\x00\x00\x01\x00\x00\x00\x00\x01\x00\x00\x000\x00\x08\x00\x00\x0091418126\x00\x1c\x00\x00\x00build_id_nightly_73_20191220\x00",
 b'\x00\x00\x00496167.dat\x00\xb9\x91\x07\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x001259\x00\x08\x00\x00\x0091420008\x00+\x00\x00\x00build_id_nightly_68_20191220_dimensions_idx\x00\x05\x00\x00\x00INDEX\x00\x04\x00\x00\x00\x00\x87\x00\x00\x00CREATE INDEX build_id_nightly_68_20191220_dimensions_idx ON public.build_id_nightly_68_20191220 USING gin (dimensions jsonb_path_ops);']

In [11]:
dat = [x for x in data.split(b"\n") if b".dat" in x]
processed = dat[1].replace(b"\x00", b" ").strip().split()
processed

[b'496167.dat',
 b'\xb9\x91\x07',
 b'\x04',
 b'1259',
 b'\x08',
 b'91420008',
 b'+',
 b'build_id_nightly_68_20191220_dimensions_idx',
 b'\x05',
 b'INDEX',
 b'\x04',
 b'\x87',
 b'CREATE',
 b'INDEX',
 b'build_id_nightly_68_20191220_dimensions_idx',
 b'ON',
 b'public.build_id_nightly_68_20191220',
 b'USING',
 b'gin',
 b'(dimensions',
 b'jsonb_path_ops);']

In [12]:
parse_toc(data)

[{'table_id': '496168', 'table_name': 'build_id_nightly_73_20191220'},
 {'table_id': '496167', 'table_name': 'build_id_nightly_68_20191220'}]

## Putting everything together

In [13]:
from pyspark.sql import Row

def main(input_dir, output_dir):
    # parse the table of contents
    toc_file = f"{input_dir}/toc.dat"
    with open(toc_file, "rb") as f:
        data = f.read()
    toc_df = spark.createDataFrame([Row(**d) for d in parse_toc(data)])

    df = read_pg_dump(input_dir)
    # join, reorder, and write to a single parquet partition
    joined = df.join(toc_df, on="table_id")
    columns = ["table_name", "aggregate_type", "ds_nodash", "dimension", "aggregate"]

    out_df = joined.select(*columns)
    # NOTE: strip out this jupyter magic if copied into a script
    %time out_df.repartition(1).write.parquet(output_dir, mode="overwrite")


input_directory = f"{data_dir}/submission_date/20191201"
output_directory = f"{data_dir}/parquet/submission_date/20191201"
main(input_directory, output_directory)

CPU times: user 3.96 ms, sys: 1.35 ms, total: 5.31 ms
Wall time: 16.6 s


In [14]:
! du -h ../data/submission_date/20191201/
! du -h ../data/parquet/submission_date/20191201/

118M	../data/submission_date/20191201/
210M	../data/parquet/submission_date/20191201/


In [15]:
read_pg_dump(input_directory).count() == spark.read.parquet(output_directory).count()

True

### And again, for build id aggregates

In [16]:
input_directory = f"{data_dir}/build_id/20191201"
output_directory = f"{data_dir}/parquet/build_id/20191201"
main(input_directory, output_directory)

CPU times: user 1.43 ms, sys: 480 µs, total: 1.91 ms
Wall time: 2.28 s


In [17]:
! du -h ../data/build_id/20191201/
! du -h ../data/parquet/build_id/20191201/

8.5M	../data/build_id/20191201/
 14M	../data/parquet/build_id/20191201/


In [18]:
read_pg_dump(input_directory).count() == spark.read.parquet(output_directory).count()

True