# MobileNet With Pandas Python UDFs


# Launch Spark

Three configuration items have to be added to the Spark configuration to enable Arrow as it is disabled by default. This can be done without modifying SparkLauncher now, but you can just modify that if you like.

```python
    # Apache Arrow Config
    conf.set('spark.yarn.appMasterEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
    conf.set('spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT', '1')
    conf.set('spark.sql.execution.arrow.enabled', 'true')
```

In [None]:
import os
import pyarrow as pa
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import venv_pack

def get_spark_conf():
    
    # Create Spark Configuration
    print(f"Creating Spark Configuration")
    conf = SparkConf()
    conf.setMaster('yarn')

    # Application Master Environment Variables -- ugly
    conf.set('spark.yarn.appMasterEnv.JAVA_HOME', '/usr/java/jdk1.8.0_181-cloudera')
    conf.set('spark.yarn.appMasterEnv.LD_LIBRARY_PATH', 
             '/opt/cloudera/parcels/CDH/lib64:/usr/java/jdk1.8.0_181-cloudera/jre/lib/amd64:/usr/java/jdk1.8.0_181-cloudera/jre/lib/amd64/server')

    # Executor Envrironment Variables -- ugly
    conf.set('spark.executorEnv.JAVA_HOME', '/usr/java/jdk1.8.0_181-cloudera')
    conf.set('spark.executorEnv.LD_LIBRARY_PATH', 
             '/opt/cloudera/parcels/CDH/lib64:/usr/java/jdk1.8.0_181-cloudera/jre/lib/amd64:/usr/java/jdk1.8.0_181-cloudera/jre/lib/amd64/server')
    conf.set('spark.executorEnv.HADOOP_HOME', "/opt/cloudera/parcels/CDH")
    conf.set('spark.executorEnv.ARROW_LIBHDFS_DIR', "/opt/cloudera/parcels/CDH/lib64")
    conf.set('spark.executorEnv.HADOOP_CONF_DIR', "/etc/hadoop/conf")
    
    app_name = f'{os.environ["USER"]}_data603_spark'
    conf.setAppName(app_name)
    conf.set('spark.yarn.dist.archives', f'{os.environ["USER"]}.tar.gz#{os.environ["USER"]}')
    conf.set('spark.pyspark.driver.python', f'"source {os.environ["USER"]}/bin/activate && {os.environ["USER"]}/bin/python3"')
    conf.set('spark.yarn.appMasterEnv.PYSPARK_PYTHON', f'"source {os.environ["USER"]}/bin/activate && {os.environ["USER"]}/bin/python3"')
    conf.set('spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON', f'"source {os.environ["USER"]}/bin/activate && {os.environ["USER"]}/bin/python3"')

    conf.set('spark.yarn.appMasterEnv.HIVE_CONF_DIR', '/etc/hive/conf')
    
    #conf.set('spark.dynamicAllocation.minExecutors', '0')
    conf.set('spark.dynamicAllocation.maxExecutors', '30')
    
    conf.set('spark.executor.cores', '30')
    conf.set('spark.executor.memory', '60g')
    conf.set('spark.executor.memoryOverhead', '10g')
    conf.set('spark.yarn.am.memoryOverhead', '6g')
    conf.set('spark.yarn.am.memory', '8g')
    
    conf.set('spark.driver.log.dfsDir', f'/user/spark/driverLogs')
    conf.set('yarn.nodemanager.vmem-check-enabled',False)
    
    conf.set('spark.driver.extraJavaOptions', '-XX:ReservedCodeCacheSize=256M -XX:MaxMetaspaceSize=512m -XX:CompressedClassSpaceSize=512m')
    conf.set('spark.executor.extraJavaOptions', '-XX:ReservedCodeCacheSize=256M -XX:MaxMetaspaceSize=512m -XX:CompressedClassSpaceSize=512m')
    
    conf.set('spark.driver.extraClassPath', '/etc/hadoop/conf:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars')
    conf.set('spark.executor.extraClassPath', '/etc/hadoop/conf:/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars')
    #CDH-6.2.0-1.cdh6.2.0.p0.967373
    conf.set('spark.port.maxRetries', 100)
    
    return conf

In [None]:
def get_spark_session(pack_venv = True, conf = get_spark_conf()):
    """
    Launches Spark Context using UMBC Big Data Cluster YARN and returns a Spark Session.
    """
    # Pack Virtual Environment
    if pack_venv:
        packed_environment_file = f"{os.environ['USER']}.tar.gz"
        print(f"Packing Virtual Environment: {packed_environment_file}")
        venv_pack.pack(output=packed_environment_file, force = True)
    
    # Set local environment variables
    # for people that just won't follow directions and setup BASH
    os.environ['JAVA_HOME'] = "/usr/java/jdk1.8.0_181-cloudera"
    os.environ['CLASSPATH'] = "/etc/hadoop/conf:/opt/cloudera/parcels/CDH/jars"
    os.environ['PATH'] = f"{os.environ['PATH']}:{os.environ['JAVA_HOME']}/bin"
    os.environ['LD_LIBRARY_PATH'] = f"/opt/cloudera/parcels/CDH/lib64"
    os.environ['LD_LIBRARY_PATH'] = f"{os.environ['LD_LIBRARY_PATH']}:{os.environ['JAVA_HOME']}/jre/lib/amd64"
    os.environ['LD_LIBRARY_PATH'] = f"{os.environ['LD_LIBRARY_PATH']}:{os.environ['JAVA_HOME']}/jre/lib/amd64/server"

    print(f"Setting Environment Variables")
    os.environ['HADOOP_HOME'] = f"/opt/cloudera/parcels/CDH"
    os.environ['SPARK_HOME'] = "/opt/cloudera/parcels/CDH/lib/spark"
    os.environ['HIVE_HOME'] = "/opt/cloudera/parcels/CDH/lib/hive"
    
    os.environ['HADOOP_CONF_DIR'] = "/etc/hadoop/conf"
    os.environ['YARN_CONF_DIR'] = "/etc/hadoop/conf"
    os.environ['SPARK_CONF_DIR'] = "/etc/spark/conf"
    os.environ['HIVE_CONF_DIR'] = "/etc/hive/conf"
     
    os.environ['PYSPARK_PYTHON'] = f'{os.environ["USER"]}/bin/python3'
    

    # Create SparkSession
    session_name = f"{os.environ['USER']}_data603_spark_session"
    print(f"Creating Spark Session: {session_name}")
    spark = SparkSession.builder\
        .config(conf = conf)\
        .appName(session_name)\
        .enableHiveSupport()\
        .getOrCreate()

    return spark

