In [1]:
import boto3
import json
from botocore.exceptions import ClientError

In [2]:
sqs = boto3.client("sqs")

In [3]:
queue_url = "https://sqs.us-east-1.amazonaws.com/491085411627/datalake-iceberg-dev-queue-dlq.fifo"

In [5]:
message_json = {
    "version":"0",
    "id":"48b21f9e-27fc-f7e4-f24b-20f4bd8ef0ac",
    "detail-type":"Object Created",
    "source":"aws.s3",
    "account":"010928219150",
    "time":"2024-09-05T20:46:10Z",
    "region":"us-east-1",
    "resources":[
        "arn:aws:s3:::raw-datalake-iceberg-2f88fdbce7e3"
    ],
    "detail":{
        "version":"0",
        "bucket":{
            "name":"raw-datalake-iceberg-2f88fdbce7e3"
        },
        "object":{
            "key":"icebergdatalake/sales/shop/files/House price.parquet",
            "size":9251,
            "etag":"1bbebf3d8830aebb7e23df098cdfaa30",
            "sequencer":"0066DA1892240E109C"
        },
        "request-id":"NB2SZ8NA4AMQPP46",
        "requester":"010928219150",
        "source-ip-address":"167.60.154.252",
        "reason":"PutObject"
    }
}

In [9]:
import uuid


'0753c0f4-63cd-4061-b734-06262a5a51e9'

In [11]:
error_code = "METADATA_FILE_NOT_FOUND"
message_json["error_code"] = error_code
message = json.dumps(message_json)
message_group_id = "datalake-iceberg"
message_ded_id = str(uuid.uuid4())
try:
    sqs.send_message(
        QueueUrl=queue_url, 
        MessageBody=message, 
        MessageGroupId=message_group_id, 
        MessageDeduplicationId=message_ded_id
    )
except ClientError as e:
    print(f"[ERROR] sending message to SQS-DLQ: {e}")

In [2]:
def create_bucket(bucket_name):
    s3 = boto3.client('s3')
    
    try:
        s3.create_bucket(
            Bucket=bucket_name
        )
        print(f"Bucket '{bucket_name}' created successfully.")
    except ClientError as e:
        if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
            print(f"Bucket '{bucket_name}' already exists.")
        else:
            raise e

    return s3

# Get the AccountId
account_id = boto3.client('sts').get_caller_identity().get('Account')
bucket_name = f"data-bucket-{account_id}"
s3_client = create_bucket(bucket_name)

Bucket 'data-bucket-981702786641' created successfully.


In [3]:
path = f"s3://{bucket_name}/iceberg_test/"
temp_path = f"s3://{bucket_name}/iceberg_test_temp/"
glue_database = "iceberg_catalog"
glue_catalog_uri = f"s3://{bucket_name}/catalog"  # Replace with your Glue Catalog URI
table_name = "iceberg_catalog.iceberg_data"

In [4]:
glue_client = boto3.client("glue")

try:
    glue_client.get_database(Name=glue_database)
    print(f"Database {glue_database} already exists")
except ClientError as err:
    glue_client.create_database(DatabaseInput={'Name': glue_database})
    print(f"Database {glue_database} created")

Database iceberg_catalog created


In [5]:
comments = {
    "vendorid": "Identifier for the vendor.",
    "pickup_datetime": "Date and time when the trip started.",
    "dropoff_datetime": "Date and time when the trip ended.",
    "ratecode": "Code indicating the rate type.",
    "passenger_count": "Number of passengers in the trip.",
    "trip_distance": "Distance of the trip in miles.",
    "fare_amount": "Fare amount charged for the trip.",
    "total_amount": "Total amount charged including extras.",
    "payment_type": "Code indicating the payment method."
}

