# Shared Data Context

The intent of this notebook is to provide examples of how data scientists can use object storage, and more specifically, Ceph object storage, much in the same way they are accoustomed to interacting with Amazon Simple Storage Service (S3). This is made possible because Ceph's object storage gateway offers excellent fidelity with the modalities of Amazon S3.

# Working with Boto

Boto is an integrated interface to current and future infrastructural services offered by Amazon Web Services. Amoung the services it provides interfaces for is Amazon S3. For lightweight analysis of data using python tools like numpy or pandas, it is handy to interact with data stored in object storage using pure python. This is where Boto shines.

In [None]:
import os
import boto3

s3 = boto3.client('s3','us-east-1', endpoint_url= os.environ['RGW_API_ENDPOINT'],
                       aws_access_key_id = os.environ['ACCESSKEY'],
                       aws_secret_access_key = os.environ['SECRETKEY'])


Creating a bucket, uploading and object (put), and listing the bucket.

In [None]:
s3.create_bucket(Bucket='ceph-bucket')
s3.put_object(Bucket='ceph-bucket',Key='object',Body='data')
for key in s3.list_objects(Bucket='ceph-bucket')['Contents']:
    print(key['Key'])

# Working with Spark

When running an application you can either establish a Spark session locally in the notebook pod, or point it to a remote Spark cluster. Oshinko is a collection of components from the radanalyticsio community that aid in the provisioning and scaling of Spark clusters for intelligent applications.

In [None]:
import os
import pyspark

from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

spark = SparkSession.builder.master("local[3]").getOrCreate()

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.endpoint", os.environ['RGW_API_ENDPOINT'])
hadoopConf.set("fs.s3a.access.key", os.environ['ACCESSKEY'])
hadoopConf.set("fs.s3a.secret.key", os.environ['SECRETKEY'])
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set("fs.s3a.connection.ssl.enabled", "false")

In [None]:
import socket
spark.range(100, numPartitions=100).rdd.map(lambda x: socket.gethostname()).distinct().collect()

In [None]:
df0 = spark.read.text("s3a://ceph-bucket/object")

In [None]:
df0

# Working with a Hybrid Data Context

