Skip to content

Commit

Permalink
Merge pull request #25 from moj-analytical-services/boto
Browse files Browse the repository at this point in the history
Boto
  • Loading branch information
Karik Isichei committed Feb 11, 2021
2 parents 2dd8509 + 42c5533 commit a724628
Show file tree
Hide file tree
Showing 11 changed files with 804 additions and 707 deletions.
53 changes: 53 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
name: Lint
on:
- pull_request

jobs:
preview:
name: Test
runs-on: ubuntu-latest
strategy:
max-parallel: 4
matrix:
python-version: [3.6, 3.7, 3.8]
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 1

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}

- name: Check for flake8 config
run: |
if [ -e ./.flake8 ]; then
echo "A .flake8 config file has been found in the root of this branch and will be used in this workflow."
else
echo "No .flake8 config file has been found in the root of this branch and so the standard Data Engineering config will used."
curl https://moj-analytical-services.github.io/.github/configs/data-engineering/flake8 > .flake8
fi
- name: Check for yamllint config
run: |
if [ -e ./.yamllint ]; then
echo "A .yamllint config file has been found in the root of this branch and will be used in this workflow."
else
echo "No .yamllint config file has been found in the root of this branch and so the standard Data Engineering config will used."
curl https://moj-analytical-services.github.io/.github/configs/data-engineering/yamllint > .yamllint
fi
- name: Install dependencies
run: |
python -m pip install --upgrade pip==20.3.3
pip install flake8 yamllint
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
- name: Lint Python files with flake8
run: >
flake8 .
- name: Lint YAML files with yamllint
run: >
yamllint .
16 changes: 3 additions & 13 deletions .github/workflows/pythonpackage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,16 @@ jobs:
python-version: [3.6, 3.7, 3.8]