In [6]:
# Define the Iceberg schema
schema = Schema(
    NestedField(field_id=1, name="vendorid", field_type=StringType(), required=False, doc=comments['vendorid']),
    NestedField(field_id=2, name="pickup_datetime", field_type=TimestampType(), required=False, doc=comments['pickup_datetime']),
    NestedField(field_id=3, name="dropoff_datetime", field_type=TimestampType(), required=False, doc=comments['dropoff_datetime']),
    NestedField(field_id=4, name="ratecode", field_type=IntegerType(), required=False, doc=comments['ratecode']),
    NestedField(field_id=5, name="passenger_count", field_type=IntegerType(), required=False, doc=comments['passenger_count']),
    NestedField(field_id=6, name="trip_distance", field_type=DoubleType(), required=False, doc=comments['trip_distance']),
    NestedField(field_id=7, name="fare_amount", field_type=DoubleType(), required=False, doc=comments['fare_amount']),
    NestedField(field_id=8, name="total_amount", field_type=DoubleType(), required=False, doc=comments['total_amount']),
    NestedField(field_id=9, name="payment_type", field_type=IntegerType(), required=False, doc=comments['payment_type']),
    NestedField(field_id=10, name="year", field_type=IntegerType(), required=False),
    NestedField(field_id=11, name="month", field_type=IntegerType(), required=False),
    NestedField(field_id=12, name="type", field_type=StringType(), required=False)
)

In [7]:
# Define the partitioning specification with year, month, and day
partition_spec = PartitionSpec(
    PartitionField(field_id=10, source_id=10, transform=IdentityTransform (), name="year"),
    PartitionField(field_id=11, source_id=11, transform=IdentityTransform (), name="month"),
    PartitionField(field_id=12, source_id=12, transform=IdentityTransform(), name="type"),
)

In [8]:
# Define the sorting order using validTimeUtc field
sort_order = SortOrder(SortField(source_id=3, transform=IdentityTransform()))

In [9]:
# Instantiate glue catalog
catalog = load_catalog("glue", **{"type": "glue"})

In [10]:
try:
    tbl = catalog.load_table(table_name)
    schema = tbl.schema()
except NoSuchTableError:
    # If the table doesn't exist, create it
    tbl = catalog.create_table(
            identifier=table_name,
            location=glue_catalog_uri,
            schema=schema,
            partition_spec=partition_spec,
            sort_order=sort_order,
            properties={
                "Description": "Sample table data"
            }
    )
    print(f"Iceberg table: {table_name} created using AWS Glue Catalog.")
except Exception as e:
    print(f"Error loading table {table_name}: {e}")
    raise e
else:
    print(f"Table {table_name} already exists.")

Iceberg table: iceberg_catalog.iceberg_data created using AWS Glue Catalog.


In [12]:
part = ds.partitioning(pa.schema([("year", pa.int32()),
                                  ("month", pa.int32()),
                                  ("type", pa.string())]))

df = pq.read_table(
    source="./data_taxti0004.parquet",
    coerce_int96_timestamp_unit="us",
    partitioning=part,
    schema=schema.as_arrow(),
)

tbl.append(df)

In [None]:
convert_options = csv.ConvertOptions(
    column_types=schema.as_arrow()
)

In [None]:
df_csv = csv.read_csv("data_taxti.csv", convert_options=convert_options)

In [None]:
tbl.append(df_csv)

In [17]:
file_metadata = tbl.metadata_location
file_metadata = "/".join(file_metadata.split("/")[3:])

s3 = boto3.client('s3')
s3.download_file(bucket_name, file_metadata, "metadata_file.json")

with open ("metadata_file.json", "r") as f:
    data = json.load(f)

data

