In [0]:
# pyspark functions
from pyspark.sql.functions import *
# URL processing
import urllib

In [0]:
dbutils.fs.ls("/FileStore/tables")

In [0]:
# Specify file type to be csv
file_type = "csv"
# Indicates file has first row as the header
first_row_is_header = "true"
# Indicates file has comma as the delimeter
delimiter = ","
# Read the CSV file to spark dataframe
aws_keys_df = spark.read.format(file_type)\
.option("header", first_row_is_header)\
.option("sep", delimiter)\
.load("/FileStore/tables/authentication_credentials.csv")

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
# AWS S3 bucket name
AWS_S3_BUCKET = "user-0e0816526d11-bucket"
# Mount name for the bucket
MOUNT_NAME = "/mnt/user-0e0816526d11-bucket"
# Source url
SOURCE_URL = "s3n://{0}:{1}@{2}".format(ACCESS_KEY, ENCODED_SECRET_KEY, AWS_S3_BUCKET)
# Mount the drive
dbutils.fs.mount(SOURCE_URL, MOUNT_NAME)

In [0]:
# list the topics stored on the mounted S3 bucket
display(dbutils.fs.ls("/mnt/user-0e0816526d11-bucket/topics"))

path,name,size,modificationTime
dbfs:/mnt/user-0e0816526d11-bucket/topics/0e0816526d11.geo/,0e0816526d11.geo/,0,1702493450799
dbfs:/mnt/user-0e0816526d11-bucket/topics/0e0816526d11.pin/,0e0816526d11.pin/,0,1702493450799
dbfs:/mnt/user-0e0816526d11-bucket/topics/0e0816526d11.user/,0e0816526d11.user/,0,1702493450799


In [0]:
# create path to topic files
file_location = "/mnt/user-0e0816526d11-bucket/topics/0e0816526d11.pin/partition=0/*.json"
# specify file type
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# load JSONs from mounted S3 bucket to Spark dataframe
df_pin = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(file_location)

# cleaning Pinterest dataframe
df_pin = df_pin.replace("No description available Story format", None)
df_pin = df_pin.replace("null", None)
df_pin = df_pin.replace("User Info Error", None)
df_pin = df_pin.replace("Image src error", None)
df_pin = df_pin.replace("N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e", None)
df_pin = df_pin.replace("No Title Data Available", None)

# transforms the 'follower_count' col from string to integer and checks if the value matches a pattern that contains either k or M 
df_pin = df_pin.withColumn("follower_count", when(
    col("follower_count").rlike("\d+k"),(regexp_extract(col("follower_count"),"(\d+)",1).cast("integer") * 1000)).when(col("follower_count").rlike("\d+M"),(regexp_extract(col("follower_count"), "(\d+)", 1).cast("integer") * 1000000))
# otherwise, if it doesn't matches it leaves the full integer value
.otherwise(col("follower_count").cast("integer")))

# cleaning the 'save_location' column by removing 'Local save in ' text and just leaving the path for the 'save_location' column
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))

# Renaming the column
df_pin = df_pin.withColumnRenamed("index", "ind")

# rearranging the Pinterest columns
reorder_col = ["ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category"]
df_pin = df_pin.select(reorder_col)

# Show the table of the Pinterest data
df_pin.show()

In [0]:
file_location = "/mnt/user-0e0816526d11-bucket/topics/0e0816526d11.geo/partition=0/*.json"
# specify file type
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# load JSONs from mounted S3 bucket to Spark dataframe
df_geo = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(file_location)

# created a new column containing latitude and longitude
df_geo = df_geo.withColumn("coordinates", array(col("latitude"), col("longitude")))

# dropping columns
df_geo = df_geo.drop("latitude", "longitude")

df_geo = df_geo.withColumn("timestamp", to_timestamp(df_geo["timestamp"], "yyyy-MM-dd HH:mm:ss"))

geo_reorder_col = ["ind", "country", "coordinates", "timestamp"]
df_geo = df_geo.select(geo_reorder_col)

# Show the table of the Geolocation data
df_geo.show()

In [0]:
file_location = "/mnt/user-0e0816526d11-bucket/topics/0e0816526d11.user/partition=0/*.json"
# specify file type
file_type = "json"
# Ask Spark to infer the schema
infer_schema = "true"
# load JSONs from mounted S3 bucket to Spark dataframe
df_user = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .load(file_location)

df_user = df_user.withColumn("user_name", concat(col("first_name"),lit(" "),col("last_name")))
df_user = df_user.drop("first_name","last_name","index")
df_user = df_user.withColumn("date_joined", to_timestamp(df_user["date_joined"], "yyyy-MM-dd HH:mm:ss"))

user_reorder_col = ["ind","user_name","age","date_joined"]
df_user = df_user.select(user_reorder_col)

# Show the table of the User data
df_user.show()

In [0]:
dbutils.fs.unmount("/mnt/user-0e0816526d11-bucket")