### Start the kernel specific to SPARK

* conda env list
* conda activate spark
* in visual code IDE, select the environment as python-spark
* check that in the selected environment pyspark is installed

In [1]:
!pip3 show pyspark
!pip3 show findspark

Name: pyspark
Version: 2.4.4
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.7/site-packages
Requires: py4j
Required-by: 
Name: findspark
Version: 1.3.0
Summary: Find pyspark to make it importable.
Home-page: https://github.com/minrk/findspark
Author: Min RK
Author-email: benjaminrk@gmail.com
License: BSD (3-clause)
Location: /usr/local/lib/python3.7/site-packages
Requires: 
Required-by: 


In [2]:
# The following line help the Jupyter program to find the Spark binaries to run the job
import findspark
findspark.init()

In [5]:
# Sample program to validate pySpark library is available
import pyspark
sc = pyspark.SparkContext('local[*]')

txt = sc.textFile('file:////Users/s0m0158/github/spark-scala/README.md')
print(txt.count())

python_lines = txt.filter(lambda line: 'python' in line.lower())
print(python_lines.count())


#big_list = range(10000)
#>>> rdd = sc.parallelize(big_list, 2)
#>>> odds = rdd.filter(lambda x: x % 2 != 0)
#>>> odds.take(5)

11
0


In [6]:
# Another random code snippet to check if the Spark session is still alive after the previous cell execution
big_list = range(10000)
rdd = sc.parallelize(big_list, 2)
odds = rdd.filter(lambda x: x % 2 != 0)
odds.take(5)

[1, 3, 5, 7, 9]

In [3]:
# Load the CCM properties file
from os.path import expanduser
home = expanduser("~")

separator = "="
keys = {}

# I named your file conf and stored it 
# in the same directory as the script

with open(home+'/nexus.prop') as f:

    for line in f:
        if separator in line:

            # Find the name and value by splitting the string
            name, value = line.split(separator, 1)

            # Assign key value pair to dict
            # strip() removes white space from the ends of strings
            keys[name.strip()] = value.strip()

#print(keys)

In [4]:
#This section handles adding JDBC driver to the PYSPARK shell
import os
jdbc_connector_mysql=keys["jdbc-connector-mysql"]

os.environ["PYSPARK_SUBMIT_ARGS"] = f"--jars file://{jdbc_connector_mysql} pyspark-shell"


In [6]:
#Create sparksession instance

spark.stop()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [42]:
# Fetch 1 workflow records from the templates table
#database = 'reach'
#url='jdbc:mysql://{}/{}'.format(host, database)
#table = 'reach.tasks'

table = "(SELECT workflowId, REPLACE(CONVERT(REPLACE(workflow,'&amp;','&') using utf8),'','') as workflow FROM reach.templates LIMIT 3) AS t"

url = keys["reach-dev-url"]
#query='select taskid from tasks limit 10'
user = keys["reach-dev-userid"]
password = keys["reach-dev-password"]

properties = {
    'user': user,
    'password': password,
    'driver': 'com.mysql.jdbc.Driver',
    #'query': 'select taskid from tasks limit 10',
    'fetchsize': '10'
}

df = spark.read.jdbc(url, table, properties=properties)
#df = sqlContext.read.jdbc(url, query, properties=properties)
df.show()

