The following code in this notebook was added to the 'mount_s3_and_get_data' notebook on the Databricks platform and used to clean the dataframes created by running that notebook.

## Clean df_pin dataframe

In [None]:
def add_nulls_to_dataframe_column(dataframe, column, value_to_replace):
    '''Converts matched values in column of dataframe to null based on expression'''
    dataframe = dataframe.withColumn(column, when(col(column).like(value_to_replace), None).otherwise(col(column)))
    return dataframe

In [None]:
# replace empty entries and entries with no relevant data in each column with Nones
# column names and values to change to null
columns_and_values_for_null = {
    "description": "No description available%",
    "follower_count": "User Info Error",
    "image_src": "Image src error.",
    "poster_name": "User Info Error",
    "tag_list": "N,o, ,T,a,g,s, ,A,v,a,i,l,a,b,l,e",
    "title": "No Title Data Available"
}
# loop through dictionary, calling function with dictionary values as arguments
for key, value in columns_and_values_for_null.items():
    df_pin = add_nulls_to_dataframe_column(df_pin, key, value)
# Perform the necessary transformations on the follower_count to ensure every entry is a number
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "k", "000"))
df_pin = df_pin.withColumn("follower_count", regexp_replace("follower_count", "M", "000000"))
# cast follower_count column to integer type
df_pin = df_pin.withColumn("follower_count", col("follower_count").cast('int'))
# convert save_location column to include only the save location path
df_pin = df_pin.withColumn("save_location", regexp_replace("save_location", "Local save in ", ""))
# rename the index column to ind
df_pin = df_pin.withColumnRenamed("index", "ind")
# reorder columns
new_pin_column_order = [
    "ind",
    "unique_id",
    "title",
    "description",
    "follower_count",
    "poster_name",
    "tag_list",
    "is_image_or_video",
    "image_src",
    "save_location",
    "category"
]
df_pin = df_pin.select(new_pin_column_order)

In [None]:
# display changes
df_pin.show()
df_pin.printSchema()

## Clean df_geo dataframe

In [None]:
# import types
from pyspark.sql.types import ArrayType, DoubleType
# define function for returning list containing two values
def combine_lat_and_long(latitude, longitude):
    return [latitude, longitude]
# define new user-defined function
new_func = udf(combine_lat_and_long, ArrayType(DoubleType()))
# apply new udf to combine latitude and longitude columns
df_geo = df_geo.withColumn("coordinates", new_func("latitude", "longitude"))
# drop the latitude and longitude columns
cols_to_drop = ("latitude", "longitude")
df_geo = df_geo.drop(*cols_to_drop)
# convert timestamp column from type string to type timestamp
df_geo = df_geo.withColumn("timestamp", to_timestamp("timestamp"))
# change column order
new_geo_column_order = [
    "ind",
    "country",
    "coordinates",
    "timestamp",
]
df_geo = df_geo.select(new_geo_column_order)

In [None]:
# display changes
df_geo.limit(50).display()
df_geo.printSchema()

ind,country,coordinates,timestamp
9455,British Indian Ocean Territory (Chagos Archipelago),"List(-82.9272, -150.346)",2022-03-15T01:46:32.000+0000
6814,British Indian Ocean Territory (Chagos Archipelago),"List(-86.5675, -149.565)",2022-09-02T11:34:28.000+0000
5111,British Indian Ocean Territory (Chagos Archipelago),"List(-83.7472, 8.65953)",2021-04-01T00:56:57.000+0000
10073,Antarctica (the territory South of 60 deg S),"List(-32.8885, -170.295)",2021-06-29T19:56:04.000+0000
2418,Antarctica (the territory South of 60 deg S),"List(-88.4642, -171.061)",2022-05-27T11:30:59.000+0000
5162,Antarctica (the territory South of 60 deg S),"List(-71.6607, -149.206)",2019-09-27T19:06:43.000+0000
1335,Antarctica (the territory South of 60 deg S),"List(-77.9931, -175.682)",2022-03-19T17:29:42.000+0000
9185,Antarctica (the territory South of 60 deg S),"List(-10.3764, -22.9809)",2019-10-06T18:12:55.000+0000
9335,Antarctica (the territory South of 60 deg S),"List(-88.4642, -171.061)",2020-11-14T23:42:22.000+0000
6749,Antarctica (the territory South of 60 deg S),"List(-88.4642, -171.061)",2018-04-16T07:39:46.000+0000


## Clean df_user dataframe

In [None]:
# create new column for full name
df_user = df_user.withColumn("user_name", concat_ws(" ", "first_name", "last_name"))
# drop the first_name and last_name columns
cols_to_drop = ("first_name", "last_name")
df_user = df_user.drop(*cols_to_drop)
# convert date_joined column from type string to type timestamp
df_user = df_user.withColumn("date_joined", to_timestamp("date_joined"))
# change column order
new_user_column_order = [
    "ind",
    "user_name",
    "age",
    "date_joined",
]
df_user = df_user.select(new_user_column_order)

In [None]:
# display changes
df_user.limit(50).display()
df_user.printSchema()

ind,user_name,age,date_joined
2015,Christopher Bradshaw,27,2016-03-08T13:38:37.000+0000
10673,Alexander Cervantes,59,2017-05-12T21:22:17.000+0000
1857,Christopher Hamilton,48,2016-02-27T16:57:44.000+0000
10020,Christopher Hawkins,45,2016-09-15T06:02:53.000+0000
2041,Christopher Campbell,35,2015-10-22T22:42:23.000+0000
7031,Christopher Anderson,48,2016-06-13T17:09:14.000+0000
6398,Christina Davenport,39,2016-06-29T20:43:59.000+0000
3599,Alexandria Alvarado,20,2015-10-23T04:13:23.000+0000
4256,Alexandria Alvarado,20,2015-10-23T04:13:23.000+0000
1901,Michelle Richardson,44,2016-12-18T16:05:39.000+0000
