Skip to content

Commit

Permalink
Merge pull request #14 from moj-analytical-services/ut-fix
Browse files Browse the repository at this point in the history
trying to see if I need to run pytest via poetry
  • Loading branch information
isichei committed Jan 10, 2020
2 parents 0645733 + d2ece06 commit b9cf5a4
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,6 @@ jobs:
run: |
pip install pytest pytest-cov poetry boto3 moto coveralls
poetry install
pytest --cov-report term-missing --cov=dataengineeringutils3 tests/
poetry run pytest --cov-report term-missing --cov=dataengineeringutils3 tests/
export GIT_BRANCH=$(echo "${{ github.ref }}" | sed "s/refs\/heads\///")
coveralls
89 changes: 85 additions & 4 deletions dataengineeringutils3/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class BaseSplitFileWriter:
:param s3_basepath: The base path to the s3 location you want to write to S3://...
:param filename_prefix: The filename that you want to keep constant. Every written file is prefixed with this string.
S3 objects written will end in the file number and the extension.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3 (default True). Note does not affect
the file_extension parameter.
:param file_extension: String representing the file extension. Should not be prefixed with a '.'.
Expand Down Expand Up @@ -136,7 +136,7 @@ class BytesSplitFileWriter(BaseSplitFileWriter):
:param s3_basepath: The base path to the s3 location you want to write to S3://...
:param filename_prefix: The filename that you want to keep constant. Every written file is prefixed with this string.
S3 objects written will end in the file number and the extension.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3 (default True). Note does not affect
the file_extension parameter.
:param file_extension: String representing the file extension. Should not be prefixed with a '.'.
Expand Down Expand Up @@ -171,11 +171,10 @@ class StringSplitFileWriter(BaseSplitFileWriter):
writing to s3. Data is written to a StringIO file buffer until it hits a
max_bytes limit at which point the data is written to S3
as single file.
:param s3_basepath: The base path to the s3 location you want to write to S3://...
:param filename_prefix: The filename that you want to keep constant. Every written file is prefixed with this string.
S3 objects written will end in the file number and the extension.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3 (default True). Note does not affect
the file_extension parameter.
:param file_extension: String representing the file extension. Should not be prefixed with a '.'.
Expand Down Expand Up @@ -208,3 +207,85 @@ def _compress_data(self, data):
Converts string data to bytes and then compresses
"""
return gzip.compress(bytes(data, "utf-8"))


class JsonNlSplitFileWriter(BaseSplitFileWriter):
"""
Class for writing json line into large datasets in to chunks and writing to s3.
This class writes to a string (rather than fileIO) and does smaller checks for a speedier
read write. Espeicially when writing multiple lines. However, if scaling to large amounts of data
it is probably better to use a json writer like jsonlines with the BytesSplitFileWriter.
The extension and the _write methods are defined in classes which extend this class
lines = [
'{"key": "value"}'
]
with JsonNlSplitFileWriter("s3://test/", "test-file") as writer:
for line in lines:
writer.write_line(line)
"""

def __init__(
self, s3_basepath, filename_prefix, max_bytes=1000000000, chunk_size=1000
):
super(JsonNlSplitFileWriter, self).__init__(
s3_basepath=s3_basepath,
filename_prefix=filename_prefix,
max_bytes=max_bytes,
compress_on_upload=True,
file_extension="jsonl.gz",
)

self.chunk_size = chunk_size
self.total_lines = 0
self.num_lines = 0

def __enter__(self):
self.mem_file = self.get_new_mem_file()
self.num_lines = 0
self.num_files = 0
return self

def __exit__(self, *args):
self.close()

def write_line(self, line):
"""Writes line as string"""
self.mem_file += f"{line}\n"
self.num_lines += 1
self.total_lines += 1
if self.file_size_limit_reached():
self.write_to_s3()

def file_size_limit_reached(self):
if (
not self.num_lines % self.chunk_size
and sys.getsizeof(self.mem_file) > self.max_bytes
):
return True
else:
return False

def write_lines(self, lines, line_transform=lambda x: x):
"""
Writes multiple lines then checks if file limit hit.
So will be quicker but less accurate on breaking up files.
"""
self.mem_file += "\n".join(line_transform(l) for l in lines) + "\n"
self.num_lines += len(lines)
self.total_lines += len(lines)
if self.file_size_limit_reached():
self.write_to_s3()

def reset_file_buffer(self):
self.num_files += 1
self.num_lines = 0
self.mem_file = self.get_new_mem_file()

def write_to_s3(self):
gzip_string_write_to_s3(self.mem_file, self.get_s3_filepath())
self.reset_file_buffer()

def close(self):
"""Write all remaining lines to a final file"""
if self.num_lines:
self.write_to_s3()
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "dataengineeringutils3"
version = "1.0.0"
version = "1.0.1"
description = "Data engineering utils python 3 version"
authors = ["Josh Rowe <josh.rowe@digital.justice.gov.uk>"]
license = "MIT"
Expand Down
30 changes: 13 additions & 17 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,24 @@
from tests.mocks import MockQs


FILE_KEY = "test-key"
BUCKET_NAME = "test"
S3_BASEPATH = f"s3://{BUCKET_NAME}/"
MAX_BYTES = 80000
CHUNK_SIZE = 1000


def test_large_select_queryset_with_writer(s3, large_select_queryset):
"""
Test file writer and queryset with large result set.
"""
file_key = "test-key"
bucket_name = "test"
s3_path = f"s3://{bucket_name}/{file_key}"
s3.meta.client.create_bucket(Bucket=bucket_name)
bucket = s3.Bucket(bucket_name)
with JsonNlSplitFileWriter(s3_path, 1024, 10000) as writer:
s3.meta.client.create_bucket(Bucket=BUCKET_NAME)
bucket = s3.Bucket(BUCKET_NAME)
with JsonNlSplitFileWriter(S3_BASEPATH, FILE_KEY, 1024, 10000) as writer:
for rows in large_select_queryset.iter_chunks():
writer.write_lines(rows)

keys_in_bucket = [f"s3://{bucket_name}/{o.key}" for o in bucket.objects.all()]
keys_in_bucket = [f"s3://{BUCKET_NAME}/{o.key}" for o in bucket.objects.all()]
files_in_bucket = len(keys_in_bucket)
assert files_in_bucket == 10

Expand All @@ -44,16 +48,10 @@ def test_large_select_queryset_with_writer(s3, large_select_queryset):
)


MAX_BYTES = 80000
CHUNK_SIZE = 1000


def write_with_writer_and_qs(result_set):
select_queryset = SelectQuerySet(MockQs(result_set), "", 10000)

with JsonNlSplitFileWriter(
"s3://test/test-file.josnl.gz", MAX_BYTES, CHUNK_SIZE
) as writer:
with JsonNlSplitFileWriter(S3_BASEPATH, FILE_KEY, MAX_BYTES, CHUNK_SIZE) as writer:
for results in select_queryset.iter_chunks():
writer.write_lines(results)

Expand All @@ -64,9 +62,7 @@ def write_with_write_to_file(result_set):
def transform_line(l):
return l

with JsonNlSplitFileWriter(
"s3://test/test-file.josnl.gz", MAX_BYTES, CHUNK_SIZE
) as writer:
with JsonNlSplitFileWriter(S3_BASEPATH, FILE_KEY, MAX_BYTES, CHUNK_SIZE) as writer:
select_queryset.write_to_file(writer, transform_line)


Expand Down
70 changes: 69 additions & 1 deletion tests/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,80 @@
from io import StringIO, BytesIO

from dataengineeringutils3.s3 import gzip_string_write_to_s3
from dataengineeringutils3.writer import BytesSplitFileWriter, StringSplitFileWriter
from dataengineeringutils3.writer import (
BytesSplitFileWriter,
StringSplitFileWriter,
JsonNlSplitFileWriter,
)
from tests.helpers import time_func

import jsonlines


def test_json_split_file_writer(s3):
"""Test Writer splits files, gzips and sends to s3"""
file_key = "test-key"
bucket_name = "test"
s3_basepath = f"s3://{bucket_name}/"

s3.meta.client.create_bucket(Bucket=bucket_name)
bucket = s3.Bucket(bucket_name)
with JsonNlSplitFileWriter(s3_basepath, file_key, 1024, 2) as writer:
for i in range(150):
writer.write_line(f"{i}. This test line number {i + 1}")
assert writer.total_lines == 150
keys_in_bucket = [f"s3://{bucket_name}/{o.key}" for o in bucket.objects.all()]
files_in_bucket = len(keys_in_bucket)
assert files_in_bucket == 5
assert keys_in_bucket == [
f"{s3_basepath}{file_key}-{i}.jsonl.gz" for i in range(files_in_bucket)
]


MAX_BYTES = 80000
CHUNK_SIZE = 1000


def write_with_writer(result_set):
with JsonNlSplitFileWriter(
"s3://test/", "test-file", MAX_BYTES, CHUNK_SIZE
) as writer:
writer.write_lines(result_set)


def write_manually(result_set):
string = ""
num_files = 0
num_lines = 0
while True:
for l in result_set:
string += f"{l}"
if not num_lines % CHUNK_SIZE and sys.getsizeof(string) > MAX_BYTES:
gzip_string_write_to_s3(
string, f"s3://test/test-file-two-{num_files}.jsonl.gz"
)
num_files += 1
num_lines = 0
string = ""
num_lines += 1
break
if string:
gzip_string_write_to_s3(string, f"s3://test/test-file-two-{num_files}.josnl.gz")


def test_speed_of_writer(result_set, s3):
"""
Test that generator is not much slower than a flat list
"""
s3.meta.client.create_bucket(Bucket="test")

range_time = time_func(write_manually, result_set)

qs_time = time_func(write_with_writer, result_set)

assert qs_time < range_time


@pytest.mark.parametrize(
"folder,filename,compress", [("test-csv/", "test-file", False), ("", "a", True)]
)
Expand Down

0 comments on commit b9cf5a4

Please sign in to comment.