In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
import urllib

aws_keys_df =  (spark.read.format("csv")
                .option("header", "true")
                .option("sep", ",")
                .load("/FileStore/tables/authentication_credentials.csv")
)

In [0]:
# Get the AWS access key and secret key from the spark dataframe
ACCESS_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Access key ID').collect()[0]['Access key ID']
SECRET_KEY = aws_keys_df.where(col('User name')=='databricks-user').select('Secret access key').collect()[0]['Secret access key']
# Encode the secrete key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

In [0]:
df_geo_kinesis = (
    spark
    .readStream
    .format('kinesis')
    .option('streamName','streaming-0e36c8cd403d-geo')
    .option('initialPosition','earliest')
    .option('region','us-east-1')
    .option('awsAccessKey', ACCESS_KEY)
    .option('awsSecretKey', SECRET_KEY)
    .load()
)

df_pin_kinesis = (
    spark
    .readStream
    .format('kinesis')
    .option('streamName','streaming-0e36c8cd403d-pin')
    .option('initialPosition','earliest')
    .option('region','us-east-1')
    .option('awsAccessKey', ACCESS_KEY)
    .option('awsSecretKey', SECRET_KEY)
    .load()
)

df_user_kinesis = (
    spark
    .readStream
    .format('kinesis')
    .option('streamName','streaming-0e36c8cd403d-user')
    .option('initialPosition','earliest')
    .option('region','us-east-1')
    .option('awsAccessKey', ACCESS_KEY)
    .option('awsSecretKey', SECRET_KEY)
    .load()
)

In [0]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType
import pyspark.sql.functions as F

user_schema = StructType(
    {
        StructField("ind", IntegerType(), True),
        StructField("first_name", StringType(), True),
        StructField("last_name", StringType(), True),        
        StructField("age", IntegerType(), True),        
        StructField("date_joined", StringType(), True),                        
    }
)

geo_schema = StructType(
    {
        StructField("ind", IntegerType(), True),
        StructField("timestamp", StringType(), True),
        StructField("latitude", DoubleType(), True),
        StructField("longitude", DoubleType(), True),
        StructField("country", StringType(), True)
    }
)

pin_schema = StructType(
    {
        StructField("category", StringType(), True),
        StructField("description", StringType(), True),
        StructField("downloaded", IntegerType(), True),
        StructField("follower_count", StringType(), True),
        StructField("image_src", StringType(), True),
        StructField("index", IntegerType(), True),
        StructField("is_image_or_video", StringType(), True),
        StructField("poster_name", StringType(), True),
        StructField("save_location", StringType(), True),
        StructField("tag_list", StringType(), True),
        StructField("title", StringType(), True),
        StructField("unique_id", StringType(), True),
    }
)

In [0]:
df_user_pre_cleaning = (
    df_user_kinesis
    .selectExpr("CAST(data as STRING)")
    .select(F.from_json("data", user_schema).alias("user"))
    .select("user.*")
    )

df_geo_pre_cleaning = (
    df_geo_kinesis
    .selectExpr("CAST(data as STRING)")
    .select(F.from_json("data", geo_schema).alias("geo"))
    .select("geo.*")
)

df_pin_pre_cleaning = (
    df_pin_kinesis
    .selectExpr("CAST(data as STRING)")
    .select(F.from_json("data", pin_schema).alias("pin"))
    .select("pin.*")
)

In [0]:
# display(df_pin_pre_cleaning)
# display(df_geo_pre_cleaning)
# display(df_user_pre_cleaning)

