Skip to content

Commit

Permalink
Merge pull request #29 from intelematics/feature/add-many-files-solution
Browse files Browse the repository at this point in the history
Feature/add many files solution
  • Loading branch information
loganwang007 committed Jan 25, 2022
2 parents 3d69491 + 73b69e4 commit 531dd4a
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 9 deletions.
53 changes: 49 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,56 @@
# Changelog

## 0.0.9 (Unreleased)
## 0.0.14 (Unreleased)

#### New Features

* Add schema code generation with Swagger specification.
* Add many files solution aiming to highly partitioned data

#### Docs

* Updated README
* Updated CHANGELOG
* Updated examples with bricklayer usage

Full set of changes: [`Unreleased`](https://github.com/intelematics/bricklayer/compare/0.0.13...HEAD)

## 0.0.13 (2021-12-15)

#### New Features

* Add capability on job deletion

Full set of changes: [`0.0.12...0.0.13`](https://github.com/intelematics/bricklayer/compare/0.0.12...0.0.13)

## 0.0.12 (2021-12-03)

#### Others

* Fix job listing error

Full set of changes: [`0.0.11...0.0.12`](https://github.com/intelematics/bricklayer/compare/0.0.11...0.0.12)

## 0.0.11 (2021-12-03)

#### Others

* Fix naming error on api_version

Full set of changes: [`0.0.10...0.0.11`](https://github.com/intelematics/bricklayer/compare/0.0.10...0.0.11)

## 0.0.10 (2021-09-03)

#### Others

* Fix hardcoded path for the notebook operations

Full set of changes: [`0.0.9...0.0.10`](https://github.com/intelematics/bricklayer/compare/0.0.9...0.0.10)

## 0.0.9 (2021-09-01)

#### New Features

* Add schema code generation with Swagger specification

#### Docs

Expand All @@ -13,8 +59,7 @@
* Added a ROADMAP
* Added examples with bricklayer usage

Full set of changes: [`Unreleased`](https://github.com/intelematics/bricklayer/compare/0.0.8...HEAD)

Full set of changes: [`0.0.8...0.0.9`](https://github.com/intelematics/bricklayer/compare/0.0.8...0.0.9)

## 0.0.8 (2021-08-30)

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ A Databricks utility for data engineers whose job is to farm jobs, build map lay

```
pip install https://github.com/intelematics/bricklayer/releases/download/0.0.12/bricklayer-0.0.12-py3-none-any.whl
pip install https://github.com/intelematics/bricklayer/releases/download/0.0.13/bricklayer-0.0.13-py3-none-any.whl
```

## Usage
Expand Down
2 changes: 1 addition & 1 deletion bricklayer/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.0.13'
__version__ = '0.0.14'
1 change: 1 addition & 0 deletions bricklayer/util/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import parallel_fetch
95 changes: 95 additions & 0 deletions bricklayer/util/parallel_fetch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
"""
Module to distribute the S3 download over a spark cluster
Useful when the data is highly partitioned and unable to be loaded by standard methods
Results end up in a table
Usage:
```
from parallel_fetch import DbricksParallelFetch
# define the aws_bucket and output_dir for the s3_fetch to start
aws_bucket = "service-trips"
output_dir = "/tmp/"
# define the target df awaiting to be parse the path
df = Spark.createDataFrame()
# export the fetched contents dataframe
output_df = DbricksParallelFetch.download_file(df, aws_bucket, output_dir, path_column)
```
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import DataFrame
import logging
import os
import boto3
import csv


class DbricksParallelFetch:
@staticmethod
def download_file(df: DataFrame, aws_bucket: str, output_dir: str, path_column: str, max_workers: int = 32):
"""encapsulate the pandas udf function as a static method
Args:
df (DataFrame): target dataframe
aws_bucket (str): aws bucket stored all the small files
output_dir (str): temporary output dir
path_column (str): path column in the target dataframe
max_workers (int): number of processors
Returns:
[DataFrame]: [output dataframe with downloaded content]
"""
@pandas_udf('string', PandasUDFType.SCALAR)
def s3_fetch(paths):
def download_one_file(bucket: str, output: str, client: boto3.client, s3_file: str):
"""
Download a single file from S3
Args:
bucket (str): S3 bucket where images are hosted
output (str): Dir to store the images
client (boto3.client): S3 client
s3_file (str): S3 object name
"""
client.download_file(
Bucket=bucket, Key=s3_file,
Filename=os.path.join(output, s3_file.replace('/', '_'))
)

files_to_download = paths
# Creating only one session and one client
session = boto3.Session()
client = session.client("s3")
# The client is shared between threads
func = partial(download_one_file, aws_bucket, output_dir, client)

# List for storing possible failed downloads to retry later
failed_downloads = []

with ThreadPoolExecutor(max_workers) as executor:
# Using a dict for preserving the downloaded file for each future
# to store it as a failure if we need that
futures = {
executor.submit(func, file_to_download):
file_to_download for file_to_download in files_to_download
}
for future in as_completed(futures):
if future.exception():
failed_downloads.append(futures[future])
if len(failed_downloads) > 0:
with open(
os.path.join(output_dir, "failed_downloads.csv"), "w", newline=""
) as csvfile:
writer = csv.writer(csvfile, quoting=csv.QUOTE_ALL)
writer.writerow(failed_downloads)

def read_file_and_return_contents(path):
try:
with open(output_dir + path.replace('/', '_'), 'r') as file:
logging.info(f"Read {file} and return its value")
return file.read()
except FileNotFoundError:
logging.warning("Messages is failed to download from s3")
return None

return paths.apply(read_file_and_return_contents)

return df.withColumn('downloaded_content', s3_fetch(path_column))
47 changes: 44 additions & 3 deletions examples/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
[Concurrent Jobs](#concurrent_jobs) |
| [Concurrent Jobs](#concurrent_jobs) |
[Easy map rendering](#map) |
[Schema to spark table generator](#schema) |
[Copy/Backup notebook runs in the workspace](#workspace)
[Copy/Backup notebook runs in the workspace](#workspace) |
[Catalog](#catalog) |
[Parallel Fetch](#parallel_fetch) |


# Concurrent Jobs <a name="concurrent_jobs"/>
Expand All @@ -21,6 +23,11 @@ Stop the job.
```python
job.stop()
```
Delete the job.
```python
job_id = job.job_id
DBSApi().delete_job(job_id)
```
Can also reference a notebook with a relative path to the current notebook.
```python
job = DBSApi().create_job('./dummy_job')
Expand Down Expand Up @@ -305,7 +312,7 @@ dbapi.import_notebook(
'/Shared/backups/2021_09_02/mynotebook',
)
```
# Catalog
# Catalog <a name="catalog"/>

Walk the databricks catalog programatically.
```python
Expand All @@ -323,4 +330,38 @@ table_provider=delta
table_location=dbfs:/dbfs/delta/weather
is_view=False
table_created_time=Tue Aug 31 11:24:55 UTC 2021
```
# Parallel Fetch <a name="parallel_fetch"/>

Walk the databricks file scanner programatically.
```python
from bricklayer.util.parallel_fetch import DbricksParallelFetch

aws_bucket = "trips-service"
output_dir = "/tmp/"

import subprocess
from datetime import datetime
import pyspark.sql.functions as F
import pandas as pd

files_df = spark.createDataFrame(pd.DataFrame([{'datetime': datetime.fromisoformat(n[0:19]),
'size': int(n[19:30]),
'path': n[31:]
}
for n in subprocess.check_output(
['aws',
's3',
'ls',
'--recurs',
's3://trips-service/TEST/']).decode('WINDOWS-1252').split('\n')[:-1] ]))
# pick up the path column and will return a new dataframe with parsed contents
res = DbricksParallelFetch.download_file(files_df, aws_bucket, output_dir, 'path')
res.display()
```
```
datetime size path downloaded_content
2020-08-04T01:54:57.000+0000 2460 TEST/0D31EEB4/trip.json {"reportedTimestamp":"", ...}
2020-08-14T08:54:57.000+0000 2200 TEST/1C0ACA63/trip.json {"accuracy":1,"validity":1, ...}
2020-08-24T11:54:57.000+0000 2299 TEST/20DD063D/trip.json {"startFuelLevel":50.00, ...}
```

0 comments on commit 531dd4a

Please sign in to comment.