In [1]:
# Notebook that shows how you can connect sagemaker to deltalake through the "Athena" connector in SM data wrangler

In [2]:
# install needed libraries

In [3]:
pip install delta-spark

You should consider upgrading via the '/home/ec2-user/anaconda3/envs/python3/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [4]:
# get spark session that automatically downloads delta jars
import pyspark
from delta import *

builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [5]:
# load the input csv file
df = spark.read.csv("./SampleSuperstore.csv", header = 'True', inferSchema = True)

In [6]:
df.limit(5).toPandas()

Unnamed: 0,ShipMode,Segment,Country,City,State,PostalCode,Region,Category,Sub-Category,Sales,Quantity,Discount,Profit
0,Second Class,Consumer,United States,Henderson,Kentucky,42420,South,Furniture,Bookcases,261.96,2,0.0,41.9136
1,Second Class,Consumer,United States,Henderson,Kentucky,42420,South,Furniture,Chairs,731.94,3,0.0,219.582
2,Second Class,Corporate,United States,Los Angeles,California,90036,West,Office Supplies,Labels,14.62,2,0.0,6.8714
3,Standard Class,Consumer,United States,Fort Lauderdale,Florida,33311,South,Furniture,Tables,957.5775,5,0.45,-383.031
4,Standard Class,Consumer,United States,Fort Lauderdale,Florida,33311,South,Office Supplies,Storage,22.368,2,0.2,2.5164


In [7]:
# Write dataframe in delta format
df.write.format("delta").mode("overwrite").save("./delta/") 

In [8]:
# Generate MANIFEST file for Athena/Catalog
deltaTable = DeltaTable.forPath(spark, "./delta/")
deltaTable.generate("symlink_format_manifest")

In [9]:
# Update local manifest file to link to S3
s3_bucket = "mybucket"
s3_prefix = "datawrangler/athena/deltalake/delta/"
def point_manifestfile_to_s3(fn, s3_bucket, s3_prefix):
    with open(fn) as f:
        line = f.readlines()[0]
        newline = line[line.find("part"):]
        newline = "s3://" + s3_bucket + "/" + s3_prefix + newline
        #print(newline)
        f.close()
    f = open(fn, "w")
    f.write(newline)
    f.close()
point_manifestfile_to_s3("./delta/_symlink_format_manifest/manifest", s3_bucket, s3_prefix)

In [10]:
# Upload local files to S3 bucket
import os
import boto3

s3_resource = boto3.resource("s3", region_name="us-east-1")

def upload_objects(bucket_name, prefix, root_path):
    try:
        my_bucket = s3_resource.Bucket(bucket_name)

        for path, subdirs, files in os.walk(root_path):
            path = path.replace("\\","/")
            directory_name = path.replace(root_path,"")
            for file in files:
                if directory_name != "":
                    my_bucket.upload_file(os.path.join(path, file), prefix + directory_name+'/'+file)
                else:
                    my_bucket.upload_file(os.path.join(path, file), prefix +file)

    except Exception as err:
        print(err)
upload_objects("mybucket", "datawrangler/athena/deltalake/delta/", "./delta/")

In [11]:
# Run Athena now

In [12]:
import boto3
import time
import json
import pprint
import sys

# Defaults
query_output = 's3://mybucket/datawrangler/athena/deltalake/delta/outputs'
pp = pprint.PrettyPrinter(indent=2)
queryparams = {}
queryparams['execution_id']=''
athena = boto3.client('athena')

# functions
# queryparams is mutable, so that execution_id has to be returned to the caller for further processing
def run_athena_query (query, queryparams):
    print("Executing query:\n{0}".format(query))
    response = athena.start_query_execution(
        QueryString=query,
        ResultConfiguration={
            'OutputLocation': query_output
        }
    )
    execution_id = response['QueryExecutionId']
    queryparams['execution_id'] = execution_id
    status = ''
    while True:
        stats = athena.get_query_execution(QueryExecutionId=execution_id)
        status = stats['QueryExecution']['Status']['State']
        if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
            return status
        time.sleep(0.2)  # 200ms

# Print the results of the query execution
def print_results(execution_id):
    results = athena.get_query_results(QueryExecutionId=execution_id)
    print (json.dumps(results, sort_keys=True, indent=4))

def run_athena():
    query = r'''CREATE EXTERNAL TABLE IF NOT EXISTS default.superstore2 ( 
shipmode STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postalcode int,
region STRING,
category STRING,
subcategory STRING,
sales double,
quantity int,
discount double,
profit double
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://temp-rp-2020/datawrangler/athena/deltalake/delta2/_symlink_format_manifest/'
    '''
    # 'org.openx.data.jsonserde.JsonSerDe'
    queryparams['execution_id']=''
    if run_athena_query(query, queryparams) !=  'SUCCEEDED' :
        print (ret)
        sys.exit(1)

    print_results(queryparams['execution_id'])

    # Now interrogate the data
    query = r'''
    SELECT
        * 
    FROM default.superstore2
    '''

    queryparams['execution_id']=''
    ret = run_athena_query(query, queryparams)
    if ret !=  'SUCCEEDED' :
        print (ret)
        sys.exit(1)
    print_results(queryparams['execution_id'])

# end functions
run_athena()

Executing query:
CREATE EXTERNAL TABLE IF NOT EXISTS default.superstore2 ( 
shipmode STRING,
segment STRING,
country STRING,
city STRING,
state STRING,
postalcode int,
region STRING,
category STRING,
subcategory STRING,
sales double,
quantity int,
discount double,
profit double
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://temp-rp-2020/datawrangler/athena/deltalake/delta2/_symlink_format_manifest/'
    
{
    "ResponseMetadata": {
        "HTTPHeaders": {
            "connection": "keep-alive",
            "content-length": "108",
            "content-type": "application/x-amz-json-1.1",
            "date": "Sun, 05 Dec 2021 23:38:18 GMT",
            "x-amzn-requestid": "e0d44558-e102-4c4e-a085-ce28b7b6264a"
        },
        "HTTPStatusCode": 200,
        "RequestId": "e0d44558-e102-4c

In [13]:
# Your Athena table is now ready. You can go to SageMaker Data Wrangler and connect to this delta file through Athena