steps:
- uses: actions/checkout@v1
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v1
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install --upgrade pip==20.3.3
pip install poetry
poetry install
- name: Check compliance with black
uses: lgeiger/black-action@master
with:
args: --check dataengineeringutils3/ tests/
- name: Lint with flake8
run: |
pip install flake8
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=88 --statistics
- name: Test with pytest
env:
COVERALLS_REPO_TOKEN: ${{ secrets.github_token }}
Expand Down
13 changes: 10 additions & 3 deletions dataengineeringutils3/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,28 @@ class SelectQuerySet:
1000,
)
# Write out json string
with JsonNlSplitFileWriter("s3://test/test-file.jsonl.gz") as writer:
column_names = select_queryset.headers
for row in select_queryset:
writer.write_line(json.dumps(zip(column_names, row), cls=DateTimeEncoder))
json_line_str = json.dumps(
dict(zip(column_names, row)),
cls=DateTimeEncoder
)
writer.write_line(json_line_str)
# Use a function to convert row to json
with JsonNlSplitFileWriter("s3://test/test-file.jsonl.gz") as writer:
column_names = select_queryset.headers
def transform_line(row):
return json.dumps(zip(column_names, row), cls=DateTimeEncoder)
return json.dumps(dict(zip(column_names, row)), cls=DateTimeEncoder)
select_queryset.write_to_file(writer, transform_line)
# Use a function to convert row to json but write multiple lines at once
with JsonNlSplitFileWriter("s3://test/test-file.jsonl.gz") as writer:
column_names = select_queryset.headers
def transform_line(row):
return json.dumps(zip(column_names, row), cls=DateTimeEncoder)
return json.dumps(dict(zip(column_names, row)), cls=DateTimeEncoder)
for results in select_queryset.iter_chunks():
writer.write_lines(results, transform_line)
"""
Expand Down
14 changes: 9 additions & 5 deletions dataengineeringutils3/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ def s3_path_to_bucket_key(s3_path):

def bucket_key_to_s3_path(bucket, key):
"""
Takes an S3 bucket and key combination and returns the full S3 path to that location.
Takes an S3 bucket and key combination and returns the
full S3 path to that location.
"""
return f"s3://{bucket}/{key}"

Expand All @@ -44,7 +45,9 @@ def get_filepaths_from_s3_folder(
s3_folder_path, file_extension=None, exclude_zero_byte_files=True
):
"""
Get a list of filepaths from a bucket. If extension is set to a string then only return files with that extension otherwise if set to None (default) all filepaths are returned.
Get a list of filepaths from a bucket. If extension is set to a string
then only return files with that extension otherwise if set to None (default)
all filepaths are returned.
:param s3_folder_path: "s3://...."
:param extension: file extension, e.g. .json
:param exclude_zero_byte_files: Whether to filter out results of zero size: True
Expand Down Expand Up @@ -116,7 +119,8 @@ def copy_s3_folder_contents_to_new_folder(
from_s3_folder_path, to_s3_folder_path, exclude_zero_byte_files=False
):
"""
Copies complete folder structure within from_s3_folder_path to the to_s3_folder_path.
Copies complete folder structure within from_s3_folder_path
to the to_s3_folder_path.
Note any s3 objects in the destination folder will be overwritten if it matches the
object name being written.
:param from_s3_folder_path: Folder path that you want to copy "s3://...."
Expand All @@ -126,7 +130,7 @@ def copy_s3_folder_contents_to_new_folder(
to_s3_folder_path = _add_slash(to_s3_folder_path)

all_from_filepaths = get_filepaths_from_s3_folder(
from_s3_folder_path, exclude_zero_byte_files=False
from_s3_folder_path, exclude_zero_byte_files=exclude_zero_byte_files
)
for afp in all_from_filepaths:
tfp = afp.replace(from_s3_folder_path, to_s3_folder_path)
Expand Down Expand Up @@ -205,7 +209,7 @@ def write_local_file_to_s3(local_file_path, s3_path, overwrite=False):
bucket, key = s3_path_to_bucket_key(s3_path)
s3_resource = boto3.resource("s3")

if check_for_s3_file(s3_path) and overwrite == False:
if check_for_s3_file(s3_path) and overwrite is False:
raise ValueError("File already exists. Pass overwrite = True to overwrite")
else:
resp = s3_resource.meta.client.upload_file(local_file_path, bucket, key)
Expand Down
87 changes: 51 additions & 36 deletions dataengineeringutils3/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ class BaseSplitFileWriter:
This is acts as a "file like object". Data is written to an in memory file
(note this file is defined in subclasses and not set in this base class).
until it hits a max_bytes limit at which point the data is written to S3
as single file. The in memory file is defined by the sub classes
as single file. The in memory file is defined by the sub classes
BytesSlitFileWriter and StringSplitFileWriter. These subclasses attempt
to mimic the expected response of BytesIO and StringIO.
to mimic the expected response of BytesIO and StringIO.
:param s3_basepath: The base path to the s3 location you want to write to S3://...
:param filename_prefix: The filename that you want to keep constant. Every written file is prefixed with this string.
:param filename_prefix: The filename that you want to keep constant. Every written
file is prefixed with this string.
S3 objects written will end in the file number and the extension.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3 (default True). Note does not affect
the file_extension parameter.
:param file_extension: String representing the file extension. Should not be prefixed with a '.'.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size)
default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3
(default True). Note does not affect the file_extension parameter.
:param file_extension: String representing the file extension.
Should not be prefixed with a '.'.
"""

