In [0]:

from pyspark.sql.session import SparkSession
import time
import boto3, botocore
import pandas as pd

# Return a data frame with the files in the source directory
def get_source_listing_df() -> pd.DataFrame:
    # Create a boto3 resource for S3 using anonymous credentials
    s3 = boto3.resource('s3', config=boto3.session.Config(signature_version=botocore.UNSIGNED))

    # Create a Bucket object
    bucket = s3.Bucket(TWEET_BUCKET_NAME)

    # List objects in the bucket
    objects = [obj.key for obj in bucket.objects.all()]

    # Convert the list of objects to a Pandas DataFrame
    df = pd.DataFrame(objects, columns=['File Name'])

    return df

# Show the contents of a file stored in S3
def show_s3_file_contents(filename: str) -> str:
    # Create a boto3 resource for S3 using anonymous credentials
    s3 = boto3.resource('s3', config=boto3.session.Config(signature_version=botocore.UNSIGNED))
    # Show the first record
    obj = s3.Object(TWEET_BUCKET_NAME, filename)
    data=obj.get()['Body'].read()
    return(data)

# This routine requires the paths defined in the includes notebook
# and it clears data from the previous run.
def clear_previous_run() -> bool:
    # delete previous run 
    dbutils.fs.rm(BRONZE_CHECKPOINT, True)
    dbutils.fs.rm(BRONZE_DELTA, True)
    dbutils.fs.rm(SILVER_CHECKPOINT, True)
    dbutils.fs.rm(SILVER_DELTA, True)
    dbutils.fs.rm(GOLD_CHECKPOINT, True)
    dbutils.fs.rm(GOLD_DELTA, True)
    return True

def stop_all_streams() -> bool:
    stopped = False
    for stream in spark.streams.active:
        stopped = True
        stream.stop()
    return stopped


def stop_named_stream(spark: SparkSession, namedStream: str) -> bool:
    stopped = False
    for stream in spark.streams.active:
        if stream.name == namedStream:
            stopped = True 
            stream.stop()
    return stopped

def wait_stream_start(spark: SparkSession, namedStream: str) -> bool:
    started = False
    count = 0
    if started == False and count <= 3:
        for stream in spark.streams.active:
            if stream.name == namedStream:
                started = True
            count += 1
        time.sleep(10)
    return started    

