## Video Upload to Amazon S3

This section of the project focuses on uploading a video file to Amazon S3, preparing it for further analysis:

1. **AWS S3 Client Initialization**: Sets up a connection to Amazon S3 using boto3.

2. **Bucket Listing**: Retrieves and displays a list of available S3 buckets.

3. **Video Upload Process**: 
   - Specifies the local path of the video file.
   - Uploads the video to a designated S3 bucket.
   - Assigns a specific name to the uploaded file in S3.

4. **Error Handling**: Implements basic error catching to manage potential upload issues.

This step is crucial for preparing video content for subsequent analysis using AWS Rekognition or other video processing services, enabling scalable and efficient video data management in the cloud.

In [4]:
import boto3

ACCESS_KEY = "AKIA4SDNV4MK62J2HEBI"
SECRET_KEY = "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"
BUCKET_NAME = "video2-analysis-bucket"

# Initialize S3 Client
s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)

# List Buckets
response = s3.list_buckets()
print("Buckets:", [bucket['Name'] for bucket in response['Buckets']])


try:
    file_path = "/Users/pawanbtw/Desktop/videoplayback.mp4"
    s3.upload_file(file_path, BUCKET_NAME, "football.mp4")  # Upload with .mp4 extension
    print(f"File uploaded to {BUCKET_NAME}")
except Exception as e:
    print("Error:", e)

Buckets: ['video2-analysis-bucket']
File uploaded to video2-analysis-bucket


## Video Analysis Initiation

- Sets up Rekognition client
- Starts asynchronous label detection on S3 video
- Configures 50% confidence threshold
- Retrieves job ID for tracking

In [6]:
# Rekognition Configuration
rekognition_client = boto3.client(
    'rekognition', 
    aws_access_key_id=ACCESS_KEY, 
    aws_secret_access_key=SECRET_KEY,
    region_name='us-east-1'  
)

# Start Rekognition Video Label Detection
response = rekognition_client.start_label_detection(
    Video={'S3Object': {'Bucket': BUCKET_NAME, 'Name': 'football.mp4'}},
    MinConfidence=50  # adjust confidence threshold as needed
)

# Extract Job ID from the response
job_id = response['JobId']
print(f"Started video analysis job with JobId: {job_id}")

Started video analysis job with JobId: 7efb95709a398aa4517d6331f256a1335b37ef20b79a960d85b5f4baba7f5bb7


## Video Analysis Results Processing

- Sets up Rekognition and DynamoDB clients
- Creates DynamoDB table for results
- Polls Rekognition for job completion
- Retrieves and processes label detection results
- Stores results in DynamoDB for further analysis

In [7]:
import boto3
import time

# AWS Configuration
ACCESS_KEY = "AKIA4SDNV4MK62J2HEBI"
SECRET_KEY = "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"
REGION = 'us-east-1'

rekognition_client = boto3.client(
    'rekognition',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name=REGION
)

dynamodb_client = boto3.client(
    'dynamodb',
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
    region_name=REGION
)

# Step 1: Create DynamoDB Table
TABLE_NAME = 'RekognitionResults1'

def create_table():
    try:
        dynamodb_client.create_table(
            TableName=TABLE_NAME,
            KeySchema=[
                {'AttributeName': 'JobId', 'KeyType': 'HASH'},  # Partition key
                {'AttributeName': 'LabelName', 'KeyType': 'RANGE'},  # Sort key
            ],
            AttributeDefinitions=[
                {'AttributeName': 'JobId', 'AttributeType': 'S'},
                {'AttributeName': 'LabelName', 'AttributeType': 'S'},
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
            }
        )
        print(f"Creating table: {TABLE_NAME}.")
        dynamodb_client.get_waiter('table_exists').wait(TableName=TABLE_NAME)
        print("Table created and active!")
    except dynamodb_client.exceptions.ResourceInUseException:
        print(f"Table {TABLE_NAME} already exists.")

