### Environment Variables to be set:
- export PYSPARK_PYTHON=/opt/anaconda/bin/python3
- export PYSPARK_DRIVER_PYTHON="jupyter"
- export PYSPARK_DRIVER_PYTHON_OPTS="notebook"

### Script to run at command line to enable this notebook
- /opt/spark/bin/pyspark --master spark://spark-master-245:7077 --jars /opt/symetry/lib/sym-spark-assembly.jar,\
/PATH_TO_JARS/aws-java-sdk-1.7.4.jar,\
/PATH_TO_JARS/hadoop-aws-2.7.3.jar,\
/PATH_TO_JARS/jets3t-0.9.4.jar \
--driver-java-options -Dsym.lic.loc=/opt/symetry/sym.lic

In [1]:
%matplotlib inline

import os
import pyspark
import matplotlib.pyplot as plt
import numpy as np
import math
import pandas as pd

print("amazonS3Example.ipynb start")

amazonS3Example.ipynb start


In [2]:
# This function copies a Java list to a Python list.
# The converted list is totally seperated from its Java version.
def tolist(l):
    if type(l)==py4j.java_collections.JavaList:
        k = []
        for i in range(0,len(l)):
            k.append(tolist(l[i]))
    else:
        k = l
    return k

# Your code Starts from Here

In [5]:
# READ Amazon S3 Credentials from env variables
awsAccessKeyId = "toto" # os.environ['AWS_ACCESS_KEY']
awsSecretAccessKey = "momy" # os.environ['AWS_SECRET_KEY']

# print("awsAccessKeyId=" + awsAccessKeyId)
# print("awsSecretAccessKey=" + awsSecretAccessKey)

In [6]:
# Create an RDD from a CSV data file
from pyspark.mllib.common import _py2java, _java2py
from   py4j.java_collections import ListConverter

sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key",awsAccessKeyId)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key",awsSecretAccessKey)

myrdd  = sc.textFile("s3a://sml-oregon/datasets/susy/SUSYmini.csv")
# Convert pyspark RDD to JavaRDD
myJavaRdd = _py2java(sc, myrdd)

In [None]:
# The first line of CSV file are the name of the attributes
attributeNames = myrdd.first().split(",")
# The attributeTypes has to be given
attributeTypes = ["B"]+["C"]*(len(attributeNames)-1)

In [10]:
# The IP address of the host if empty, project is not persisted. (Not Persisted Here)
gateway         = sc._gateway
sym             = gateway.jvm.com.sml.shell
sym.SymShellConfig.set("RedisHost","localhost")
sym.SymShellConfig.set("RedisPort",6379)

In [11]:
# 1) Create the Project here
projectName     = "amazonS3ExampleInNotebook"
userName        = "c1"
projectType     =  0         # 0: Using CPU, 11:using GPU
gateway         = sc._gateway
sym             = gateway.jvm.com.sml.shell
p               = sym.PySparkShellSymetryProject(userName,projectName, projectType)

Py4JJavaError: An error occurred while calling None.com.sml.shell.PySparkShellSymetryProject.
: java.lang.Exception: Persistence not available for Scala / Python projects
	at com.sml.shell.PySparkShellSymetryProject.<init>(PySparkShell.scala:125)
	at com.sml.shell.PySparkShellSymetryProject.<init>(PySparkShell.scala:90)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:238)
	at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
	at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [None]:
#Enable Histogram
bins = 100
subject = "lepton-2-pT"

p.setBuildHistogram(True, 1000);
p.learn(sc._jsc, myJavaRdd, attributeNames, attributeTypes, None)
density = p.getPDFForAttribute(subject, bins, False, True)
x = density.getHistogram()

sigma = math.sqrt(p.univariate(subject)['variance'])
mu = p.univariate(subject)['mean']

In [None]:
#Getting Width, Min, Max of Density
xw = density.getWidth()
xmin = density.getMin()
xmax = density.getMax()
x_width = []
#Building x array
for k in range(0,100):
    xr = k*xw
    x_width.append(xr+xmin)
    x_list = []
#Building y array
for i in x:
    x_list.append(i)

In [None]:
#Plotting of PDF
plt.figure(figsize=(8,4))
plt.bar(x_width, x_list, width = 0.04, color='#0504aa',alpha=0.5)
plt.xlim(min(x_width), max(x_width))
plt.grid(axis='y', alpha=0.5)
plt.xlabel('Values',fontsize=10)
plt.ylabel('Frequency',fontsize=15)
plt.xticks(fontsize=10)
plt.yticks(fontsize=10)
plt.ylabel('Frequency',fontsize=10)
plt.title(subject+" Distribution",fontsize=15)
plt.show()

