# Working with unpredictibly changing schemas

This notebook describe sample code snippets for handling source schema changes in AWS Glue. 

DISCLAIMER: This is sample code and is provided 'as is'. Should only be used as reference and we assume no responsability for errors

In [1]:
import sys
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

glueContext = GlueContext(SparkContext.getOrCreate())

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
6,application_1581610574402_0007,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Source dataset could have new columns over time.

One option is to treat new columns as custom fields. To keep output dataset consistant we can add them as one field with a self defining format (e.g. JSON). Keep in mind that new columns cannot grow indefinitely as there could be limitations on the size fo the JSON string/document.  

1. Use Dynamicframe to read latest schema directly from the Glue Catalog. New columns will be updated via an AWS Glue Crawler. Assuming the crawler configuration allows updates when new columns are identified.

In [7]:
sales_source = glueContext.create_dynamic_frame.from_catalog(database="sales", table_name="sales")
sales_source.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- date: string
|-- categories: string
|-- geo: string
|-- revenue: double
|-- store: string
|-- cost: double
|-- profit: string

2. Use the Map function to rebuild schema.

In [8]:
import json

static_fields = ['date', 'categories','geo','revenue']
custom_fields = []
custom_fields_nm = 'custom_fields'

def build_struct(rec):
    rec[custom_fields_nm] = {}
    for key in rec:
        if key in static_fields:
            rec[key] = rec[key]
        elif key != custom_fields_nm:
            rec[custom_fields_nm][key] = json.dumps(rec[key])
            if key not in custom_fields:
                custom_fields.append(key)
                          
    #remove custom fields from the root
    for k in custom_fields:
        del rec[k]
    return rec

sales_with_struct = Map.apply(frame = sales_source, f = build_struct)
sales_with_struct.printSchema()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
|-- date: string
|-- categories: string
|-- revenue: double
|-- custom_fields: struct
|    |-- cost: string
|    |-- store: string
|    |-- profit: string
|-- geo: string

In [9]:
sales_with_struct.toDF().show(20,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------+---------------+----------+---------------------------------------+------+
|date  |categories     |revenue   |custom_fields                          |geo   |
+------+---------------+----------+---------------------------------------+------+
|1/1/17|Digital        |1738.04848|[463.4264144, "MyStore", "1274.622066"]|Turkey|
|1/1/17|Movies         |3359.74848|[534.1761529, "MyStore", "2825.572327"]|Turkey|
|1/1/17|Industrial     |3553.54288|[1171.128561, "MyStore", "2382.414319"]|Turkey|
|1/1/17|Games          |257.30696 |[66.10850685, "MyStore", "191.1984531"]|Turkey|
|1/1/17|Office Supplies|7479.57508|[6356.354677, "MyStore", "1123.220403"]|Turkey|
|1/1/17|Computers      |787.6508  |[370.4022929, "MyStore", "417.2485071"]|Turkey|
|1/1/17|Books          |1091.6226 |[105.8353184, "MyStore", "985.7872816"]|Turkey|
|1/1/17|Health         |540.40948 |[465.8638668, "MyStore", "74.5456132"] |Turkey|
|1/1/17|Outdoors       |548.13056 |[403.3166732, "MyStore", "144.8138868"]|Turkey|
|1/1