# read s3 delta table

the following code can be used to read data from data lake table stored on s3 in the delta file format (similar to parquet). 

we will read the dataframe using pyspark.

In [1]:
import os
import ast
import logging
import pprint
import yaml
import json
import time
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.column import *
import datetime as dt
import pandas as pd
import numpy as np

import boto3
import pprint
import botocore.session
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr

with open("./../creds.json", "r") as g:
    creds = json.load(g)
    g.close()

pp = pprint.PrettyPrinter(indent = 1)
# print("creds.json keys: ")
# pp.pprint([k for k in creds.keys()])

spark_host = creds["spark_host"]
spark_port = creds["spark_port"]
aws_client = creds["aws-access-key"]
aws_secret = creds["aws-secret-access-key"]
extra_jar_list = creds["extra_jar_list"]
print("imported modules.")

s3 = boto3.client("s3")
athena = boto3.client("athena")
glue = boto3.client("glue")
my_lambda = boto3.client("lambda")
dynamo = boto3.client("dynamodb")
dynamo_resource = boto3.resource("dynamodb")
print("connected to boto3 clients.")

imported modules.
connected to boto3 clients.


## boto3

In [6]:
# s3 bucket
bucket_name = "reddit-streaming-stevenhurwitt-new"
reddit_bucket = s3.list_objects_v2(Bucket = bucket_name)
print(reddit_bucket)

# dynamo
table = dynamo_resource.Table("aws")
print(table)

In [7]:
my_folders = np.unique([r["Key"].split("/")[0] for r in reddit_bucket["Contents"]])
print(my_folders)

['BikiniBottomTwitter' 'BikiniBottomTwitter_clean' 'BlackPeopleTwitter'
 'BlackPeopleTwitter_clean' 'ProgrammerHumor']


## delta table s3 path

In [2]:
bucket = "reddit-streaming-stevenhurwitt-new"
subreddit = "aws"
filepath = os.path.join("s3a://", bucket, subreddit + "_clean")
print(filepath)

s3a://reddit-streaming-stevenhurwitt-new/aws_clean


## spark

In [8]:
try:
    spark = SparkSession.builder.appName("twitter") \
        .master("spark://{}:{}".format(spark_host, spark_port)) \
        .config("spark.executor.memory", "2048m") \
        .config("spark.executor.cores", "2") \
        .config("spark.streaming.concurrentJobs", "8") \
        .config("spark.local.dir", "/opt/workspace/tmp/driver/") \
        .config("spark.worker.dir", "/opt/workspace/tmp/executor/") \
        .config("spark.eventLog.enabled", "true") \
        .config("spark.eventLog.dir", "/opt/workspace/tmp/events/") \
        .config("spark.sql.debug.maxToStringFields", 1000) \
        .config("spark.jars.packages", extra_jar_list) \
        .config("spark.hadoop.fs.s3a.access.key", aws_client) \
        .config("spark.hadoop.fs.s3a.secret.key", aws_secret) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .config("spark.hadoop.fs.s3a.buffer.dir", "/opt/workspace/tmp/blocks") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
        .enableHiveSupport() \
        .getOrCreate()

    sc = spark.sparkContext
    # index = 0
    sc.setLogLevel("WARN")
    # sc.setLocalProperty("spark.scheduler.pool", "pool{}".format(str(index)))
    print("imported modules, created spark.")

except Exception as f:
    print("EXCEPTION: ")
    print(f)

:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/steven/.ivy2/cache
The jars for the packages stored in: /home/steven/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.hadoop#hadoop-common added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-client added as a dependency
io.delta#delta-core_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b0825680-98be-4e7b-9ccf-7d1cd5a70d9a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.1 in central
	found org.apache.kafka#kafka-clients;2.8.1 in local-m2-cache
	found org.lz4#lz4-java;1.8.0 in local-m2-cache
	found org.xerial.snappy#snappy-java;1.1.8.4 in local-m2-cache
	found org.slf4j#slf4j-api;1.7.32 in local-m2-cache
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in local-m2-cache
	fo

EXCEPTION: 
Java gateway process exited before sending its port number


In [30]:
try:
    df = spark.read.format("delta").option("header", True).load(filepath)
    df.show()

except Exception as g:
    print("EXCEPTION: {}".format(g))

EXCEPTION: name 'spark' is not defined
