## Data Processing Using Apache Spark on Openshift

This notebook server is hosted on the OpenShift platform which provides a dedicated notebook server for each individual user. The platform takes care of provisioning the cluster resources including the allocation related to storage resources.

### Manually set variables in notebook mode

In [None]:
# import os
# os.environ['S3_ENDPOINT'] = "http://minio-ml-workshop:9000"

###  Prepare S3 URL
Define a function that will convert an S3 URL into URL that works with MINIO

In [1]:
import os, socket
from urllib.parse import urlparse

# Get the S3 URL information and use it in Spark Context
# NOTE: S3 Hadoop API for spark does not work with domain name, use IP address instead
def domain_to_ip(url):
    domain = urlparse(url).netloc.split(":")[0]
    ip_address = socket.gethostbyname(domain)
    ip_url = url.replace(domain, ip_address)
    return ip_url

###  Connect to Spark Cluster provided by OpenShift Platform
Using the given spark_util library, create a Spark session that connects to a Spark cluster dedicated for this notebook. You may add additional Spark submit arguments in the second argument of spark_util.getOrCreateSparkSession() such as additional packages and or override some configuration items.

In [2]:
import spark_util

submit_args = f"--conf spark.hadoop.fs.s3a.endpoint={domain_to_ip(os.environ['S3_ENDPOINT_URL'])} \
--conf spark.hadoop.fs.s3a.access.key=minio \
--conf spark.hadoop.fs.s3a.secret.key=minio123 \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.hadoop.fs.s3a.multipart.size=104857600 \
--conf spark.jars.ivy=/tmp \
--packages org.apache.hadoop:hadoop-aws:3.2.0"
# ,com.amazonaws:aws-java-sdk:1.11.968"

spark = spark_util.getOrCreateSparkSession("ANZ Join Tables", submit_args)

Initializing environment variables for Spark
Cluter name: spark-cluster-rbrigoli
PYSPARK_SUBMIT_ARGS: --conf spark.hadoop.fs.s3a.endpoint=http://172.30.29.255:9000 --conf spark.hadoop.fs.s3a.access.key=minio --conf spark.hadoop.fs.s3a.secret.key=minio123 --conf spark.hadoop.fs.s3a.path.style.access=true --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem --conf spark.hadoop.fs.s3a.multipart.size=104857600 --conf spark.jars.ivy=/tmp --packages org.apache.hadoop:hadoop-aws:3.2.0 --master spark://spark-cluster-rbrigoli:7077 pyspark-shell 
Driver IP address: 10.130.3.188
Creating a spark session...
Spark session created


In [None]:
print(spark.sparkContext.getConf().getAll())
spark

###  Create dataframes from CSV files

Using Spark, read the CSV filed from S3 storage and load them as Spark dataframes.

In [3]:
df_cfms_cncrn = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://raw-data-anz/CFMS_CNCRN.csv")
#df_cfms_cncrn.printSchema()

df_cfms_issue = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://raw-data-anz/CFMS_ISSUE.csv")
#df_cfms_issue.printSchema()

df_salesforce = spark.read\
                .options(delimeter=',', inferSchema='True', header='True') \
                .csv("s3a://raw-data-anz/SALESFORCECMOSAU_CASE.csv")
#df_salesforce.printSchema()

### Join dataframes
Perform a full outer join on two dataframes using ```CNCRN_ID``` as key

In [67]:
from pyspark.sql.functions import col

df_cfms_issue_alias = df_cfms_issue.select(col("CNCRN_ID"), col("ID").alias("CF_ID"), \
                                           col("ISSUE_DS").alias("CF_ISSUE_DS"), col("END_DATE").alias("CF_END_DATE"))

df_cfms_joined = df_cfms_cncrn.join(df_cfms_issue_alias, "CNCRN_ID", how="left_outer")
#df_cfms_joined.printSchema()

df_salesforce_alias = df_salesforce.select(col("CASENUMBER_C").alias("CNCRN_ID"), col("ID").alias("SF_ID"), \
                                           col("ISSUE_DS").alias("SF_ISSUE_DS"), col("END_DATE").alias("SF_END_DATE"))

df_cfms_salesforce_joined = df_cfms_joined.join(df_salesforce_alias, "CNCRN_ID", how="left_outer")
#df_cfms_salesforce_joined.printSchema()


df_cfms_salesforce_joined.show()

