#### The goal of this notebook is to show a user several basic PySpark commands and to calculate ...

In [None]:
%load_ext autoreload
%autoreload

#%autoreload
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
display(HTML("""<style>div.output_area{max-height:10000px;overflow:scroll;}</style>"""))

from timeUtils import clock, elapsed

import os
import findspark
from pyspark import SparkContext
from operator import add
 
import random
from pandas import DataFrame, Series

import pandas as pd
pd.set_option("display.max_rows",1000)
pd.set_option('precision', 3)

import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')

import warnings
warnings.filterwarnings('ignore')

# Check Python

In [None]:
import sys
sys.version

# Setup Java

In [None]:
javahome=os.environ.get("JAVA_HOME")
print("JAVA_HOME -->",javahome)
## This should point to something that has 'whatever.jdk1.8.whatever' in the path. If not, nothing will work.

# Setup Spark

In [None]:
def setupSpark(debug=False):
    import os
    try:
        javahome=os.environ.get("JAVA_HOME")
        print("Java home is {0}".format(javahome))
    except:
        raise ValueError("There is not JAVA_HOME variable")
        
    try:
        sparkhome=os.environ.get("SPARK_HOME")
        print("Spark home is {0}".format(sparkhome))
    except:
        raise ValueError("There is not SPARK_HOME variable")
    
    try:
        import findspark
        findspark.init(sparkhome)
    except:
        raise ValueError("Could import findspark")
        
    try:
        import pyspark    
    except:
        raise ValueError("Could not import pyspark")
    
    conf = (pyspark.SparkConf()
        .setAppName('My Spark Application')
#        .setMaster('yarn')
        .set('spark.driver.memory', '120g')
        .set('spark.shuffle.service.enabled', True)
        .set('spark.dynamicAllocation.enabled', True)
#        .set('spark.executor.heartbeatInterval', '3600s')
#        .set('spark.executor.memory', '5g')
#        .set('spark.yarn.executor.memoryOverhead', '4000m')
        .set('spark.dynamicAllocation.maxExecutors', 250)
#        .set('spark.dynamicAllocation.minExecutors', 10)
        .set('spark.kryoserializer.buffer.max', '2047m')
        .set('spark.speculation', True)
        .set('spark.sql.execution.arrow.enabled', False)
#        .set('spark.jars', hivejar)
        .set('spark.port.maxRetries', 100)
        .set('spark.driver.maxResultSize', '20g')
        .set('spark.sql.broadcastTimeout', 600))
            
# VT Stuff        
#        .set("fs.adl.oauth2.access.token.provider.type","ClientCredential")
#        .set("fs.adl.oauth2.client.id","4e7f6eec-d974-4d85-9997-1288d01bf6c5")
#        .set("fs.adl.oauth2.credential","dJ8W3bZ9QR9-alR:TgTX-H.v2hDFVwIS")
#        .set("fs.adl.oauth2.refresh.url","https://login.microsoftonline.com/8a7197be-6a57-442c-b24c-ba9cf03dab93/oauth2/token"))
    
    sc = pyspark.SparkContext(conf = conf)
    sc.setLogLevel('ERROR')
    
    return sc


#from pyspark import SparkContext

In [None]:
def setupHive(sc):
    from pyspark.sql import HiveContext
    hc = HiveContext(sc)
    return hc
    
def setupSQL(sc):
    from pyspark.sql import SQLContext
    sqlc = SQLContext(sc)
    return sqlc

In [None]:
sc = setupSpark(debug=True)

In [None]:
hc   = setupHive(sc)
sqlc = setupSQL(sc)

# Hello World For Spark

In [None]:
data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x: (x, 1)).reduceByKey(add).sortBy(lambda x: x[1], ascending=False).collect()
for (word, count) in counts:
    print("{}: {}".format(word, count))
sc.stop()

# Odd Number Generator

### For reasons I don't fully understand we must recreate sparkContext

In [None]:
sc = setupSpark(debug=True)
hc   = setupHive(sc)
sqlc = setupSQL(sc)

In [None]:
start, cmt = clock("Testing Odd Number Generator")
big_list = range(10000)
rdd = sc.parallelize(big_list, 2)
odds = rdd.filter(lambda x: x % 2 != 0)
elapsed(start, cmt)

