# JupyterHub Notebook

### This notebook server is hosted on the OpenShift platform which provides a separate server for each individual user. The platform takes care of the provisioning of the server and allocating related to storage.

### First, install and import required libraries, watermark our file, initialise our Spark Session Builder and initialise our environment with required configuration

In [2]:
%pip install watermark
%pip install Minio
%pip install matplotlib


Collecting watermark
  Downloading watermark-2.2.0-py2.py3-none-any.whl (6.8 kB)
Installing collected packages: watermark
Successfully installed watermark-2.2.0
You should consider upgrading via the '/opt/app-root/bin/python3.8 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.
You should consider upgrading via the '/opt/app-root/bin/python3.8 -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.
Collecting matplotlib
  Downloading matplotlib-3.5.0-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (11.3 MB)
[K     |████████████████████████████████| 11.3 MB 26.8 MB/s eta 0:00:01
[?25hCollecting cycler>=0.10
  Downloading cycler-0.11.0-py3-none-any.whl (6.4 kB)
Collecting kiwisolver>=1.0.1
  Downloading kiwisolver-1.3.2-cp38-cp38-manylinux_2_5_x86_64.manylinux1_x86_64.whl (1.2 MB)
[K     |████████████████████████████████| 1.2 MB 118.6 MB/s eta 0:00:01
[?25hCollecting se

In [3]:
import os
import json
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import from_json, col, to_json, struct
import watermark
from minio import Minio

%matplotlib inline
%load_ext watermark

In [4]:
%watermark -n -v -m -g -iv


Python implementation: CPython
Python version       : 3.8.6
IPython version      : 7.22.0

Compiler    : GCC 8.4.1 20200928 (Red Hat 8.4.1-1)
OS          : Linux
Release     : 4.18.0-240.22.1.el8_3.x86_64
Machine     : x86_64
Processor   : x86_64
CPU cores   : 32
Architecture: 64bit

Git hash: d93a18cffaf35f2871b99f5a3dc16990dfb92072

json     : 2.0.9
watermark: 2.2.0



## Load Hyper parameters

In [None]:
from hyper_parameters import get_hyper_paras
user_id,PROJECT_NAME,EXPERIMENT_NAME,experiment_name = get_hyper_paras()

In [5]:
sparkSessionBuilder = SparkSession\
    .builder\
    .appName("Customer Churn ingest Pipeline")

In [6]:
submit_args = "--conf spark.jars.ivy=/tmp \
--conf spark.hadoop.fs.s3a.endpoint=http://minio-ml-workshop:9000 \
--conf spark.hadoop.fs.s3a.access.key=minio \
--conf spark.hadoop.fs.s3a.secret.key=minio123 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--packages org.apache.hadoop:hadoop-aws:3.2.0"


# submit_args = "--conf spark.jars.ivy=/tmp \
# --conf spark.hadoop.fs.s3a.endpoint=http://minio-ml-workshop:9000 \
# --conf spark.hadoop.fs.s3a.access.key=minio \
# --conf spark.hadoop.fs.s3a.secret.key=minio123 \
# --conf spark.hadoop.fs.s3a.path.style.access=true \
# --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
# --packages org.apache.hadoop:hadoop-aws:3.2.0,\
# org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,\
# org.apache.kafka:kafka-clients:2.8.0,\
# org.apache.spark:spark-streaming_2.12:3.0.1"


###  Connect to Spark Cluster provided by OpenShift Platform

In [7]:
import spark_util

spark = spark_util.getOrCreateSparkSession("ML Ops Demo", submit_args)
spark.sparkContext.setLogLevel("INFO")
print('Spark context started.')

Initializing environment variables for Spark
Creating a spark session...
Spark session created


Spark context started.


###  Declare our input data sources, import and combine them

In [8]:
dataFrame_Customer = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://rawdata/customers/Customer-Churn_P1.csv")
dataFrame_Customer.printSchema()

root
 |-- customerID: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- SeniorCitizen: integer (nullable = true)
 |-- Partner: string (nullable = true)
 |-- Dependents: string (nullable = true)
 |-- tenure: integer (nullable = true)



In [9]:
dataFrame_Products = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://rawdata/products/Customer-Churn_P2.csv")
dataFrame_Products.printSchema()


root
 |-- customerID: integer (nullable = true)
 |-- PhoneService: string (nullable = true)
 |-- MultipleLines: string (nullable = true)
 |-- InternetService: string (nullable = true)
 |-- OnlineSecurity: string (nullable = true)
 |-- OnlineBackup: string (nullable = true)
 |-- DeviceProtection: string (nullable = true)
 |-- TechSupport: string (nullable = true)
 |-- StreamingTV: string (nullable = true)
 |-- StreamingMovies: string (nullable = true)
 |-- Contract: string (nullable = true)
 |-- PaperlessBilling: string (nullable = true)
 |-- PaymentMethod: string (nullable = true)
 |-- MonthlyCharges: double (nullable = true)
 |-- TotalCharges: double (nullable = true)
 |-- Churn: string (nullable = true)



In [10]:
# from pyspark.sql.types import *
# from  pyspark.sql.functions import *

# srcKafkaBrokers = "odh-message-bus-kafka-bootstrap:9092"
# srcKakaTopic = "datatelco"



# schema = StructType()\
#     .add("customerID", IntegerType())\
#     .add("PhoneService", StringType())\
#     .add("MultipleLines", StringType())\
#     .add("InternetService", StringType())\
#     .add("OnlineSecurity", StringType())\
#     .add("OnlineBackup", StringType())\
#     .add("DeviceProtection", StringType())\
#     .add("TechSupport", StringType())\
#     .add("StreamingTV", StringType())\
#     .add("StreamingMovies", StringType())\
#     .add("Contract", StringType())\
#     .add("PaperlessBilling", StringType())\
#     .add("PaymentMethod", StringType())\
#     .add("MonthlyCharges", StringType())\
#     .add("TotalCharges", DoubleType())\
#     .add("Churn", StringType())



# #Read from JSON Kafka messages into a dataframe
# dfKafka = spark.read.format("kafka")\
#     .option("kafka.bootstrap.servers", srcKafkaBrokers)\
#     .option("subscribe", srcKakaTopic)\
#     .option("startingOffsets", "earliest")\
#     .load()\
#     .withColumn("value", regexp_replace(col("value").cast("string"), "\\\\", "")) \
#     .withColumn("value", regexp_replace(col("value"), "^\"|\"$", "")) \
#     .selectExpr("CAST(value AS STRING) as jsonValue")\
#     .rdd.map(lambda row: row["jsonValue"])

# dfObj = spark.read.schema(schema).json(dfKafka)
# dfObj.printSchema()
# dfObj.show(n=2)


In [11]:
dataFrom_All = dataFrame_Customer.join(dataFrame_Products, "customerID", how="full")

###  Push prepared data to object storage and stop Spark cluster to save resources
###  Note - be sure to change this user_id on the next line to your username (something in the range user1 ... user30)

In [12]:


# user_id = "user29"
file_location = "s3a://data/full_data_csv" + user_id
dataFrom_All.repartition(1).write.mode("overwrite")\
    .option("header", "true")\
    .format("csv").save(file_location)

In [13]:
spark.stop()