In [1]:
## Set Python - Spark environment.
import os
import sys
#Setting the path for Spark
# os.environ['SPARK_HOME'] = "/home/Kunal/Downloads/spark-2.4.7-bin-hadoop2.7"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.7,org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.7 pyspark-shell'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
#import packages
from sparktorch import serialize_torch_obj, SparkTorch
import torch
import torch.nn as nn
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SparkSession
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from sparktorch import SparkTorch, serialize_torch_obj_lazy
from pyspark.sql.functions import rand
## Create SparkContext, SparkSession
from pyspark.sql import *
import pyspark
from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, column

In [3]:
#setting configuration
conf = SparkConf().setAppName("Python_Spark_example").setMaster('yarn-client').set("spark.executor.memory","5g")
sc = SparkContext(conf=conf)
# sc=SparkContext.getOrCreate(conf=create_spark_conf().setMaster("local[4]").set("spark.driver.memory","8g").set("spark.executor.memory", '8g').set('spark.executor.cores', 4))
sc.setLogLevel("ERROR")
# spark = SparkSession(sc)
spark = SparkSession \
            .builder \
            .appName("Python Spark SQL basic example") \
            .config("spark.sql.catalogImplementation=hive").enableHiveSupport() \
            .getOrCreate()

In [4]:
# spark = SparkSession.builder.appName("examples").master('yarn-client').getOrCreate()
df = spark.read.option("inferSchema", "true").csv('hdfs://localhost:9000//user/kunal/examples/mnist_train.csv').coalesce(2)

In [5]:
#Typecast features into double
for col_name in df.columns:
    df = df.withColumn(col_name, col(col_name).cast('Double'))

In [6]:
#Filter null value
df=df.filter("_c13 is not null")

In [7]:
#number of record
df.count()

60000

In [8]:
#print Schema
df.printSchema()

root
 |-- _c0: double (nullable = true)
 |-- _c1: double (nullable = true)
 |-- _c2: double (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: double (nullable = true)
 |-- _c8: double (nullable = true)
 |-- _c9: double (nullable = true)
 |-- _c10: double (nullable = true)
 |-- _c11: double (nullable = true)
 |-- _c12: double (nullable = true)
 |-- _c13: double (nullable = true)
 |-- _c14: double (nullable = true)
 |-- _c15: double (nullable = true)
 |-- _c16: double (nullable = true)
 |-- _c17: double (nullable = true)
 |-- _c18: double (nullable = true)
 |-- _c19: double (nullable = true)
 |-- _c20: double (nullable = true)
 |-- _c21: double (nullable = true)
 |-- _c22: double (nullable = true)
 |-- _c23: double (nullable = true)
 |-- _c24: double (nullable = true)
 |-- _c25: double (nullable = true)
 |-- _c26: double (nullable = true)
 |-- _c27: double (nullable = tru

In [9]:
#Define Neural Network
network = nn.Sequential(
    nn.Linear(784, 256),
    nn.ReLU(),
    nn.Linear(256, 256),
    nn.ReLU(),
    nn.Linear(256, 10),
    nn.Softmax(dim=1)
)

In [10]:
# Build the pytorch object
torch_obj = serialize_torch_obj(
    model=network,
    criterion=nn.CrossEntropyLoss(),
    optimizer=torch.optim.Adam,
    lr=0.0001
)

In [11]:
# Setup features
vector_assembler = VectorAssembler(inputCols=df.columns[1:785], outputCol='features')

In [12]:
# Create a SparkTorch Model with torch distributed. Barrier execution is on by default for this mode.
spark_model = SparkTorch(
    inputCol='features',
    labelCol='_c0',
    predictionCol='predictions',
    torchObj=torch_obj,
    iters=1000,
    verbose=1,
    miniBatch=256,
    earlyStopPatience=40,
    validationPct=0.2
)

In [13]:
# Can be used in a pipeline and saved.
p = Pipeline(stages=[vector_assembler, spark_model]).fit(df)

In [14]:
#save
#p.save('simple_dnn')

In [15]:
# Run predictions and evaluation
predictions = p.transform(df).persist()
evaluator = MulticlassClassificationEvaluator(labelCol="_c0", predictionCol="predictions", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Train accuracy = %g" % accuracy)

Train accuracy = 0.787133


In [16]:
#END