Skip to content

Commit

Permalink
add the related docs
Browse files Browse the repository at this point in the history
  • Loading branch information
loganwang007 committed Dec 17, 2021
1 parent 35a6b17 commit 73b69e4
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 12 deletions.
53 changes: 49 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,56 @@
# Changelog

## 0.0.9 (Unreleased)
## 0.0.14 (Unreleased)

#### New Features

* Add schema code generation with Swagger specification.
* Add many files solution aiming to highly partitioned data

#### Docs

* Updated README
* Updated CHANGELOG
* Updated examples with bricklayer usage

Full set of changes: [`Unreleased`](https://github.com/intelematics/bricklayer/compare/0.0.13...HEAD)

## 0.0.13 (2021-12-15)

#### New Features

* Add capability on job deletion

Full set of changes: [`0.0.12...0.0.13`](https://github.com/intelematics/bricklayer/compare/0.0.12...0.0.13)

## 0.0.12 (2021-12-03)

#### Others

* Fix job listing error

Full set of changes: [`0.0.11...0.0.12`](https://github.com/intelematics/bricklayer/compare/0.0.11...0.0.12)

## 0.0.11 (2021-12-03)

#### Others

* Fix naming error on api_version

Full set of changes: [`0.0.10...0.0.11`](https://github.com/intelematics/bricklayer/compare/0.0.10...0.0.11)

## 0.0.10 (2021-09-03)

#### Others

* Fix hardcoded path for the notebook operations

Full set of changes: [`0.0.9...0.0.10`](https://github.com/intelematics/bricklayer/compare/0.0.9...0.0.10)

## 0.0.9 (2021-09-01)

#### New Features

* Add schema code generation with Swagger specification

#### Docs

Expand All @@ -13,8 +59,7 @@
* Added a ROADMAP
* Added examples with bricklayer usage

Full set of changes: [`Unreleased`](https://github.com/intelematics/bricklayer/compare/0.0.8...HEAD)

Full set of changes: [`0.0.8...0.0.9`](https://github.com/intelematics/bricklayer/compare/0.0.8...0.0.9)

## 0.0.8 (2021-08-30)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ A Databricks utility for data engineers whose job is to farm jobs, build map lay

```
pip install https://github.com/intelematics/bricklayer/releases/download/0.0.12/bricklayer-0.0.12-py3-none-any.whl
pip install https://github.com/intelematics/bricklayer/releases/download/0.0.13/bricklayer-0.0.13-py3-none-any.whl
```

## Usage
Expand Down
1 change: 0 additions & 1 deletion bricklayer/catalog/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
from . import dbricks_catalog
from . import crawler
from . import file_scanner
1 change: 1 addition & 0 deletions bricklayer/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import parallel_fetch
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@
Results end up in a table
Usage:
```
from file_scanner import DbricksFileScanner
from parallel_fetch import DbricksParallelFetch
# define the aws_bucket and output_dir for the s3_fetch to start
aws_bucket = "service-trips"
output_dir = "/tmp/"
# define the target df awaiting to be parse the path
df = Spark.createDataFrame()
# export the fetched contents dataframe
output_df = DbricksFileScanner.download_file(df, aws_bucket, output_dir, path_column)
output_df = DbricksParallelFetch.download_file(df, aws_bucket, output_dir, path_column)
```
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
Expand All @@ -24,7 +24,7 @@
import csv


class DbricksFileScanner:
class DbricksParallelFetch:
@staticmethod
def download_file(df: DataFrame, aws_bucket: str, output_dir: str, path_column: str, max_workers: int = 32):
"""encapsulate the pandas udf function as a static method
Expand Down
47 changes: 44 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[Concurrent Jobs](#concurrent_jobs) |
| [Concurrent Jobs](#concurrent_jobs) |
[Easy map rendering](#map) |
[Schema to spark table generator](#schema) |
[Copy/Backup notebook runs in the workspace](#workspace)
[Copy/Backup notebook runs in the workspace](#workspace) |
[Catalog](#catalog) |
[Parallel Fetch](#parallel_fetch) |


# Concurrent Jobs <a name="concurrent_jobs"/>
Expand All @@ -21,6 +23,11 @@ Stop the job.
```python
job.stop()
```
Delete the job.
```python
job_id = job.job_id
DBSApi().delete_job(job_id)
```
Can also reference a notebook with a relative path to the current notebook.
```python
job = DBSApi().create_job('./dummy_job')
Expand Down Expand Up @@ -305,7 +312,7 @@ dbapi.import_notebook(
'/Shared/backups/2021_09_02/mynotebook',
)
```
# Catalog
# Catalog <a name="catalog"/>

Walk the databricks catalog programatically.
```python
Expand All @@ -323,4 +330,38 @@ table_provider=delta
table_location=dbfs:/dbfs/delta/weather
is_view=False
table_created_time=Tue Aug 31 11:24:55 UTC 2021
```
# Parallel Fetch <a name="parallel_fetch"/>

Walk the databricks file scanner programatically.
```python
from bricklayer.util.parallel_fetch import DbricksParallelFetch

aws_bucket = "trips-service"
output_dir = "/tmp/"

import subprocess
from datetime import datetime
import pyspark.sql.functions as F
import pandas as pd

files_df = spark.createDataFrame(pd.DataFrame([{'datetime': datetime.fromisoformat(n[0:19]),
'size': int(n[19:30]),
'path': n[31:]
}
for n in subprocess.check_output(
['aws',
's3',
'ls',
'--recurs',
's3://trips-service/TEST/']).decode('WINDOWS-1252').split('\n')[:-1] ]))
# pick up the path column and will return a new dataframe with parsed contents
res = DbricksParallelFetch.download_file(files_df, aws_bucket, output_dir, 'path')
res.display()
```
```
datetime size path downloaded_content
2020-08-04T01:54:57.000+0000 2460 TEST/0D31EEB4/trip.json {"reportedTimestamp":"", ...}
2020-08-14T08:54:57.000+0000 2200 TEST/1C0ACA63/trip.json {"accuracy":1,"validity":1, ...}
2020-08-24T11:54:57.000+0000 2299 TEST/20DD063D/trip.json {"startFuelLevel":50.00, ...}
```

0 comments on commit 73b69e4

Please sign in to comment.