# JupyterHub Notebook

### This notebook server is hosted on the OpenShift platform which provides a separate server for individual user. The platform take care about the provisioning of the server and allocating related to storage.

In [2]:
import os
import json
from pyspark import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.functions import from_json, col, to_json, struct

In [3]:
sparkSessionBuilder = SparkSession\
    .builder\
    .appName("Credit card data ingest Pipeline") \

In [4]:
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--packages \
org.postgresql:postgresql:42.2.10,\
org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5,\
org.apache.kafka:kafka-clients:2.4.0,\
org.apache.spark:spark-streaming_2.11:2.4.5,\
org.apache.hadoop:hadoop-aws:2.7.3 \
--conf spark.jars.ivy=/tmp \
--conf spark.hadoop.fs.s3a.endpoint=http://172.30.226.86:9000 \
--conf spark.hadoop.fs.s3a.access.key=minio \
--conf spark.hadoop.fs.s3a.secret.key=minio123 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--master spark://' + os.environ['SPARK_CLUSTER'] + ':7077 pyspark-shell '



# Connect to Spark Cluster provided by OpenShift Platform

In [5]:
spark = sparkSessionBuilder.getOrCreate()
spark.sparkContext.setLogLevel("INFO")
print('Spark context started.')

Spark context started.


# Fetch Data from CRM Database - Hosted by OpenShift Platform

In [6]:
### READ FROM POSTGRES ###
postgresUrl = "jdbc:postgresql://postgres-ml-workshop:5432/ml-workshop"
tableName =  "creditcard"
dbUser = "postgres"
dbPassword = "postgres"
numberOfPartitions = 200
partitionColumn = "row_number"


def getUpperBound(sqlContext, postgresUrl, tableName, dbUser, dbPassword):
    # Read from postgres, count the number of lines. We need to know how many lines are there for partitioning.
    upperBound = sqlContext.read \
        .format("jdbc") \
        .option("url", postgresUrl) \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", "(select count(*) as result from " + tableName + ") as upperBound")\
        .option("user", dbUser)\
        .option("password", dbPassword)\
        .load().head()[0]
    
    return upperBound


# Read postrgres data into a dataframe
sqlContext = SQLContext(spark.sparkContext)
reader = sqlContext.read \
    .format("jdbc")\
    .option("url", postgresUrl)\
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", tableName) \
    .option("user", dbUser)\
    .option("password", dbPassword)

upperBound = getUpperBound(sqlContext, postgresUrl, tableName, dbUser, dbPassword)
if (upperBound == 0):
    print("The JDBC source is empty.")

if numberOfPartitions > 0:
    print(f"Using JDBC number of partitions : {numberOfPartitions}")
    reader = reader.option("partitionColumn", partitionColumn) \
        .option("lowerbound", 0) \
        .option("upperBound", upperBound) \
        .option("numPartitions", numberOfPartitions)

print("Creating dataframe...")
dfPostgres = reader.load()
print("Dataframe created.")

Using JDBC number of partitions : 200
Creating dataframe...
Dataframe created.


# Fetch Data from Read Time Kafka Cluster - Provided by OpenShift Platform

In [7]:
srcKafkaBrokers = "kafka-ml-workshop-kafka-bootstrap:9092"
srcKakaTopic = "mlworkshop.creditcard"
    
#Read from JSON Kafka messages into a dataframe
dfKafka = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", srcKafkaBrokers) \
    .option("subscribe", srcKakaTopic) \
    .option("startingOffsets", "earliest") \
    .load() \
    .selectExpr("CAST(value AS STRING) as jsonValue") \
    .rdd.map(lambda row: row["jsonValue"])

#print(f"Source Kafka messages count: {dfKafka.count()}")
print("Loading Kafka JSON messages...")
dfObj = spark.read.json(dfKafka)
print("Dataframe created.")


Loading Kafka JSON messages...
Dataframe created.


# Join the data from CRM and Real Time Kafka and store it in S3. S3 server is provided by OpenShift

In [8]:
### DO A FULL OUTER JOIN ON 2 DATAFRAMES AND WRITE TO S3 BUCKET ###
dfJoined = dfPostgres.join(dfObj, "row_number", how="full")

print("Schema after join.")
dfJoined.printSchema()

print("Writing joined dataframe to S3 bucket...")
dfJoined.write.mode("overwrite").format("csv").save("s3a://data/credticard_clean_csv")
print("Done.")


Schema after join.
root
 |-- row_number: long (nullable = true)
 |-- Time: string (nullable = true)
 |-- V1: string (nullable = true)
 |-- V2: string (nullable = true)
 |-- V3: string (nullable = true)
 |-- V4: string (nullable = true)
 |-- V5: string (nullable = true)
 |-- V6: string (nullable = true)
 |-- V7: string (nullable = true)
 |-- V8: string (nullable = true)
 |-- V9: string (nullable = true)
 |-- V10: string (nullable = true)
 |-- V11: string (nullable = true)
 |-- V12: string (nullable = true)
 |-- V13: string (nullable = true)
 |-- V14: string (nullable = true)
 |-- V15: string (nullable = true)
 |-- V16: string (nullable = true)
 |-- V17: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- Class: string (nullable = true)
 |-- V18: string (nullable = true)
 |-- V19: string (nullable = true)
 |-- V20: string (nullable = true)
 |-- V21: string (nullable = true)
 |-- V22: string (nullable = true)
 |-- V23: string (nullable = true)
 |-- V24: string (nullable = 

In [9]:
### STOP SPARK SESSION ###
spark.stop()