In [1]:
import boto3
from botocore.client import Config
from botocore.handlers import disable_signing
from datetime import datetime, timedelta
s3 = boto3.client(
    's3',
    aws_access_key_id='',
    aws_secret_access_key='',
    config=Config(signature_version='UNSIGNED')
)
bucket_name = 'noaa-goes17'
bucket = 'noaa-goes17'

# Disable signing for paginated requests
s3.meta.events.register('choose-signer.s3.*', disable_signing)
session = boto3.Session(aws_access_key_id='', 
                        aws_secret_access_key='')

In [2]:
import boto3
from botocore.client import Config
from botocore.handlers import disable_signing
from datetime import datetime, timedelta
import time
from kafka import KafkaProducer
import json

# Custom JSON serializer for datetime objects
def datetime_serializer(obj):
    if isinstance(obj, datetime):
        return obj.isoformat()
    raise TypeError("Type not serializable")

# Set up Kafka producer
producer = KafkaProducer(bootstrap_servers=['54.145.37.197:9092'], value_serializer=lambda v: json.dumps(v, default=datetime_serializer).encode('utf-8'))

# Define Kafka topic to produce to
bucket_name = 'noaa-goes17'

def is_data_available(prefix):
    try:
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
        if 'Contents' in response:
            return True
    except Exception as e:
        print(f"Error checking {prefix}: {e}")
    return False

def get_hour_contents(product_name, year, day, hour):
    try:
        response = s3.list_objects_v2(Bucket=bucket_name, Prefix=f"{product_name}/{year}/{day}/{hour}/")
        if 'Contents' in response:
            objects = response['Contents']
            return objects  # Return a list of objects
    except Exception as e:
        print(f"Error getting data for {product_name}/{year}/{day}/{hour}: {e}")
    return None

def scrape_data_and_send_to_kafka(product_id, topic, start_date, end_date):
    current_date = start_date
    while current_date <= end_date:
        print(current_date)
        year_name = current_date.strftime('%Y')
        day_name = current_date.strftime('%j')
        hour_name = current_date.strftime('%H')

        files = get_hour_contents(product_id, year_name, day_name, hour_name)

        # Ensure there are files available for pairing
        if not files:
            print(f"No files available in {year_name}/{day_name}/{hour_name}. Waiting...")
            time.sleep(3600)  # Sleep for 1 hour (3600 seconds) before checking again
            current_date += timedelta(hours=1)
            continue

        for file in files:
            file_key = file['Key']
            timestamp = file_key.split('_')[3]  # Extract timestamp from FDCC file name
            
            producer.send(topic, (file_key,))
            print(f"Sent data to Kafka for {product_id}: {file_key}")
            time.sleep(1)  # Wait for 1 seconds before sending the next pair
    
        current_date += timedelta(hours=1)

# Example usage
product_name = "ABI-L2-FDCC"
start_date = datetime(2020, 9, 9)
end_date = datetime(2020, 9, 9, 23)  # For a single day, specify the hours
scrape_data_and_send_to_kafka(product_name, 'nasa-demo', start_date, end_date)

2020-09-09 00:00:00
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2020/253/00/OR_ABI-L2-FDCC-M6_G17_s20202530001174_e20202530003547_c20202530004106.nc
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2020/253/00/OR_ABI-L2-FDCC-M6_G17_s20202530006174_e20202530008547_c20202530009101.nc
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2020/253/00/OR_ABI-L2-FDCC-M6_G17_s20202530011174_e20202530013547_c20202530014093.nc
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2020/253/00/OR_ABI-L2-FDCC-M6_G17_s20202530016174_e20202530018547_c20202530019129.nc
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2020/253/00/OR_ABI-L2-FDCC-M6_G17_s20202530021174_e20202530023547_c20202530024108.nc
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2020/253/00/OR_ABI-L2-FDCC-M6_G17_s20202530026174_e20202530028547_c20202530029108.nc
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2020/253/00/OR_ABI-L2-FDCC-M6_G17_s20202530031174_e20202530033547_c20202530034101.nc
Sent data to Kafka for ABI-L2-FDCC: ABI-L2-FDCC/2