def __init__(
Expand Down Expand Up @@ -117,12 +120,15 @@ class BytesSplitFileWriter(BaseSplitFileWriter):
file is suffixed with an integer (first file is suffixed with 0, the next 1, etc)
:param s3_basepath: The base path to the s3 location you want to write to S3://...
:param filename_prefix: The filename that you want to keep constant. Every written file is prefixed with this string.
:param filename_prefix: The filename that you want to keep constant. Every written
file is prefixed with this string.
S3 objects written will end in the file number and the extension.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3 (default True). Note does not affect
the file_extension parameter.
:param file_extension: String representing the file extension. Should not be prefixed with a '.'.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size)
default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3
(default True). Note does not affect the file_extension parameter.
:param file_extension: String representing the file extension.
Should not be prefixed with a '.'.
:Example:
Expand All @@ -142,12 +148,13 @@ class BytesSplitFileWriter(BaseSplitFileWriter):
) as f:
f.write(b"This is some text")
f.write(b"This is some other text")
# Example 2 - Using it with a writing package
# The following example uses jsonlines to write the data to a BytesSplitFileWriter
# when data written to the in memory buffer exceeds BytesSplitFileWriter then the data
# in the buffer is written to S3. With the first file being "s3://test/folder/test-file-0.jsonl.gz"
# when data written to the in memory buffer exceeds BytesSplitFileWriter then the
# data in the buffer is written to S3. With the first file being
# "s3://test/folder/test-file-0.jsonl.gz"
# and the next "s3://test/folder/test-file-1.jsonl.gz", etc.
from dataengineeringutils3.writer import BytesSplitFileWriter
Expand All @@ -158,7 +165,7 @@ class BytesSplitFileWriter(BaseSplitFileWriter):
{"col1": 1, "col2": "y"},
{"col1": 2, "col2": "z"}
]
bsfw = BytesSplitFileWriter("s3://test/folder/",
"test-file",
max_bytes=30,
Expand Down Expand Up @@ -186,17 +193,21 @@ class StringSplitFileWriter(BaseSplitFileWriter):
StringIO file like object for splitting large datasets in to chunks and
writing to s3. Data is written to a StringIO file buffer until it hits a
max_bytes limit at which point the data is written to S3 as a
as single file. Then data continues to be written to a new StringIO buffer until that
hits the size limit which results in a new single file being written to S3. Each S3
file is suffixed with an integer (first file is suffixed with 0, the next 1, etc)
as single file. Then data continues to be written to a new StringIO buffer
until that hits the size limit which results in a new single file being
written to S3. Each S3 file is suffixed with an integer (first file is
suffixed with 0, the next 1, etc)
:param s3_basepath: The base path to the s3 location you want to write to S3://...
:param filename_prefix: The filename that you want to keep constant. Every written file is prefixed with this string.
:param filename_prefix: The filename that you want to keep constant. Every written
file is prefixed with this string.
S3 objects written will end in the file number and the extension.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size) default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3 (default True). Note does not affect
the file_extension parameter.
:param file_extension: String representing the file extension. Should not be prefixed with a '.'.
:param max_bytes: The maximum number of bytes for each file (uncompressed file size)
default set at 1GB.
:param compress_on_upload: If the file should be compressed before writing to S3
(default True). Note does not affect the file_extension parameter.
:param file_extension: String representing the file extension. Should not be
prefixed with a '.'.
:Example:
Expand All @@ -216,12 +227,15 @@ class StringSplitFileWriter(BaseSplitFileWriter):
) as f:
f.write("This is some text")
f.write("This is some other text")
# Example 2 - Using it with a writing package
# The following example uses jsonlines to write the data to a BytesSplitFileWriter
# when data written to the in memory buffer exceeds BytesSplitFileWriter then the data
# in the buffer is written to S3. With the first file being "s3://test/folder/test-file-0.jsonl.gz"
# The following example uses jsonlines to write the
# data to a BytesSplitFileWriter
# when data written to the in memory buffer exceeds
# BytesSplitFileWriter then the data in the buffer
# is written to S3. With the first file being
# "s3://test/folder/test-file-0.jsonl.gz"
# and the next "s3://test/folder/test-file-1.jsonl.gz", etc.
from dataengineeringutils3.writer import StringSplitFileWriter
Expand All @@ -232,7 +246,7 @@ class StringSplitFileWriter(BaseSplitFileWriter):
{"col1": 1, "col2": "y"},
{"col1": 2, "col2": "z"}
]
ssfw = StringSplitFileWriter("s3://test/folder/",
"test-file",
max_bytes=30,
Expand Down Expand Up @@ -264,10 +278,11 @@ def _compress_data(self, data):
class JsonNlSplitFileWriter(BaseSplitFileWriter):
"""
Class for writing json line into large datasets in to chunks and writing to s3.
This class writes to a string (rather than fileIO) and does smaller checks for a speedier
read write. Espeicially when writing multiple lines. However, if scaling to large amounts of data
it is probably better to use a json writer like jsonlines with the BytesSplitFileWriter.
The extension and the _write methods are defined in classes which extend this class
This class writes to a string (rather than fileIO) and does smaller checks for
a speedier read write. Espeicially when writing multiple lines. However,
if scaling to large amounts of data it is probably better to use a json writer
like jsonlines with the BytesSplitFileWriter. The extension and the _write
methods are defined in classes which extend this class
lines = [
'{"key": "value"}'
]
Expand Down Expand Up @@ -322,7 +337,7 @@ def write_lines(self, lines, line_transform=lambda x: x):
Writes multiple lines then checks if file limit hit.
So will be quicker but less accurate on breaking up files.
"""
self.mem_file += "\n".join(line_transform(l) for l in lines) + "\n"
self.mem_file += "\n".join(line_transform(line) for line in lines) + "\n"
self.num_lines += len(lines)
self.total_lines += len(lines)
if self.file_size_limit_reached():
Expand Down
Loading

0 comments on commit a724628

Please sign in to comment.