# Step 2: Store Rekognition Results in the Table
def store_results(job_id, labels):
    try:
        for label in labels:
            label_name = label['Label']['Name']
            confidence = label['Label']['Confidence']

            dynamodb_client.put_item(
                TableName=TABLE_NAME,
                Item={
                    'JobId': {'S': job_id},
                    'LabelName': {'S': label_name},
                    'Confidence': {'N': str(confidence)}
                }
            )
        print("Rekognition results stored successfully in DynamoDB!")
    except Exception as e:
        print(f"Error storing results: {e}")

# Step 3: Run Rekognition Label Detection and Store Results
def detect_labels_and_store(job_id):
    while True:
        try:
            response = rekognition_client.get_label_detection(JobId=job_id)
            status = response['JobStatus']
            if status in ['SUCCEEDED', 'FAILED']:
                break
            print('Job is still in progress...')
            time.sleep(5)
        except Exception as e:
            print(f"Error fetching job status: {e}")
            return

    if status == 'SUCCEEDED':
        print("Label detection job completed successfully!")
        labels = response.get('Labels', [])
        for label in labels:
            print(f"Label: {label['Label']['Name']}, Confidence: {label['Label']['Confidence']}")
        store_results(job_id, labels)
    else:
        print("Label detection job failed or was not found.")

# Main Execution
if __name__ == "__main__":
    job_id = '7efb95709a398aa4517d6331f256a1335b37ef20b79a960d85b5f4baba7f5bb7'  # Replace with your JobId
    create_table()
    detect_labels_and_store(job_id)


Creating table: RekognitionResults1.
Table created and active!
Label detection job completed successfully!
Label: Adult, Confidence: 90.30839538574219
Label: American Football, Confidence: 60.17186737060547
Label: American Football (Ball), Confidence: 52.428977966308594
Label: Arm, Confidence: 50.7934455871582
Label: Astronomy, Confidence: 88.83755493164062
Label: Ball, Confidence: 95.50232696533203
Label: Body Part, Confidence: 51.604217529296875
Label: Field, Confidence: 50.07183074951172
Label: Football, Confidence: 95.50232696533203
Label: Hand, Confidence: 51.25117111206055
Label: Head, Confidence: 66.55892181396484
Label: Kicking, Confidence: 55.832069396972656
Label: Male, Confidence: 90.42467498779297
Label: Man, Confidence: 90.30839538574219
Label: Moon, Confidence: 88.83755493164062
Label: Nature, Confidence: 88.83755493164062
Label: Night, Confidence: 88.83755493164062
Label: Outdoors, Confidence: 88.83755493164062
Label: Person, Confidence: 90.42467498779297
Label: Playing 

In [8]:
# AWS Configuration
ACCESS_KEY = "AKIA4SDNV4MK62J2HEBI"
SECRET_KEY = "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"
REGION = 'us-east-1'
BUCKET_NAME = "video2-analysis-bucket"
TABLE_NAME = 'RekognitionResults1'

# CloudWatch Configuration
log_group = 'ImageProcessingLogs'
log_stream = 'ImageProcessingStream'

# Initialize AWS Clients
s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
rekognition_client = boto3.client('rekognition', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION)
dynamodb_client = boto3.client('dynamodb', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION)
logs_client = boto3.client('logs', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION)

# CloudWatch Logging Functions
def create_log_group_and_stream():
    try:
        logs_client.create_log_group(logGroupName=log_group)
    except logs_client.exceptions.ResourceAlreadyExistsException:
        pass

    try:
        logs_client.create_log_stream(logGroupName=log_group, logStreamName=log_stream)
    except logs_client.exceptions.ResourceAlreadyExistsException:
        pass

def log_to_cloudwatch(message):
    timestamp = int(round(time.time() * 1000))
    logs_client.put_log_events(
        logGroupName=log_group,
        logStreamName=log_stream,
        logEvents=[{
            'timestamp': timestamp,
            'message': message
        }]
    )

