# Hybrid Data Engineering: Open Data Hub and Object Storage

The intent of this notebook is to provide examples of how data engineers/scientist can use Open Data Hub and object storage, specifically, Ceph object storage, much in the same way they are accustomed to interacting with Amazon Simple Storage Service (S3). This is made possible because Ceph's object storage gateway offers excellent fidelity with the modalities of Amazon S3.

# Working with Boto

Boto is an integrated interface to current and future infrastructural services offered by Amazon Web Services. Among the services it provides interfaces for is Amazon S3. For lightweight analysis of data using python tools like numpy or pandas, it is handy to interact with data stored in object storage using pure python. This is where Boto shines. 

In [None]:
import sys

In [None]:
import os
import boto3


s3_endpoint_url = os.environ['S3_ENDPOINT_URL']
s3_access_key = os.environ['AWS_ACCESS_KEY_ID']
s3_secret_key = os.environ['AWS_SECRET_ACCESS_KEY']
s3_bucket_name = os.environ['JUPYTERHUB_USER']

print(s3_endpoint_url)
print(s3_bucket_name)
s3 = boto3.client('s3','us-east-1', endpoint_url= s3_endpoint_url,
                       aws_access_key_id = s3_access_key,
                       aws_secret_access_key = s3_secret_key)


# Interacting with S3

### Creating a bucket, uploading an object (put), and listing the bucket.

In the cell below we will use our boto3 connection, `s3`, to do the following: Create an S3 bucket, upload an object, and then display all of the contents of that bucket.  

In [None]:
s3.create_bucket(Bucket=s3_bucket_name)
s3.put_object(Bucket=s3_bucket_name,Key='object',Body='data')
for key in s3.list_objects(Bucket=s3_bucket_name)['Contents']:
    print(key['Key'])

### Exercise #1: Manage Remote Storage

Let's do something slightly more more complicated and upload a small file to our new bucket. 

Below we have used pandas to generate a small csv file for you. Run the below cell, and then upload it to your S3 bucket. Then Display the contents of your bucket like we did above. 

This resource may be helpful: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-uploading-files.html

#### Objective

1) Upload a csv file to your s3 bucket using `s3.upload_file()`

2) List the objects currently in your bucket using `s3.list_objects()`

In [None]:
### Create and save a small pandas dataframe and save it locally as a .csv file

import pandas as pd

x = [1,2,3,4]
y = [4,5,6,7]

df  = pd.DataFrame([x,y])
df.to_csv('new_data.csv')

In [None]:
# 1. Upload a csv file to your s3 bucket using s3.upload_file()

s3.upload_file(Filename='new_data.csv',Bucket=s3_bucket_name, Key='new_data.csv')

In [None]:
# 2. List the objects currently in your bucket using s3.list_objects()

for key in s3.list_objects(Bucket=s3_bucket_name)['Contents']:
    print(key['Key'])


Great, now you know how to interact with and manage your data store with simple data types. But what if we needed to work with larger data sets and employ some more advanced analytics tools like Spark?  

# Working with Spark

The Open Data Hub operator will also install Spark. Each Jupyterhub user will also have a dedicated Spark cluster (Master and Workers) to use. The first step is to connect to the Spark Cluster and get a spark session.

In [None]:
import os
import pyspark
import socket

from pyspark.context import SparkContext
from pyspark.sql import SparkSession, SQLContext

# Retrieve the notebook pod clusterIP to pass to the spark cluster 
spark_driver_host = socket.gethostbyname(socket.gethostname())


os.environ["PYSPARK_SUBMIT_ARGS"] = f"--conf spark.jars.ivy={os.environ['HOME']} --conf spark.driver.host={spark_driver_host} --packages com.amazonaws:aws-java-sdk:1.8.0,org.apache.hadoop:hadoop-aws:2.8.5 pyspark-shell"
spark_cluster_url = f"spark://{os.environ['SPARK_CLUSTER']}:7077"
spark = SparkSession.builder.master(spark_cluster_url).getOrCreate()

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.endpoint", s3_endpoint_url)
hadoopConf.set("fs.s3a.access.key", s3_access_key)
hadoopConf.set("fs.s3a.secret.key", s3_secret_key)
hadoopConf.set("fs.s3a.path.style.access", "true")
hadoopConf.set("fs.s3a.connection.ssl.enabled", "true")

In [None]:
import socket
spark.range(100, numPartitions=100).rdd.map(lambda x: socket.gethostname()).distinct().collect()

### Exercise #2: Display Spark Data

Now that we are connected to our spark cluster, let's go ahead an interact with our data. First, lets read in our initial data object we created above using `spark.read.text()` into a spark dataframe called `df0`, then we can go ahead and print the total number of rows in our dataframe using built-in `count()` method. 

Let's also go ahead and print out the dataframes schema, show all entire dataframe then filter it down to show just the values. 

#### Objectives:

1) Read in the data as a spark data frame called `df0` using `spark.read.csv()` with the parameter `header=true`

2) Print the total number of rows using `df0.count()`

3) Display the data frames Schema using `df0.printSchema()`

4) Output the entire data frame using `df0.show()`

5) Filter the output to just the values using `df0.select()`


In [None]:
# 1. Read in the data as a spark data frame called df0

df0 = spark.read.csv(f"s3a://{s3_bucket_name}/new_data.csv", header=True)

In [None]:
# 2. Print the total number of rows using df0.count()

print("Total number of rows in df0: %d" % df0.count())

In [None]:
# 3. Display the data frames Schema using `df0.printSchema()'

df0.printSchema()

In [None]:
# 4. Output the entire data frame using df0.show()

df0.show()

In [None]:
# 5. Filter the output to just the values of one column using `df0.select()

df0.select("1").show()

# Working with a Hybrid Data Context

