In [1]:
from json_schema_to_glue_columns import *

In [2]:
glue_data_formats_mapping ={
    "csv": {
        "input_format": "org.apache.hadoop.mapred.TextInputFormat",
        "output_format": "org.apache.hadoop.hive.ql.io.HivelgnoreKeyTextOutputFormat",
        "serde_info": {
            "SerializationLibrary": "org.apache.hadoop.hive.serde2.OpenCSVSerde",
            "Parameters": {
                "separatorChar": ","
            }
        },
        "parameters": {
            "classification": "csv"
        }
    },
    "json": {
        "input_format": "org.apache.hadoop.mapred.TextInputFormat",
        "output_format": "org.apache.hadoop.hive.qlio.HivelgnoreKeyTextOutputFormat",
        "serde_info": {
            "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe"
        },
        "parameters": {
            "classification": "json"
        }
    },
    "parquet": {
        "input_format": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat",
        "output_format": "org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat",
        "serde_info": {
            "SerializationLibrary": "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe",
            "Parameters": {
                "serialization.format": "1"
            }
        },
        "parameters": {
            "classification": "parquet"
        }
    }
}

In [3]:
schema_file_location = "./sample1.schema.json"
config_file_location = "./sample1.config.yml"

In [4]:
schema = load_json_schema(schema_file_location)
schema