# Step 1: Upload Video to S3
def upload_video_to_s3(file_path):
    try:
        log_to_cloudwatch("Uploading video to S3...")
        s3.upload_file(file_path, BUCKET_NAME, "Pawan_Karthik_Elevator_Pitch.mp4")
        log_to_cloudwatch(f"Video uploaded to S3 bucket: {BUCKET_NAME}")
    except Exception as e:
        log_to_cloudwatch(f"Error uploading video to S3: {e}")
        raise

# Step 2: Start Rekognition Video Label Detection
def start_label_detection():
    try:
        log_to_cloudwatch("Starting Rekognition label detection...")
        response = rekognition_client.start_label_detection(
            Video={'S3Object': {'Bucket': BUCKET_NAME, 'Name': 'Pawan_Karthik_Elevator_Pitch.mp4'}},
            MinConfidence=50
        )
        job_id = response['JobId']
        log_to_cloudwatch(f"Started Rekognition job with JobId: {job_id}")
        return job_id
    except Exception as e:
        log_to_cloudwatch(f"Error starting label detection: {e}")
        raise

# Step 3: Create DynamoDB Table
def create_table():
    try:
        log_to_cloudwatch("Creating DynamoDB table...")
        dynamodb_client.create_table(
            TableName=TABLE_NAME,
            KeySchema=[
                {'AttributeName': 'JobId', 'KeyType': 'HASH'},
                {'AttributeName': 'LabelName', 'KeyType': 'RANGE'},
            ],
            AttributeDefinitions=[
                {'AttributeName': 'JobId', 'AttributeType': 'S'},
                {'AttributeName': 'LabelName', 'AttributeType': 'S'},
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
            }
        )
        dynamodb_client.get_waiter('table_exists').wait(TableName=TABLE_NAME)
        log_to_cloudwatch("DynamoDB table created successfully!")
    except dynamodb_client.exceptions.ResourceInUseException:
        log_to_cloudwatch(f"DynamoDB table {TABLE_NAME} already exists.")
    except Exception as e:
        log_to_cloudwatch(f"Error creating DynamoDB table: {e}")
        raise

# Step 4: Store Rekognition Results in DynamoDB
def store_results(job_id, labels):
    try:
        log_to_cloudwatch("Storing Rekognition results in DynamoDB...")
        for label in labels:
            label_name = label['Label']['Name']
            confidence = label['Label']['Confidence']

            dynamodb_client.put_item(
                TableName=TABLE_NAME,
                Item={
                    'JobId': {'S': job_id},
                    'LabelName': {'S': label_name},
                    'Confidence': {'N': str(confidence)}
                }
            )
        log_to_cloudwatch("Rekognition results stored successfully in DynamoDB!")
    except Exception as e:
        log_to_cloudwatch(f"Error storing results in DynamoDB: {e}")
        raise

# Step 5: Detect Labels and Store Results
def detect_labels_and_store(job_id):
    while True:
        try:
            response = rekognition_client.get_label_detection(JobId=job_id)
            status = response['JobStatus']
            if status in ['SUCCEEDED', 'FAILED']:
                break
            log_to_cloudwatch("Rekognition job is still in progress...")
            time.sleep(5)
        except Exception as e:
            log_to_cloudwatch(f"Error fetching job status: {e}")
            return

    if status == 'SUCCEEDED':
        log_to_cloudwatch("Rekognition label detection completed successfully!")
        labels = response.get('Labels', [])
        for label in labels:
            log_to_cloudwatch(f"Detected Label: {label['Label']['Name']}, Confidence: {label['Label']['Confidence']}")
        store_results(job_id, labels)
    else:
        log_to_cloudwatch("Rekognition job failed or was not found.")