+--------+-------------+----------+--------------------+-------+-----+--------------------+-----------+-----+--------------------+-----------+
|CNCRN_ID|SOURCE_SYSTEM|   RECVD_D|            CNCRN_DS|   STAT|CF_ID|         CF_ISSUE_DS|CF_END_DATE|SF_ID|         SF_ISSUE_DS|SF_END_DATE|
+--------+-------------+----------+--------------------+-------+-----+--------------------+-----------+-----+--------------------+-----------+
|    A111|         CFMS|2019-06-14|Something's wrong...|   Open|   11|Something's wrong...| 2050-01-01| null|                null|       null|
|    A111|         CFMS|2019-06-14|Something's wrong...|   Open|    1|Something's wrong...| 2025-01-01| null|                null|       null|
|    A112|         CFMS|2020-06-12|Something's wrong...|   Open|   12|Something's wrong...| 2050-01-01| null|                null|       null|
|    A112|         CFMS|2020-06-12|Something's wrong...|   Open|    2|Something's wrong...| 2025-01-01| null|                null|       null|

###  Push the prepared data to the object storage and stop the Spark application
Write the joined dataframe to an S3 bucket as CSV and as Parquet file.

In [68]:
file_location = "s3a://data-anz/cfms_concern"
df_cfms_salesforce_joined.write.mode("overwrite")\
    .option("header", "true")\
    .format("csv").save(file_location)

In [69]:
file_location = "s3a://data-anz/cfms_concern_parquet"
df_cfms_salesforce_joined.write.mode("overwrite").parquet(file_location)

## Query Parquet file

In [70]:
from pyspark.sql.functions import *  

file_location = "s3a://data-anz/cfms_concern_parquet"
df_parquet = spark.read.parquet(file_location)

#df_queried.show()

df_formatted = df_parquet.select(col("CNCRN_ID"), col("SOURCE_SYSTEM").alias("Source"),\
                                col("RECVD_D").alias("Date Recvd"), col("CNCRN_DS").alias("Concern Description"),\
                                col("STAT").alias("Status"), concat_ws("", df_parquet.CF_ISSUE_DS, df_parquet.SF_ISSUE_DS).alias("Issue Description"),\
                                concat_ws("", df_parquet.CF_END_DATE, df_parquet.SF_END_DATE).alias("End Date"))

df_formatted.show()


+--------+----------+----------+--------------------+-------+--------------------+-----------+
|CNCRN_ID|    Source|Date Recvd| Concern Description| Status|   Issue Description|   End Date|
+--------+----------+----------+--------------------+-------+--------------------+-----------+
|    A111|      CFMS|2019-06-14|Something's wrong...|   Open|Something's wrong...| 2050-01-01|
|    A111|      CFMS|2019-06-14|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A112|      CFMS|2020-06-12|Something's wrong...|   Open|Something's wrong...| 2050-01-01|
|    A112|      CFMS|2020-06-12|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A113|      CFMS|2021-06-01|Something's wrong...|   Open|Something's wrong...| 2050-01-01|
|    A113|      CFMS|2021-06-01|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A114|      CFMS|2019-07-23|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A115|      CFMS|2019-08-24|Something's wrong.

### Display formatted data using SQL syntax

In [71]:
df_parquet.createOrReplaceTempView("concerns")
sqldf = spark.sql("SELECT CNCRN_ID, SOURCE_SYSTEM as SOURCE, RECVD_D as RECVD, CNCRN_DS, STAT,\
concat_ws('',CF_ISSUE_DS, SF_ISSUE_DS) as ISSUE_DS, concat_ws('',CF_END_DATE,SF_END_DATE) as END_DATE FROM concerns")
sqldf.show()

+--------+----------+----------+--------------------+-------+--------------------+-----------+
|CNCRN_ID|    SOURCE|     RECVD|            CNCRN_DS|   STAT|            ISSUE_DS|   END_DATE|
+--------+----------+----------+--------------------+-------+--------------------+-----------+
|    A111|      CFMS|2019-06-14|Something's wrong...|   Open|Something's wrong...| 2050-01-01|
|    A111|      CFMS|2019-06-14|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A112|      CFMS|2020-06-12|Something's wrong...|   Open|Something's wrong...| 2050-01-01|
|    A112|      CFMS|2020-06-12|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A113|      CFMS|2021-06-01|Something's wrong...|   Open|Something's wrong...| 2050-01-01|
|    A113|      CFMS|2021-06-01|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A114|      CFMS|2019-07-23|Something's wrong...|   Open|Something's wrong...| 2025-01-01|
|    A115|      CFMS|2019-08-24|Something's wrong.

### Pivot dataframe on ISSUE_ID

In [108]:
df_col_merged = df_parquet.select(col("CNCRN_ID"), col("SOURCE_SYSTEM"), col("RECVD_D"), col("CNCRN_DS"), col("STAT"),\
                                  concat_ws("F",df_parquet.CF_ID, df_parquet.SF_ID).alias("ISSUE_ID"),\
                                  concat_ws("", df_parquet.CF_ISSUE_DS, df_parquet.SF_ISSUE_DS).alias("ISSUE_DS"),\
                                  concat_ws("", df_parquet.CF_END_DATE, df_parquet.SF_END_DATE).alias("END_DATE"))