As of Hadoop 2.8, S3A supports per bucket configuration. This is very powerful. It allows us to have a distinct S3A configuration, with a different endpoint and different set of credentials. With this I can use a single Spark context to read a parquet file from a bucket in the public cloud (Amazon S3) into a data frame, then turn around and write that dataframe as a parquet file into a bucket that exists in the Ceph(Rook) local cluster installation.

In [None]:
hadoopConf=spark.sparkContext._jsc.hadoopConfiguration()
hadoopConf.set("fs.s3a.bucket.bd-dist.endpoint", "s3.amazonaws.com")
hadoopConf.set("fs.s3a.bucket.bd-dist.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider")

### Exercise #3: Public to Private ETL

Now that we have our hadoop configuration set up, let's do some public to private ETL.

Simply read a tab separated data from a bucket in Amazon S3 and write it back out to a bucket in our Ceph(Rook) service.

First, go ahead and read the tab separated file "s3a://bd-dist/trip_report.tsv" from our AWS bucket using Spark, then go ahead and do what we did above, printing the total number of rows, printing the schema and then displaying the entire data frame.

After we've read the data into our notebook, we can go ahead an write back out to our local bucket. 


#### Objectives:

1) Read in the tsv file using `spark.read.csv()` into the variable `tripreport` and make sure to pass the parameter `sep=\t`.

2) Print the total number of rows in `tripreport` using `.count()`

3) Display the dataframe's schema with `.printSchema()`

4) Output the dataframe with `.show()`

5) Chain the `.read()` and `.write()` methods to read data from AWS and write it Ceph


In [None]:
# 1. Read in the tsv file using `spark.read.csv()` into the variable `tripreport`.

tripreport = spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t")

In [None]:
# 2. Print the total number of rows in `tripreport` using `.count()`
    
print("Total number of rows in tripreport: %d" % tripreport.count())    

In [None]:
# 3. Display the dataframe's schema with `.printSchema()`

tripreport.printSchema()

In [None]:
# 4. Output the dataframe with `.show()`

tripreport.show()

In [None]:
# 5. Chain the `.read()` and `.write()` methods to read data from AWS and write it Ceph

spark.read.csv("s3a://bd-dist/trip_report.tsv",sep="\t").write.csv(f"s3a://{s3_bucket_name}/trip_report.tsv",sep="\t",mode="overwrite")

Now we will extract all JSON files from a bucket prefix (pseudo directory) in S3 to `jsonFile` and use it with SparkSQL in the next section.

In [None]:
jsonFile= spark.read.option("multiline", True).option("mode", "PERMISSIVE").json("s3a://bd-dist/kube-metrics")
print("Total number of rows in df0: %d" % jsonFile.count())
jsonFile.printSchema()
jsonFile.show()

# Working with SparkSQL

__Import statistics libraries__

In [None]:
import pandas as pd
import json
import numpy as np
import seaborn as sns
import sys
import matplotlib.pyplot as plt
%matplotlib inline

from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

__Display schema of files__

In [None]:
print('Display schema:')
jsonFile.printSchema()

__Query the JSON data using filters__

In [None]:
#Register the created SchemaRDD as a temporary table.
jsonFile.registerTempTable("kubelet_docker_operations_latency_microseconds")

#Filter the results into a data frame
data = spark.sql("SELECT values, metric.operation_type FROM kubelet_docker_operations_latency_microseconds WHERE metric.quantile='0.9' AND metric.hostname='free-stg-master-03fb6'")

data.show()
data.printSchema()

In [None]:
# Read Spark RDD into pandas dataframe for analysis. (https://spark.apache.org/docs/1.3.0/api/python/pyspark.sql.html) 
data_pd = data.toPandas()

# set the operation type.
OP_TYPE = 'list_images'

# create an empty dataframe with 3 columns to populate with json data
df2 = pd.DataFrame(columns = ['utc_timestamp','value', 'operation_type'])

# for each unique operation type collect its values, and place in the new data frame
for op in set(data_pd['operation_type']):
    dict_raw = data_pd[data_pd['operation_type'] == op]['values']
    list_raw = []
    for key in dict_raw.keys():
        list_raw.extend(dict_raw[key])
    temp_frame = pd.DataFrame(list_raw, columns = ['utc_timestamp','value'])
    temp_frame['operation_type'] = op
    
    df2 = df2.append(temp_frame)

# Remove rows that contain nan values
df2 = df2[df2['value'] != 'NaN']

# convert all values to ints
df2['value'] = df2['value'].apply(lambda a: int(a))

# convert timestamp column of strings to timestamp objects
df2['timestamp'] = df2['utc_timestamp'].apply(lambda a : datetime.fromtimestamp(int(a)))

df2.head()

#### Verify Above Alerts

Store timestamp with data

In [None]:
df2.reset_index(inplace =True)

del df2['index']

df2['operation_type'].unique()
df2.head()

# Save Dataframe in local Ceph(Rook)

Now that we have done some work with our data we want to save it back into our local Ceph bucket. 


### Excersie #4: Saving Your DataFrame

Let's go ahead and convert our pandas dataframe back into a Spark data frame and use `.write.csv()` to write our new file.

#### Objective

1) Use `dfSpark.write.csv()` as `operationainfo.csv` to the `kube-metrics` location in your ceph bucket. 

In [None]:
# Convert pandas DataFrame to Spark dataframe
dfSpark = spark.createDataFrame(df2)
dfSpark.printSchema()

In [None]:
# 1. Use `dfSpark.write.csv()` as `operationainfo.csv` to the `kube-metrics` location in your ceph bucket. 

dfSpark.write.csv(f"s3a://{s3_bucket_name}//kube-metrics/operationinfo.csv",sep="\t",mode="overwrite",header = 'True')

In [None]:
spark.stop()