In [None]:
# 2) Learn the RDD
# sc : is the SparkContext which is automatically generated (sc._jsc: is its Java version)
p.learn(sc._jsc, myJavaRdd, attributeNames, attributeTypes, None)

In [None]:
# 3)  some data exploration (univariate, and bivariate Statistics)
print(p.univariate(4)) # exploration , see whether the project has been built correctly
print(p.univariate("lepton-2-pT")) # You may pass the name of the attribute as well
stats = p.univariate("lepton-2-pT")

In [None]:
# Measuring some univariate statistics
x = range(1,len(attributeNames))
attrVariance=[p.univariate(i-1)["variance"] for i in x]
attrMean=[p.univariate(i-1)["mean"] for i in x]

In [None]:
print("Plotting Distributions")
l1ptv = p.univariate("lepton-1-pT")['stddev']
l1ptm = p.univariate("lepton-1-pT")['mean']
l1ev = p.univariate("lepton-1-eta")['stddev']
l1em = p.univariate("lepton-1-eta")['mean']
l1pv = p.univariate("lepton-1-phi")['stddev']
l1pm = p.univariate("lepton-1-phi")['mean']
x = ['lepton-1-pT','lepton-1-eta','lepton-1-phi']
e = np.array([l1ptv,l1ev,l1pv])
y = np.array([l1ptm,l1em,l1pm])
plt.errorbar(x, y, e, linestyle='None', marker='^')
plt.title('Distribution of characteristics')
plt.show()

In [None]:
print(p.bivariate("lepton-2-pT","lepton-2-phi")["linCorr"])

In [None]:
# calculate the pairwise correlation coefficients
linCorr=[]
for attr1 in attributeNames:
    temp = []
    for attr2 in attributeNames:  
        b1=p.bivariate(attr1,attr2) #calculates two bivariate statistics
        temp.append(b1["linCorr"])
    linCorr.append(temp)

In [None]:
plt.xticks(range(0,len(attributeNames)))
plt.yticks(range(0,len(attributeNames)))
plt.imshow(linCorr,cmap="hot",interpolation='none')
plt.title("Linear Correlation")
_ = plt.colorbar()

In [None]:
# 4) Perform PCA
e1 = p.pca(range(1,19)) # returns a Tuple[eigenvalues,eigenvectors]
print(e1["EigenValues"][0])                                     # the first eigen-values
print(e1["EigenVectors"][0])                                      # the first eigen-vectors

In [None]:
v = e1["EigenVectors"][0:5]
v = tolist(v)
# in order to plot e1 ,we need to convert it to a List, we used predefined tolist function

plt.xticks(range(0,len(attributeNames)))
plt.yticks(range(0,len(attributeNames)))
plt.imshow(v,cmap="hot",interpolation='none')
plt.ylabel("PCA EigenVectos")
plt.colorbar()

In [None]:
e2 = p.pca([
    "lepton-1-pT",
    "lepton-1-eta", "lepton-1-phi",
    "lepton-2-pT", "lepton-2-eta"])

In [None]:
# 5) Build model
tar   = [0]                              # The forth Attribute is used as Target
input = range(1,19)                      # The first four attribute used as as Input
 
p.buildModel(input, tar, "lsvm", "mySvmModel")       # The Model is know built

In [None]:
# One row of data (Attributes are comma separated)
# Here in this model, the first Attribute is the Target,
# we put an arbitrary value as it will be ignored in the prediction

# The response for this example should be 1
test1 = ["-1","1.667973", "0.0641906","-1.2251714",
"0.506102", "-0.3389389", "1.6725428",
"3.475464", "-1.2191363", "0.0129545",
"3.775173", "1.0459771",  "0.5680512",
"0.481928", "0.0000000", "0.4484102",
"0.205355", "1.3218934", "0.3775840"]

df1 = sym.PyDataFrame() # The dataframe has to be type PyDataFrame
df1.setAttributeNames(attributeNames)
df1.setAttributeTypes(attributeTypes)
df1.addTuple(test1)
results1 = p.predict(df1,"mySvmModel")
print(results1)

In [None]:
# The response for this example should be 0
test2 = ["-1","1.001869","-0.471788","0.555614",
"1.233368","1.255548","-1.052491",
"0.437615","-1.333052","0.326858",
"-0.111678","1.435708","0.755201",
"0.466779","0.454541","1.446331",
"0.592259","1.325197","0.083014"]

df1.clear()
df1.addTuple(test2)
results2 = p.predict(df1,"mySvmModel")
print(results2)

In [None]:
# STEP 6) You can delete the Model to release the used memory
p.deleteModel("mySvmModel")

# STEP 7) Tou can delete the Project to release the used memory
p.deleteProject()

In [None]:
print("amazonS3Example.ipynb end")