In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# URL processing
import urllib

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KinesisDataRead") \
    .getOrCreate()

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the Spark DataFrame
ACCESS_KEY = aws_keys_df.select('Access key ID').first()['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').first()['Secret access key']

# Encode the secret key
ENCODED_SECRET_KEY = urllib.parse.quote(string=SECRET_KEY, safe="")

# Set the AWS credentials in Spark context
spark.conf.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
spark.conf.set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)

# Define AWS region and Kinesis stream details
kinesis_region = "us-east-1"  
kinesis_stream_name_pin = "streaming-0eaa2e755d1f-pin"
kinesis_stream_name_geo = "streaming-0eaa2e755d1f-geo"
kinesis_stream_name_user = "streaming-0eaa2e755d1f-user"

# Read data from Kinesis stream 
pin_kinesis_df = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", kinesis_stream_name_pin) \
    .option("region", kinesis_region) \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .load()
    

# Automatically infer schema and parse JSON data
pin_df = pin_kinesis_df \
    .selectExpr("CAST(data AS STRING) as json_data")

# Write the streaming data to the console (for testing)
pin_query = pin_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Wait for the termination
pin_query.awaitTermination()

# Read data from kinesis stream 
geo_kinesis_df = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", kinesis_stream_name_geo) \
    .option("region", kinesis_region) \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .load()

geo_df = geo_kinesis_df\
    .selectExpr("CAST(data AS STRING) as json_data")

# Write the streaming data to the console (for testing)
geo_query = geo_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Wait for the termination
geo_query.awaitTermination()

# Read data from kinesis stream 
user_kinesis_df = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", kinesis_stream_name_user) \
    .option("region", kinesis_region) \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .load()

user_df = user_kinesis_df\
    .selectExpr("CAST(data AS STRING) as json_data")

# Write the streaming data to the console (for testing)
user_query = geo_df \
    .writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

# Wait for the termination
user_query.awaitTermination()





In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, expr, regexp_extract, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KinesisDataReadAndClean") \
    .getOrCreate()

# Define the path to the Delta table
delta_table_path = "dbfs:/user/hive/warehouse/authentication_credentials"

# Read the Delta table to a Spark DataFrame
aws_keys_df = spark.read.format("delta").load(delta_table_path)

# Get the AWS access key and secret key from the Spark DataFrame
ACCESS_KEY = aws_keys_df.select('Access key ID').first()['Access key ID']
SECRET_KEY = aws_keys_df.select('Secret access key').first()['Secret access key']

# Define AWS region and Kinesis stream details
kinesis_region = "us-east-1"
kinesis_stream_name_pin = "streaming-0eaa2e755d1f-pin"
kinesis_stream_name_geo = "streaming-0eaa2e755d1f-geo"
kinesis_stream_name_user = "streaming-0eaa2e755d1f-user"

# Define schemas for JSON data
pin_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("unique_id", StringType(), True),
    StructField("title", StringType(), True),
    StructField("description", StringType(), True),
    StructField("follower_count", StringType(), True),
    StructField("poster_name", StringType(), True),
    StructField("tag_list", StringType(), True),
    StructField("is_image_or_video", StringType(), True),
    StructField("image_src", StringType(), True),
    StructField("save_location", StringType(), True),
    StructField("category", StringType(), True)
])

geo_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

