In [0]:
# how do we check for something bring mounted?

#dbutils.fs.unmount("/mnt/user-0af8d0adfd13-bucket") # this will unmount if it is mounted to prevent errors

In [0]:
# import pyspark functions
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import URL processing
import urllib

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
##### we don't need this in this notebook, but I think we should put it in our final version for Kafka batch processing

'''
# AWS S3 bucket name
AWS_S3_BUCKET = "user-0af8d0adfd13-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/user-0af8d0adfd13-bucket"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)

# checks for already mounted
def sub_unmount(str_path):
    if any(mount.mountPoint == str_path for mount in dbutils.fs.mounts()):
        dbutils.fs.unmount(str_path)

sub_unmount('/mnt/user-0af8d0adfd13-bucket')

# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)
'''

In [0]:
''' This was just a test
df = spark \
.readStream \
.format('kinesis') \
.option('streamName','streaming-0af8d0adfd13-geo') \
.option('initialPosition','earliest') \
.option('region','us-east-1') \
.option('awsAccessKey', ACCESS_KEY) \
.option('awsSecretKey', SECRET_KEY) \
.load()
'''

Get data from Kinesis for each of our streams, namely 'geo', 'pin' and 'user'.

In [0]:
def get_stream(stream_suffix):
    '''Gets the Kinesis stream with the suffix supplied in name

    Parameter: name (string)
    Returns: stream that needs serialised and which requires a struct
    
    '''
    return spark \
        .readStream \
        .format('kinesis') \
        .option('streamName',f'streaming-0af8d0adfd13-{stream_suffix}') \
        .option('initialPosition','earliest') \
        .option('region','us-east-1') \
        .option('awsAccessKey', ACCESS_KEY) \
        .option('awsSecretKey', SECRET_KEY) \
        .load()


stream_geo = get_stream('geo')
stream_pin = get_stream('pin')
stream_user = get_stream('user')


We need to deserialiase the BLOB in the data column

In [0]:
''' doing this in the function now
df_user_cast = df_user.selectExpr("CAST(data as STRING)")
df_geo_cast = df_geo.selectExpr("CAST(data as STRING)")
df_pin_cast = df_pin.selectExpr("CAST(data as STRING)")
'''

Once we deserialise the data column, we then need to give it a schema as the json object will [only have string values](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.from_json.html). When we were doing this from Kafka we inferred the schema,  but here we need to be more explicit as [we discovered in this article](https://medium.com/road-to-data-engineering/stream-data-from-kinesis-to-databricks-with-pyspark-813c516b4233) and [this article](https://medium.com/@uzzaman.ahmed/tk-tk-understanding-pysparks-structtype-and-structfield-for-complex-data-structures-98cdb2ab242b). With reference also to [this older article](https://www.databricks.com/blog/2017/08/09/apache-sparks-structured-streaming-with-amazon-kinesis-on-databricks.html).

In [0]:
#### Struct definitions and function for creating dataframes

struct_pin = StructType().add("index", IntegerType()) \
    .add("unique_id", StringType()) \
    .add("title", StringType()) \
    .add("description", StringType()) \
    .add("poster_name", StringType()) \
    .add("follower_count", StringType()) \
    .add("tag_list", StringType()) \
    .add("is_image_or_video", StringType()) \
    .add("image_src", StringType()) \
    .add("downloaded", IntegerType()) \
    .add("save_location", StringType()) \
    .add("category", StringType())

struct_user = StructType().add("ind", IntegerType())\
    .add("first_name", StringType())\
    .add("last_name", StringType())\
    .add("age", StringType())\
    .add("date_joined", TimestampType())

struct_geo = StructType().add("ind", IntegerType())\
    .add("timestamp", TimestampType())\
    .add("latitude", FloatType())\
    .add("longitude", FloatType())\
    .add("country", StringType())

def create_dataframe(stream_name, struct_name):
    ''' Creates a dataframe from the stream supplied using the struct values 

    Parameters:
        stream_name (string)
        struct_name (string)

    Returns:
        dataframe
    '''

    return stream_name.selectExpr("cast (data as STRING) jsonData").select(from_json("jsonData", struct_name).alias("our_data")).select("our_data.*")



In [0]:
# Create dataframes and display them

df_user = create_dataframe(stream_user, struct_user)
df_geo = create_dataframe(stream_geo, struct_geo)
df_pin = create_dataframe(stream_pin, struct_pin)

# display(df_user)
# display(df_geo)
# display(df_pin)

We have made a mistake and we can't remove the data in the Kinesis stream. But we can clean it out by removing lines with nulls in them. So let's do that!

In [0]:
## to remove duplicate values erroneously produced due to a mistake in the ingestion script
## this will be legacy but needed for testing

df_geo = df_geo.dropna(how='all', subset=['timestamp', 'latitude', 'longitude', 'country'])
df_pin = df_pin.dropna(how='all', subset=['unique_id', 'title', 'description', 'poster_name'])


In [0]:
display(df_pin)

In [0]:
## Clean the streaming data in the same way you have previously cleaned the batch data.

# so we need to bring in the cleaning functions that are in the TEST file (which annoyingly is the main file for our work for the batch processing)

General cleaning functions

In [0]:
def clean_column(df, column_name, string, debug=True): # default to None for replacement
    ''' cleans a column of a dataframe, using the LIKE string format. Removes the entire entry and replaces with None
    Arguments:
        df: DataFrame
        column_name: string
        string: string formatted as a LIKE condition
        debug: shows state of the table before and after
    
    '''
        
    return df.withColumn(column_name,when(df_pin[column_name].like(string), None).otherwise(df_pin[column_name])) 
    

def clean_column_regex(df, column_name, regex, replacement, debug=True):
     ''' Replaces the regex the supplied element

     Arguments:
          df: Dataframe
          column_name: string
          regex: regular expression
          replacement: string

     Returns: Dataframe
     '''
    
     return df.withColumn(column_name, regexp_replace(column_name, regex, replacement)) 


Start with df_pin

In [0]:


df_pin = clean_column(df_pin, "description", "No description available%")
df_pin = clean_column(df_pin, "poster_name", "User Info Error%")
df_pin = clean_column(df_pin, "image_src", "Image src error%") # prob need to use a regex to keep the right ones here
df_pin = clean_column(df_pin, "save_location", "%.jpg") # still need to have a strategy for random text - need a negative somewhere
df_pin = clean_column(df_pin, "tag_list", "N,o, ,T,a,g,s,%")
df_pin = clean_column(df_pin, "title", "No Title Data%")

df_pin = clean_column_regex(df_pin, "save_location", "Local save in ", "") 
df_pin = clean_column_regex(df_pin, "follower_count", "[kK]", "000") 
df_pin = clean_column_regex(df_pin, "follower_count", "[mM]", "000000")
df_pin = df_pin.withColumn("follower_count", df_pin.follower_count.cast("int"))
#df_pin = df_pin.withColumn("index", df_pin.follower_count.cast("int")) # convert to int for index, as was string

clean_column(df_pin, "save_location", "%pinimg%")
clean_column(df_pin, "save_location", "image")

## finally, there are now garbage rows, so let's remove these
df_pin = df_pin.dropna(how='all', subset=['title', 'description', 'poster_name'])


df_pin = df_pin.withColumnRenamed('index', 'ind') # change name
df_pin = df_pin.select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category") # change order
df_pin.printSchema

Now df_geo

In [0]:
from pyspark.sql.functions import *

# df_geo = df_geo.withColumn("coordinates", concat(df_geo["latitude"], ', ', df_geo["longitude"])) needs to be an array

df_geo = df_geo.select(df_geo.ind, df_geo.country ,array(df_geo.latitude, df_geo.longitude).alias("coordinates"), df_geo.timestamp)
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))
df_geo.printSchema()