# Main Execution
if __name__ == "__main__":
    create_log_group_and_stream()
    file_path = "/Users/pawanbtw/Desktop/videoplayback.mp4"
    
    upload_video_to_s3(file_path)
    
    create_table()
    
    job_id = start_label_detection()
    
    detect_labels_and_store(job_id)

In [9]:
import cv2
import numpy as np
from PIL import Image
import io

# AWS Configuration
ACCESS_KEY = "AKIA4SDNV4MK62J2HEBI"
SECRET_KEY = "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"
REGION = 'us-east-1'
BUCKET_NAME = "video2-analysis-bucket"
TABLE_NAME = 'RekognitionResults1'

# CloudWatch Configuration
log_group = 'ImageProcessingLogs'
log_stream = 'ImageProcessingStream'

# Initialize AWS Clients
s3 = boto3.client('s3', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY)
rekognition_client = boto3.client('rekognition', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION)
dynamodb_client = boto3.client('dynamodb', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION)
logs_client = boto3.client('logs', aws_access_key_id=ACCESS_KEY, aws_secret_access_key=SECRET_KEY, region_name=REGION)

# CloudWatch Logging Functions
def create_log_group_and_stream():
    try:
        logs_client.create_log_group(logGroupName=log_group)
    except logs_client.exceptions.ResourceAlreadyExistsException:
        pass

    try:
        logs_client.create_log_stream(logGroupName=log_group, logStreamName=log_stream)
    except logs_client.exceptions.ResourceAlreadyExistsException:
        pass

def log_to_cloudwatch(message):
    timestamp = int(round(time.time() * 1000))
    logs_client.put_log_events(
        logGroupName=log_group,
        logStreamName=log_stream,
        logEvents=[{
            'timestamp': timestamp,
            'message': message
        }]
    )

# Step 1: Upload Video to S3
def upload_video_to_s3(file_path):
    try:
        log_to_cloudwatch("Uploading video to S3...")
        s3.upload_file(file_path, BUCKET_NAME, "Pawan_Karthik_Elevator_Pitch.mp4")
        log_to_cloudwatch(f"Video uploaded to S3 bucket: {BUCKET_NAME}")
    except Exception as e:
        log_to_cloudwatch(f"Error uploading video to S3: {e}")
        raise

# Step 2: Start Rekognition Video Label Detection
def start_label_detection():
    try:
        log_to_cloudwatch("Starting Rekognition label detection...")
        response = rekognition_client.start_label_detection(
            Video={'S3Object': {'Bucket': BUCKET_NAME, 'Name': 'Pawan_Karthik_Elevator_Pitch.mp4'}},
            MinConfidence=50
        )
        job_id = response['JobId']
        log_to_cloudwatch(f"Started Rekognition job with JobId: {job_id}")
        return job_id
    except Exception as e:
        log_to_cloudwatch(f"Error starting label detection: {e}")
        raise

# Step 3: Create DynamoDB Table
def create_table():
    try:
        log_to_cloudwatch("Creating DynamoDB table...")
        dynamodb_client.create_table(
            TableName=TABLE_NAME,
            KeySchema=[
                {'AttributeName': 'JobId', 'KeyType': 'HASH'},
                {'AttributeName': 'LabelName', 'KeyType': 'RANGE'},
            ],
            AttributeDefinitions=[
                {'AttributeName': 'JobId', 'AttributeType': 'S'},
                {'AttributeName': 'LabelName', 'AttributeType': 'S'},
            ],
            ProvisionedThroughput={
                'ReadCapacityUnits': 5,
                'WriteCapacityUnits': 5
            }
        )
        dynamodb_client.get_waiter('table_exists').wait(TableName=TABLE_NAME)
        log_to_cloudwatch("DynamoDB table created successfully!")
    except dynamodb_client.exceptions.ResourceInUseException:
        log_to_cloudwatch(f"DynamoDB table {TABLE_NAME} already exists.")
    except Exception as e:
        log_to_cloudwatch(f"Error creating DynamoDB table: {e}")
        raise