user_schema = StructType([
    StructField("ind", StringType(), True),
    StructField("first_name", StringType(), True),
    StructField("last_name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("date_joined", StringType(), True)
])

# Define the cleaning function for pin data
def clean_df_pin(df):
    # Parse JSON data
    df = df.withColumn("parsed_json", from_json(col("json_data"), pin_schema))
    df = df.select("parsed_json.*")
    
    # Clean the follower_count to ensure it's a number and convert to int
    df = df.withColumn("follower_count", 
                       when(col("follower_count").rlike("^[0-9]+$"), col("follower_count"))
                       .when(col("follower_count").rlike("^[0-9]+[kmKM]+$"), 
                             expr("CASE WHEN lower(follower_count) LIKE '%k%' THEN int(follower_count * 1000) " +
                                  "WHEN lower(follower_count) LIKE '%m%' THEN int(follower_count * 1000000) " +
                                  "ELSE int(follower_count) END"))
                       .cast("int"))
    
    # Clean the save_location column
    df = df.withColumn("save_location", regexp_extract(col("save_location"), r'[^/]+$', 0))
    
    # Reorder the DataFrame columns
    df = df.select("ind", "unique_id", "title", "description", "follower_count", 
                   "poster_name", "tag_list", "is_image_or_video", 
                   "image_src", "save_location", "category")
    
    return df

# Define the cleaning function for geo data
def clean_df_geo(df):
    # Parse JSON data
    df = df.withColumn("parsed_json", from_json(col("json_data"), geo_schema))
    df = df.select("parsed_json.*")
    
    # Create coordinates column
    df = df.withColumn("coordinates", expr("array(latitude, longitude)"))
    
    # Drop latitude and longitude columns
    df = df.drop("latitude", "longitude")
    
    # Convert timestamp to timestamp type
    df = df.withColumn("timestamp", col("timestamp").cast(TimestampType()))
    
    # Reorder the DataFrame columns
    df = df.select("ind", "country", "coordinates", "timestamp")
    
    return df

# Define the cleaning function for user data
def clean_df_user(df):
    # Parse JSON data
    df = df.withColumn("parsed_json", from_json(col("json_data"), user_schema))
    df = df.select("parsed_json.*")
    
    # Create user_name column
    df = df.withColumn("user_name", col("first_name") + " " + col("last_name"))
    
    # Drop first_name and last_name columns
    df = df.drop("first_name", "last_name")
    
    # Convert date_joined to timestamp type
    df = df.withColumn("date_joined", col("date_joined").cast(TimestampType()))
    
    # Reorder the DataFrame columns
    df = df.select("ind", "user_name", "age", "date_joined")
    
    return df

# Read, clean, and process the pin Kinesis stream
pin_kinesis_df = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", kinesis_stream_name_pin) \
    .option("region", kinesis_region) \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .load()

pin_df = pin_kinesis_df \
    .selectExpr("CAST(data AS STRING) as json_data")

# Apply cleaning function
cleaned_pin_df = clean_df_pin(pin_df)

# Write the cleaned streaming data to Delta Table
pin_query = cleaned_pin_df \
    .writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "dbfs:/tmp/checkpoints/pin") \
    .option("path", "dbfs:/user/hive/warehouse/0eaa2e755d1f_pin_table") \
    .start()

# Read, clean, and process the geo Kinesis stream
geo_kinesis_df = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", kinesis_stream_name_geo) \
    .option("region", kinesis_region) \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .load()

geo_df = geo_kinesis_df \
    .selectExpr("CAST(data AS STRING) as json_data")

# Apply cleaning function
cleaned_geo_df = clean_df_geo(geo_df)

# Write the cleaned streaming data to Delta Table
geo_query = cleaned_geo_df \
    .writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "dbfs:/tmp/checkpoints/geo") \
    .option("path", "dbfs:/user/hive/warehouse/0eaa2e755d1f_geo_table") \
    .start()

# Read, clean, and process the user Kinesis stream
user_kinesis_df = spark \
    .readStream \
    .format("kinesis") \
    .option("streamName", kinesis_stream_name_user) \
    .option("region", kinesis_region) \
    .option("awsAccessKey", ACCESS_KEY) \
    .option("awsSecretKey", SECRET_KEY) \
    .load()

user_df = user_kinesis_df \
    .selectExpr("CAST(data AS STRING) as json_data")

# Apply cleaning function
cleaned_user_df = clean_df_user(user_df)

# Write the cleaned streaming data to Delta Table
user_query = cleaned_user_df \
    .writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", "dbfs:/tmp/checkpoints/user") \
    .option("path", "dbfs:/user/hive/warehouse/0eaa2e755d1f_user_table") \
    .start()

# Wait for the termination of all queries
pin_query.awaitTermination()
geo_query.awaitTermination()
user_query.awaitTermination()