+--------------------+--------------------+
|          workflowId|            workflow|
+--------------------+--------------------+
|61cb537d-6af8-4bb...|{"id":"61cb537d-6...|
|61cb537d-6af8-4bb...|{"id":"61cb537d-6...|
|61cb537d-6af8-4bb...|{"id":"61cb537d-6...|
+--------------------+--------------------+



In [7]:
#Parse the json column
df.select("workflow").show()

+--------------------+
|            workflow|
+--------------------+
|{"id":"61cb537d-6...|
|{"id":"61cb537d-6...|
|{"id":"61cb537d-6...|
+--------------------+



In [None]:
#from pyspark.sql.functions import from_json, col
#json_schema = spark.read.json(df.select("workflow").rdd.map(lambda row: row.json)).schema
#df.withColumn('json', from_json(col('json'), json_schema))

workflowRDD = df.select("workflow").rdd

workflowRDD.map(lambda r: r.).take(2)
#workflowRDD.map(lambda r: r.toJSON).take(2)
#workflowRDD.take(2)

#new_df = spark.read.json(.map(lambda r: r.json))
#new_df.printSchema()

#df.show()

In [23]:
workflowRDD2 = df.select("workflow").toJSON()


In [24]:
workflowRDD2.map(lambda r: type(r)).take(2)

[str, str]

In [26]:
workflowRDD2.map(lambda r: r[1:30]).take(2)

['"workflow":"{\\"id\\":\\"61cb537', '"workflow":"{\\"id\\":\\"61cb537']

In [27]:
json_schema = spark.read.json(workflowRDD2).schema

In [30]:
print(json_schema.fields)

[StructField(workflow,StringType,true)]


In [33]:
from pyspark.sql.functions import from_json, col
df.withColumn('json', from_json(col('workflow'), json_schema)).printSchema

<bound method DataFrame.printSchema of DataFrame[workflowId: string, workflow: string, json: struct<workflow:string>]>

In [36]:
workflowRDD3 = df.select("workflow").rdd.map(list)
workflowRDD3.cache()

PythonRDD[72] at RDD at PythonRDD.scala:53

In [38]:
workflowRDD3.map(lambda r: type(r)).take(2)

[list, list]

In [48]:
workflowRDD4 = workflowRDD3.map(lambda r: r[0])

In [49]:
workflowRDD4.map(lambda r: r[1:30]).take(2)

['"id":"61cb537d-6af8-4bb5-8443', '"id":"61cb537d-6af8-4bb5-8443']

In [50]:
json_schema2 = spark.read.json(workflowRDD4).schema

In [51]:
print(json_schema2.fields)

[StructField(adGroups,ArrayType(StringType,true),true), StructField(createdBy,StringType,true), StructField(createdTs,LongType,true), StructField(description,StringType,true), StructField(id,StringType,true), StructField(isActive,BooleanType,true), StructField(languageCode,StringType,true), StructField(languages,ArrayType(StringType,true),true), StructField(lastPublishedBy,StringType,true), StructField(lastPublishedTs,LongType,true), StructField(lastUpdatedBy,StringType,true), StructField(lastUpdatedTs,LongType,true), StructField(lockedBy,StringType,true), StructField(name,StringType,true), StructField(roleGroups,ArrayType(StringType,true),true), StructField(roles,ArrayType(StructType(List(StructField(groups,ArrayType(StringType,true),true),StructField(name,StringType,true))),true),true), StructField(rules,ArrayType(StringType,true),true), StructField(sections,ArrayType(StructType(List(StructField(description,StringType,true),StructField(fields,ArrayType(StructType(List(StructField(cus

In [52]:
from pyspark.sql.functions import from_json, col
df.withColumn('json', from_json(col('workflow'), json_schema2)).printSchema

<bound method DataFrame.printSchema of DataFrame[workflowId: string, workflow: string, json: struct<adGroups:array<string>,createdBy:string,createdTs:bigint,description:string,id:string,isActive:boolean,languageCode:string,languages:array<string>,lastPublishedBy:string,lastPublishedTs:bigint,lastUpdatedBy:string,lastUpdatedTs:bigint,lockedBy:string,name:string,roleGroups:array<string>,roles:array<struct<groups:array<string>,name:string>>,rules:array<string>,sections:array<struct<description:string,fields:array<struct<customData:struct<affectedId:string,options:struct<attachmentName:string,options:array<struct<options:array<struct<options:array<struct<options:array<string>,value:string>>,value:string>>,value:string>>,placeholders:array<string>>>,helperText:string,hidden:boolean,id:string,includeYear:boolean,isFilterable:boolean,label:string,maxFiles:bigint,options:array<struct<order:bigint,value:string>>,order:bigint,placeholder:string,prefix:string,required:boolean,responses:array<stri

In [54]:
new_df=df.withColumn('json', from_json(col('workflow'), json_schema2))
new_df.cache()

DataFrame[workflowId: string, workflow: string, json: struct<adGroups:array<string>,createdBy:string,createdTs:bigint,description:string,id:string,isActive:boolean,languageCode:string,languages:array<string>,lastPublishedBy:string,lastPublishedTs:bigint,lastUpdatedBy:string,lastUpdatedTs:bigint,lockedBy:string,name:string,roleGroups:array<string>,roles:array<struct<groups:array<string>,name:string>>,rules:array<string>,sections:array<struct<description:string,fields:array<struct<customData:struct<affectedId:string,options:struct<attachmentName:string,options:array<struct<options:array<struct<options:array<struct<options:array<string>,value:string>>,value:string>>,value:string>>,placeholders:array<string>>>,helperText:string,hidden:boolean,id:string,includeYear:boolean,isFilterable:boolean,label:string,maxFiles:bigint,options:array<struct<order:bigint,value:string>>,order:bigint,placeholder:string,prefix:string,required:boolean,responses:array<string>,sectionId:string,specifiedFileType:

In [72]:
from pyspark.sql.functions import explode
new_df.select("json.id", "json.description", explode("json.startADGroups").alias("start_adgroup")).show(truncate=False)

+------------------------------------+-----------------------------------------------------------+-------------------------------------------------------------------------------------------+
|json.id                             |json.description                                           |start_adgroup                                                                              |
+------------------------------------+-----------------------------------------------------------+-------------------------------------------------------------------------------------------+
|61cb537d-6af8-4bb5-8443-aedb2acdeaa1|Track & monitor government agency contacts at US facilities|CN=CMAlcoholTobaccoSME,OU=Archer,OU=Apps,DC=homeoffice,DC=Wal-Mart,DC=com                  |
|61cb537d-6af8-4bb5-8443-aedb2acdeaa1|Track & monitor government agency contacts at US facilities|CN=CMHealthSafetySME,OU=Archer,OU=Apps,DC=homeoffice,DC=Wal-Mart,DC=com                    |
|61cb537d-6af8-4bb5-8443-aedb2acdeaa1|Track &

In [110]:
new_df2 = new_df.select(col("json.id").alias("id"), explode("json.sections").alias("section")).select("id", explode("section.fields").alias("field")).select(col("field.id").alias("fieldId")).distinct().limit(5)

In [115]:
new_df2.show(10)

+--------------------+
|             fieldId|
+--------------------+
|d5e5f3d5-d22d-48d...|
|7330c3ba-d3ab-4d5...|
|15fa0c80-06ac-43d...|
|39bcdbcd-1a45-44d...|
|7e3e4d3f-e30c-441...|
+--------------------+



In [111]:
from pyspark.sql.functions import lit
new_df3 = new_df2.select("fieldId", lit(1).alias("id"))

In [112]:
new_df3.show()

+--------------------+---+
|             fieldId| id|
+--------------------+---+
|d5e5f3d5-d22d-48d...|  1|
|7330c3ba-d3ab-4d5...|  1|
|15fa0c80-06ac-43d...|  1|
|39bcdbcd-1a45-44d...|  1|
|7e3e4d3f-e30c-441...|  1|
+--------------------+---+



In [113]:
from pyspark.sql.functions import lit
pivotDF = new_df3.groupBy("id").pivot("fieldId").min("id")

In [114]:
pivotDF.show()

+---+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
| id|15fa0c80-06ac-43df-8193-4fbde9603de3|39bcdbcd-1a45-44de-b16b-0266aac67979|7330c3ba-d3ab-4d5a-847e-7d106693e9bb|7e3e4d3f-e30c-4414-970a-d7c46af06347|d5e5f3d5-d22d-48df-831e-f0a5d99db2bd|
+---+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|  1|                                   1|                                   1|                                   1|                                   1|                                   1|
+---+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+



In [130]:
#new_df3.join(pivotDF,on="id", how="inner").show()
cross_df = new_df3.crossJoin(pivotDF)

In [131]:
cross_df.show()

+--------------------+---+---+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|             fieldId| id| id|15fa0c80-06ac-43df-8193-4fbde9603de3|39bcdbcd-1a45-44de-b16b-0266aac67979|7330c3ba-d3ab-4d5a-847e-7d106693e9bb|7e3e4d3f-e30c-4414-970a-d7c46af06347|d5e5f3d5-d22d-48df-831e-f0a5d99db2bd|
+--------------------+---+---+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|d5e5f3d5-d22d-48d...|  1|  1|                                   1|                                   1|                                   1|                                   1|                                   1|
|7330c3ba-d3ab-4d5...|  1|  1|                                   1|                                   1|                                

In [135]:
column_list = cross_df.drop("fieldId", "id").columns

In [136]:
print(column_list)

['15fa0c80-06ac-43df-8193-4fbde9603de3', '39bcdbcd-1a45-44de-b16b-0266aac67979', '7330c3ba-d3ab-4d5a-847e-7d106693e9bb', '7e3e4d3f-e30c-4414-970a-d7c46af06347', 'd5e5f3d5-d22d-48df-831e-f0a5d99db2bd']


In [145]:
pre_fin_df = cross_df.drop("id").select(*[lit(column).alias(column) if column in column_list else column for column in cross_df.drop("id").columns])

In [146]:
pre_fin_df.show()

+--------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|             fieldId|15fa0c80-06ac-43df-8193-4fbde9603de3|39bcdbcd-1a45-44de-b16b-0266aac67979|7330c3ba-d3ab-4d5a-847e-7d106693e9bb|7e3e4d3f-e30c-4414-970a-d7c46af06347|d5e5f3d5-d22d-48df-831e-f0a5d99db2bd|
+--------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|d5e5f3d5-d22d-48d...|                15fa0c80-06ac-43d...|                39bcdbcd-1a45-44d...|                7330c3ba-d3ab-4d5...|                7e3e4d3f-e30c-441...|                d5e5f3d5-d22d-48d...|
|7330c3ba-d3ab-4d5...|                15fa0c80-06ac-43d...|                39bcdbcd-1a45-44d...|                7330c3ba-d3ab-4d5...|                7e3e4d3f-e30c-441..

In [148]:
fin_df = pre_fin_df.drop("fieldId").distinct()

In [149]:
fin_df.show()

+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|15fa0c80-06ac-43df-8193-4fbde9603de3|39bcdbcd-1a45-44de-b16b-0266aac67979|7330c3ba-d3ab-4d5a-847e-7d106693e9bb|7e3e4d3f-e30c-4414-970a-d7c46af06347|d5e5f3d5-d22d-48df-831e-f0a5d99db2bd|
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+
|                15fa0c80-06ac-43d...|                39bcdbcd-1a45-44d...|                7330c3ba-d3ab-4d5...|                7e3e4d3f-e30c-441...|                d5e5f3d5-d22d-48d...|
+------------------------------------+------------------------------------+------------------------------------+------------------------------------+------------------------------------+



In [7]:
# Load data from templates table for workflowid=a611477c-41f1-4a9e-9721-d7afeaf55099
#REPLACE(CONVERT(REPLACE(workflow,'&amp;','&') using utf8),'','') as workflow
table = "(SELECT workflowId, workflowVersion, publishStatus, isActive, lastUpdatedTs, lastPublishedTs, REPLACE(CONVERT(REPLACE(lastUpdatedBy,'&amp;','&') using utf8),'','') as lastUpdatedBy, REPLACE(CONVERT(REPLACE(lastPublishedBy,'&amp;','&') using utf8),'','') as lastPublishedBy, REPLACE(CONVERT(REPLACE(createdBy,'&amp;','&') using utf8),'','') as createdBy, createdTs, name, description, REPLACE(CONVERT(REPLACE(workflow,'&amp;','&') using utf8),'','') as workflow FROM reach.templates where workflowid='a611477c-41f1-4a9e-9721-d7afeaf55099' and workflowversion < 11) AS t"

#table = "(SELECT workflowid, name, count(1) as cnt from reach.templates group by workflowid, name) AS t"

url = keys["reach-dev-url"]
user = keys["reach-dev-userid"]
password = keys["reach-dev-password"]

properties = {
    'user': user,
    'password': password,
    'driver': 'com.mysql.jdbc.Driver',
    'fetchsize': '10'
}

templatesDF = spark.read.jdbc(url, table, properties=properties)
templatesDF.cache()

DataFrame[workflowId: string, workflowVersion: int, publishStatus: string, isActive: int, lastUpdatedTs: timestamp, lastPublishedTs: timestamp, lastUpdatedBy: string, lastPublishedBy: string, createdBy: string, createdTs: timestamp, name: string, description: string, workflow: string]

In [8]:
templatesDF.select("workflowid", "workflowversion", "publishstatus", "isactive", "lastupdatedts", "lastpublishedts", "name", "description").show(truncate=False)

+------------------------------------+---------------+-------------+--------+-------------------+-------------------+---------------------------------------+-------------------------------------------------------------------------------------------+
|workflowid                          |workflowversion|publishstatus|isactive|lastupdatedts      |lastpublishedts    |name                                   |description                                                                                |
+------------------------------------+---------------+-------------+--------+-------------------+-------------------+---------------------------------------+-------------------------------------------------------------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |published    |0       |2019-01-22 16:13:32|2019-01-22 16:13:32|Health and Wellness Regulatory Contacts|Track and monitor government agency contacts for Health and Wellness areas in US facilities|


In [9]:
#Flatten out the workflowMetadata information from templates table
#templatesFlatten1 = templatesDF.select("workflowid", "workflowversion", "workflow")
templatesFlatten1 = templatesDF.select("workflow")
templatesFlatten2 = templatesFlatten1.rdd.map(list)
templatesFlatten2.cache()

PythonRDD[15] at RDD at PythonRDD.scala:53

In [10]:
templatesFlatten3 = templatesFlatten2.map(lambda r : r[0])
#templatesFlatten3.map(lambda r : type(r)).take(2)
workflow_schema = spark.read.json(templatesFlatten3).schema
print(workflow_schema.fields)

[StructField(adGroups,ArrayType(StringType,true),true), StructField(createdBy,StringType,true), StructField(createdTs,LongType,true), StructField(description,StringType,true), StructField(id,StringType,true), StructField(isActive,BooleanType,true), StructField(languageCode,StringType,true), StructField(languages,ArrayType(StringType,true),true), StructField(lastPublishedBy,StringType,true), StructField(lastPublishedTs,LongType,true), StructField(lastUpdatedBy,StringType,true), StructField(lastUpdatedTs,LongType,true), StructField(lockedBy,StringType,true), StructField(name,StringType,true), StructField(roleGroups,ArrayType(StringType,true),true), StructField(roles,ArrayType(StructType(List(StructField(groups,ArrayType(StringType,true),true),StructField(name,StringType,true))),true),true), StructField(rules,ArrayType(StringType,true),true), StructField(sections,ArrayType(StructType(List(StructField(description,StringType,true),StructField(fields,ArrayType(StructType(List(StructField(cus

In [11]:
from pyspark.sql.functions import from_json, col, explode
templateParsed = templatesFlatten1.withColumn('json', from_json(col('workflow'), workflow_schema))
templateParsed.printSchema

<bound method DataFrame.printSchema of DataFrame[workflow: string, json: struct<adGroups:array<string>,createdBy:string,createdTs:bigint,description:string,id:string,isActive:boolean,languageCode:string,languages:array<string>,lastPublishedBy:string,lastPublishedTs:bigint,lastUpdatedBy:string,lastUpdatedTs:bigint,lockedBy:string,name:string,roleGroups:array<string>,roles:array<struct<groups:array<string>,name:string>>,rules:array<string>,sections:array<struct<description:string,fields:array<struct<customData:struct<affectedId:string,options:string>,helperText:string,hidden:boolean,id:string,includeYear:boolean,isFilterable:boolean,label:string,maxFiles:bigint,options:array<struct<order:bigint,value:string>>,order:bigint,placeholder:string,prefix:string,required:boolean,responses:array<string>,sectionId:string,specifiedFileType:array<string>,type:string>>,hidden:boolean,id:string,order:bigint,title:string,type:string,workflowId:string>>,startADGroups:array<string>,steps:array<struct<add

In [12]:
###****************DEBUG****************###
#templateParsed.count()
templateParsed.select(col("json.id").alias("workflowid"), col("json.version").alias("workflowVersion"), explode("json.sections").alias("section")).select("workflowid", "workflowversion", col("section.id").alias("sectionid")).filter(col("workflowversion")==10).show(truncate=False)

+------------------------------------+---------------+------------------------------------+
|workflowid                          |workflowversion|sectionid                           |
+------------------------------------+---------------+------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |25b4d30e-21b0-452f-aaa7-3a13dd71cb3e|
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |590f3e27-ecfc-44dc-a286-dfd5a5434ba0|
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |7429d574-210a-4f23-a674-887973473efd|
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |8381161a-10cb-4013-9f7e-07916b083093|
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |3f472a0e-dcef-4f55-aeae-b49c686b5fd1|
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |f76f8151-f799-4929-904e-c386743c31e3|
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |2fff901d-e085-49f4-a3c9-1cff08d79ef9|
|a611477c-41f1-4a9e-9721-d7afeaf55099|10             |4c6e329f-5821-4403-8649-74

In [13]:
###****************DEBUG****************###
from pyspark.sql.functions import collect_set, sort_array
templateSectionsDF = templateParsed.select(col("json.id").alias("workflowid"), col("json.version").alias("workflowVersion"), col("json.sections").alias("sections"))

templateSectionExplodedDF = templateSectionsDF.select("workflowid", "workflowVersion", explode("sections").alias("section")).select("workflowid", "workflowversion", col("section.id").alias("sectionid"), col("section.description").alias("sectiondesc"), col("section.order").alias("sectionorder"), col("section.title").alias("sectiontitle"), col("section.type").alias("sectiontype"), col("section.fields").alias("fields"))

templateSectionExplodedDF.groupBy(col("workflowid"), col("sectionid")).agg(sort_array(collect_set(col("workflowversion")))).show(truncate=False)

+------------------------------------+------------------------------------+----------------------------------------------+
|workflowid                          |sectionid                           |sort_array(collect_set(workflowversion), true)|
+------------------------------------+------------------------------------+----------------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|3f472a0e-dcef-4f55-aeae-b49c686b5fd1|[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]               |
|a611477c-41f1-4a9e-9721-d7afeaf55099|590f3e27-ecfc-44dc-a286-dfd5a5434ba0|[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]               |
|a611477c-41f1-4a9e-9721-d7afeaf55099|8381161a-10cb-4013-9f7e-07916b083093|[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]               |
|a611477c-41f1-4a9e-9721-d7afeaf55099|7429d574-210a-4f23-a674-887973473efd|[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]               |
|a611477c-41f1-4a9e-9721-d7afeaf55099|ebac1f96-a692-4e86-84aa-93bf0b6a67ae|[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]               |
|a611477c-41f1-4

In [14]:
###****************DEBUG****************###
templateSectionExplodedDF.select("workflowid", "workflowversion", "sectionid", "sectionorder", "sectiontitle", "sectiontype").filter(col("workflowversion").isin({1, 2})).show(truncate=False)

+------------------------------------+---------------+------------------------------------+------------+-----------------------------------+-----------+
|workflowid                          |workflowversion|sectionid                           |sectionorder|sectiontitle                       |sectiontype|
+------------------------------------+---------------+------------------------------------+------------+-----------------------------------+-----------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |25b4d30e-21b0-452f-aaa7-3a13dd71cb3e|1           |Facility Instructions              |input      |
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |590f3e27-ecfc-44dc-a286-dfd5a5434ba0|1           |Facility Information               |input      |
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |7429d574-210a-4f23-a674-887973473efd|1           |Regulatory Agency Contact          |input      |
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |8381161a-10cb-4013-9f7e-0791

In [23]:
###****************DEBUG****************###
from pyspark.sql.functions import substring
templateFieldExplodedDF = templateSectionExplodedDF.select("workflowid", "workflowversion", "sectionid", "sectionorder", "sectiontitle", "sectiontype", explode("fields").alias("field")).select("workflowid", "workflowversion", "sectionid", "sectionorder", "sectiontitle", "sectiontype", col("field.id").alias("fieldid"), col("field.helpertext").alias("fieldhelpertext"), col("field.hidden").alias("fieldishidden"), col("field.includeyear").alias("fieldincludeyear"), col("field.isfilterable").alias("fieldisfilterable"), col("field.label").alias("fieldlabel"), col("field.options").alias("fieldoptions"), col("field.order").alias("fieldorder"), col("field.placeholder").alias("fieldplaceholder"), col("field.prefix").alias("fieldprefix"), col("field.required").alias("fieldidrequired"), col("field.responses").alias("fieldresponses"), col("field.type").alias("fieldtype"))

#templateFieldExplodedDF.select(col("workflowversion").alias("wid"), substring("sectionid", 0, 3).alias("sid"), col("sectionorder").alias("sord"), substring("sectiontitle",0, 20).alias("sttl"), col("sectiontype").alias("styp"), substring("fieldid", 0, 10).alias("fid"), substring("fieldhelpertext", 0, 15).alias("fhlp"), col("fieldishidden").alias("fhid"), col("fieldincludeyear").alias("fiy"), col("fieldisfilterable").alias("fif"), substring("fieldlabel", 0, 15).alias("flbl"), substring(col("fieldoptions").cast("string"), 0, 30).alias("fopt"), "fieldorder", "fieldidrequired", "fieldresponses", "fieldtype").filter(col("wid") == 1).distinct().sort(col("sid"), col("fieldorder")).show(50, truncate=False)

templateFieldExplodedDF.select(col("workflowversion").alias("wid"), substring("sectionid", 0, 3).alias("sid"), col("sectionorder").alias("sord"), substring("sectiontitle",0, 50).alias("sttl"), col("sectiontype").alias("styp"), substring("fieldid", 0, 10).alias("fid"), substring("fieldlabel", 0, 15).alias("flbl"), "fieldorder", "fieldtype").filter(col("wid") == 1).distinct().sort(col("sid"), col("fieldorder")).show(50, truncate=False)
#sectiontitle_coalesce(fieldlabel, fieldtype)

+---+---+----+-----------------------------------+-----+----------+---------------+----------+---------------------+
|wid|sid|sord|sttl                               |styp |fid       |flbl           |fieldorder|fieldtype            |
+---+---+----+-----------------------------------+-----+----------+---------------+----------+---------------------+
|1  |2ff|1   |Observation                        |task |3143aaff-c|Observation Nam|1         |shortAnswer          |
|1  |2ff|1   |Observation                        |task |33a5a586-3|Observation Des|2         |longAnswer           |
|1  |2ff|1   |Observation                        |task |969c925d-1|Assign To Role |3         |rolesAndGroups       |
|1  |2ff|1   |Observation                        |task |480e2bff-6|Status         |4         |radioButton          |
|1  |2ff|1   |Observation                        |task |8ce662ae-4|null           |5         |dependentDropdownList|
|1  |2ff|1   |Observation                        |task |a65efc78

In [16]:
#Printing a single JSON workflow metadata to look at all records in full
tmpdir = keys["tmp-dir"]
templateParsed.select("json").limit(1).write.mode("overwrite").json(tmpdir + "/templates")

In [83]:
parsedWorkflowMeta1 = templateParsed.select(col("json.id").alias("workflowid"), col("json.version").alias("workflowVersion"), explode("json.sections").alias("section")).select("workflowid", "workflowversion", col("section.id").alias("sectionid"), explode("section.fields").alias("field")).select("workflowid", "workflowversion", "sectionid", col("field.id").alias("fieldId")).distinct()

In [85]:
parsedWorkflowMeta1.show(truncate= False)

+------------------------------------+---------------+------------------------------------+------------------------------------+
|workflowid                          |workflowversion|sectionid                           |fieldId                             |
+------------------------------------+---------------+------------------------------------+------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |2fff901d-e085-49f4-a3c9-1cff08d79ef9|a65efc78-cd63-41bf-bee6-1ea0808b2d50|
|a611477c-41f1-4a9e-9721-d7afeaf55099|5              |2fff901d-e085-49f4-a3c9-1cff08d79ef9|50df1dd0-4eea-45b0-8a52-af78ffdef8f6|
|a611477c-41f1-4a9e-9721-d7afeaf55099|8              |2fff901d-e085-49f4-a3c9-1cff08d79ef9|57c211f6-9b84-439f-8984-538534537a66|
|a611477c-41f1-4a9e-9721-d7afeaf55099|9              |7429d574-210a-4f23-a674-887973473efd|15088ebb-2cd2-40c2-9ba6-58bdad9255ee|
|a611477c-41f1-4a9e-9721-d7afeaf55099|5              |7429d574-210a-4f23-a674-887973473efd|6996e8

In [129]:
import re
from pyspark.sql.functions import udf, StringType, lit, isnull
#Create a UDF to transform the sectiontitle, fieldlable, fieldtype columns into a concatenated column
def generate_col(sectiontitle, fieldlabel, fieldtype):
    #''.join(char for char in sectiontitle if char.isalnum())
    sectiontitle_fmt = re.sub('[ ]+', '_', re.sub('[^A-Za-z0-9 ]+', '', sectiontitle)).lower()
    
    fieldlabel_fmt = ''
    if fieldlabel is None:
        fieldlabel_fmt = re.sub('[ ]+', '_', re.sub('[^A-Za-z0-9 ]+', '', fieldtype)).lower()
    else:
        fieldlabel_fmt = re.sub('[ ]+', '_', re.sub('[^A-Za-z0-9 ]+', '', fieldlabel)).lower()

    return 'dyn_'+sectiontitle_fmt+'_'+fieldlabel_fmt
#print(generate_col("Inspector's Contact Information", "null", "c"))    


generate_col_udf = udf(generate_col, StringType())
#templateFieldExplodedDF.withColumn("title", generate_col_udf(col("sectiontitle"), col("fieldlabel"), col("fieldtype")))\
#    .select("workflowversion", "sectionid", "fieldid", "fieldorder", "title", "fieldlabel")\
#    .filter(col("workflowversion") == 1)\
#    .show(50, truncate=False)

#.filter((col("workflowversion") == 1) & (isnull(col("fieldlabel"))))\

workflowTemplateSchema = templateFieldExplodedDF.withColumn("title", generate_col_udf(col("sectiontitle"), col("fieldlabel"), col("fieldtype")))\
    .select("workflowid", "workflowversion", "fieldid", "title")

workflowTemplateSchema.filter(col("workflowversion")==1).show(50, truncate=False)

+------------------------------------+---------------+------------------------------------+----------------------------------------------------------------------------+
|workflowid                          |workflowversion|fieldid                             |title                                                                       |
+------------------------------------+---------------+------------------------------------+----------------------------------------------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |ba494455-3cf8-443e-a5f4-b1a119aaaf23|dyn_facility_information_facilitystore                                      |
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |5e233f83-6348-4d7d-8f30-91a5a76997df|dyn_facility_information_associateinformation                               |
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |a0da99e6-5f30-48d4-a8f9-ae892e4c93d9|dyn_regulatory_agency_contact_date_of_contact                   

In [72]:
#Checking if the fieldId and title are PK combination for a given workflow version
from pyspark.sql.functions import countDistinct, count
workflowTemplateSchema.groupBy("workflowid", "workflowversion")\
    .agg(countDistinct("fieldId"), countDistinct("title"), count("title"))\
    .sort(col("workflowversion"))\
    .show(truncate=False)
#(countDistinct("fieldid").alias("fieldid_cnt"), count(col("fieldid")).alias("tot_cnt")).show()

+------------------------------------+---------------+-----------------------+---------------------+------------+
|workflowid                          |workflowversion|count(DISTINCT fieldId)|count(DISTINCT title)|count(title)|
+------------------------------------+---------------+-----------------------+---------------------+------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |41                     |41                   |41          |
|a611477c-41f1-4a9e-9721-d7afeaf55099|2              |41                     |41                   |41          |
|a611477c-41f1-4a9e-9721-d7afeaf55099|3              |42                     |42                   |42          |
|a611477c-41f1-4a9e-9721-d7afeaf55099|4              |42                     |42                   |42          |
|a611477c-41f1-4a9e-9721-d7afeaf55099|5              |42                     |42                   |42          |
|a611477c-41f1-4a9e-9721-d7afeaf55099|6              |42                     |42        

In [130]:
#Pivoting the workflowTemplateSchema dataframe
from pyspark.sql.functions import first

workflowSchemaPivot = workflowTemplateSchema\
    .groupBy("workflowid", "workflowversion")\
    .pivot("title")\
    .agg(first("fieldid"))\
    .sort("workflowversion")

workflowSchemaPivot.select("workflowid", 'workflowversion', 'dyn_observation_status','dyn_observation_dependentdropdownlist').show(truncate=False)

+------------------------------------+---------------+------------------------------------+-------------------------------------+
|workflowid                          |workflowversion|dyn_observation_status              |dyn_observation_dependentdropdownlist|
+------------------------------------+---------------+------------------------------------+-------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |480e2bff-6879-4b2d-8539-842b9cfbe42f|8ce662ae-4f5d-46f8-a4f2-3447ca74ace7 |
|a611477c-41f1-4a9e-9721-d7afeaf55099|2              |480e2bff-6879-4b2d-8539-842b9cfbe42f|8ce662ae-4f5d-46f8-a4f2-3447ca74ace7 |
|a611477c-41f1-4a9e-9721-d7afeaf55099|3              |480e2bff-6879-4b2d-8539-842b9cfbe42f|8ce662ae-4f5d-46f8-a4f2-3447ca74ace7 |
|a611477c-41f1-4a9e-9721-d7afeaf55099|4              |480e2bff-6879-4b2d-8539-842b9cfbe42f|8ce662ae-4f5d-46f8-a4f2-3447ca74ace7 |
|a611477c-41f1-4a9e-9721-d7afeaf55099|5              |480e2bff-6879-4b2d-8539-842b9cfbe42f

In [88]:
#Creating a dataframe for workflow attributes
templatesNonSchemaAttribute = templatesDF.select("workflowid", "workflowversion", col("name").alias("workflowname"), col("description").alias("workflowdescription"))

In [76]:
# Load data from submissions table for workflowid='a0ab07fa-fe40-4eb5-bdef-5b505defd91a'
#table = "(SELECT workflowid, workflowversion, count(1) as cnt from reach.submissions where workflowid='a611477c-41f1-4a9e-9721-d7afeaf55099' group by workflowid, workflowversion) AS t"
table = "(SELECT submissionid, currentstep, totalsteps, workflowid, workflowversion, createdts, lastupdatedts, createdby, lastupdatedname, createdbyname, recordid, countrycode, REPLACE(CONVERT(REPLACE(stepmetadata,'&amp;','&') using utf8),'','') as stepmetadata, tzoffset from reach.submissions where workflowid='a611477c-41f1-4a9e-9721-d7afeaf55099' and workflowversion<11) AS t"

submissionsDF = spark.read.jdbc(url, table, properties=properties)
submissionsDF.cache()

DataFrame[submissionid: string, currentstep: int, totalsteps: int, workflowid: string, workflowversion: int, createdts: timestamp, lastupdatedts: timestamp, createdby: string, lastupdatedname: string, createdbyname: string, recordid: string, countrycode: string, stepmetadata: string, tzoffset: int]

In [89]:
submissionsDF.select("submissionid", col("currentstep").alias("submissionCurrentStep"), "totalsteps", "workflowid", "workflowversion", col("createdts").alias("submissionCreatedTs"), col("lastupdatedts").alias("submissionLastUpdatedTs"), "recordid", "countrycode", "tzoffset").show(11, truncate=False)

+------------------------------------+---------------------+----------+------------------------------------+---------------+-------------------+-----------------------+---------+-----------+--------+
|submissionid                        |submissionCurrentStep|totalsteps|workflowid                          |workflowversion|submissionCreatedTs|submissionLastUpdatedTs|recordid |countrycode|tzoffset|
+------------------------------------+---------------------+----------+------------------------------------+---------------+-------------------+-----------------------+---------+-----------+--------+
|05855f2d-e9f7-4d7b-b9d2-28b45da429d0|4                    |4         |a611477c-41f1-4a9e-9721-d7afeaf55099|9              |2019-02-12 19:51:04|2019-04-15 14:47:11    |ba16-20f8|US         |-21600  |
|1af76439-45c4-492f-87c8-09c03781259e|4                    |4         |a611477c-41f1-4a9e-9721-d7afeaf55099|10             |2019-02-13 20:29:45|2019-06-07 13:02:14    |9d35-2658|US         |-18000  |


In [101]:
submissionAttribute = submissionsDF.select("submissionid", col("currentstep").alias("submissionCurrentStep"), col("workflowid").alias("submissionWorkflowId"), col("workflowversion").alias("submissionWorkflowVersion"), col("createdts").alias("submissionCreatedTs"), col("lastupdatedts").alias("submissionLastUpdatedTs"), "recordid", col("countrycode").alias("submissionCountryCode"))

submissionAttribute.show(truncate=False)
submissionAttribute.count()

+------------------------------------+---------------------+------------------------------------+-------------------------+-------------------+-----------------------+---------+---------------------+
|submissionid                        |submissionCurrentStep|submissionWorkflowId                |submissionWorkflowVersion|submissionCreatedTs|submissionLastUpdatedTs|recordid |submissionCountryCode|
+------------------------------------+---------------------+------------------------------------+-------------------------+-------------------+-----------------------+---------+---------------------+
|05855f2d-e9f7-4d7b-b9d2-28b45da429d0|4                    |a611477c-41f1-4a9e-9721-d7afeaf55099|9                        |2019-02-12 19:51:04|2019-04-15 14:47:11    |ba16-20f8|US                   |
|1af76439-45c4-492f-87c8-09c03781259e|4                    |a611477c-41f1-4a9e-9721-d7afeaf55099|10                       |2019-02-13 20:29:45|2019-06-07 13:02:14    |9d35-2658|US                   |


18

In [82]:
# Load data from responses table for workflowid='a0ab07fa-fe40-4eb5-bdef-5b505defd91a'
#table = "(SELECT workflowid, count(1) as cnt from reach.responses group by workflowid) AS t"
table = "(SELECT responseid, submissionid, fieldid, submittedby, submitter, lastupdatedts, siteid, REPLACE(CONVERT(REPLACE(value,'&amp;','&') using utf8),'','') as value, `order`, currentstep, createdTs, submittedByName, lastUpdatedBy, taskid from reach.responses where workflowid='a611477c-41f1-4a9e-9721-d7afeaf55099' and workflowVersion<11) AS t"

responsesDF = spark.read.jdbc(url, table, properties=properties)
responsesDF.cache()

DataFrame[responseid: string, submissionid: string, fieldid: string, submittedby: string, submitter: string, lastupdatedts: timestamp, siteid: string, value: string, order: int, currentstep: string, createdTs: timestamp, submittedByName: string, lastUpdatedBy: string, taskid: string]

In [83]:
responsesDF.select("responseid", "submissionid", "fieldid", "lastupdatedts", "siteid", col("value").alias("responseValue"), "order", "currentstep", "taskid").show(truncate=False)
responsesDF.count()

+------------------------------------+------------------------------------+------------------------------------+-------------------+-------+--------------------------------------------------------------------------------+-----+-----------+------------------------------------+
|responseid                          |submissionid                        |fieldid                             |lastupdatedts      |siteid |responseValue                                                                   |order|currentstep|taskid                              |
+------------------------------------+------------------------------------+------------------------------------+-------------------+-------+--------------------------------------------------------------------------------+-----+-----------+------------------------------------+
|003dd09a-2604-4488-a4b2-20df2c3c73f4|5dc916e3-aa2b-48a1-a86d-2b4eafa2c546|a0da99e6-5f30-48d4-a8f9-ae892e4c93d9|2019-02-13 20:49:06|5105   |["02/13/2019"]               

598

In [102]:
responseAttribute = responsesDF.select("responseid", col("submissionid").alias("responseSubmissionId"), "fieldid", col("value").alias("responseValue"), col("order").alias("responseOrder"), col("taskid").alias("responseTaskId"))
responseAttribute.show(truncate=False)

+------------------------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------------+-------------+------------------------------------+
|responseid                          |responseSubmissionId                |fieldid                             |responseValue                                                                   |responseOrder|responseTaskId                      |
+------------------------------------+------------------------------------+------------------------------------+--------------------------------------------------------------------------------+-------------+------------------------------------+
|003dd09a-2604-4488-a4b2-20df2c3c73f4|5dc916e3-aa2b-48a1-a86d-2b4eafa2c546|a0da99e6-5f30-48d4-a8f9-ae892e4c93d9|["02/13/2019"]                                                                  |1            |null                                |
|00e268f3-9bea-4509-

In [84]:
# Load data from tasks table for workflowid='a0ab07fa-fe40-4eb5-bdef-5b505defd91a'
#table = "(SELECT workflowid, count(1) as cnt from reach.tasks group by workflowid) AS t"
table = "(SELECT taskId, title, description, status, submissionid, sectionId from reach.tasks where workflowid='a611477c-41f1-4a9e-9721-d7afeaf55099' and workflowVersion<11) AS t"

tasksDF = spark.read.jdbc(url, table, properties=properties)
tasksDF.cache()

DataFrame[taskId: string, title: string, description: string, status: string, submissionid: string, sectionId: string]

In [85]:
tasksDF.select("taskId", col("title").alias("taskTitle"), col("description").alias("taskDescription"), col("status").alias("taskStatus"), "submissionId").show(truncate=False)
tasksDF.count()

+------------------------------------+----------------------------------------------+----------------+----------+------------------------------------+
|taskId                              |taskTitle                                     |taskDescription |taskStatus|submissionId                        |
+------------------------------------+----------------------------------------------+----------------+----------+------------------------------------+
|12c51745-5dd3-43fb-8fa1-13c889ce076d|Passed inspection                             |Completed/Closed|completed |1af76439-45c4-492f-87c8-09c03781259e|
|1b9a7b1e-f0ed-411c-9987-a02cb727bd61|Inspection                                    |Completed/Closed|completed |c548e4b9-d45b-4913-8da9-cfb8a9c3e077|
|33e52299-ab8d-4c4a-9ca4-e112e872234e|alarm system                                  |Completed/Closed|completed |5dc916e3-aa2b-48a1-a86d-2b4eafa2c546|
|3e150b21-a9d7-47af-929e-15f35a5dca5a|Fire Department Inspection                    |Completed

8

In [103]:
taskAttribute = tasksDF.select("taskId", col("title").alias("taskTitle"), col("description").alias("taskDescription"), col("status").alias("taskStatus"), col("submissionId").alias("taskSubmissionId"))
taskAttribute.show(truncate=False)

+------------------------------------+----------------------------------------------+----------------+----------+------------------------------------+
|taskId                              |taskTitle                                     |taskDescription |taskStatus|taskSubmissionId                    |
+------------------------------------+----------------------------------------------+----------------+----------+------------------------------------+
|12c51745-5dd3-43fb-8fa1-13c889ce076d|Passed inspection                             |Completed/Closed|completed |1af76439-45c4-492f-87c8-09c03781259e|
|1b9a7b1e-f0ed-411c-9987-a02cb727bd61|Inspection                                    |Completed/Closed|completed |c548e4b9-d45b-4913-8da9-cfb8a9c3e077|
|33e52299-ab8d-4c4a-9ca4-e112e872234e|alarm system                                  |Completed/Closed|completed |5dc916e3-aa2b-48a1-a86d-2b4eafa2c546|
|3e150b21-a9d7-47af-929e-15f35a5dca5a|Fire Department Inspection                    |Completed

In [131]:
print(workflowSchemaPivot.printSchema)
print(templatesNonSchemaAttribute.printSchema)
print(submissionAttribute.printSchema)
print(responseAttribute.printSchema)
print(taskAttribute.printSchema)

<bound method DataFrame.printSchema of DataFrame[workflowid: string, workflowversion: bigint, dyn_additional_details_additional_comments: string, dyn_additional_details_contact_type: string, dyn_additional_details_date_fees_paid: string, dyn_additional_details_date_fines_paid: string, dyn_additional_details_fees_issued: string, dyn_additional_details_fees_paid: string, dyn_additional_details_fines_issued: string, dyn_additional_details_fines_paid: string, dyn_additional_documents_additional_documents: string, dyn_assignment_notes_assigned_team: string, dyn_assignment_notes_assigned_to: string, dyn_assignment_notes_insufficient_paperwork: string, dyn_assignment_notes_non_health_and_wellness_contact: string, dyn_board_orders_discipline_removed: string, dyn_board_orders_permit_discipline: string, dyn_board_orders_type_of_discipline_removed: string, dyn_documents_from_regulator_documents: string, dyn_facility_information_associateinformation: string, dyn_facility_information_facilitystore:

In [124]:
#workflowSchemaPivot
#templatesNonSchemaAttribute
#submissionAttribute
#responseAttribute
#taskAttribute

#Join templatesNonSchemaAttribute with submissionAttribute
templateSubmissionJoin = templatesNonSchemaAttribute.join(submissionAttribute, (templatesNonSchemaAttribute.workflowid == submissionAttribute.submissionWorkflowId) & (templatesNonSchemaAttribute.workflowversion == submissionAttribute.submissionWorkflowVersion), how="inner")\
    .select("workflowid", "workflowversion", "workflowname", "workflowdescription", "submissionid", "submissionCurrentStep", "submissionCreatedTs", "submissionLastUpdatedTs", "recordId", "submissionCountryCode")

templateSubmissionJoin.show(10, truncate=False)

+------------------------------------+---------------+---------------------------------------+-------------------------------------------------------------------------------------------+------------------------------------+---------------------+-------------------+-----------------------+---------+---------------------+
|workflowid                          |workflowversion|workflowname                           |workflowdescription                                                                        |submissionid                        |submissionCurrentStep|submissionCreatedTs|submissionLastUpdatedTs|recordId |submissionCountryCode|
+------------------------------------+---------------+---------------------------------------+-------------------------------------------------------------------------------------------+------------------------------------+---------------------+-------------------+-----------------------+---------+---------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55

In [115]:
from pyspark.sql.functions import concat, coalesce
#Self aggregate response to create a map
responseAttributeConcat = responseAttribute.withColumn("fieldIdValue", concat('fieldId', lit(':'), 'responseValue'))\
    .withColumn("responseTaskIdCoalesced", coalesce("responseTaskId", lit("")))\
    .select('responseId', 'responseSubmissionId', 'responseOrder', 'responseTaskIdCoalesced', 'fieldIdValue')
#responseAttribute.filter(isnull("responseTaskId")).show(truncate=False)

responseAttributeConcat.show(truncate=False)


+------------------------------------+------------------------------------+-------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
|responseId                          |responseSubmissionId                |responseOrder|responseTaskIdCoalesced             |fieldIdValue                                                                                                         |
+------------------------------------+------------------------------------+-------------+------------------------------------+---------------------------------------------------------------------------------------------------------------------+
|003dd09a-2604-4488-a4b2-20df2c3c73f4|5dc916e3-aa2b-48a1-a86d-2b4eafa2c546|1            |                                    |a0da99e6-5f30-48d4-a8f9-ae892e4c93d9:["02/13/2019"]                                                                  |
|00e268f3-9bea-4509-

In [123]:
from pyspark.sql.functions import collect_set
#Aggregate all the fieldIdValue for a combination of submissionId, taskId into a single record
responseAggregateRecord = responseAttributeConcat.limit(20).groupBy("responseSubmissionId", "responseTaskIdCoalesced")\
                            .agg(collect_set("fieldIdValue").alias("fieldIdValues"))
                        
responseAggregateRecord.show(truncate=False)

+------------------------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|responseSubmissionId                |responseTaskIdCoalesced             |fieldIdValues                                                                                                                                                                       |
+------------------------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|5dc916e3-aa2b-48a1-a86d-2b4eafa2c546|                                    |[a0da99e6-5f30-48d4-a8f9-ae892e4c93d9:["02/13/2019"]]                                                                                                     

In [140]:
#Join templateSubmissionJoin with responseAggregateRecord
submissionResponseJoin = templateSubmissionJoin.join(responseAggregateRecord, (templateSubmissionJoin.submissionid == responseAggregateRecord.responseSubmissionId), how='inner')\
    .select(col("workflowId").alias('submissionWorkflowId'), col("workflowVersion").alias('submissionWorkflowVersion'), "submissionid", "responseTaskIdCoalesced", "fieldIdValues")

submissionResponseJoin.show(truncate=False)

+------------------------------------+-------------------------+------------------------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|submissionWorkflowId                |submissionWorkflowVersion|submissionid                        |responseTaskIdCoalesced             |fieldIdValues                                                                                                                                                                       |
+------------------------------------+-------------------------+------------------------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|7 

In [132]:
#Trying to flatten the table for following 3 columns
#4f842804-7da0-4a18-afb3-696c7b1a0991 > dyn_inspector39s_contact_information_inspector_phone_number
#57c50f5e-df29-4a11-af9d-06e29ab11747 > dyn_assignment_notes_insufficient_paperwork
#b98305e1-e911-4a98-9541-df6094937a75 > dyn_additional_details_contact_type

#Need to figure out the column names for following fieldIds
workflowTemplateSchema.filter(col("fieldid").isin('4f842804-7da0-4a18-afb3-696c7b1a0991','57c50f5e-df29-4a11-af9d-06e29ab11747','b98305e1-e911-4a98-9541-df6094937a75'))\
    .show(10, truncate=False)

+------------------------------------+---------------+------------------------------------+-----------------------------------------------------------+
|workflowid                          |workflowversion|fieldid                             |title                                                      |
+------------------------------------+---------------+------------------------------------+-----------------------------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |4f842804-7da0-4a18-afb3-696c7b1a0991|dyn_inspector39s_contact_information_inspector_phone_number|
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |57c50f5e-df29-4a11-af9d-06e29ab11747|dyn_assignment_notes_insufficient_paperwork                |
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |b98305e1-e911-4a98-9541-df6094937a75|dyn_additional_details_contact_type                        |
|a611477c-41f1-4a9e-9721-d7afeaf55099|2              |4f842804-7da0-4a18-afb3-696c7b1a09

In [134]:
###*************************DEBUG*************************###
workflowSchemaPivot.select("workflowid", "workflowversion", "dyn_inspector39s_contact_information_inspector_phone_number", "dyn_assignment_notes_insufficient_paperwork", "dyn_additional_details_contact_type").show(truncate=False)

+------------------------------------+---------------+-----------------------------------------------------------+-------------------------------------------+------------------------------------+
|workflowid                          |workflowversion|dyn_inspector39s_contact_information_inspector_phone_number|dyn_assignment_notes_insufficient_paperwork|dyn_additional_details_contact_type |
+------------------------------------+---------------+-----------------------------------------------------------+-------------------------------------------+------------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|1              |4f842804-7da0-4a18-afb3-696c7b1a0991                       |57c50f5e-df29-4a11-af9d-06e29ab11747       |b98305e1-e911-4a98-9541-df6094937a75|
|a611477c-41f1-4a9e-9721-d7afeaf55099|2              |4f842804-7da0-4a18-afb3-696c7b1a0991                       |57c50f5e-df29-4a11-af9d-06e29ab11747       |b98305e1-e911-4a98-9541-df6094937a75|
|a611477c-41f1-4a9e-

In [139]:
###*************************DEBUG*************************###
for elem in workflowSchemaPivot.schema.names:
    print(elem)

workflowid
workflowversion
dyn_additional_details_additional_comments
dyn_additional_details_contact_type
dyn_additional_details_date_fees_paid
dyn_additional_details_date_fines_paid
dyn_additional_details_fees_issued
dyn_additional_details_fees_paid
dyn_additional_details_fines_issued
dyn_additional_details_fines_paid
dyn_additional_documents_additional_documents
dyn_assignment_notes_assigned_team
dyn_assignment_notes_assigned_to
dyn_assignment_notes_insufficient_paperwork
dyn_assignment_notes_non_health_and_wellness_contact
dyn_board_orders_discipline_removed
dyn_board_orders_permit_discipline
dyn_board_orders_type_of_discipline_removed
dyn_documents_from_regulator_documents
dyn_facility_information_associateinformation
dyn_facility_information_facilitystore
dyn_inspector39s_contact_information_inspector_email
dyn_inspector39s_contact_information_inspector_name
dyn_inspector39s_contact_information_inspector_phone_number
dyn_observation_assign_to_role
dyn_observation_corrective_action

In [219]:
###*************************DEBUG*************************###
debugDF01 = submissionResponseJoin.join(workflowSchemaPivot, (submissionResponseJoin.submissionWorkflowId == workflowSchemaPivot.workflowid) & (submissionResponseJoin.submissionWorkflowVersion == workflowSchemaPivot.workflowversion), how='inner')\
    .drop('submissionWorkflowId')\
    .drop('submissionWorkflowVersion')
    

debugDF01.select('workflowId', 'workflowVersion','submissionId', 'responseTaskIdCoalesced', 'fieldIdValues', 'dyn_inspector39s_contact_information_inspector_phone_number', 'dyn_assignment_notes_insufficient_paperwork', 'dyn_additional_details_contact_type')\
    .show(30, truncate=False)

+------------------------------------+---------------+------------------------------------+------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------+-------------------------------------------+------------------------------------+
|workflowId                          |workflowVersion|submissionId                        |responseTaskIdCoalesced             |fieldIdValues                                                                                                                                                                       |dyn_inspector39s_contact_information_inspector_phone_number|dyn_assignment_notes_insufficient_paperwork|dyn_additional_details_contact_type |
+------------------------------------+---------------+------------------------------------+-------

In [183]:
#Map the fieldId value to the respective column
def search_and_return(fieldId, listKeyValue):
    returnValue = ''
    for keyValue in listKeyValue:
        key = keyValue.split(':')[0]
        value = keyValue.split(':')[1]
        if str(fieldId) == key:
            returnValue = value
    return returnValue
    


sampleKeyValueList = ['12:a', '11:b', '13:c']
print(search_and_return(13, sampleKeyValueList))

c


In [185]:
#Create a UDF out of the function
search_and_return_udf = udf(search_and_return, StringType())

debugDF01.withColumn("dyn_inspector39s_contact_information_inspector_phone_number", search_and_return_udf(col('dyn_inspector39s_contact_information_inspector_phone_number'), 'fieldIdValues'))\
    .withColumn("dyn_assignment_notes_insufficient_paperwork", search_and_return_udf(col('dyn_assignment_notes_insufficient_paperwork'), 'fieldIdValues'))\
    .withColumn("dyn_additional_details_contact_type", search_and_return_udf(col('dyn_additional_details_contact_type'), 'fieldIdValues'))\
    .select('workflowid', 'workflowversion', 'submissionId', 'responseTaskIdCoalesced', 'dyn_inspector39s_contact_information_inspector_phone_number', 'dyn_assignment_notes_insufficient_paperwork', 'dyn_additional_details_contact_type')\
    .show(30, truncate=False)

+------------------------------------+---------------+------------------------------------+------------------------------------+-----------------------------------------------------------+-------------------------------------------+-----------------------------------+
|workflowid                          |workflowversion|submissionId                        |responseTaskIdCoalesced             |dyn_inspector39s_contact_information_inspector_phone_number|dyn_assignment_notes_insufficient_paperwork|dyn_additional_details_contact_type|
+------------------------------------+---------------+------------------------------------+------------------------------------+-----------------------------------------------------------+-------------------------------------------+-----------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|7              |edcd8325-31eb-43a0-9f7c-a76aa30f54d9|                                    |                                                           |["No"

In [172]:
#Making the replace process automated
workflowSchemaDFColumnList = workflowSchemaPivot.schema.names
dynamicColumnList = list(filter(lambda x: 'dyn_' in x, workflowSchemaDFColumnList))

for colName in dynamicColumnList:
    print(colName)

dyn_additional_details_additional_comments
dyn_additional_details_contact_type
dyn_additional_details_date_fees_paid
dyn_additional_details_date_fines_paid
dyn_additional_details_fees_issued
dyn_additional_details_fees_paid
dyn_additional_details_fines_issued
dyn_additional_details_fines_paid
dyn_additional_documents_additional_documents
dyn_assignment_notes_assigned_team
dyn_assignment_notes_assigned_to
dyn_assignment_notes_insufficient_paperwork
dyn_assignment_notes_non_health_and_wellness_contact
dyn_board_orders_discipline_removed
dyn_board_orders_permit_discipline
dyn_board_orders_type_of_discipline_removed
dyn_documents_from_regulator_documents
dyn_facility_information_associateinformation
dyn_facility_information_facilitystore
dyn_inspector39s_contact_information_inspector_email
dyn_inspector39s_contact_information_inspector_name
dyn_inspector39s_contact_information_inspector_phone_number
dyn_observation_assign_to_role
dyn_observation_corrective_action
dyn_observation_dependentd

In [220]:
#debugDF01.withColumn(F.col('dyn_inspector39s_contact_information_inspector_phone_number'), search_and_return_udf(F.col('dyn_inspector39s_contact_information_inspector_phone_number'), 'fieldIdValues'))\
debugDF03 = debugDF01

for rName in dynamicColumnList:
    debugDF03 = debugDF03.withColumn(rName, search_and_return_udf(col(rName), 'fieldIdValues'))
#debugDF03 = debugDF03.withColumn(F.col('dyn_inspector39s_contact_information_inspector_phone_number'), search_and_return_udf(F.col('dyn_inspector39s_contact_information_inspector_phone_number'), 'fieldIdValues'))
#debugDF03 = debugDF03.withColumn(F.col('dyn_inspector39s_contact_information_inspector_phone_number'), search_and_return_udf(F.col('dyn_inspector39s_contact_information_inspector_phone_number'), 'fieldIdValues'))

#debugDF03 = debugDF03.withColumn('workflowId', lit('workflowId'))
#debugDF03 = debugDF03.withColumn('workflowId', col('workflowId'))

#debugDF03.withColumn('dyn_inspector39s_contact_information_inspector_phone_number', search_and_return_udf(F.col('dyn_inspector39s_contact_information_inspector_phone_number'), 'fieldIdValues'))\
debugDF03.select('workflowid', 'workflowversion', 'submissionId', 'responseTaskIdCoalesced', 'dyn_inspector39s_contact_information_inspector_phone_number', 'dyn_assignment_notes_insufficient_paperwork', 'dyn_additional_details_contact_type')\
    .show(30, truncate=False)

+------------------------------------+---------------+------------------------------------+------------------------------------+-----------------------------------------------------------+-------------------------------------------+-----------------------------------+
|workflowid                          |workflowversion|submissionId                        |responseTaskIdCoalesced             |dyn_inspector39s_contact_information_inspector_phone_number|dyn_assignment_notes_insufficient_paperwork|dyn_additional_details_contact_type|
+------------------------------------+---------------+------------------------------------+------------------------------------+-----------------------------------------------------------+-------------------------------------------+-----------------------------------+
|a611477c-41f1-4a9e-9721-d7afeaf55099|7              |edcd8325-31eb-43a0-9f7c-a76aa30f54d9|                                    |                                                           |["No"