# Step 4: Store Rekognition Results in DynamoDB
def store_results(job_id, labels):
    try:
        log_to_cloudwatch("Storing Rekognition results in DynamoDB...")
        for label in labels:
            label_name = label['Label']['Name']
            confidence = label['Label']['Confidence']

            dynamodb_client.put_item(
                TableName=TABLE_NAME,
                Item={
                    'JobId': {'S': job_id},
                    'LabelName': {'S': label_name},
                    'Confidence': {'N': str(confidence)}
                }
            )
        log_to_cloudwatch("Rekognition results stored successfully in DynamoDB!")
    except Exception as e:
        log_to_cloudwatch(f"Error storing results in DynamoDB: {e}")
        raise

# Step 5: Detect Labels and Store Results
def detect_labels_and_store(job_id):
    while True:
        try:
            response = rekognition_client.get_label_detection(JobId=job_id)
            status = response['JobStatus']
            if status in ['SUCCEEDED', 'FAILED']:
                break
            log_to_cloudwatch("Rekognition job is still in progress...")
            time.sleep(5)
        except Exception as e:
            log_to_cloudwatch(f"Error fetching job status: {e}")
            return

    if status == 'SUCCEEDED':
        log_to_cloudwatch("Rekognition label detection completed successfully!")
        labels = response.get('Labels', [])
        for label in labels:
            log_to_cloudwatch(f"Detected Label: {label['Label']['Name']}, Confidence: {label['Label']['Confidence']}")
        store_results(job_id, labels)
    else:
        log_to_cloudwatch("Rekognition job failed or was not found.")

# New function to process frames and store in S3
def process_and_store_frames(job_id):
    log_to_cloudwatch("Processing frames and storing in S3...")
    
    # Get video metadata
    response = s3.head_object(Bucket=BUCKET_NAME, Key="Pawan_Karthik_Elevator_Pitch.mp4")
    content_length = int(response['ContentLength'])
    
    # Download video from S3
    s3.download_file(BUCKET_NAME, "Pawan_Karthik_Elevator_Pitch.mp4", "temp_video.mp4")
    
    # Open video file
    cap = cv2.VideoCapture("temp_video.mp4")
    
    frame_count = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Get labels for this frame
        response = rekognition_client.get_label_detection(JobId=job_id, MaxResults=10, NextToken='')
        labels = response.get('Labels', [])
        
        # Draw bounding boxes
        for label in labels:
            if 'Instances' in label['Label']:
                for instance in label['Label']['Instances']:
                    bbox = instance['BoundingBox']
                    left = int(bbox['Left'] * frame.shape[1])
                    top = int(bbox['Top'] * frame.shape[0])
                    width = int(bbox['Width'] * frame.shape[1])
                    height = int(bbox['Height'] * frame.shape[0])
                    
                    cv2.rectangle(frame, (left, top), (left + width, top + height), (0, 255, 0), 2)
                    cv2.putText(frame, label['Label']['Name'], (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (36,255,12), 2)
        
        # Convert frame to PNG
        success, buffer = cv2.imencode(".png", frame)
        io_buf = io.BytesIO(buffer)
        
        # Upload to S3
        s3.upload_fileobj(io_buf, BUCKET_NAME, f"processed_frames/frame_{frame_count}.png")
        
        frame_count += 1
    
    cap.release()
    log_to_cloudwatch(f"Processed and stored {frame_count} frames in S3.")

# Main Execution
if __name__ == "__main__":
    create_log_group_and_stream()
    file_path = "/Users/pawanbtw/Desktop/videoplayback.mp4"
    
    upload_video_to_s3(file_path)
    
    create_table()
    
    job_id = start_label_detection()
    
    detect_labels_and_store(job_id)
    
    # Process and store frames
    process_and_store_frames(job_id)