In [24]:
from pyspark.context import SparkContext, SparkConf
from awsglue.dynamicframe import DynamicFrame
import awsglue.transforms as T

In [2]:
# this is just so i can develop with local glue libs
jars = '/Users/joe/aws-glue-libs/jarsv1/*'
sc = SparkContext(conf=SparkConf().setAll([
    ('spark.executor.extraClassPath', jars),
    ('spark.driver.extraClassPath', jars)
]))

In [3]:
from awsglue.context import GlueContext

glueContext = GlueContext(sc)

spark = glueContext.spark_session

### Read in the data

Use a dynamicframe to deal with null values or variable types

In [4]:
data = spark.read.parquet('data/catalog.parquet')

In [5]:
datasource = DynamicFrame.fromDF(data, glueContext, 'datasource')

### Grab the Location records

This is the primary source of locations but there might be others in other resources.

In [25]:
locations = datasource.filter(
    lambda r: r['resourceType'] == 'Location'
)
locations = locations.select_fields(
    ['identifier','name','type','address','position']
)
locations = T.DropNullFields.apply(locations, "locations")

null_fields ['identifier.type', 'identifier.use', 'identifier.period', 'identifier.extension', 'identifier.assigner', 'name.array', 'type', 'address.array']


Let's have a look to see where we can find addresses

In [7]:
addresses = datasource.filter(
    lambda r: r['address']
)

In [9]:
addresses = T.DropNullFields.apply(addresses, "addresses")

null_fields ['created', 'serialNumber', 'encounter', 'agent', 'presentedForm', 'date', 'serviceProvider', 'effectiveDateTime', 'prescription', 'referral', 'insurer', 'activity', 'result', 'occurrenceDateTime', 'location', 'verificationStatus', 'recorded', 'intent', 'contained', 'serviceType', 'type.struct', 'issued', 'custodian', 'udiCarrier', 'total', 'onsetDateTime', 'component', 'valueQuantity', 'use', 'procedure', 'addresses', 'target', 'facility', 'careTeam', 'managingOrganization.array', 'length', 'deviceName', 'participant', 'outcome', 'supportingInfo', 'abatementDateTime', 'patient', 'claim', 'category', 'hospitalization', 'manufactureDate', 'expirationDate', 'reasonReference', 'primarySource', 'code', 'period', 'billablePeriod', 'vaccineCode', 'payment', 'priority', 'recordedDate', 'medicationCodeableConcept', 'provider', 'content', 'valueCodeableConcept', 'class', 'requester', 'lotNumber', 'clinicalStatus', 'author', 'item', 'authoredOn', 'diagnosis', 'insurance', 'distinctId

In [16]:
df = addresses.toDF()

In [17]:
df.select('resourceType').distinct().show()

+------------+
|resourceType|
+------------+
|     Patient|
|Organization|
|    Location|
|Practitioner|
+------------+



So we can see there are more locations in **Patient**, **Organization** and **Practitioner**

Let's have a look at the schema:

In [18]:
df.select('address').printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- struct: struct (nullable = true)
 |    |    |-- postalCode: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- line: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |-- array: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- postalCode: string (nullable = true)
 |    |    |    |-- district: string (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- type: string (nullable = true)
 |    |    |    |-- country: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)
 |    |    |    |-- use: string (nullable = true)
 |    |    |    |-- line: array (nullable = true)
 |    |    |    |    |-- element: string (containsN

So there's a zero or more cardinality and it's either a struct or an array of structs. 

We need to normalize this data using the `explode()` method 

In [19]:
import pyspark.sql.functions as F

In [20]:
df = df.withColumn('address', F.explode(df.address.array))

In [21]:
df.select('address').printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- text: string (nullable = true)
 |    |-- postalCode: string (nullable = true)
 |    |-- district: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- use: string (nullable = true)
 |    |-- line: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- period: struct (nullable = true)
 |    |    |-- start: string (nullable = true)
 |    |-- extension: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |    |-- extension: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |-- valueDecimal: double (nullable = true)



In [26]:
loc = locations.toDF()

In [33]:
loc = loc.withColumn('address', F.col('address.struct'))

In [34]:
loc.select('address').printSchema()

root
 |-- address: struct (nullable = true)
 |    |-- postalCode: string (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- country: string (nullable = true)
 |    |-- state: string (nullable = true)
 |    |-- line: array (nullable = true)
 |    |    |-- element: string (containsNull = true)



Next we need to join the dataframes, keeping only the relevant fields. 

For this we could use an **left outer-join** on the `loc` dataframe. 

In [48]:
# drop irrelevant fields from struct
def filter_columns(df, root):
    cols = df.select(root).columns
    fields = filter(lambda x: x in ['postalCode', 'city', 'country', 'state', 'line'], cols)
    kept = list(map(lambda x: root[:-1] + x, fields))
    return df.select(kept)


other_address = filter_columns(df.select('address'), root='address.*')
loc_address = filter_columns(df.select('address'), root='address.*')

We now have the same schema, ready to be combined into one dataframe

In [49]:
other_address.printSchema()

loc_address.printSchema()

root
 |-- postalCode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- line: array (nullable = true)
 |    |-- element: string (containsNull = true)

root
 |-- postalCode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- line: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [90]:
new_df = loc_address.join(other_address, on=['postalCode','city','country','state','line'], how='left_outer')

In [91]:
new_df.printSchema()

root
 |-- postalCode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- state: string (nullable = true)
 |-- line: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [92]:
new_df = new_df.drop_duplicates()

In [93]:
new_df.show()

+----------+---------------+-------+--------------+--------------------+
|postalCode|           city|country|         state|                line|
+----------+---------------+-------+--------------+--------------------+
|      CB23|      Cambridge|     GB|Cambridgeshire|[625 Corwin Fort ...|
|       CT3|        Preston|     GB|          Kent|[669 Bahringer Mill]|
|       SY4|       Shawbury|     GB|    Shropshire|[822 Marquardt An...|
|       TF6|          Aston|     GB|    Shropshire|  [252 Lakin Corner]|
|       TF9|         Sutton|     GB|    Shropshire|[104 Rempel Light...|
|     SY3 5|     Shrewsbury|     GB|    Shropshire|[Mytton Oak Road ...|
|     SY8 1|         Ludlow|     GB|    Shropshire|   [Gravel Hill nan]|
|       TF9|         Sutton|     GB|    Shropshire|[789 Ferry Arcade...|
|       SM7|       Banstead|     GB|        Surrey|[459 Bruen Parade...|
|      SY99|     Shrewsbury|     GB|    Shropshire|    [154 Hills Fort]|
|      BS41|        Bristol|     GB|      Somerset|

In [94]:
df = new_df

# care_sites = df.na.drop(subset=["type"])

TODO: 

- [ ] Make identifier based on hash of row
- [ ] Transform to CareSite table based on type

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
df = df.withColumn('city', F.col('address.struct.city'))\
       .withColumn('state', F.col('address.struct.state'))\
       .withColumn('zip', F.col('address.struct.postalCode'))\
       .withColumn('country', F.col('address.struct.country'))

In [None]:
# df = df.withColumn('exploded', F.explode('address.struct.line'))

In [None]:
df = df.withColumn('address_1', F.col('address.struct.line').getItem(0))
df = df.withColumn('address_2', F.col('address.struct.line').getItem(1))

In [None]:
df = df.withColumnRenamed('id', 'location_id')

In [None]:
df = df.drop(*['address','position','exploded','name','type'])

In [None]:
df.show()

In [None]:
locations = DynamicFrame.fromDF(df, glueContext, 'locations')