In [None]:
# Kinesis and Glue 

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from pyspark.sql import DataFrame, Row
import datetime
from awsglue import DynamicFrame

# Get job parameters from the command line arguments
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# Initialize Spark and Glue contexts
sc = SparkContext()  # Create a SparkContext
glueContext = GlueContext(sc)  # Create a GlueContext to interact with AWS Glue
spark = glueContext.spark_session  # Get the Spark session from GlueContext
job = Job(glueContext)  # Initialize a Glue job
job.init(args['JOB_NAME'], args)  # Start the job with its name

# Create a DataFrame from the Kinesis stream
dataframe_AmazonKinesis_node1726555925249 = glueContext.create_data_frame.from_options(
    connection_type="kinesis",
    connection_options={
        "typeOfData": "kinesis",
        "streamARN": "arn:aws:kinesis:us-east-1:484907529427:stream/sample_stream",
        "classification": "csv",
        "startingPosition": "earliest",
        "inferSchema": "true"
    },
    transformation_ctx="dataframe_AmazonKinesis_node1726555925249"
)

# Define a function to process each batch of data
def processBatch(data_frame, batchId):
    if (data_frame.count() > 0):  # Check if the DataFrame has any records
        # Convert the DataFrame to a DynamicFrame for Glue compatibility
        AmazonKinesis_node1726555925249 = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        
        # Get the current date and time for partitioning the output
        now = datetime.datetime.now()
        year = now.year
        month = now.month
        day = now.day
        hour = now.hour

        # Construct the S3 output path for the processed data
        AmazonS3_node1726556007033_path = (
            "s3://lambda-etl-vp-v4/Error/2025" +
            "/ingest_year=" + "{:0>4}".format(str(year)) +
            "/ingest_month=" + "{:0>2}".format(str(month)) +
            "/ingest_day=" + "{:0>2}".format(str(day)) +
            "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        )

        # Write the DynamicFrame to S3 in CSV format
        AmazonS3_node1726556007033 = glueContext.write_dynamic_frame.from_options(
            frame=AmazonKinesis_node1726555925249,
            connection_type="s3",
            format="csv",
            connection_options={
                "path": AmazonS3_node1726556007033_path,
                "partitionKeys": []  # No partition keys specified
            },
            format_options={
                "compression": "uncompressed"  # Write uncompressed files
            },
            transformation_ctx="AmazonS3_node1726556007033"
        )

# Process each batch of data from the Kinesis stream
glueContext.forEachBatch(
    frame=dataframe_AmazonKinesis_node1726555925249,
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds",  # Define the window size for batching
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/"  # Specify checkpoint location
    }
)

# Commit the Glue job to finalize the processing
job.commit()


In [None]:
# in-pyspark

In [None]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

# Initialize Spark session
spark = SparkSession.builder \
    .appName("KinesisToS3Streaming") \
    .getOrCreate()

# Define the schema of the incoming data (modify according to your actual data)
schema = StructType([
    StructField("column1", StringType(), True),
    StructField("column2", StringType(), True),
    # Add more fields as per your data
])

# Create a streaming DataFrame by reading from the Kinesis stream
kinesis_stream = spark.readStream \
    .format("kinesis") \
    .option("streamName", "sample_stream") \
    .option("region", "us-east-1") \
    .option("startingPosition", "latest") \
    .load()

# Assuming the data is in CSV format, you may need to deserialize it
# This step depends on how your data is formatted in Kinesis
# For CSV, you can use the following code to split the data:
kinesis_data = kinesis_stream \
    .selectExpr("CAST(data AS STRING)") \
    .selectExpr("split(data, ',') AS csv_values") \
    .selectExpr("csv_values[0] AS column1", "csv_values[1] AS column2")  # Adjust indices according to your schema

# Add a timestamp to each record
kinesis_data = kinesis_data.withColumn("ingestion_time", current_timestamp())

# Define the output S3 path
output_path = "s3://lambda-etl-vp-v4/Error/2025/ingest_year={}/ingest_month={}/ingest_day={}/ingest_hour={}/".format(
    "2025", "01", "01", "01"  # Use dynamic values as necessary
)

# Write the streaming DataFrame to S3
query = kinesis_data.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", output_path) \
    .option("checkpointLocation", "/tmp/checkpoints/kinesis_to_s3") \
    .start()

# Wait for the termination of the query
query.awaitTermination()
