Skip to content

Commit

Permalink
bug-1886021: Implement GCS storage classes for GCP (#6572)
Browse files Browse the repository at this point in the history
  • Loading branch information
relud committed May 8, 2024
1 parent 0e44482 commit 6dc30d2
Show file tree
Hide file tree
Showing 24 changed files with 1,513 additions and 263 deletions.
49 changes: 49 additions & 0 deletions bin/gcs_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# Usage: ./bin/gcs_cli.py CMD

import os
from pathlib import Path, PurePosixPath

import click

Expand Down Expand Up @@ -119,6 +120,54 @@ def list_objects(bucket_name, details):
click.echo("No objects in bucket.")


@gcs_group.command("upload")
@click.argument("source")
@click.argument("destination")
def upload(source, destination):
"""Upload files to a bucket
SOURCE is a path to a file or directory of files. will recurse on directory trees
DESTINATION is a path to a file or directory in the bucket. If SOURCE is a
directory or DESTINATION ends with "/", then DESTINATION is treated as a directory.
"""

client = get_client()

# remove protocol from destination if present
destination = destination.split("://", 1)[-1]
bucket_name, _, prefix = destination.partition("/")
prefix_path = PurePosixPath(prefix)

try:
bucket = client.get_bucket(bucket_name)
except NotFound as e:
raise click.ClickException(f"GCS bucket {bucket_name!r} does not exist.") from e

source_path = Path(source)
if not source_path.exists():
raise click.ClickException(f"local path {source!r} does not exist.")
source_is_dir = source_path.is_dir()
if source_is_dir:
sources = [p for p in source_path.rglob("*") if not p.is_dir()]
else:
sources = [source_path]
if not sources:
raise click.ClickException(f"No files in directory {source!r}.")
for path in sources:
if source_is_dir:
# source is a directory so treat destination as a directory
key = str(prefix_path / path.relative_to(source_path))
elif prefix == "" or prefix.endswith("/"):
# source is a file but destination is a directory, preserve file name
key = str(prefix_path / path.name)
else:
key = prefix
blob = bucket.blob(key)
blob.upload_from_filename(path)
click.echo(f"Uploaded gs://{bucket_name}/{key}")


def main(argv=None):
argv = argv or []
gcs_group(argv)
Expand Down
17 changes: 12 additions & 5 deletions bin/process_crashes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

# Pulls down crash data for specified crash ids, syncs to the S3 bucket, and
# sends the crash ids to the Pub/Sub queue.
# Pulls down crash data for specified crash ids, syncs to the cloud storage
# bucket, and sends the crash ids to the queue.
#
# Usage: ./bin/process_crashes.sh
#
Expand Down Expand Up @@ -47,9 +47,16 @@ mkdir "${DATADIR}" || echo "${DATADIR} already exists."
./socorro-cmd fetch_crash_data "${DATADIR}" $@

# Make the bucket and sync contents
./bin/socorro_aws_s3.sh mb s3://dev-bucket/
./bin/socorro_aws_s3.sh cp --recursive "${DATADIR}" "s3://${CRASHSTORAGE_S3_BUCKET}/"
./bin/socorro_aws_s3.sh ls --recursive "s3://${CRASHSTORAGE_S3_BUCKET}/"
# ^^ returns CLOUD_PROVIDER value as uppercase
if [[ "${CLOUD_PROVIDER^^}" == "GCP" ]]; then
./socorro-cmd gcs create "${CRASHSTORAGE_GCS_BUCKET}"
./socorro-cmd gcs upload "${DATADIR}" "${CRASHSTORAGE_GCS_BUCKET}"
./socorro-cmd gcs list_objects "${CRASHSTORAGE_GCS_BUCKET}"
else
./bin/socorro_aws_s3.sh mb "s3://${CRASHSTORAGE_S3_BUCKET}/"
./bin/socorro_aws_s3.sh cp --recursive "${DATADIR}" "s3://${CRASHSTORAGE_S3_BUCKET}/"
./bin/socorro_aws_s3.sh ls --recursive "s3://${CRASHSTORAGE_S3_BUCKET}/"
fi

# Add crash ids to queue
# ^^ returns CLOUD_PROVIDER value as uppercase
Expand Down
9 changes: 7 additions & 2 deletions socorro/external/boto/connection_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,17 +204,22 @@ def load_file(self, bucket, path):
f"(bucket={bucket!r} key={path}) not found, no value returned"
) from exc

def list_objects_paginator(self, bucket, prefix):
def list_objects_paginator(self, bucket, prefix, page_size=None):
"""Returns S3 client paginator of objects with key prefix in bucket
:arg bucket: the name of the bucket
:arg prefix: the key prefix
:arg page_size: the size of pages to request
:returns: S3 paginator
"""
paginator = self.client.get_paginator("list_objects_v2")
page_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
page_iterator = paginator.paginate(
Bucket=bucket,
Prefix=prefix,
PaginationConfig={} if page_size is None else {"PageSize": page_size},
)
return page_iterator