{'location': 's3://data-bucket-010928219262/catalog',
 'table-uuid': '6bd7286d-f3a3-43e2-a459-2f0fdce8039e',
 'last-updated-ms': 1725399054834,
 'last-column-id': 12,
 'schemas': [{'type': 'struct',
   'fields': [{'id': 1,
     'name': 'vendorid',
     'type': 'string',
     'required': False,
     'doc': 'Identifier for the vendor.'},
    {'id': 2,
     'name': 'pickup_datetime',
     'type': 'timestamp',
     'required': False,
     'doc': 'Date and time when the trip started.'},
    {'id': 3,
     'name': 'dropoff_datetime',
     'type': 'timestamp',
     'required': False,
     'doc': 'Date and time when the trip ended.'},
    {'id': 4,
     'name': 'ratecode',
     'type': 'int',
     'required': False,
     'doc': 'Code indicating the rate type.'},
    {'id': 5,
     'name': 'passenger_count',
     'type': 'int',
     'required': False,
     'doc': 'Number of passengers in the trip.'},
    {'id': 6,
     'name': 'trip_distance',
     'type': 'double',
     'required': False,
    

In [43]:
schema_json = json.dumps(data["schemas"][0])
schema_data = Schema.model_validate_json(schema_json)

partition_spec_json = json.dumps(data["partition-specs"][0])
partition_spec_data = PartitionSpec.model_validate_json(partition_spec_json)

In [55]:
tbl = catalog.create_table(
        identifier="iceberg_catalog.iceberg_newdata",
        location=data["location"],
        schema=schema_data,
        partition_spec=partition_spec_data,
        properties=data["properties"]
)
print(f"Iceberg table: {table_name} created using AWS Glue Catalog.")

Iceberg table: iceberg_catalog.iceberg_data created using AWS Glue Catalog.


In [16]:
example_metadata = {
   "schemas":[
      {
         "type":"struct",
         "fields":[
            {
               "id":1,
               "name":"vendorid",
               "type":"string",
               "required":False,
               "doc":"Identifier for the vendor."
            },
            {
               "id":2,
               "name":"pickup_datetime",
               "type":"timestamp",
               "required":False,
               "doc":"Date and time when the trip started."
            },
            {
               "id":3,
               "name":"dropoff_datetime",
               "type":"timestamp",
               "required":False,
               "doc":"Date and time when the trip ended."
            },
            {
               "id":4,
               "name":"ratecode",
               "type":"int",
               "required":False,
               "doc":"Code indicating the rate type."
            },
            {
               "id":5,
               "name":"passenger_count",
               "type":"int",
               "required":False,
               "doc":"Number of passengers in the trip."
            },
            {
               "id":6,
               "name":"trip_distance",
               "type":"double",
               "required":False,
               "doc":"Distance of the trip in miles."
            },
            {
               "id":7,
               "name":"fare_amount",
               "type":"double",
               "required":False,
               "doc":"Fare amount charged for the trip."
            },
            {
               "id":8,
               "name":"total_amount",
               "type":"double",
               "required":False,
               "doc":"Total amount charged including extras."
            },
            {
               "id":9,
               "name":"payment_type",
               "type":"int",
               "required":False,
               "doc":"Code indicating the payment method."
            },
            {
               "id":10,
               "name":"year",
               "type":"int",
               "required":False
            },
            {
               "id":11,
               "name":"month",
               "type":"int",
               "required":False
            },
            {
               "id":12,
               "name":"type",
               "type":"string",
               "required":False
            }
         ],
         "schema-id":0,
         "identifier-field-ids":[
            
         ]
      }
   ],
   "partition-specs":[
      {
         "spec-id":0,
         "fields":[
            {
               "source-id":10,
               "field-id":1000,
               "transform":"identity",
               "name":"year"
            },
            {
               "source-id":11,
               "field-id":1001,
               "transform":"identity",
               "name":"month"
            },
            {
               "source-id":12,
               "field-id":1002,
               "transform":"identity",
               "name":"type"
            }
         ]
      }
   ],
   "properties":{
      "Description":"Sample table data"
   }
}

In [17]:
client = boto3.client('ssm')

response = client.put_parameter(
    Name='MetadataTableSample',
    Description='Metadata table',
    Value=json.dumps(example_metadata, indent=2),
    Type='String',
    Overwrite=True
)

In [8]:
client = boto3.client('ssm')

response = client.get_parameter(
    Name='MetadataTableSample',
    WithDecryption=False
)

In [9]:
meta_data = json.loads(response['Parameter']['Value'])

In [10]:
schema_json = json.dumps(meta_data["schemas"][0])
schema_data = Schema.model_validate_json(schema_json)

partition_spec_json = json.dumps(meta_data["partition-specs"][0])
partition_spec_data = PartitionSpec.model_validate_json(partition_spec_json)

In [12]:
# Instantiate glue catalog
catalog = load_catalog("glue", **{"type": "glue"})

In [18]:
tbl = catalog.create_table(
        identifier="iceberg_catalog.iceberg_newborndata",
        location=f"s3://{bucket_name}/catalog",
        schema=schema_data,
        partition_spec=partition_spec_data,
        properties=meta_data["properties"]
)
print("Iceberg table: created using AWS Glue Catalog.")

Iceberg table: created using AWS Glue Catalog.


In [29]:
df = pq.read_table(
    source="./data_taxti0001.parquet",
    coerce_int96_timestamp_unit="us",
    partitioning=partition_spec_data,
    schema=schema_data.as_arrow(),
)

tbl.append(df)