Skip to content

Commit

Permalink
chore(pubsub): refactor pubsub event trigger
Browse files Browse the repository at this point in the history
Change pubsub event trigger to after an upload completes instead of on each upload instance creation.
  • Loading branch information
kennedykori committed Aug 17, 2022
1 parent a58b100 commit 05548d2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 45 deletions.
38 changes: 1 addition & 37 deletions apps/sql_data/apiviews.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import os

from google.cloud import pubsub_v1
from rest_framework import mixins, status
from rest_framework.decorators import action
from rest_framework.views import Request, Response
from rest_framework.viewsets import GenericViewSet

from apps.core.apiviews import AuditBaseViewSet
from utils.core_events import AbstractEventPublisher

from .models import (
DataSourceVersion,
Expand All @@ -26,10 +22,6 @@
SQLUploadMetadataSerializer,
)

publisher = pubsub_v1.PublisherClient()
topic_id = "idr_incoming_extracts_metadata"
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")


class DataSourceVersionViewSet(AuditBaseViewSet):
"""Data Source Version API."""
Expand Down Expand Up @@ -75,42 +67,14 @@ class SQLUploadChunkViewSet(
serializer_class = SQLUploadChunkSerializer


class SQLUploadMetadataViewSet(AuditBaseViewSet, AbstractEventPublisher):
class SQLUploadMetadataViewSet(AuditBaseViewSet):
"""SQL Upload Metadata API."""

queryset = SQLUploadMetadata.objects.prefetch_related(
"upload_chunks"
).all()
serializer_class = SQLUploadMetadataSerializer

def publish_event(self, topic_path: str, data: bytes):
future = publisher.publish(topic_path, data)
print("Event publish result id ", future.result())

def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
self.perform_create(serializer)
headers = self.get_success_headers(serializer.data)

topic_path = publisher.topic_path(project_id, topic_id)
data = {
"org_unit_name": serializer.validated_data.get("org_unit_name"),
"org_unit_code": serializer.validated_data.get("org_unit_code"),
"content_type": serializer.validated_data.get("content_type"),
"extract_metadata": serializer.validated_data.get(
"extract_metadata"
),
"chunks": serializer.validated_data.get("chunks"),
}

# publish metadata upload success event
self.publish_event(topic_path, str(data).encode("utf-8"))

return Response(
serializer.data, status=status.HTTP_201_CREATED, headers=headers
)

@action(
detail=True,
methods=["PATCH"],
Expand Down
53 changes: 45 additions & 8 deletions apps/sql_data/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@


def sql_extracts_upload_to(instance: "SQLUploadChunk", filename: str) -> str:
ex_meta: SQLExtractMetadata = instance.upload_metadata.extract_metadata
extracts_group_name: str = ex_meta.preferred_uploads_name or ex_meta.name
return "%s/%s/%s/%s/%d__%s" % (
settings.BASE_EXTRACTS_UPLOAD_DIR_NAME,
settings.SQL_EXTRACTS_UPLOAD_DIR_NAME,
extracts_group_name,
instance.upload_metadata.name,
upload_meta: SQLUploadMetadata = instance.upload_metadata
return "%s/%d__%s" % (
upload_meta.upload_data_dir,
instance.chunk_index,
str(instance.pk),
)
Expand Down Expand Up @@ -175,9 +171,50 @@ def data_source_name(self) -> str:
"""Return the name of the source of thus upload."""
return self.extract_metadata.data_source.name

@property
def upload_data_dir(self) -> str:
ex_meta: SQLExtractMetadata = self.extract_metadata
extracts_group_name: str = (
ex_meta.preferred_uploads_name or ex_meta.name
)
return "%s/%s/%s/%s/%s__%s/" % (
settings.BASE_EXTRACTS_UPLOAD_DIR_NAME,
settings.SQL_EXTRACTS_UPLOAD_DIR_NAME,
extracts_group_name,
"%s__%s" % (self.org_unit_code, self.org_unit_name),
str(self.id),
str(self.start_time),
)

def mark_as_complete(self, user=None) -> None:
self.update(modifier=user, finish_time=timezone.now())
# TODO: Add an action to notify interested parties.
# TODO: This should be replaced with the observer pattern.
self._publish_to_pubsub()

def _publish_to_pubsub(self) -> None:
# TODO: Implement this functionality properly and DELETE this!!!
import os

from google.cloud import pubsub_v1

data = {
"org_unit_code": self.org_unit_code,
"org_unit_name": self.org_unit_name,
"content_type": self.content_type,
"extract_metadata": str(self.extract_metadata.id),
"extract_type": self.extract_metadata.name,
"chunks_count": self.chunks_count,
"upload_id": str(self.id),
"uploads_data_dir": self.upload_data_dir,
"start_time": str(self.start_time),
"finish_time": str(self.finish_time),
}

publisher = pubsub_v1.PublisherClient()
topic_id = "idr_incoming_extracts_metadata"
project_id = os.getenv("GOOGLE_CLOUD_PROJECT")
topic_path = publisher.topic_path(project_id, topic_id)
publisher.publish(topic_path, str(data).encode("utf-8"))

class Meta(AbstractExtractMetadata.Meta):
verbose_name_plural = "Sql upload metadata"
1 change: 1 addition & 0 deletions apps/sql_data/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ class SQLUploadMetadataSerializer(AuditBaseSerializer):
class Meta:
model = SQLUploadMetadata
fields = "__all__"
read_only_fields = ("finish_time",)

0 comments on commit 05548d2

Please sign in to comment.