From 5e400defe5ff3e3905d59c47327a0d58b8071cc0 Mon Sep 17 00:00:00 2001 From: Harry Hands Date: Thu, 15 Jun 2023 12:24:34 +0100 Subject: [PATCH 1/3] new json parsing functionality to add timestamps and id fields --- darwinpyspark.py | 56 +++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/darwinpyspark.py b/darwinpyspark.py index 3ec7403..dff112c 100644 --- a/darwinpyspark.py +++ b/darwinpyspark.py @@ -6,7 +6,7 @@ import requests from pyspark.sql import SparkSession -from pyspark.sql.functions import from_json +from pyspark.sql.functions import from_json, current_timestamp, col class DarwinPyspark: @@ -30,7 +30,7 @@ def __init__(self, API_KEY, team_slug, dataset_slug): } self.team_slug = team_slug.lower().strip().replace(" ", "-") self.dataset_slug = dataset_slug.lower().strip().replace(" ", "-") - + def upload_items(self, df): """ Method to upload a pyspark dataframes data to V7 @@ -46,7 +46,7 @@ def upload_items(self, df): df.select("file_name", "object_url").foreach( lambda row: self._upload_item(row[0], row[1]) ) - + def download_export(self, export_name): """ Calls all download methods to get and write an export to a pyspark dataframe @@ -62,7 +62,28 @@ def download_export(self, export_name): export_url = self._get_export_url(export_name) # create a SparkSession object spark = SparkSession.builder.appName("darwinpyspark").getOrCreate() - return self._extract_export(self._download_export_zip(export_url), spark) + export_df = self._extract_export( + self._download_export_zip(export_url), spark + ).withColumn("export_date", current_timestamp()) + col_order = [ + "item_id", + "item_name", + "dataset_id", + "dataset_slug", + "team_slug", + "annotations", + "item", + "schema_ref", + "export_date", + ] + return export_df.select( + *col_order, + *[ + col(col_name) + for col_name in export_df.columns + if col_name not in col_order + ], + ) def _data_registration(self, item_name): """ @@ -200,7 +221,9 @@ def _get_export_url(self, export_name): url = f"https://darwin.v7labs.com/api/v2/teams/{self.team_slug}/datasets/{self.dataset_slug}/exports" response = requests.get(url, headers=self.headers) if not response.ok: - raise RuntimeError(f"Failed to fetch export '{export_name}': {response.status_code} - {response.content}") + raise RuntimeError( + f"Failed to fetch export '{export_name}': {response.status_code} - {response.content}" + ) exports_json = response.json() # get the export zip url @@ -248,8 +271,25 @@ def _extract_export(self, zipfile, spark): json_files = [] for filename in zipfile.namelist(): if filename.endswith(".json"): - data = zipfile.read(filename) - json_files.append(data.decode("utf-8")) + data = json.loads(zipfile.read(filename).decode("utf-8")) + try: + data["dataset_slug"] = data["item"]["source_info"]["dataset"]["slug"] + data["dataset_id"] = data["item"]["source_info"]["dataset"]["dataset_management_url"].split("/")[-2] + data["item_id"] = data["item"]["source_info"]["item_id"] + data["item_name"] = data["item"]["name"] + data["team_slug"] = data["item"]["source_info"]["team"]["slug"] + del ( + data["item"]["source_info"]["dataset"]["slug"], + data["item"]["source_info"]["item_id"], + data["item"]["name"], + data["item"]["source_info"]["team"]["slug"], + data["version"], + ) + except KeyError as e: + raise KeyError( + f"Required keys are missing in the dictionary, try creating the export again and specify version='2.0': {str(e)} key missing" + ) + json_files.append(json.dumps(data)) # Define the schema for the JSON data schema = "struct<" @@ -261,4 +301,4 @@ def _extract_export(self, zipfile, spark): df = spark.createDataFrame(json_files, "string") df = df.select(from_json(df.value, schema).alias("data")).select("data.*") - return df + return df \ No newline at end of file From a0265ba2dc5ad3b58da991cf70d41c520dfaf51f Mon Sep 17 00:00:00 2001 From: Harry Hands Date: Thu, 15 Jun 2023 13:28:02 +0100 Subject: [PATCH 2/3] update setup.py --- setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index c7cf175..0bfcea0 100644 --- a/setup.py +++ b/setup.py @@ -2,11 +2,11 @@ setup( name="darwinpyspark", - version="0.0.1", + version="0.0.3", author="Harry Hands", author_email="harry.hands@v7labs.com", description="A package for interacting with the V7 platform via Pyspark", - url="https://github.com/v7labs/databricks", + url="https://github.com/v7labs/darwinpyspark", packages=find_packages(), install_requires=[ "requests" From 7130c50bda5a78fb428ee080eb969c69fa0eeebd Mon Sep 17 00:00:00 2001 From: Harry Hands Date: Tue, 20 Jun 2023 17:00:27 +0100 Subject: [PATCH 3/3] structure change --- __init__.py => darwinpyspark/__init__.py | 0 darwinpyspark.py => darwinpyspark/darwinpyspark.py | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename __init__.py => darwinpyspark/__init__.py (100%) rename darwinpyspark.py => darwinpyspark/darwinpyspark.py (100%) diff --git a/__init__.py b/darwinpyspark/__init__.py similarity index 100% rename from __init__.py rename to darwinpyspark/__init__.py diff --git a/darwinpyspark.py b/darwinpyspark/darwinpyspark.py similarity index 100% rename from darwinpyspark.py rename to darwinpyspark/darwinpyspark.py