Creating Spark Configuration


In [None]:
import import_ipynb
from data603 import SparkLauncher

# get a configuration object
conf = SparkLauncher.get_spark_conf()

# add a file to the configuration that will get copied to all the nodes on the cluster
conf.set('spark.yarn.dist.files', 'keras_data/mobilenet_1_0_224_tf.h5')


# launch the cluster using the configuration
spark = SparkLauncher.get_spark_session(pack_venv = False, conf = conf)


importing Jupyter notebook from /scratch/data603/a280/data603/SparkLauncher.ipynb
Creating Spark Configuration
Creating Spark Configuration
Setting Environment Variables
Creating Spark Session: a280_data603_spark_session


# Read Dataframe

This must be done _BEFORE_ the UDF is defined because the UDF needs the schemas of the dataframes it will be using. I have a parquet file written out with the bounding boxes extracted of several bird types.

In [None]:
image_chips = spark.read.parquet("/user/has1/chips_image.parquet")
image_chips = image_chips.drop('data') # remove the full-image data.

In [None]:
#Drop useless columns 
image_chips=image_chips.drop('hdfs_path')

In [None]:
#Checking to see its there
image_chips.show()

+----------------+---------+---------------+--------------------+
|         ImageID|LabelName|      LabelText|           chip_data|
+----------------+---------+---------------+--------------------+
|025d25975e4275a2| /m/0c29q|        Leopard|[FF D8 FF E0 00 1...|
|025d25975e4275a2| /m/0cd4d|        Cheetah|[FF D8 FF E0 00 1...|
|025d25975e4275a2| /m/0449p|Jaguar (Animal)|[FF D8 FF E0 00 1...|
|03bacd7be83b721e| /m/096mb|           Lion|[FF D8 FF E0 00 1...|
|078bfcf1afb210ae| /m/096mb|           Lion|[FF D8 FF E0 00 1...|
|078bfcf1afb210ae| /m/096mb|           Lion|[FF D8 FF E0 00 1...|
|078bfcf1afb210ae| /m/096mb|           Lion|[FF D8 FF E0 00 1...|
|0c9f40ea3014c553| /m/096mb|           Lion|[FF D8 FF E0 00 1...|
|0e0e38e4ffb1b727| /m/0c29q|        Leopard|[FF D8 FF E0 00 1...|
|0e0e38e4ffb1b727| /m/0cd4d|        Cheetah|[FF D8 FF E0 00 1...|
|1465d9f311a8ef8c| /m/0c29q|        Leopard|[FF D8 FF E0 00 1...|
|192a3715b2f2c738| /m/04g2r|           Lynx|[FF D8 FF E0 00 1...|
|193d7fdc2

In [None]:
#What are the datatypes
image_chips

DataFrame[ImageID: string, LabelName: string, LabelText: string, chip_data: binary]

In [None]:
def evaluate_chip(chip_data):
    import io
    import os
    
    from tensorflow.keras.applications.mobilenet import MobileNet
    from tensorflow.keras.applications.mobilenet import preprocess_input
    from tensorflow.keras.applications.mobilenet import decode_predictions
    from tensorflow.keras.preprocessing.image import load_img
    from tensorflow.keras.preprocessing.image import img_to_array


    # Load Model Data
    model = MobileNet(weights = 'imagenet',
                 include_top = True,alpha=1.0)

    # Load the image
    img = load_img(io.BytesIO(chip_data), target_size = (224,224))

    # Prepare Image
    image = img_to_array(img)
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    image = preprocess_input(image)

    
    # Run prediction
    yhat = model.predict(image)

    # Decode Predictions
    label = decode_predictions(yhat)
    label = label[0][0]

    ret = [label[1], float(label[2])]   

    return ret



In [None]:
# make a UDF
from pyspark.sql.types import *
from pyspark.sql.functions import udf

schema = ArrayType(StringType())

udf_evaluate_chip = udf(evaluate_chip, schema)

In [None]:
# evaluate image chips
image_chips = image_chips.withColumn("prediction", udf_evaluate_chip("chip_data"))

In [None]:
#Show what the model predicted
image_chips.select('prediction').show(5)

+--------------------+
|          prediction|
+--------------------+
|[jaguar, 0.946791...|
|[jaguar, 0.981539...|
|[jaguar, 0.959353...|
|[cougar, 0.984374...|
|[skunk, 0.2050361...|
+--------------------+
only showing top 5 rows



In [None]:
image_chips

DataFrame[ImageID: string, LabelName: string, LabelText: string, chip_data: binary, prediction: array<string>]

In [None]:
image_chips1 = image_chips.select('ImageID','LabelText',image_chips.prediction[0],image_chips.prediction[1])

image_chips1.show(5)

In [None]:
spark.stop()