# Automagically Shipping Custom Libraries in Spark Using OarphPy

This notebook demonstrates how `oarphpy` ..

## Setup

To run this notebook locally, try using the `oarphpy/full` dockerized environment:
`docker run -it --rm --net=host oarphpy/full:0.0.2 jupyter notebook --allow-root`

**COLAB ONLY** If you're running this notebook in [Google Colab](https://colab.research.google.com/), you'll need to install `oarphpy`, Spark, and Java.  Running the cell below will take care of that for you.  You might need to restart the runtime (Use the menu option: *Runtime* > *Restart runtime ...*) in order for Colab to recognize the new modules.

In [1]:
import os
import sys
if 'google.colab' in sys.modules:
    !pip install oarphpy[spark]==0.0.2
    !pip install pyspark==2.4.4
    !apt-get update && apt-get install -y openjdk-8-jdk
    os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'

## Our Custom Library: A Scraper

Our custom library will scrape images and 'tags' from simple HTML tables like this one:

<table>
<tr>
<td><img src="https://pwais.github.io/oarphpy-demo-assets/image_with_tags/beach.jpg"> beach</td>
<td><img src="https://pwais.github.io/oarphpy-demo-assets/image_with_tags/bridge.jpg"> bridge</td>
<td><img src="https://pwais.github.io/oarphpy-demo-assets/image_with_tags/car.jpg"> car</td>
</tr>
</table>

We'll store scraped images in a table where each row stores a single image and its annotations.  We'll use Spark here as a tool to **E**xtract data from webpages, **T**ransform it into our own data structure(s) / schemas, and **L**oad the transformed rows into some data store (e.g. the local filesystem).

### Source Code for the Library

The code for our custom library could live anywhere, but for this demo we're going to put the code in an arbitrary temporary directory.  The cell below sets that up.  

*Aside:* When running a Jupyter notebook, the current working directory of the Jupyter process is implicitly included in the `PYTHONPATH`.  (This is a feature, not a bug!)  We're going to put our custom library in a random temporary directory to isolate it from Jupyter and simulates having one's own code in a separate directory (e.g. perhaps the repository isolates library code from a `notebooks` directory).

In [2]:
# Create a random temporary directory for our library
import os
import tempfile
old_cwd = os.getcwd()
tempdir = tempfile.TemporaryDirectory(suffix='_oarphpy_demo')
CUSTOM_LIB_SRC_DIR = tempdir.name
print("Putting demo assets in %s" % CUSTOM_LIB_SRC_DIR)
os.chdir(CUSTOM_LIB_SRC_DIR)
!mkdir -p mymodule
!touch mymodule/__init__.py

Putting demo assets in /tmp/tmpu4zpxr8b_oarphpy_demo


Now let's write the library.  Here's our webpage-scraping code, which simply uses `BeautifulSoup` to parse image urls and tags from the pages:

In [3]:
!pip3 install bs4

You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [4]:
%%writefile mymodule/scrape.py
def get_image_tag_pairs(url):
    import urllib.request
    from urllib.parse import urljoin
    from urllib.parse import urlparse
    from bs4 import BeautifulSoup
    
    raw_page = urllib.request.urlopen(url).read()
    soup = BeautifulSoup(raw_page, features="html.parser")
    for td in soup.find_all('td'):
        tag = td.text.strip()
        img_src = td.find('img')['src']
        if not urlparse(img_src).scheme:
            img_src = urljoin(url, img_src)
        yield tag, img_src

Writing mymodule/scrape.py


We'll abstract scraped image-tag pairs into a class `ImageWithAnno`, which will also represent a single row in our final table.  In this simple demo, this class mainly helps us encapsulate the code for constructing a table row.

In [5]:
%%writefile mymodule/imagewithanno.py
class ImageWithAnno(object):
    def __init__(self, url='', tag='', image_bytes=None, width=None, height=None):
        self.url = url
        self.tag = tag
        self.image_bytes = image_bytes
        self.width = width
        self.height = height
    
    @staticmethod
    def create_from_url(url):
        import urllib.request
        image_bytes = bytearray(urllib.request.urlopen(url).read())

        # Read the image dimensions without actually decoding the jpeg
        from oarphpy.util import get_jpeg_size
        width, height = get_jpeg_size(image_bytes)
        return ImageWithAnno(
            url=url,
            image_bytes=image_bytes,
            width=width,
            height=height)

Writing mymodule/imagewithanno.py


Now we're done!

In [6]:
print("Changing working directory back to %s" % old_cwd)
os.chdir(old_cwd)

Changing working directory back to /opt/oarphpy/notebooks


### Test the Library

Let's do a brief test of the library in this notebook.  As is the custom, we modify `sys.path` to make our library importable to the notebook's `Python` process.

In [7]:
sys.path.append(CUSTOM_LIB_SRC_DIR)

from mymodule.scrape import get_image_tag_pairs
test_pairs = list(get_image_tag_pairs('https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html'))
test_labels = [l for l, img in test_pairs]
assert set(['beach', 'bridge', 'car']) == set(test_labels), \
    "Got unexpected labels %s" % (test_labels,)