odds.take(5)

# Pi Estimation

In [None]:
from IPython.core.display import Image, display
display(Image('pi.png', unconfined=True))

#### Define the perimeter of a circle of radius 1

In [None]:
def inside(p):
    x, y = random.random(), random.random()
    return x*x + y*y < 1

In [None]:
def computePiWithPython(N):
    start, cmt = clock("Testing Python with {0} events".format(N))
    pcount = sum(map(inside, range(0, N)))
    print("Pi is roughly %f" % (4.0 * pcount / N))
    runtime = elapsed(start, cmt, returnTime=True)
    return runtime

def computePiWithSpark(N):
    start, cmt = clock("Testing PySpark with {0} events".format(N))
    count = sc.parallelize(range(0, N)).filter(inside).count()
    print("Pi is roughly %f" % (4.0 * count / N))
    runtime = elapsed(start, cmt, returnTime=True)
    return runtime

#### Set up timing data

In [None]:
runtimes = {}
from numpy import linspace
exponents = linspace(3, 8, 11)
Ns = [int(10**exponent) for exponent in exponents]
runtimes = {N: {} for N in Ns}

### Run with Python
#### You will see only one process running

In [None]:
for N in Ns:
    rt = computePiWithPython(N)
    runtimes[N]["Python"] = rt

### Run with Spark
#### You will see lots of processes running

In [None]:
for N in Ns:
    rt = computePiWithSpark(N)
    runtimes[N]["Spark"] = rt

In [None]:
DataFrame(runtimes).T.plot(logx=True, logy=True)

# DataFrame Example

In [None]:
from ioUtils import getFile
moviedata = getFile("/Users/tgadfort/Documents/code/network_lunch_and_learn/data.p")
moviedata.head()

In [None]:
# Enable Arrow-based columnar data transfers
#spark.conf.set("spark.sql.execution.arrow.enabled", "true")

# Create a Spark DataFrame from a pandas DataFrame
start, cmt = clock("Creating IMDB Spark DataFrame")
spdf = hc.createDataFrame(moviedata)
spdf.cache()
elapsed(start, cmt)

In [None]:
from pyspark.sql.functions import countDistinct, col, max, min
print("Total Rows = {0}".format(spdf.count()))
print("Total Cols = {0}".format(len(spdf.columns)))
spdf.agg(countDistinct("Actor")).show()
spdf.agg(min(col("Year"))).show()
spdf.agg(max(col("Year"))).show()

### Count Number of Rows with Column Value (i.e., how many movies were made each year) 

In [None]:
spdf.groupBy('Year').count().show(3)
## Somewhat equivalent Pandas command
moviedata.groupby('Year').size().head(3)

#### Try Actor/Actress With The Most Films

In [None]:
result = spdf.groupBy('Actor').count() \
             .filter("`count` >= 10") \
             .sort(col("count").desc())

#### Spark won't compute anything until it needs to show it

In [None]:
result.show(5)

#### Try Caching Result And See Speed Increase

In [None]:
result = result.cache()

In [None]:
result.show(5)

# Spark SQL (Using KDD '99 Data)

#### Load 2MB GZIP File With KDD Data

In [None]:
start, cmt = clock("Creating Spark DataFrame From GZIP -> RDD -> Rows -> Spark")

In [None]:
data_file = "kddcup.data_10_percent.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(1)

#### Creating a Spark DataFrame Directly From The RDD

In [None]:
csv_rdd = raw_rdd.map(lambda row: row.split(","))
csv_rdd.take(1)

#### Create List of Spark Rows From Split CSV Data

In [None]:
from pyspark.sql import Row

#pddf.columns = ["duration", "protocol_type", "service", "flag", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent", "hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell", "su_attempted", "num_root", "num_file_creations", "num_shells", "num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login", "count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate", "dst_host_serror_rate", "dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate", "label"]

parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    logged_in=int(r[11]),
    num_compromised=int(r[12]),
    root_shell=int(r[13]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    num_shells=int(r[17]),
    num_access_files=int(r[18]),
    num_outbound_cmds=int(r[19]),
    is_host_login=int(r[20]),
    is_guest_login=int(r[21]),
    count=int(r[22]),
    srv_count=int(r[23]),
    serror_rate=float(r[24]),
    srv_serror_rate=float(r[25]),
    rerror_rate=float(r[26]),
    srv_rerror_rate=float(r[27]),
    same_srv_rate=float(r[28]),
    diff_srv_rate=float(r[29]),
    srv_diff_host_rate=float(r[30]),
    dst_host_count=int(r[31]),
    dst_host_srv_count=int(r[32]),
    dst_host_same_srv_rate=float(r[33]),
    dst_host_diff_srv_rate=float(r[34]),
    dst_host_same_src_port_rate=float(r[35]),
    dst_host_srv_diff_host_rate=float(r[36]),
    dst_host_serror_rate=float(r[37]),
    dst_host_srv_serror_rate=float(r[38]),
    dst_host_rerror_rate=float(r[39]),
    dst_host_srv_rerror_rate=float(r[40]),
    label=r[-1]
    )
)
parsed_rdd.take(1)

In [None]:
spdf = sqlc.createDataFrame(parsed_rdd)
print("Total Rows = {0}".format(spdf.count()))
print("Total Cols = {0}".format(len(spdf.columns)))

In [None]:
elapsed(start, cmt)

#### Load GZIP Data And Convert To Python String Object

In [None]:
start, cmt = clock("Creating Spark DataFrame From GZIP -> CSV -> Pandas -> Spark")

In [None]:
import gzip
gbuffer    = gzip.open("kddcup.data_10_percent.gz", 'rb') ## Returns a Buffer
gbytesdata = gbuffer.read()  ## Reads the data into a `bytes` object
gstrdata   = gbytesdata.decode("utf-8")  ## Decode the 'encoded' btyes object using UTF-8. It's now a string object
print("1st 50 Characters  ==>  {0}".format(gstrdata[:50]))

#### Create Pandas DataFrame From CSV String

In [None]:
from pandas import DataFrame
splitdata = gstrdata.split("\n")
splitdata = [x.split(",") for x in splitdata]
pddf = DataFrame(splitdata)
pddf.columns = ["duration", "protocol_type", "service", "flag", "src_bytes", "dst_bytes", "land", "wrong_fragment", "urgent", "hot", "num_failed_logins", "logged_in", "num_compromised", "root_shell", "su_attempted", "num_root", "num_file_creations", "num_shells", "num_access_files", "num_outbound_cmds", "is_host_login", "is_guest_login", "count", "srv_count", "serror_rate", "srv_serror_rate", "rerror_rate", "srv_rerror_rate", "same_srv_rate", "diff_srv_rate", "srv_diff_host_rate", "dst_host_count", "dst_host_srv_count", "dst_host_same_srv_rate", "dst_host_diff_srv_rate", "dst_host_same_src_port_rate", "dst_host_srv_diff_host_rate", "dst_host_serror_rate", "dst_host_srv_serror_rate", "dst_host_rerror_rate", "dst_host_srv_rerror_rate", "label"]
elapsed(start, cmt)

#### Create Spark DataFrame From Pandas DataFrame

In [None]:
spdf = hc.createDataFrame(pddf)
print("Total Rows = {0}".format(spdf.count()))
print("Total Cols = {0}".format(len(spdf.columns)))

In [None]:
elapsed(start, cmt)

## KDD '99 Spark DataFrame SQL Access 

#### Collect results using PySpark statements

In [None]:
protocols = spdf.groupBy('protocol_type').count().orderBy('count', ascending=False)
protocols.show()

#### Collect results using SQL statements

In [None]:
spdf.registerTempTable("connections")  ## Must register the DataFrame as a Table (Temp)

#### Run previous command, but through a SQL statement

In [None]:
protocols = sqlc.sql("""SELECT protocol_type, count(*) as freq
                        FROM connections
                        GROUP BY protocol_type
                        ORDER BY 2 DESC
                        """)
protocols.show()

#### Another SQL example

In [None]:
attack_protocol = sqlc.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as freq
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_protocol.show()

#### Yet another SQL example

In [None]:
attack_stats = sqlc.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
attack_stats.show()

#### Bring everything back to Pandas

In [None]:
attack_stats_df = attack_stats.toPandas()
attack_stats_df

# Running Python Code In PySpark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Load a CSV with header and ';' delimiter
spdf = spark.read.load("bank.csv", format="csv", sep=";", inferSchema="true", header="true", quote="\"")
spdf.show(1)