def head_object(self, bucket, key):
Expand Down
57 changes: 12 additions & 45 deletions socorro/external/boto/crashstorage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

import datetime
import json
import logging

Expand All @@ -12,6 +11,10 @@
CrashStorageBase,
CrashIDNotFound,
MemoryDumpsMapping,
get_datestamp,
dict_to_str,
list_to_str,
str_to_list,
)
from socorro.external.boto.connection_context import S3Connection
from socorro.lib.libjsonschema import JsonSchemaReducer
Expand All @@ -21,7 +24,6 @@
SocorroDataReducer,
transform_schema,
)
from socorro.lib.libooid import date_from_ooid
from socorro.schemas import TELEMETRY_SOCORRO_CRASH_SCHEMA


Expand All @@ -32,25 +34,6 @@ def wait_time_generator():
yield from [1, 1, 1, 1, 1]


class CrashIDMissingDatestamp(Exception):
"""Indicates the crash id is invalid and missing a datestamp."""


def get_datestamp(crashid):
"""Parses out datestamp from a crashid.
:returns: datetime
:raises CrashIDMissingDatestamp: if the crash id has no datestamp at the end
"""
datestamp = date_from_ooid(crashid)
if datestamp is None:
# We should never hit this situation unless the crashid is not valid
raise CrashIDMissingDatestamp(f"{crashid} is missing datestamp")
return datestamp


def build_keys(name_of_thing, crashid):
"""Builds a list of s3 pseudo-filenames
Expand Down Expand Up @@ -81,25 +64,6 @@ def build_keys(name_of_thing, crashid):
return [f"v1/{name_of_thing}/{crashid}"]


class JSONISOEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.date):
return obj.isoformat()
raise NotImplementedError(f"Don't know about {obj!r}")


def dict_to_str(a_mapping):
return json.dumps(a_mapping, cls=JSONISOEncoder)


def list_to_str(a_list):
return json.dumps(list(a_list))


def str_to_list(a_string):
return json.loads(a_string)


class BotoS3CrashStorage(CrashStorageBase):
"""Saves and loads crash data to S3"""

Expand Down Expand Up @@ -195,15 +159,18 @@ def save_processed_crash(self, raw_crash, processed_crash):
path = build_keys("processed_crash", crash_id)[0]
self.save_file(path, data)

def list_objects_paginator(self, prefix):
"""Return generator of objects in the bucket that have a specified key prefix
def list_objects_paginator(self, prefix, page_size=None):
"""Yield pages of object keys in the bucket that have a specified key prefix
:arg prefix: the prefix to look at
:arg page_size: the number of results to return per page
:returns: generator of keys
:returns: generator of pages (lists) of object keys
"""
return self.connection.list(bucket=self.bucket, prefix=prefix)
for page in self.connection.list_objects_paginator(
bucket=self.bucket, prefix=prefix, page_size=page_size
):
yield [item["Key"] for item in page.get("Contents", [])]

def exists_object(self, key):
"""Returns whether the object exists in the bucket
Expand Down
42 changes: 42 additions & 0 deletions socorro/external/crashstorage_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
"""Base classes for crashstorage system."""

from contextlib import suppress
import datetime
import json
import logging
import os

from socorro.lib.libooid import date_from_ooid


class MemoryDumpsMapping(dict):
"""there has been a bifurcation in the crash storage data throughout the
Expand Down Expand Up @@ -262,3 +266,41 @@ def remove(self, crash_id):

with suppress(KeyError):
del self._processed_crash_data[crash_id]


class CrashIDMissingDatestamp(Exception):
"""Indicates the crash id is invalid and missing a datestamp."""


def get_datestamp(crashid):
"""Parses out datestamp from a crashid.
:returns: datetime
:raises CrashIDMissingDatestamp: if the crash id has no datestamp at the end
"""
datestamp = date_from_ooid(crashid)
if datestamp is None:
# We should never hit this situation unless the crashid is not valid
raise CrashIDMissingDatestamp(f"{crashid} is missing datestamp")
return datestamp


class JSONISOEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime.date):
return obj.isoformat()
raise NotImplementedError(f"Don't know about {obj!r}")


def dict_to_str(a_mapping):
return json.dumps(a_mapping, cls=JSONISOEncoder)


def list_to_str(a_list):
return json.dumps(list(a_list))


def str_to_list(a_string):
return json.loads(a_string)
3 changes: 3 additions & 0 deletions socorro/external/gcs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at https://mozilla.org/MPL/2.0/.

0 comments on commit 6dc30d2

Please sign in to comment.