# Start a Job 
1. Look for file(s) to process in landing_zone
2. Copy to node
3. Extract/unzip into working zone
4. Run notebook and params from extracted job.yaml
5. Cleanup

Conventions
* All zip files contain a job.yaml file which includes Notebook and Params sections

In [0]:
# for now, let's just take the first zip in the landing zone and process it
landing_zone_path = 's3://cg-test-bucket-1000/databricks/landing-zone/'
lz_files = dbutils.fs.ls(landing_zone_path)
lz_zip_files = [f for f in lz_files if f.name[-4:] == '.zip']
zipfile = lz_zip_files[0]
print(zipfile)

FileInfo(path='s3://cg-test-bucket-1000/databricks/landing-zone/Ethiopia_Bean.zip', name='Ethiopia_Bean.zip', size=532594, modificationTime=1694179670000)


### Make a unique identifier for each job run

In [0]:
import uuid

jobid = uuid.uuid4()
print(jobid)

126f593f-3f5a-483d-be66-3d1a3a4554b1


### Extract zip file to temporary directory

In [0]:
## do work in tmp in dbfs - consider move to s3 working-zone?
jobfolder = '/tmp/' + str(jobid)
dbutils.fs.mkdirs(jobfolder)
dbfs_zip_path = jobfolder + '/' + zipfile.name
dbutils.fs.cp(zipfile.path, dbfs_zip_path)
print ('copying', zipfile.path, ' into ', dbfs_zip_path)

copying s3://cg-test-bucket-1000/databricks/landing-zone/Ethiopia_Bean.zip  into  /tmp/126f593f-3f5a-483d-be66-3d1a3a4554b1/Ethiopia_Bean.zip


In [0]:
# now unzip it
import os
#os.popen('ls /dbfs/tmp/*/*.zip').read()
unzipped_folder = '/dbfs' + jobfolder + '/unzipped'
os.popen('unzip /dbfs' + jobfolder + '/' + zipfile.name + ' -d ' + unzipped_folder).read()

Out[4]: 'Archive:  /dbfs/tmp/126f593f-3f5a-483d-be66-3d1a3a4554b1/Ethiopia_Bean.zip\n  inflating: /dbfs/tmp/126f593f-3f5a-483d-be66-3d1a3a4554b1/unzipped/FieldSamples_Ethiopia_Bean.csv  \n  inflating: /dbfs/tmp/126f593f-3f5a-483d-be66-3d1a3a4554b1/unzipped/GeneticReference_Ethiopia_Bean.csv  \n  inflating: /dbfs/tmp/126f593f-3f5a-483d-be66-3d1a3a4554b1/unzipped/job.yaml  \n'

In [0]:
dbutils.fs.ls('/tmp/')

Out[5]: [FileInfo(path='dbfs:/tmp/0a65eb1d-da56-48bd-b61c-304fd6db9b49/', name='0a65eb1d-da56-48bd-b61c-304fd6db9b49/', size=0, modificationTime=1694179897968),
 FileInfo(path='dbfs:/tmp/126f593f-3f5a-483d-be66-3d1a3a4554b1/', name='126f593f-3f5a-483d-be66-3d1a3a4554b1/', size=0, modificationTime=1694179897968),
 FileInfo(path='dbfs:/tmp/27ef151d-371a-4b9e-9a06-62d1a28cf62d/', name='27ef151d-371a-4b9e-9a06-62d1a28cf62d/', size=0, modificationTime=1694179897968),
 FileInfo(path='dbfs:/tmp/3153b9d1-7d8e-4a38-ab7f-ac90e73d34e4/', name='3153b9d1-7d8e-4a38-ab7f-ac90e73d34e4/', size=0, modificationTime=1694179897968),
 FileInfo(path='dbfs:/tmp/453de1ef-9da8-48aa-95c9-ed65197f55c6/', name='453de1ef-9da8-48aa-95c9-ed65197f55c6/', size=0, modificationTime=1694179897968),
 FileInfo(path='dbfs:/tmp/4e8b9afa-d1f5-48b7-8172-2e3530f9cf82/', name='4e8b9afa-d1f5-48b7-8172-2e3530f9cf82/', size=0, modificationTime=1694179897968),
 FileInfo(path='dbfs:/tmp/52faf005-fdd5-468b-83e9-e4822504df97/', name='52

### Read the yaml file for parameters

In [0]:
%sh
pip install pyyaml

Collecting pyyaml
  Downloading PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (738 kB)
Installing collected packages: pyyaml
Successfully installed pyyaml-6.0.1
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-46aaa81a-bb69-4924-a835-2967b82a9a23/bin/python -m pip install --upgrade pip' command.


In [0]:
import yaml

with open('/dbfs/tmp/' + str(jobid) + '/unzipped/job.yaml', 'r') as file:
    job = yaml.safe_load(file)

print(job)

{'Job': {'Description': 'runs IMAGE IBS Analysis', 'Notebook': 'IBS_Analysis', 'Parameters': [{'ref_file': 'GeneticReference_Ethiopia_Bean.csv'}, {'field_file': 'FieldSamples_Ethiopia_Bean.csv'}, {'country': 'Ethiopia'}, {'crop': 'Bean'}]}}


### Add extra 'standard' parameters

In [0]:
from collections import ChainMap
params_as_dict = dict(ChainMap(*job['Job']['Parameters']))

In [0]:
params_as_dict['unzipped_folder'] = '/tmp/' + str(jobid) + '/unzipped'
params_as_dict['jobid'] = str(jobid)
print(params_as_dict)

{'crop': 'Bean', 'country': 'Ethiopia', 'field_file': 'FieldSamples_Ethiopia_Bean.csv', 'ref_file': 'GeneticReference_Ethiopia_Bean.csv', 'unzipped_folder': '/tmp/126f593f-3f5a-483d-be66-3d1a3a4554b1/unzipped', 'jobid': '126f593f-3f5a-483d-be66-3d1a3a4554b1'}


### Execute Notebook

In [0]:
dbutils.notebook.run(job['Job']['Notebook'], 600, {"params": str(params_as_dict)})

### Remove from temp, save to archive, remove from landing zone

In [0]:
#cleanup - remove from tmp
dbutils.fs.rm('/tmp/' + str(jobid), True)

Out[11]: True

In [0]:
#move to archive?
archive_path = 's3://cg-test-bucket-1000/databricks/archive/' + str(jobid) + '/'+ zipfile.name
dbutils.fs.cp(landing_zone_path + zipfile.name, archive_path)

Out[12]: True

In [0]:
#cleanup - remove from s3
dbutils.fs.rm(landing_zone_path + zipfile.name)

Out[13]: True