## Define Python functions and let Spark know about them via a UDF (user defined function)

In [None]:
def squareF(z):
    try:
        return z**2
    except:
        return None

def squareFconv(z):
    try:
        return float(z**2)
    except:
        return None


def square(z):
    try:
        return z**2
    except:
        return None


## Import UDF From PySpark Functions
from pyspark.sql.functions import udf
## Import Types
from pyspark.sql.types import FloatType, IntegerType


## Create 3 UDFs
square_udf_int = udf(lambda z: square(z), IntegerType())
square_udf_float = udf(lambda z: squareF(z), FloatType())
square_udf_float_conv = udf(lambda z: squareFconv(z), FloatType())

#### Show Only Certain Columns of PySpark DataFrame

In [None]:
spdf.select(["age", "education", "balance"]).show(5)

##### Apply The "Square an Integer" UDF to the 'Age' column

In [None]:
dummy = spdf.withColumn('ageSquared', square_udf_int('age'))
dummy.select(["age", "education", "balance", "ageSquared"]).show(5)

##### Apply The "Square an Integer" UDF to the 'Eduction' column

In [None]:
dummy = spdf.withColumn('eductionSquared', square_udf_int('education'))
dummy.select(["age", "education", "balance", "eductionSquared"]).show(5)

##### Apply The "Square a Float" UDF to the 'Balance' column

In [None]:
dummy = spdf.withColumn('balanceSquared', square_udf_float('balance'))
dummy.select(["age", "education", "balance", "balanceSquared"]).show(5)

##### Apply The "Square a Float and Force Conversion to Type Float" UDF to the 'Balance' column

In [None]:
dummy = spdf.withColumn('balanceConvSquared', square_udf_float_conv('balance'))
dummy.select(["age", "education", "balance", "balanceConvSquared"]).show(5)

# Machine Learning (Extra Credit)

## Using MLlib For Classification

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Load a CSV with header and ';' delimiter
spdf = spark.read.load("bank.csv", format="csv", sep=";", inferSchema="true", header="true", quote="\"")
cols = spdf.columns

In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan', 'contact', 'poutcome']
stages = []
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]
label_stringIdx = StringIndexer(inputCol = 'y', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['age', 'balance', 'duration', 'campaign', 'pdays', 'previous']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [None]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(spdf)
spdf = pipelineModel.transform(spdf)
selectedCols = ['label', 'features'] + cols
df = spdf.select(selectedCols)
df.printSchema()

In [None]:
train, test = df.randomSplit([0.7, 0.3], seed = 2018)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))

In [None]:
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=10)
lrModel = lr.fit(train)


#We can obtain the coefficients by using LogisticRegressionModel’s attributes.

import matplotlib.pyplot as plt
import numpy as np
beta = np.sort(lrModel.coefficients)
plt.plot(beta)
plt.ylabel('Beta Coefficients')
plt.show()

In [None]:
trainingSummary = lrModel.summary
print('Training set areaUnderROC: ' + str(trainingSummary.areaUnderROC))

## Using MLlib For Clustering

### Generate some random "blobs" for clustering

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.datasets.samples_generator import make_blobs
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import SQLContext

nCenters = 25

for (n_samples, name) in [(int(1e5), "small.csv"), (int(1e6), "mid.csv"), (int(3e6), "large.csv")]:
    n_features=3
    X, y = make_blobs(n_samples=n_samples, centers=nCenters, n_features=n_features, random_state=42)
    if name == "small.csv":
        plotData = [X, y]

    # add a row index as a string
    pddf = pd.DataFrame(X, columns=['x', 'y', 'z'])
    pddf['id'] = 'row'+pddf.index.astype(str)

    #move it first (left)
    cols = list(pddf)
    cols.insert(0, cols.pop(cols.index('id')))
    pddf = pddf.ix[:, cols]
    pddf.head()

    # save the ndarray as a csv file
    print("Saving {0} data values to {1}".format(n_samples, name))
    pddf.to_csv(name, index=False)

### Plot data to see what we generated

In [None]:
threedee = plt.figure(figsize=(12,10)).gca(projection='3d')
threedee.scatter(plotData[0][:,0], plotData[0][:,1], plotData[0][:,2], c=plotData[1])
threedee.set_xlabel('x')
threedee.set_ylabel('y')
threedee.set_zlabel('z')
plt.show()