## Scraping with Spark and OarphPy

Now we're ready to scrape!  For this demo, we'll scrape these pages:

In [8]:
PAGE_URLS = (
    'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html',
    'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page2.html',
)

### Magic Egg-ification

Now we'll start a Spark session using `oarphpy`, which will automagically Egg-ify our custom library and ship that Python Egg with our job.  By default `oarphpy` tries to Egg-ify the library surrounding the calling code; this feature is ideal when running Spark jobs from scripts where the scripts and the library code live in the same Python module.  For this demo, we just need to point `oarphpy` at our temp directory:

In [9]:
from oarphpy.spark import NBSpark

NBSpark.SRC_ROOT = os.path.join(CUSTOM_LIB_SRC_DIR, 'mymodule')
spark = NBSpark.getOrCreate()

2020-02-01 06:21:00,035	oarph 7711 : Using source root /tmp/tmpu4zpxr8b_oarphpy_demo 
2020-02-01 06:21:00,064	oarph 7711 : Generating egg to /tmp/op_spark_eggs_5cf835b4-f6d7-4625-aa8f-de728c898b08 ...
2020-02-01 06:21:00,133	oarph 7711 : ... done.  Egg at /tmp/op_spark_eggs_5cf835b4-f6d7-4625-aa8f-de728c898b08/tmpu4zpxr8b_oarphpy_demo-0.0.0-py3.6.egg
  get_ipython().set_hook('pre_run_code_hook', maybe_rebuild_egg)