Finally df_users

In [0]:
df_user = df_user.withColumn("user_name", concat(df_user["first_name"], lit(" "), df_user["last_name"]))
df_user = df_user.drop('first_name', 'last_name')
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))
df_user = df_user.select(df_user.ind, df_user.user_name, df_user.age, df_user.date_joined)
df_user.printSchema

In [0]:
display(df_user)

In [0]:
display(df_pin)

In [0]:
display(df_geo)

Once the streaming data has been cleaned, you should save each stream in a Delta Table. You should save the following tables: <your_UserId>_pin_table, <your_UserId>_geo_table and <your_UserId>_user_table.

In [0]:
# df_pin.writeStream \
#   .format("delta") \
#   .outputMode("append") \
#   .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
#   .table("0af8d0adfd13_pin_table")

In [0]:
def write_dataframe(df, name):
    ''' Write stream dataframe in a delta table

    Parameters: 
        df (string): the dataframe name
        name: name that needs to be used, one of geo, user or pin
    '''

    df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table(f"0af8d0adfd13_{name}_table")
    

In [0]:
write_dataframe(df_pin, "pin")

In [0]:
write_dataframe(df_geo, "geo")


In [0]:
write_dataframe(df_user, "user")

In [0]:
display(df_geo)

ind,country,coordinates,timestamp
7528,Albania,"List(-89.9787, -173.293)",2020-08-28T03:52:47.000+0000
2863,Armenia,"List(-5.34445, -177.924)",2020-04-27T13:34:16.000+0000
5730,Colombia,"List(-77.015, -101.437)",2021-04-19T17:37:03.000+0000
8304,French Guiana,"List(-28.8852, -164.87)",2019-09-13T04:50:29.000+0000
8731,Aruba,"List(-83.104, -171.302)",2020-07-17T04:39:09.000+0000
1313,Maldives,"List(77.0447, 61.9119)",2018-06-26T02:39:25.000+0000
4315,Cote d'Ivoire,"List(-45.8508, 66.1003)",2019-12-15T03:51:28.000+0000
10794,Cocos (Keeling) Islands,"List(-89.5236, -154.567)",2022-01-01T02:26:50.000+0000
5494,Bulgaria,"List(-82.6768, -129.202)",2021-07-21T02:02:35.000+0000
5069,Azerbaijan,"List(-63.0063, -157.474)",2021-03-20T09:32:44.000+0000


In [0]:
#### testing purposes

df_geo.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") \
    .table("0af8d0adfd13_geo_table")