{'title': 'Root Schema',
 'type': 'object',
 'default': {},
 'required': ['id', 'isbn', 'author', 'editor', 'title', 'category', 'tags'],
 'additionalProperties': True,
 'properties': {'ID': {'title': 'The id Schema',
   'type': 'number',
   'default': 0,
   'examples': [1]},
  'isbn': {'title': 'The isbn Schema',
   'type': 'string',
   'default': '',
   'examples': ['123-456-222']},
  'author': {'title': 'The author Schema',
   'type': 'object',
   'default': {},
   'required': ['lastname', 'firstname'],
   'additionalProperties': True,
   'properties': {'last name': {'title': 'The lastname Schema',
     'type': 'string',
     'default': '',
     'examples': ['Doe']},
    'first-name': {'title': 'The firstname Schema',
     'type': 'string',
     'default': '',
     'examples': ['Jane']}},
   'examples': [{'lastname': 'Doe', 'firstname': 'Jane'}]},
  'editor': {'title': 'The editor Schema',
   'type': 'object',
   'default': {},
   'required': ['lastname', 'firstname'],
   'additiona

In [5]:
placeholders = {
    "[aws_env]": "dev",
    "[logical_env]": "abc"
}

config = load_yaml_config(config_file_location, placeholders)
config

{'data_format': 'json',
 'pipeline_type': 'SCD1',
 'storage_zones': {'raw': {'database': 'abc_source_raw',
   'table': 'sample',
   's3_location': 's3://org-dev-abc-raw/prefix'},
  'staging': {'database': 'abc_source_staging',
   'table': 'sample',
   's3_location': 's3://org-dev-abc-staging/prefix'}}}

In [6]:
partition_key_list = ["year:int", "month:int", "day:int", "hour:int", "minute:int"]
partition_keys = create_partition_keys(partition_key_list)
partition_keys

[{'Name': 'year', 'Type': 'int'},
 {'Name': 'month', 'Type': 'int'},
 {'Name': 'day', 'Type': 'int'},
 {'Name': 'hour', 'Type': 'int'},
 {'Name': 'minute', 'Type': 'int'}]

In [7]:
raw_glue_columns = convert_json_schema_to_glue_columns(schema)
raw_glue_columns

[{'Name': 'id', 'Type': 'DOUBLE'},
 {'Name': 'isbn', 'Type': 'STRING'},
 {'Name': 'author', 'Type': 'STRUCT<last_name:STRING,first_name:STRING>'},
 {'Name': 'editor', 'Type': 'STRUCT<last_name:STRING,first_name:STRING>'},
 {'Name': 'title', 'Type': 'STRING'},
 {'Name': 'category', 'Type': 'ARRAY<STRING>'},
 {'Name': 'tags', 'Type': 'ARRAY<STRUCT<key:STRING,value:STRING>>'}]

In [8]:
raw_zone_config = config["storage_zones"]["raw"]

raw_database = raw_zone_config["database"]
raw_table = raw_zone_config["table"]
raw_s3_location = raw_zone_config["s3_location"]

raw_glue_data_formats = glue_data_formats_mapping[config["data_format"].lower()]

raw_input_format = raw_glue_data_formats["input_format"]
raw_output_format = raw_glue_data_formats["output_format"]
raw_serde_info = raw_glue_data_formats["serde_info"]
raw_parameters = raw_glue_data_formats["parameters"]

In [9]:
raw_table_manager = GlueTableManager(
    table_type='EXTERNAL_TABLE',
    description='This is a sample table',
    database_name=raw_database,
    table_name=raw_table,
    columns=raw_glue_columns,
    location=raw_s3_location,
    input_format=raw_input_format,
    output_format=raw_output_format,
    serde_info=raw_serde_info,
    partition_keys=partition_keys, 
    parameters=raw_parameters
)

In [10]:
raw_table_manager.create_or_update_table()

{'ResponseMetadata': {'RequestId': 'bbdaad04-0ca1-4657-b618-f25bf71152a8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 07 Jun 2023 10:02:42 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'bbdaad04-0ca1-4657-b618-f25bf71152a8'},
  'RetryAttempts': 0}}

In [11]:
staging_glue_columns = convert_json_schema_to_glue_columns(schema, flatten=True, delimiter='__')
staging_glue_columns

[{'Name': 'id', 'Type': 'DOUBLE'},
 {'Name': 'isbn', 'Type': 'STRING'},
 {'Name': 'author__last_name', 'Type': 'STRING'},
 {'Name': 'author__first_name', 'Type': 'STRING'},
 {'Name': 'editor__last_name', 'Type': 'STRING'},
 {'Name': 'editor__first_name', 'Type': 'STRING'},
 {'Name': 'title', 'Type': 'STRING'},
 {'Name': 'category', 'Type': 'ARRAY<STRING>'},
 {'Name': 'tags', 'Type': 'ARRAY<STRUCT<key:STRING,value:STRING>>'}]

In [12]:
pipeline_type = config["pipeline_type"]
additional_columns = get_additional_columns(pipeline_type)
staging_glue_columns+=additional_columns
staging_glue_columns

[{'Name': 'id', 'Type': 'DOUBLE'},
 {'Name': 'isbn', 'Type': 'STRING'},
 {'Name': 'author__last_name', 'Type': 'STRING'},
 {'Name': 'author__first_name', 'Type': 'STRING'},
 {'Name': 'editor__last_name', 'Type': 'STRING'},
 {'Name': 'editor__first_name', 'Type': 'STRING'},
 {'Name': 'title', 'Type': 'STRING'},
 {'Name': 'category', 'Type': 'ARRAY<STRING>'},
 {'Name': 'tags', 'Type': 'ARRAY<STRUCT<key:STRING,value:STRING>>'},
 {'Name': 'last_updated', 'Type': 'STRING'}]

In [13]:
staging_zone_config = config["storage_zones"]["staging"]

staging_database = staging_zone_config["database"]
staging_table = staging_zone_config["table"]
staging_s3_location = staging_zone_config["s3_location"]

staging_glue_data_formats = glue_data_formats_mapping["parquet"]

staging_input_format = staging_glue_data_formats["input_format"]
staging_output_format = staging_glue_data_formats["output_format"]
staging_serde_info = staging_glue_data_formats["serde_info"]
staging_parameters = staging_glue_data_formats["parameters"]

In [14]:
staging_table_manager = GlueTableManager(
    table_type='EXTERNAL_TABLE',
    description='This is a sample table',
    database_name=staging_database,
    table_name=staging_table,
    columns=staging_glue_columns,
    location=staging_s3_location,
    input_format=staging_input_format,
    output_format=staging_output_format,
    serde_info=staging_serde_info,
    partition_keys=partition_keys, 
    parameters=staging_parameters
)

In [15]:
staging_table_manager.create_or_update_table()

{'ResponseMetadata': {'RequestId': '97ce08af-fbe0-426b-bb51-b924a3176a2f',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 07 Jun 2023 10:02:46 GMT',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': '97ce08af-fbe0-426b-bb51-b924a3176a2f'},
  'RetryAttempts': 0}}