In [0]:
df_pin_cleaned = (
    # Drop duplicates
    df_pin_pre_cleaning.drop_duplicates([column_name for column_name, data_type in df_pin_pre_cleaning.dtypes])
        # Replace entries with no relevant data in each column with Nones
        .withColumn("follower_count", F.when(F.col("follower_count") == "User Info Error", None)
                    .otherwise(F.col("follower_count")))
        .withColumn("image_src", F.when(F.col("image_src") == "Image src error.", None)
                    .otherwise(F.col("image_src")))
        .withColumn("poster_name", F.when(F.col("poster_name") == "User Info Error", None)
                    .otherwise(F.col("poster_name")))
        
        # Convert follower_count columns with k/M
        .withColumn("follower_count", F.when(F.col("follower_count").endswith("k"), expr("substring(follower_count, 1, length(follower_count)-1)") * 1000)
                    .otherwise(F.col("follower_count")))
        .withColumn("follower_count", F.when(F.col("follower_count").endswith("M"), expr("substring(follower_count, 1, length(follower_count)-1)") * 1000000)
                    .otherwise(F.col("follower_count")))
        
        # Clean the data in the save_location column to include only the save location path
        .withColumn("save_location", F.split("save_location", "Local save in"))
        .withColumn("save_location", F.col("save_location")[F.size("save_location") -1])

        .withColumnRenamed("index", "ind")
)

In [0]:
df_pin = (df_pin_cleaned
          .withColumn("follower_count", df_pin_cleaned["follower_count"].cast(IntegerType()))
          .withColumn("index", df_pin_cleaned["ind"].cast(IntegerType()))          
          .select("ind", "unique_id", "title", "description", "follower_count", "poster_name", "tag_list", "is_image_or_video", "image_src", "save_location", "category")          
)

In [0]:
df_geo = (
    df_geo_pre_cleaning.drop_duplicates([column_name for column_name, data_type in df_geo_pre_cleaning.dtypes])
    .withColumn("coordinates", F.array("latitude", "longitude"))
    .withColumn("timestamp", F.to_timestamp("timestamp"))
    .select("ind", "country", "coordinates", "timestamp")
)

In [0]:
df_user = (
     df_user_pre_cleaning.drop_duplicates([column_name for column_name, data_type in df_user_pre_cleaning.dtypes])
     .withColumn("user_name", F.concat(F.col("first_name"), F.lit(" "), F.col("last_name")))
     .withColumn("date_joined", F.to_timestamp("date_joined"))
     .select("ind", "user_name", "age", "date_joined")    
)

In [0]:
# display(df_pin)
# display(df_geo)
# display(df_user)

ind,country,coordinates,timestamp
7733,Algeria,"List(-89.5173, -179.689)",2022-09-29T10:45:19.000+0000
159,Andorra,"List(-88.0812, -166.603)",2017-11-20T21:14:56.000+0000
8300,Chile,"List(-64.5259, 19.7049)",2018-02-14T23:50:10.000+0000
1198,Finland,"List(6.37207, -150.73)",2020-01-24T23:42:18.000+0000
5076,Germany,"List(-70.7195, -91.1935)",2019-05-05T16:22:23.000+0000
772,Monaco,"List(25.0708, -97.1964)",2022-09-22T05:59:55.000+0000
5293,Sao Tome and Principe,"List(-13.1463, -25.9649)",2019-05-31T20:49:36.000+0000
2015,Armenia,"List(-17.629, -177.685)",2018-09-22T13:52:19.000+0000
4315,Cote d'Ivoire,"List(-45.8508, 66.1003)",2019-12-15T03:51:28.000+0000
223,Isle of Man,"List(1.15509, -118.397)",2018-12-07T07:30:40.000+0000


In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
(
    df_pin.writeStream
    .format("delta") 
    .outputMode("append") 
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") 
    .table("0e36c8cd403d_pin_table")
)

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
(
    df_geo.writeStream 
    .format("delta") 
    .outputMode("append") 
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") 
    .table("0e36c8cd403d_geo_table")
)

In [0]:
dbutils.fs.rm("/tmp/kinesis/_checkpoints/", True)
(
    df_user.writeStream 
    .format("delta") 
    .outputMode("append") 
    .option("checkpointLocation", "/tmp/kinesis/_checkpoints/") 
    .table("0e36c8cd403d_user_table")
)