## KMeans Clustering With PySpark

In [None]:
csvfile = "small.csv"
plotClusters = False
start, cmt = clock("Running KMeans With PySpark")

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

## Load a CSV with header and ';' delimiter
spdf = spark.read.load(csvfile, format="csv", sep=",", inferSchema="true", header="true") #, quote="\"")
cols = spdf.columns
FEATURES_COL = ['x', 'y', 'z']
print(spdf.dtypes)
spdf.show(3)

#### Set Format For MLlib

In [None]:
vecAssembler = VectorAssembler(inputCols=FEATURES_COL, outputCol="features")
spdf_kmeans = vecAssembler.transform(spdf).select('id', 'features')
spdf_kmeans.show(3)

#### Run a 'cost' analysis to determine optimal number of clusters

In [None]:
## Runs Spark KMeans
runCost = False
if runCost is True:
    cost = np.zeros(20)
    for k in range(2,20):
        kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
        model = kmeans.fit(spdf_kmeans.sample(False,0.1, seed=42))
        cost[k] = model.computeCost(spdf_kmeans) # requires Spark 2.0 or later


    fig, ax = plt.subplots(1,1, figsize =(8,6))
    ax.plot(range(2,20),cost[2:20])
    ax.set_xlabel('k')
    ax.set_ylabel('cost')

#### Choose 10 clusters because that's what we actually generated

In [None]:
k = nCenters
kmeans = KMeans().setK(k).setSeed(1).setFeaturesCol("features")
model = kmeans.fit(spdf_kmeans)
centers = model.clusterCenters()

print("Cluster Centers: ")
for center in centers:
    print(center)

#### Predict cluster for generated data

In [None]:
transformed = model.transform(spdf_kmeans).select('id', 'prediction')
rows = transformed.collect()

spdf_pred = spark.createDataFrame(rows)
spdf_pred.show(5)

#### Join with generated data

In [None]:
spdf_pred = spdf_pred.join(spdf, 'id')
spdf_pred.show(5)

#### Convert to Pandas and set index

In [None]:
pddf_pred = spdf_pred.toPandas().set_index('id')
pddf_pred.head()

#### Plot everything

In [None]:
if plotClusters:
    threedee = plt.figure(figsize=(12,10)).gca(projection='3d')
    threedee.scatter(pddf_pred.x, pddf_pred.y, pddf_pred.z, c=pddf_pred.prediction)
    threedee.set_xlabel('x')
    threedee.set_ylabel('y')
    threedee.set_zlabel('z')
    plt.show()

In [None]:
elapsed(start, cmt)

In [None]:
## Just sleep to cool off
from time import sleep
sleep(2)

## KMeans Clustering With Sklean + Pandas (For Comparison)

In [None]:
#csvfile = "small.csv"
start, cmt = clock("Running KMeans With Sklean + Pandas")

In [None]:
from pandas import read_csv
clusterData = read_csv(csvfile)
clusterData.index = clusterData['id']
clusterData.drop(['id'], axis=1, inplace=True)
clusterData.head()

#### Create clusters and predict

In [None]:
from sklearn.cluster import KMeans as SklearnKMeans
kmeans = SklearnKMeans(n_clusters=nCenters)
kmeans.fit(clusterData)
y_kmeans = kmeans.predict(clusterData)

#### Join everything together

In [None]:
from pandas import Series
pddf_pred_sk = DataFrame(y_kmeans, index=clusterData.index)
pddf_pred_sk.columns = ['prediction']
pddf_pred_sk = pddf_pred_sk.join(clusterData) ## This works because we have identical indices
pddf_pred_sk.head()

#### Same plot, but doing it all with Pandas + Sklean

In [None]:
if plotClusters:
    threedee = plt.figure(figsize=(12,10)).gca(projection='3d')
    threedee.scatter(pddf_pred_sk.x, pddf_pred_sk.y, pddf_pred_sk.z, c=pddf_pred_sk.prediction)
    threedee.set_xlabel('x')
    threedee.set_ylabel('y')
    threedee.set_zlabel('z')
    plt.show()

In [None]:
elapsed(start, cmt)