# JupyterHub Notebook

### This notebook server is hosted on the OpenShift platform which provides a separate server for each individual user. The platform takes care of the provisioning of the server and allocating related to storage.

### First, install and import required libraries, watermark our file, initialise our Spark Session Builder and initialise our environment with required configuration

In [None]:
%pip install watermark
%pip install Minio

In [None]:
import os
import json
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import from_json, col, to_json, struct
import watermark
from minio import Minio

%matplotlib inline
%load_ext watermark

In [None]:
%watermark -n -v -m -g -iv


In [None]:
sparkSessionBuilder = SparkSession\
    .builder\
    .appName("Customer Churn ingest Pipeline")

In [None]:
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--packages \
org.postgresql:postgresql:42.2.10,\
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,\
org.apache.kafka:kafka-clients:2.4.0,\
org.apache.spark:spark-streaming_2.11:2.4.5,\
org.apache.hadoop:hadoop-aws:2.7.3 \
--conf spark.jars.ivy=/tmp \
--conf spark.hadoop.fs.s3a.endpoint=http://172.30.141.91:9000 \
--conf spark.hadoop.fs.s3a.access.key=minio \
--conf spark.hadoop.fs.s3a.secret.key=minio123 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--master spark://' + os.environ['SPARK_CLUSTER'] + ':7077 pyspark-shell '



###  Connect to Spark Cluster provided by OpenShift Platform

In [None]:
spark = sparkSessionBuilder.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
print('Spark context started.')

###  Declare our input data sources, import and combine them

In [None]:
dataFrame_Customer = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://rawdata/Customer-Churn_P1.csv")
dataFrame_Customer.printSchema()

In [None]:
# dataFrame_Products = spark.read\
#                 .options(delimeter=',', inferSchema='True', header='True') \
#                 .csv("s3a://rawdata/Customer-Churn_P2.csv")
# dataFrame_Products.printSchema()


In [None]:
from pyspark.sql.types import *
from  pyspark.sql.functions import *

srcKafkaBrokers = "odh-message-bus-kafka-bootstrap:9092"
srcKakaTopic = "data"



schema = StructType()\
    .add("customerID", IntegerType())\
    .add("Premium", StringType())\
    .add("RelationshipManager", StringType())\
    .add("PrimaryChannel", StringType())\
    .add("HasCreditCard", StringType())\
    .add("DebitCard", StringType())\
    .add("IncomeProtection", StringType())\
    .add("WealthManagement", StringType())\
    .add("HomeEquityLoans", StringType())\
    .add("MoneyMarketAccount", StringType())\
    .add("CreditRating", StringType())\
    .add("PaperlessBilling", StringType())\
    .add("AccountType", StringType())\
    .add("MonthlyCharges", StringType())\
    .add("TotalCharges", DoubleType())\
    .add("Churn", StringType())



#Read from JSON Kafka messages into a dataframe
dfKafka = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", srcKafkaBrokers)\
    .option("subscribe", srcKakaTopic)\
    .option("startingOffsets", "earliest")\
    .load()\
    .withColumn("value", regexp_replace(col("value").cast("string"), "\\\\", "")) \
    .withColumn("value", regexp_replace(col("value"), "^\"|\"$", "")) \
    .selectExpr("CAST(value AS STRING) as jsonValue")\
    .rdd.map(lambda row: row["jsonValue"])

dfObj = spark.read.schema(schema).json(dfKafka)
dfObj.printSchema()
dfObj.show(n=2)


In [None]:
dataFrom_All = dataFrame_Customer.join(dfObj, "customerID", how="full")

###  Push prepared data to object storage and stop Spark cluster to save resources
###  Note - be sure to change this user_id on the next line to your username (something in the range user1 ... user30)

In [None]:
user_id = "user29"
file_location = "s3a://data/full_data_csv" + user_id
dataFrom_All.repartition(1).write.mode("overwrite")\
    .option("header", "true")\
    .format("csv").save(file_location)

In [None]:
spark.stop()