diff --git a/CHANGELOG.md b/CHANGELOG.md index 7be9caf..9dce6f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,56 @@ # Changelog -## 0.0.9 (Unreleased) +## 0.0.14 (Unreleased) #### New Features -* Add schema code generation with Swagger specification. +* Add many files solution aiming to highly partitioned data + +#### Docs + +* Updated README +* Updated CHANGELOG +* Updated examples with bricklayer usage + +Full set of changes: [`Unreleased`](https://github.com/intelematics/bricklayer/compare/0.0.13...HEAD) + +## 0.0.13 (2021-12-15) + +#### New Features + +* Add capability on job deletion + +Full set of changes: [`0.0.12...0.0.13`](https://github.com/intelematics/bricklayer/compare/0.0.12...0.0.13) + +## 0.0.12 (2021-12-03) + +#### Others + +* Fix job listing error + +Full set of changes: [`0.0.11...0.0.12`](https://github.com/intelematics/bricklayer/compare/0.0.11...0.0.12) + +## 0.0.11 (2021-12-03) + +#### Others + +* Fix naming error on api_version + +Full set of changes: [`0.0.10...0.0.11`](https://github.com/intelematics/bricklayer/compare/0.0.10...0.0.11) + +## 0.0.10 (2021-09-03) + +#### Others + +* Fix hardcoded path for the notebook operations + +Full set of changes: [`0.0.9...0.0.10`](https://github.com/intelematics/bricklayer/compare/0.0.9...0.0.10) + +## 0.0.9 (2021-09-01) + +#### New Features + +* Add schema code generation with Swagger specification #### Docs @@ -13,8 +59,7 @@ * Added a ROADMAP * Added examples with bricklayer usage -Full set of changes: [`Unreleased`](https://github.com/intelematics/bricklayer/compare/0.0.8...HEAD) - +Full set of changes: [`0.0.8...0.0.9`](https://github.com/intelematics/bricklayer/compare/0.0.8...0.0.9) ## 0.0.8 (2021-08-30) diff --git a/README.md b/README.md index ca0b7a1..67291df 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ A Databricks utility for data engineers whose job is to farm jobs, build map lay ``` -pip install https://github.com/intelematics/bricklayer/releases/download/0.0.12/bricklayer-0.0.12-py3-none-any.whl +pip install https://github.com/intelematics/bricklayer/releases/download/0.0.13/bricklayer-0.0.13-py3-none-any.whl ``` ## Usage diff --git a/bricklayer/catalog/__init__.py b/bricklayer/catalog/__init__.py index f7f5d7f..d69301e 100644 --- a/bricklayer/catalog/__init__.py +++ b/bricklayer/catalog/__init__.py @@ -1,3 +1,2 @@ from . import dbricks_catalog from . import crawler -from . import file_scanner diff --git a/bricklayer/util/__init__.py b/bricklayer/util/__init__.py new file mode 100644 index 0000000..5da58fe --- /dev/null +++ b/bricklayer/util/__init__.py @@ -0,0 +1 @@ +from . import parallel_fetch diff --git a/bricklayer/catalog/file_scanner.py b/bricklayer/util/parallel_fetch.py similarity index 95% rename from bricklayer/catalog/file_scanner.py rename to bricklayer/util/parallel_fetch.py index b1eb2c3..77ae2c5 100644 --- a/bricklayer/catalog/file_scanner.py +++ b/bricklayer/util/parallel_fetch.py @@ -4,14 +4,14 @@ Results end up in a table Usage: ``` - from file_scanner import DbricksFileScanner + from parallel_fetch import DbricksParallelFetch # define the aws_bucket and output_dir for the s3_fetch to start aws_bucket = "service-trips" output_dir = "/tmp/" # define the target df awaiting to be parse the path df = Spark.createDataFrame() # export the fetched contents dataframe - output_df = DbricksFileScanner.download_file(df, aws_bucket, output_dir, path_column) + output_df = DbricksParallelFetch.download_file(df, aws_bucket, output_dir, path_column) ``` """ from concurrent.futures import ThreadPoolExecutor, as_completed @@ -24,7 +24,7 @@ import csv -class DbricksFileScanner: +class DbricksParallelFetch: @staticmethod def download_file(df: DataFrame, aws_bucket: str, output_dir: str, path_column: str, max_workers: int = 32): """encapsulate the pandas udf function as a static method diff --git a/examples/README.md b/examples/README.md index 60a22d0..b535040 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1,7 +1,9 @@ -[Concurrent Jobs](#concurrent_jobs) | +| [Concurrent Jobs](#concurrent_jobs) | [Easy map rendering](#map) | [Schema to spark table generator](#schema) | -[Copy/Backup notebook runs in the workspace](#workspace) +[Copy/Backup notebook runs in the workspace](#workspace) | +[Catalog](#catalog) | +[Parallel Fetch](#parallel_fetch) | # Concurrent Jobs @@ -21,6 +23,11 @@ Stop the job. ```python job.stop() ``` +Delete the job. +```python +job_id = job.job_id +DBSApi().delete_job(job_id) +``` Can also reference a notebook with a relative path to the current notebook. ```python job = DBSApi().create_job('./dummy_job') @@ -305,7 +312,7 @@ dbapi.import_notebook( '/Shared/backups/2021_09_02/mynotebook', ) ``` -# Catalog +# Catalog Walk the databricks catalog programatically. ```python @@ -323,4 +330,38 @@ table_provider=delta table_location=dbfs:/dbfs/delta/weather is_view=False table_created_time=Tue Aug 31 11:24:55 UTC 2021 +``` +# Parallel Fetch + +Walk the databricks file scanner programatically. +```python +from bricklayer.util.parallel_fetch import DbricksParallelFetch + +aws_bucket = "trips-service" +output_dir = "/tmp/" + +import subprocess +from datetime import datetime +import pyspark.sql.functions as F +import pandas as pd + +files_df = spark.createDataFrame(pd.DataFrame([{'datetime': datetime.fromisoformat(n[0:19]), + 'size': int(n[19:30]), + 'path': n[31:] + } + for n in subprocess.check_output( + ['aws', + 's3', + 'ls', + '--recurs', + 's3://trips-service/TEST/']).decode('WINDOWS-1252').split('\n')[:-1] ])) +# pick up the path column and will return a new dataframe with parsed contents +res = DbricksParallelFetch.download_file(files_df, aws_bucket, output_dir, 'path') +res.display() +``` +``` +datetime size path downloaded_content +2020-08-04T01:54:57.000+0000 2460 TEST/0D31EEB4/trip.json {"reportedTimestamp":"", ...} +2020-08-14T08:54:57.000+0000 2200 TEST/1C0ACA63/trip.json {"accuracy":1,"validity":1, ...} +2020-08-24T11:54:57.000+0000 2299 TEST/20DD063D/trip.json {"startFuelLevel":50.00, ...} ``` \ No newline at end of file