As of Hadoop 2.8, S3A supports per bucket configuration. This is very powerful. It allows us to have a distinct S3A configuration, with a different endpoint and different set of credentials. With this I can use a single Spark context to read a parquet file from a bucket in the public cloud (Amazon S3) into a data frame, then turn around and write that dataframe as a parquet file into a bucket that exists in the Ceph Nano service running in Minishift.

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.bucket.bd-dist.endpoint", "s3.amazonaws.com")
hadoopConf.set("fs.s3a.bucket.bd-dist.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

__Public to Private ETL__

Simply read tab separated data from a bucket in Amazon S3 and write it back out to a bucket in our Ceph Nano service.

In [None]:
spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t").write.csv("s3a://ceph-bucket/trip_report.tsv",sep="\t")

Extract all JSON files from a bucket prefix (pseudo directory) in Amazon S3 and write them back out to a bucket in our Ceph nano service with the same bucket prefix.

In [None]:
spark.read.option("multiline", True).option("mode", "PERMISSIVE").json("s3a://bd-dist/kube-metrics").repartition(76).write.option("compression", "bzip2").json("s3a://ceph-bucket/kube-metrics")

# Working with SparkSQL

Load Prometheus data set from Ceph Nano into a data frame.

In [None]:
jsonFile = spark.read.json("s3a://ceph-bucket/kube-metrics")

__Import statistics libraries__

In [None]:
import pandas as pd
import json
import numpy as np
import seaborn as sns
import sys
import matplotlib.pyplot as plt
%matplotlib inline

from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

__Display schema of files__

In [None]:
print('Display schema:')
jsonFile.printSchema()

__Query the JSON data using filters__

In [None]:
#Register the created SchemaRDD as a temporary table.
jsonFile.registerTempTable("kubelet_docker_operations_latency_microseconds")

#Filter the results into a data frame
data = spark.sql("SELECT values, metric.operation_type FROM kubelet_docker_operations_latency_microseconds WHERE metric.quantile='0.9' AND metric.hostname='free-stg-master-03fb6'")

data.show()

In [None]:
data_pd = data.toPandas()

spark.stop()

OP_TYPE = 'list_images'

df2 = pd.DataFrame(columns = ['utc_timestamp','value', 'operation_type'])
#df2 ='
for op in set(data_pd['operation_type']):
    dict_raw = data_pd[data_pd['operation_type'] == op]['values']
    list_raw = []
    for key in dict_raw.keys():
        list_raw.extend(dict_raw[key])
    temp_frame = pd.DataFrame(list_raw, columns = ['utc_timestamp','value'])
    temp_frame['operation_type'] = op
    
    df2 = df2.append(temp_frame)


df2 = df2[df2['value'] != 'NaN']

df2['value'] = df2['value'].apply(lambda a: int(a))

df2['timestamp'] = df2['utc_timestamp'].apply(lambda a : datetime.fromtimestamp(int(a)))

df2.head()

__ Objective - Verify Above Alerts __

Store timestamp with data

In [None]:
df2.reset_index(inplace =True)

del df2['index']

df2['operation_type'].unique()

Segregate the values by operation type in separate variables as Series

In [None]:
def get_filtered_op_frame(op_type):
    temp = df2[df2.operation_type == op_type]
    temp = temp.sort_values(by='timestamp')
    return temp

operation_type_value = {}
for temp in list(df2.operation_type.unique()):
    operation_type_value[temp] = get_filtered_op_frame(temp)['value']

__Descriptive Stats__

It refers to the portion of statistics dedicated to summarizing a total population

_Mean_

Arithmetic average of a range of values or quantities, computed by dividing the total of all values by the number of values.

In [None]:
for temp in operation_type_value.keys():
    print("Mean of: ",temp, " - ", np.mean(operation_type_value[temp]))

_Variance_

In the same way that the mean is used to describe the central tendency, variance is intended to describe the spread. The xi – μ is called the “deviation from the mean”, making the variance the squared deviation multiplied by 1 over the number of samples. This is why the square root of the variance, σ, is called the standard deviation.

In [None]:
for temp in operation_type_value.keys():
    print("Variance of: ",temp, " - ", np.var(operation_type_value[temp]))

_Standard Deviation_

Standard deviation (SD, also represented by the Greek letter sigma σ or the Latin letter s) is a measure that is used to quantify the amount of variation or dispersion of a set of data values.[1] A low standard deviation indicates that the data points tend to be close to the mean (also called the expected value) of the set, while a high standard deviation indicates that the data points are spread out over a wider range of values.

In [None]:
for temp in operation_type_value.keys():
    print("Standard Deviation of: ",temp, " - ", np.std(operation_type_value[temp]))

_Median_

Denotes value or quantity lying at the midpoint of a frequency distribution of observed values or quantities, such that there is an equal probability of falling above or below it. Simply put, it is the middle value in the list of numbers. The median is a better choice when the indicator can be affected by some outliers.

In [None]:
for temp in operation_type_value.keys():
    print("Median of: ",temp, " - ", np.median(operation_type_value[temp]))

__Histogram__

The most common representation of a distribution is a histogram, which is a graph that shows the frequency or probability of each value. Plots will be generated by operation type

We will use Seaborn module for this. __Kernel Density Estimation__ * will be added for smoothing.

* In statistics, kernel density estimation (KDE) is a non-parametric way to estimate the probability density function of a random variable. Kernel density estimation is a fundamental data smoothing problem where inferences about the population are made, based on a finite data sample.
* The kernel density estimate may be less familiar, but it can be a useful tool for plotting the shape of a distribution. Like the histogram, the KDE plots encodes the density of observations on one axis with height along the other axis:

In [None]:
sns.set(color_codes = True)

for temp in operation_type_value.keys():
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15,12))
    sns.distplot(get_filtered_op_frame(temp)['value'], kde=True, ax=ax[0], axlabel= temp)
    sns.distplot(np.log(get_filtered_op_frame(temp)['value']), kde=True, ax=ax[1], axlabel = "Log transformed "+ temp)
    fig.show()


__Understanding__

They are all log normals, cause value will always be greater than 0

In [None]:
df2.columns

__Box-Whisker__

Box plots may also have lines extending vertically from the boxes (whiskers) indicating variability outside the upper and lower quartiles, hence the terms box-and-whisker plot and box-and-whisker diagram. Outliers may be plotted as individual points.

Log normalisation is required because, for different operations, values seems to be in very different scales

In [None]:
df_whisker =  df2
df_whisker['log_transformed_value'] = np.log(df2['value'])

In [None]:
df_whisker.head()

In [None]:
plt.figure(figsize=(20,15))
ax = sns.boxplot(x="operation_type", y="log_transformed_value", hue="operation_type", data=df_whisker)  # RUN PLOT   
plt.show()

plt.clf()
plt.close()

__Finding trend in time series, if there any__

Trend means, if over time values have increasing or decreasing pattern. In this example we see that there is a trend of a slow and steady increase followed by a sharp drop.

In [None]:
operation_type_value.keys()

for temp in operation_type_value.keys():
    fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15,12))
    temp_frame = get_filtered_op_frame(temp)
    temp_frame = temp_frame.set_index(temp_frame.timestamp)
    temp_frame = temp_frame[['log_transformed_value']]
    temp_frame.plot(figsize=(15,12),title=temp)