'''
<br><br>
@Author: Shivraj Yelave<br>
@Date: 12-09-24<br>
@Last modified by: Shivraj Yelave<br>
@Last modified time: 12-09-24<br>
@Title: CURD Operation in RDS Using boto3<br><br>
'''

In [1]:
import boto3
from dotenv import load_dotenv

load_dotenv()

s3 = boto3.client('s3')

In [2]:

def upload_file_to_s3(file_name, bucket, object_name=None):
    """
    Description:
    Uploads a file to an Amazon S3 bucket.

    Parameters:
    file_name (str): The path to the file to be uploaded.
    bucket (str): The name of the S3 bucket where the file will be uploaded.
    object_name (str, optional): The name of the object in S3. If not specified, the file_name is used.

    Returns:
    None

    Prints:
    A confirmation message if the upload is successful.
    An error message if the upload fails.
    """
    try:
        # Upload the file to S3
        s3.upload_file(file_name, bucket, object_name or file_name)
        print(f"File {file_name} uploaded to bucket {bucket}.")
    except Exception as e:
        # Print any error messages
        print(f"Error: {e}")

# Example usage of the function
upload_file_to_s3('covid_19_clean_complete.csv', 'lambda.busket', 'dataset.csv')


File covid_19_clean_complete.csv uploaded to bucket lambda.busket.


In [3]:

def download_file_from_s3(bucket, object_name, file_name=None):
    """
    Description:
    Downloads a file from an Amazon S3 bucket.

    Parameters:
    bucket (str): The name of the S3 bucket from which the file will be downloaded.
    object_name (str): The key (name) of the object in S3 to be downloaded.
    file_name (str, optional): The local path where the file will be saved. If not specified, the object_name is used.

    Returns:
    None

    Prints:
    A confirmation message if the download is successful.
    An error message if the download fails.
    """
    try:
        # Download the file from S3
        s3.download_file(bucket, object_name, file_name or object_name)
        print(f"File {object_name} downloaded from bucket {bucket}.")
    except Exception as e:
        # Print any error messages
        print(f"Error: {e}")

# Example usage of the function
download_file_from_s3('lambda.busket', 'dataset.csv', 'dataset.csv')


File dataset.csv downloaded from bucket lambda.busket.


In [1]:
import pyodbc
import boto3
from io import StringIO
import pandas as pd
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

def load_data_to_rds_from_s3():
    """
    Description:
    Downloads a CSV file from S3, processes the data, and loads it into an RDS instance.

    This function:
    - Downloads a CSV file from an S3 bucket.
    - Converts the CSV data into a pandas DataFrame.
    - Creates a table in an RDS SQL Server instance if it does not already exist.
    - Inserts data from the DataFrame into the RDS table.
    """
    # Get environment variables
    rds_host = os.getenv('RDS_HOST')
    rds_user = os.getenv('RDS_USER')
    rds_password = os.getenv('RDS_PASSWORD')
    db_name = os.getenv('DB_NAME')
    bucket = os.getenv('S3_BUCKET')
    key = os.getenv('S3_KEY')

    # Download file from S3
    s3 = boto3.client('s3')
    response = s3.get_object(Bucket=bucket, Key=key)
    data = response['Body'].read().decode('utf-8')

    # Convert CSV data to DataFrame
    df = pd.read_csv(StringIO(data))

    # Print DataFrame columns to see what's in the file
    print("DataFrame Columns:", df.columns)    
    # Ensure the data types and handle NaN

    
    # Debug: Inspect one row before inserting
    df = df.head(127)  # Limit rows for testing
    df = df.dropna()
    print(df.iloc[0])  # Print the first row to debug

    # Connect to RDS
    conn_str = f"DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={rds_host};DATABASE={db_name};UID={rds_user};PWD={rds_password}"
    conn = pyodbc.connect(conn_str)
    cursor = conn.cursor()

    # Create table if not exists
    cursor.execute("""
    IF NOT EXISTS (SELECT * FROM sysobjects WHERE name='covid' AND xtype='U')
    CREATE TABLE covid (
        Province_State  nvarchar(255),
        Country_Region  nvarchar(255),
        Lat   decimal(10, 8),
        Long   decimal(11, 8),
        Date    date,
        Confirmed  int,
        Deaths     int,
        Recovered   int,
        Active      int,
        WHO_Region   nvarchar(255)
    )
    """)

    # Load data into RDS
    for index, row in df.iterrows():
        try:
            cursor.execute("""
            INSERT INTO covid (Province_State, Country_Region, Lat, Long, Date, Confirmed, Deaths, Recovered, Active, WHO_Region)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 
            """, row['Province/State'], row['Country/Region'], row['Lat'], row['Long'], row['Date'], 
            row['Confirmed'], row['Deaths'], row['Recovered'], row['Active'], row['WHO Region'])
        except pyodbc.Error as e:
            print(f"Error inserting row {index}: {e}")
            print(f"Row data: {row}")  # Print row data for debugging

    conn.commit()
    cursor.close()
    conn.close()

load_data_to_rds_from_s3()


DataFrame Columns: Index(['Province/State', 'Country/Region', 'Lat', 'Long', 'Date', 'Confirmed',
       'Deaths', 'Recovered', 'Active', 'WHO Region'],
      dtype='object')
Province/State    Australian Capital Territory
Country/Region                       Australia
Lat                                   -35.4735
Long                                  149.0124
Date                                2020-01-22
Confirmed                                    0
Deaths                                       0
Recovered                                    0
Active                                       0
WHO Region                     Western Pacific
Name: 8, dtype: object
