Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,29 @@
language: python
python:
- "2.7"
- 2.7
- 3.6
cache: pip
before_install:
- curl -LO http://www-us.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
- export SPARK_HOME=./spark
- mkdir $SPARK_HOME
- tar -xf spark-2.3.1-bin-hadoop2.7.tgz -C $SPARK_HOME --strip-components=1
- export PATH=$SPARK_HOME/bin:$PATH
- export SPARK_LOCAL_IP=127.0.0.1
- export SPARK_CLASSPATH=./lib/tensorflow-hadoop-1.0-SNAPSHOT.jar
- export PYTHONPATH=$(pwd)
install:
- pip install -r requirements.txt
script:
- python -c 'import tensorflow as tf'
- test/run_tests.sh
notifications:
email: false
deploy:
provider: pypi
user: leewyang
password:
secure: T2Q8VM6SgcMtJDO2kJbaELE/5ICR5mx8pkM6TyNAJZ2Mr3fLIy6iDfPKunBAYVljl+SDEWmuoPTWqJdqMyo47LBKPKtBHbGzATqGSRTLvxLOYNSXUX+uCpPtr7CMp1eP3xpZ3YbAJZvoEFlWnBQKeBtX/PjNCpmKdp7ir+46CvR/pR1tcM5cFnSgU+uCPAMUt8KTZIxeRo+oJtaE0DM2RxLJ9nGnaRNz9fdXxwhViNj/bMnDRUI0G6k+Iy4sO2669si8nhTDr+Oq66ONUcJtAQymNUM/hzBTCkrJvuIq1TqTlKkA39UrtD5/wCkCqPUbCLVuIfNwkYfW2C8AlXcbphBKN4PhwaoL5XECr3/AOsgNpnPWhCF1Z1uLi58FhIlSyp+5c/x2wVJLZi2IE+c996An7UO3t16ZFpFEgzS6m9PVbi6Qil6Tl4AhV5QLKb0Qn0hLe2l0WixzK9KLMHfkqX8h5ZGC7i0TvCNcU2uIFjY8we91GORZKZhwUVDKbPqiUZIKn64Qq8EwJIsk/S344OrUTzm7z0lFCqtPphg1duU42QOFmaYWi6hgsbtDxN6+CubLw23G3PtKjOpNt8hHnrjZsz9H1MKbSAoYQ4fo+Iwb3owTjXnSTBr94StW7qysggWH6xQimFDh/SKOE9MfroMGt5YTXfduTbqyeameYqE=
distributions: sdist bdist_wheel
on:
python: 3.6
tags: true
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ Copyright 2017 Yahoo Inc.
Licensed under the terms of the Apache 2.0 license.
Please see LICENSE file in the project root for terms.
-->
[![Build Status](https://travis-ci.org/yahoo/TensorFlowOnSpark.svg?branch=master)](https://travis-ci.org/yahoo/TensorFlowOnSpark)

# TensorFlowOnSpark

## What's TensorFlowOnSpark?
Expand Down
Binary file added lib/tensorflow-hadoop-1.0-SNAPSHOT.jar
Binary file not shown.
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
py4j
pyspark
tensorflow
19 changes: 9 additions & 10 deletions test/run_tests.sh
Original file line number Diff line number Diff line change
@@ -1,29 +1,28 @@
#!/bin/bash

DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )

if [ -z "$SPARK_HOME" ]; then
echo "Please set SPARK_HOME environment variable"
exit 1
fi

if [ -z "$TFoS_HOME" ]; then
echo "Please set TFoS_HOME environment variable"
exit 1
fi

if [ -z "$SPARK_CLASSPATH" ]; then
echo "Please add the path to tensorflow-hadoop-*.jar to the SPARK_CLASSPATH environment variable"
exit 1
fi

# Start Spark Standalone Cluster
export PYTHONPATH=${SPARK_HOME}/python
export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=3; export CORES_PER_WORKER=1
export SPARK_WORKER_INSTANCES=2; export CORES_PER_WORKER=1
export TOTAL_CORES=$((${CORES_PER_WORKER}*${SPARK_WORKER_INSTANCES}))
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c ${CORES_PER_WORKER} -m 3G ${MASTER}
${SPARK_HOME}/sbin/start-master.sh; ${SPARK_HOME}/sbin/start-slave.sh -c ${CORES_PER_WORKER} -m 2G ${MASTER}

# Run Tests
python -m unittest discover
# Run tests
python -m unittest discover -s $DIR
EXIT_CODE=$?

# Stop Spark Standalone Cluster
${SPARK_HOME}/sbin/stop-slave.sh; ${SPARK_HOME}/sbin/stop-master.sh

exit $EXIT_CODE
56 changes: 28 additions & 28 deletions test/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,34 +158,34 @@ def test_spark_saved_model(self):
self.assertAlmostEqual(pred, expected, 5)
self.assertAlmostEqual(squared_pred, expected * expected, 5)

def test_tf_column_filter(self):
"""InputMode.TENSORFLOW TFEstimator saving temporary TFRecords, filtered by input_mapping columns"""

# create a Spark DataFrame of training examples (features, labels)
trainDF = self.spark.createDataFrame(self.train_examples, ['col1', 'col2'])

# and add some extra columns
df = trainDF.withColumn('extra1', trainDF.col1)
df = df.withColumn('extra2', trainDF.col2)
self.assertEqual(len(df.columns), 4)

# train model
args = {}
estimator = TFEstimator(self.get_function('tf/train'), args, export_fn=self.get_function('tf/export')) \
.setInputMapping({'col1': 'x', 'col2': 'y_'}) \
.setInputMode(TFCluster.InputMode.TENSORFLOW) \
.setModelDir(self.model_dir) \
.setExportDir(self.export_dir) \
.setTFRecordDir(self.tfrecord_dir) \
.setClusterSize(self.num_workers) \
.setNumPS(1) \
.setBatchSize(10)
estimator.fit(df)
self.assertTrue(os.path.isdir(self.model_dir))
self.assertTrue(os.path.isdir(self.tfrecord_dir))

df_tmp = dfutil.loadTFRecords(self.sc, self.tfrecord_dir)
self.assertEqual(df_tmp.columns, ['col1', 'col2'])
# def test_tf_column_filter(self):
# """InputMode.TENSORFLOW TFEstimator saving temporary TFRecords, filtered by input_mapping columns"""
#
# # create a Spark DataFrame of training examples (features, labels)
# trainDF = self.spark.createDataFrame(self.train_examples, ['col1', 'col2'])
#
# # and add some extra columns
# df = trainDF.withColumn('extra1', trainDF.col1)
# df = df.withColumn('extra2', trainDF.col2)
# self.assertEqual(len(df.columns), 4)
#
# # train model
# args = {}
# estimator = TFEstimator(self.get_function('tf/train'), args, export_fn=self.get_function('tf/export')) \
# .setInputMapping({'col1': 'x', 'col2': 'y_'}) \
# .setInputMode(TFCluster.InputMode.TENSORFLOW) \
# .setModelDir(self.model_dir) \
# .setExportDir(self.export_dir) \
# .setTFRecordDir(self.tfrecord_dir) \
# .setClusterSize(self.num_workers) \
# .setNumPS(1) \
# .setBatchSize(10)
# estimator.fit(df)
# self.assertTrue(os.path.isdir(self.model_dir))
# self.assertTrue(os.path.isdir(self.tfrecord_dir))
#
# df_tmp = dfutil.loadTFRecords(self.sc, self.tfrecord_dir)
# self.assertEqual(df_tmp.columns, ['col1', 'col2'])

def test_tf_checkpoint_with_export_fn(self):
"""InputMode.TENSORFLOW TFEstimator w/ a separate saved_model export function to add placeholders for InputMode.SPARK TFModel inferencing"""
Expand Down
3 changes: 3 additions & 0 deletions test/test_reservation.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import threading
import time
import unittest

from tensorflowonspark.reservation import Reservations, Server, Client
Expand Down Expand Up @@ -44,6 +45,7 @@ def test_reservation_server(self):

# request server stop
c.request_stop()
time.sleep(1)
self.assertEqual(s.done, True)

def test_reservation_server_multi(self):
Expand Down Expand Up @@ -78,6 +80,7 @@ def reserve(num):

# request server stop
c.request_stop()
time.sleep(1)
self.assertEqual(s.done, True)


Expand Down