Running the cell above should log messages confirming that `oarphpy` Egg-ified our code and gave it to Spark.  Let's now prove that step worked.  We'll show that when we import code from our library on the Spark worker (which in this case is a local Python instance running distinct from the notebook), the import works and the imported code is coming from the `oarphpy`-generated Egg.  (This feature even has an explicit [unit test](https://github.com/pwais/oarphpy/blob/28ed5764e3cdd67ae18aa2ecec241c789398ce50/oarphpy_test/fixtures/test_spark_with_custom_library.py#L57) in `oarphpy` !).  

In [10]:
def test_mymodule_is_included():
    import mymodule
    return mymodule.__file__

from oarphpy import spark as S
mod_paths = S.for_each_executor(spark, test_mymodule_is_included)
print("Loaded mymodule from %s" % (mod_paths,))
assert all('.egg' in p for p in mod_paths)

2020-02-01 06:21:02,460	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:02,462	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:02,462	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:02,463	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:02,464	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:02,464	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,321	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,325	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,327	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,331	oarph 7711 : src time 1580538059.7767618


Loaded mymodule from ['/tmp/spark-a7df888c-36e1-4177-902b-bedb2dcbfd04/userFiles-206956ce-059d-4773-b917-97ecb02e6725/tmpu4zpxr8b_oarphpy_demo-0.0.0-py3.6.egg/mymodule/__init__.py']


### Run a Scraping Job

We'll now run a scraping job using Spark's RDD API, which is the easiest way to leverage our custom library.  

In [11]:
url_rdd = spark.sparkContext.parallelize(PAGE_URLS)
print("RDD of pages to scrape: %s" % (url_rdd.collect()))

from mymodule.scrape import get_image_tag_pairs
tag_img_url_rdd = url_rdd.flatMap(get_image_tag_pairs)
  # NB: `get_image_tag_pairs` gets sent to Spark workers via `cloudpickle`

def to_image_anno(tag, img_url):
    from mymodule.imagewithanno import ImageWithAnno
    imganno = ImageWithAnno.create_from_url(img_url)
    imganno.tag = tag
    return imganno
image_anno_rdd = tag_img_url_rdd.map(lambda pair: to_image_anno(*pair))
num_images = image_anno_rdd.count()

print("Scraped %s images" % num_images)
assert num_images == 5, "Unexpectedly saw %s images" % num_images

2020-02-01 06:21:03,344	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,347	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,352	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,354	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,404	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,405	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,407	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,407	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,408	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,409	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,409	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,410	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:03,422	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:03,423	oarph 7711 : src time 1580538059.7767618


RDD of pages to scrape: ['https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page1.html', 'https://pwais.github.io/oarphpy-demo-assets/image_with_tags/page2.html']


2020-02-01 06:21:05,010	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:05,011	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:05,011	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:05,012	oarph 7711 : src time 1580538059.7767618


Scraped 5 images


### View / Save the results

Now let's create and save that table!  Spark can automatically convert some simple Python objects to DataFrame table rows.  (`oarphpy` offers tools for slotted classes, `numpy` arrays, and more in [`RowAdapter`](https://github.com/pwais/oarphpy/blob/d62bdca8b5743be97f74b8b45fec71f72c90aa47/oarphpy/spark.py#L682)).  We'll leverage Spark's built-in type munging and Parquet support below:

In [12]:
df = spark.createDataFrame(image_anno_rdd)

# Save the results
df.write.parquet('/tmp/demo_results', mode='overwrite')

# Show the results using df.show() or as a Pandas Dataframe, which has pretty printing support in Jupyter.
df.toPandas()

2020-02-01 06:21:05,016	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:05,017	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:06,283	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:06,284	oarph 7711 : src time 1580538059.7767618
2020-02-01 06:21:07,336	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:07,337	oarph 7711 : src time 1580538059.7767618


Unnamed: 0,height,image_bytes,tag,url,width
0,53,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",beach,https://pwais.github.io/oarphpy-demo-assets/im...,120
1,133,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",bridge,https://pwais.github.io/oarphpy-demo-assets/im...,100
2,75,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",car,https://pwais.github.io/oarphpy-demo-assets/im...,100
3,84,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",challah,https://pwais.github.io/oarphpy-demo-assets/im...,80
4,90,"[255, 216, 255, 224, 0, 16, 74, 70, 73, 70, 0,...",pizza,https://pwais.github.io/oarphpy-demo-assets/im...,120


In [13]:
os.chdir(CUSTOM_LIB_SRC_DIR)

2020-02-01 06:21:07,513	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:07,514	oarph 7711 : src time 1580538059.7767618


In [14]:
%%writefile mymodule/imagewithanno.py
class ImageWithAnno(object):
    def __init__(self, url='', tag='', image_bytes=None, width=None, height=None):
        self.url = url
        self.tag = tag
        self.image_bytes = image_bytes
        self.width = width
        self.height = height
        self.moof = 'cow'
    
    @staticmethod
    def create_from_url(url):
        import urllib.request
        image_bytes = bytearray(urllib.request.urlopen(url).read())
        
        # Read the image dimensions without actually decoding the jpeg
        from oarphpy.util import get_jpeg_size
        width, height = get_jpeg_size(image_bytes)
        return ImageWithAnno(
            url=url,
            image_bytes=image_bytes,
            width=width,
            height=height)

2020-02-01 06:21:07,534	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:07,535	oarph 7711 : src time 1580538059.7767618


Overwriting mymodule/imagewithanno.py


In [15]:
os.chdir(old_cwd)

2020-02-01 06:21:07,544	oarph 7711 : egg time 1580538060.1287572
2020-02-01 06:21:07,545	oarph 7711 : src time 1580538067.5326605
2020-02-01 06:21:07,545	oarph 7711 : rebuilding egg
2020-02-01 06:21:07,546	oarph 7711 : Using source root /tmp/tmpu4zpxr8b_oarphpy_demo 
2020-02-01 06:21:07,548	oarph 7711 : Generating egg to /tmp/op_spark_eggs_6c918b32-b629-49d9-9024-b8c48ac8d288 ...
2020-02-01 06:21:07,561	oarph 7711 : ... done.  Egg at /tmp/op_spark_eggs_6c918b32-b629-49d9-9024-b8c48ac8d288/tmpu4zpxr8b_oarphpy_demo-0.0.0-py3.6.egg


In [16]:
# url_rdd = spark.sparkContext.parallelize(PAGE_URLS)
# print("RDD of pages to scrape: %s" % (url_rdd.collect()))

# from mymodule.scrape import get_image_tag_pairs
# tag_img_url_rdd = url_rdd.flatMap(get_image_tag_pairs)
#   # NB: `get_image_tag_pairs` gets sent to Spark workers via `cloudpickle`

# # def yay(x):
# #     import zipimport
# #     zipimport._zip_directory_cache = {}
# #     return x
# # tag_img_url_rdd = tag_img_url_rdd.map(yay)
    
# def to_image_anno(tag, img_url):
#     import mymodule
#     from mymodule.imagewithanno import ImageWithAnno
#     imganno = ImageWithAnno.create_from_url(img_url)
#     imganno.tag = tag
#     return imganno
# image_anno_rdd = tag_img_url_rdd.map(lambda pair: to_image_anno(*pair))
# num_images = image_anno_rdd.count()

# print("Scraped %s images" % num_images)
# assert num_images == 5, "Unexpectedly saw %s images" % num_images

df = spark.createDataFrame(image_anno_rdd)
df.show()

2020-02-01 06:21:07,576	oarph 7711 : egg time 1580538067.5566602
2020-02-01 06:21:07,578	oarph 7711 : src time 1580538067.5326605
2020-02-01 06:21:07,854	oarph 7711 : egg time 1580538067.5566602
2020-02-01 06:21:07,854	oarph 7711 : src time 1580538067.5326605


+------+--------------------+-------+--------------------+-----+
|height|         image_bytes|    tag|                 url|width|
+------+--------------------+-------+--------------------+-----+
|    53|[FF D8 FF E0 00 1...|  beach|https://pwais.git...|  120|
|   133|[FF D8 FF E0 00 1...| bridge|https://pwais.git...|  100|
|    75|[FF D8 FF E0 00 1...|    car|https://pwais.git...|  100|
|    84|[FF D8 FF E0 00 1...|challah|https://pwais.git...|   80|
|    90|[FF D8 FF E0 00 1...|  pizza|https://pwais.git...|  120|
+------+--------------------+-------+--------------------+-----+