df_col_merged.show(20, False)

df_grouped = df_col_merged.groupBy("CNCRN_ID", "SOURCE_SYSTEM", "RECVD_D","CNCRN_DS","STAT").count()
df_grouped.show(20, False)

df_pivoted = df_col_merged.groupBy("CNCRN_ID", "SOURCE_SYSTEM", "RECVD_D","CNCRN_DS","STAT")\
.pivot("ISSUE_ID").agg(first("ISSUE_DS").alias("ISSUE_DS"), first("END_DATE").alias("END_DATE"))

df_pivoted.head()

+--------+-------------+----------+----------------------+-------+--------+---------------------------------------+-----------+
|CNCRN_ID|SOURCE_SYSTEM|RECVD_D   |CNCRN_DS              |STAT   |ISSUE_ID|ISSUE_DS                               |END_DATE   |
+--------+-------------+----------+----------------------+-------+--------+---------------------------------------+-----------+
|A111    |CFMS         |2019-06-14|Something's wrong here|Open   |11      |Something's wrong in CRFMS - A111      |2050-01-01 |
|A111    |CFMS         |2019-06-14|Something's wrong here|Open   |1       |Something's wrong in CRFMS - A111      |2025-01-01 |
|A112    |CFMS         |2020-06-12|Something's wrong here|Open   |12      |Something's wrong in CRFMS - A112      |2050-01-01 |
|A112    |CFMS         |2020-06-12|Something's wrong here|Open   |2       |Something's wrong in CRFMS - A112      |2025-01-01 |
|A113    |CFMS         |2021-06-01|Something's wrong here|Open   |13      |Something's wrong in CRFMS - 

Row(CNCRN_ID='A112', SOURCE_SYSTEM='CFMS', RECVD_D='2020-06-12', CNCRN_DS="Something's wrong here", STAT='Open', 1_ISSUE_DS=None, 1_END_DATE=None, 10_ISSUE_DS=None, 10_END_DATE=None, 11_ISSUE_DS=None, 11_END_DATE=None, 12_ISSUE_DS="Something's wrong in CRFMS - A112", 12_END_DATE='2050-01-01', 13_ISSUE_DS=None, 13_END_DATE=None, 14_ISSUE_DS=None, 14_END_DATE=None, 15_ISSUE_DS=None, 15_END_DATE=None, 16_ISSUE_DS=None, 16_END_DATE=None, 2_ISSUE_DS="Something's wrong in CRFMS - A112", 2_END_DATE='2025-01-01', 3_ISSUE_DS=None, 3_END_DATE=None, 4_ISSUE_DS=None, 4_END_DATE=None, 5_ISSUE_DS=None, 5_END_DATE=None, 6_ISSUE_DS=None, 6_END_DATE=None, 7_ISSUE_DS=None, 7_END_DATE=None, 8_ISSUE_DS=None, 8_END_DATE=None, 9_ISSUE_DS=None, 9_END_DATE=None)

### Concateneate column values of rows into a singale column 

In [107]:
df_parent = df_col_merged.select("CNCRN_ID", "SOURCE_SYSTEM", "RECVD_D","CNCRN_DS","STAT").distinct()
df_sublist = df_col_merged.groupBy("CNCRN_ID").agg(collect_list("ISSUE_DS").alias("ISSUE_DS"), collect_list("END_DATE").alias("END_DATES"))

df_parent.join(df_sublist, "CNCRN_ID", how="inner").show(20, True)

+--------+-------------+----------+--------------------+-------+--------------------+--------------------+
|CNCRN_ID|SOURCE_SYSTEM|   RECVD_D|            CNCRN_DS|   STAT|            ISSUE_DS|           END_DATES|
+--------+-------------+----------+--------------------+-------+--------------------+--------------------+
|    A116|   SALESFORCE|2019-09-11|Something's wrong...| Closed|[Something's wron...|        [2029-05-01]|
|    A126|   SALESFORCE|2019-09-11|Something's wrong...| Closed|[Something's wron...|        [2029-08-01]|
|    A130|   SALESFORCE|2020-11-14|Something's wrong...|   Open|[Something's wron...|[2029-08-210, 202...|
|    A125|         CFMS|2019-08-24|Something's wrong...| Closed|[Something's wron...|       [2026-01-210]|
|    A117|   SALESFORCE|2019-10-12|Something's wrong...|Pending|[Something's wron...|        [2029-05-01]|
|    A127|   SALESFORCE|2019-10-12|Something's wrong...|Pending|[Something's wron...|        [2029-08-01]|
|    A121|         CFMS|2019-06-14|So

### Stop Spark Session
Because this is last step of our data preparation, we don't need the Spark cluster anymore. We will stop the Spark context which will remove the Spark application from the cluster.

In [None]:
spark.stop()