In [1]:
# !pip install geopy
# !pip install boto3
# !pip install requests
# !pip install mysql-connector-python
# !pip install arcgis
# !pip install -U SQLAlchemy

In [2]:
import warnings
warnings.filterwarnings("ignore")
import pandas as pd
import numpy as np
import geopy as gp
import json
import requests
import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError
from io import StringIO, BytesIO
from sqlalchemy import create_engine

In [3]:
# Function to fetch data from an API  
def fetch_data_from_api(api_url, query_params=None):
    try:
        # Send GET request to the API with optional query parameters
        response = requests.get(api_url, params=query_params)
        response.raise_for_status()  # Check if the request was successful

        # Return the JSON data
        return response.json()

    except requests.exceptions.RequestException as e:
        print(f"Error fetching data from API: {e}")
        return None

# Function to upload data to an S3 bucket
def upload_to_s3(data, bucket_name, object_key):
    try:
        # Initialize the S3 client
        s3 = boto3.client("s3",
        aws_access_key_id='',
        aws_secret_access_key='')

        # Convert the data to a CSV string
        csv_buffer = StringIO()
        if isinstance(data, list):
            # If data is a list of dictionaries, convert to DataFrame and then to CSV
            df = pd.DataFrame(data)
            df.to_csv(csv_buffer, index=False)
        else:
            # If data is a dictionary, assume it's a single record
            df = pd.DataFrame([data])
            df.to_csv(csv_buffer, index=False)

        # Upload the CSV string to S3
        s3.put_object(Bucket=bucket_name, Key=object_key, Body=csv_buffer.getvalue())
        print(f"Data uploaded to '{bucket_name}/{object_key}'.")
    
    except NoCredentialsError:
        print("AWS credentials not found. Please configure your AWS credentials.")
    except PartialCredentialsError:
        print("Incomplete AWS credentials. Please check your configuration.")
    except Exception as e:
        print(f"Error uploading to S3: {e}")

In [4]:
# Validate the files uploaded in S3 bucket        
def print_bucket_content(bucketname):
    s3 = boto3.client('s3',
        aws_access_key_id='',
        aws_secret_access_key='')
    response = s3.list_objects_v2(Bucket = bucketname)
    if 'Contents' in response:
        print('Contents of the {} bucket'.format(bucketname))
        for obj in response['Contents']:
            print(obj['Key'])
    else:
        print('bucket is empty')
        
# Function to read all files from an S3 bucket and convert to DataFrames
def read_s3_files_as_dataframes(bucket_name, s3_folder=None, file_extension=".csv"):
    s3 = boto3.client("s3",
        aws_access_key_id='',
        aws_secret_access_key='')

    dataframes = []

    try:
        # List objects in the specified S3 bucket (and folder, if specified)
        if s3_folder:
            response = s3.list_objects_v2(Bucket=bucket_name, Prefix=s3_folder)
        else:
            response = s3.list_objects_v2(Bucket=bucket_name)

        # Check if the bucket contains files
        if "Contents" not in response:
            print(f"No files found in the specified S3 '{bucket_name}'/'{s3_folder}'.")
            return dataframes

        # Iterate over the objects in the bucket
        for item in response["Contents"]:
            key = item["Key"]

            # Filter by file extension (if required)
            if key.endswith(file_extension):
                # Get the file content from S3
                file_obj = s3.get_object(Bucket=bucket_name, Key=key)

                # Read the content into a pandas DataFrame
                file_content = file_obj["Body"].read().decode("utf-8")
                dataframe = pd.read_csv(StringIO(file_content))

                # Store the DataFrame
                dataframes.append(dataframe)
                print(f"File '{key}' read and stored as DataFrame.")

    except NoCredentialsError:
        print("AWS credentials not found. Please configure your AWS credentials.")
    except PartialCredentialsError:
        print("Incomplete AWS credentials. Please check your configuration.")
    except Exception as e:
        print(f"Error reading from S3: {e}")

    return dataframes

In [5]:
# Load Drinking Water Quality data folder into S3 bucket
bucket_name = "mission-green-bucket"  # Name of the S3 bucket

# Load Drinking Water Quality data from API to S3 bucket
api_url = "https://data.cityofnewyork.us/resource/gjm4-k24g.json"  # API endpoint
object_key = "Drinking_Water_Quality_Data/Self_Reported_Drinking_Water_Tank_Inspection_Results.csv"  # Desired key for the S3 object

query_params = {
    "$limit": 49800 # Example parameter to limit the number of results
}

# Fetch data from the API
api_data = fetch_data_from_api(api_url, query_params)

if api_data:
    # Upload the data to S3
    upload_to_s3(api_data, bucket_name, object_key)

Data uploaded to 'mission-green-bucket/Drinking_Water_Quality_Data/Self_Reported_Drinking_Water_Tank_Inspection_Results.csv'.


In [6]:
# Validate the files stored in s3 bucket
print_bucket_content("mission-green-bucket")

Contents of the mission-green-bucket bucket
Air_Quality_Data/CO/ad_viz_plotval_data_CO_2020.csv
Air_Quality_Data/CO/ad_viz_plotval_data_CO_2021.csv
Air_Quality_Data/CO/ad_viz_plotval_data_CO_2022.csv
Air_Quality_Data/CO/ad_viz_plotval_data_CO_2023.csv
Air_Quality_Data/NO2/ad_viz_plotval_data_NO2_2020.csv
Air_Quality_Data/NO2/ad_viz_plotval_data_NO2_2021.csv
Air_Quality_Data/NO2/ad_viz_plotval_data_NO2_2022.csv
Air_Quality_Data/NO2/ad_viz_plotval_data_NO2_2023.csv
Air_Quality_Data/Ozone/ad_viz_plotval_data_Ozone_2020.csv
Air_Quality_Data/Ozone/ad_viz_plotval_data_Ozone_2021.csv
Air_Quality_Data/Ozone/ad_viz_plotval_data_Ozone_2022.csv
Air_Quality_Data/Ozone/ad_viz_plotval_data_Ozone_2023.csv
Air_Quality_Data/Ozone/ad_viz_plotval_data_Ozone_2024.csv
Air_Quality_Data/PM10/ad_viz_plotval_data_PM10_2020.csv
Air_Quality_Data/PM10/ad_viz_plotval_data_PM10_2021.csv
Air_Quality_Data/PM10/ad_viz_plotval_data_PM10_2022.csv
Air_Quality_Data/PM10/ad_viz_plotval_data_PM10_2023.csv
Air_Quality_Data/P

In [11]:
dataframes = read_s3_files_as_dataframes('mission-green-bucket','Drinking_Water_Quality_Data')
for i, df in enumerate(dataframes):
    final_water_df = df

File 'Drinking_Water_Quality_Data/Self_Reported_Drinking_Water_Tank_Inspection_Results.csv' read and stored as DataFrame.


In [12]:
pd.set_option('display.max_columns', 50)
final_water_df

Unnamed: 0,bin,borough,zip,house_num,street_name,block,lot,confirmation_num,reporting_year,tank_num,inspection_by_firm,inspection_performed,inspection_date,gi_req_internal_structure,gi_result_internal_structure,gi_req_external_structure,gi_result_external_structure,gi_req_overflow_pipes,gi_result_overflow_pipes,gi_req_access_ladders,gi_result_access_ladders,gi_req_air_vents,gi_result_air_vents,gi_req_roof_access,gi_result_roof_access,si_req_sediment,si_result_sediment,si_req_biological_growth,si_result_biological_growth,si_req_debris_insects,si_result_debris_insects,si_req_rodent_bird,si_result_rodent_bird,sample_collected,lab_name,nys_certified,analytes,coliform,ecoli,deleted,latitude,longitude,community_board,council_district,census_tract,bbl,nta,batch_date,meet_standards
0,1035071.0,MANHATTAN,10019.0,9,West 57th Street,1273,22,WTI8972067865,2020,1,Isseks Bros. Inc.,Y,05/30/2020,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,"ENVIRONMENTAL BUILDING SOLUTIONS, LLC",Y,B,A,A,No,40.763255,-73.974633,5.0,4.0,11201.0,1.012730e+09,Midtown-Midtown South,2024-05-03T12:15:19.000,
1,1080609.0,MANHATTAN,10018.0,1372,BROADWAY,813,23,WTI0835829410,2021,1,ISSEKS BROS INC,Y,12/11/2021,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,ENVIRONMENTAL BUILDING SOLUTIONS LLC,Y,B,A,A,No,40.752270,-73.987454,5.0,4.0,109.0,1.008130e+09,Midtown-Midtown South,2024-05-03T12:15:36.000,
2,1087665.0,MANHATTAN,10128.0,333,East 91 Street,1554,23,WTI2822978287,2020,1,"Isseks Bros., Inc.",Y,08/24/2020,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,enviromental building solutions,Y,B,A,A,No,40.780692,-73.948521,8.0,5.0,154.0,1.015540e+09,Yorkville,2024-05-03T12:15:31.000,
3,1003881.0,MANHATTAN,10002.0,50,Eldridge street,300,8,WTI8346971831,2018,1,Isseks Bros Inc,Y,06/26/2018,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,Environmental Building Solutions,Y,B,A,A,No,40.716012,-73.993052,3.0,1.0,16.0,1.003000e+09,Chinatown,2024-05-03T12:15:18.000,
4,4030340.0,QUEENS,11377.0,40-33,69TH STREET,1301,10,WTI9251583006,2016,1,American Pipe & Tank,Y,07/18/2016,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,Enviromental Building Solutions,Y,B,A,A,No,40.745678,-73.896189,2.0,26.0,263.0,4.013010e+09,Woodside,2024-05-03T12:15:09.000,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
49795,1014412.0,MANHATTAN,10001.0,212,WEST 35 STREET,784,51,WTI4702638830,2022,1,ISSEKS BROS INC,Y,12/17/2022,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,ENVIRONMENTAL BUILDING SOLUTIONS LLC,Y,B,A,A,No,40.751933,-73.990807,5.0,3.0,109.0,1.007840e+09,Midtown-Midtown South,2024-05-03T12:15:28.000,
49796,1085671.0,MANHATTAN,10017.0,450,Lexington Avenue,1280,90,WTI5762173305,2024,1,Nalco Company,Y,02/07/2024,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,EMSL,Y,B,A,A,No,40.753102,-73.974854,5.0,4.0,92.0,1.012800e+09,Turtle Bay-East Midtown,2024-05-03T12:15:22.000,
49797,1015862.0,MANHATTAN,10118.0,350,5th Ave,835,41,WTI1896490394,2019,6,Ambient Group,Y,09/26/2019,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,Ambient Group,Y,B,A,A,No,40.748276,-73.984690,5.0,4.0,76.0,1.008350e+09,Midtown-Midtown South,2024-05-03T12:15:18.000,
49798,1043850.0,MANHATTAN,10065.0,210,EAST 63 STREET,1417,42,WTI0125126928,2019,1,Rosenwach Tank Co. LLC,Y,10/30/2019,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,EMSL Analytical,Y,B,A,A,No,40.763840,-73.964266,8.0,4.0,110.0,1.014170e+09,Lenox Hill-Roosevelt Island,2024-05-03T12:15:36.000,Y


In [13]:
final_water_df = final_water_df.rename(columns={'zip': 'zipcode'})
final_water_df = final_water_df.dropna(subset=["zipcode"])
final_water_df['zipcode']=final_water_df['zipcode'].astype(int)
final_water_df

Unnamed: 0,bin,borough,zipcode,house_num,street_name,block,lot,confirmation_num,reporting_year,tank_num,inspection_by_firm,inspection_performed,inspection_date,gi_req_internal_structure,gi_result_internal_structure,gi_req_external_structure,gi_result_external_structure,gi_req_overflow_pipes,gi_result_overflow_pipes,gi_req_access_ladders,gi_result_access_ladders,gi_req_air_vents,gi_result_air_vents,gi_req_roof_access,gi_result_roof_access,si_req_sediment,si_result_sediment,si_req_biological_growth,si_result_biological_growth,si_req_debris_insects,si_result_debris_insects,si_req_rodent_bird,si_result_rodent_bird,sample_collected,lab_name,nys_certified,analytes,coliform,ecoli,deleted,latitude,longitude,community_board,council_district,census_tract,bbl,nta,batch_date,meet_standards
0,1035071.0,MANHATTAN,10019,9,West 57th Street,1273,22,WTI8972067865,2020,1,Isseks Bros. Inc.,Y,05/30/2020,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,"ENVIRONMENTAL BUILDING SOLUTIONS, LLC",Y,B,A,A,No,40.763255,-73.974633,5.0,4.0,11201.0,1.012730e+09,Midtown-Midtown South,2024-05-03T12:15:19.000,
1,1080609.0,MANHATTAN,10018,1372,BROADWAY,813,23,WTI0835829410,2021,1,ISSEKS BROS INC,Y,12/11/2021,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,ENVIRONMENTAL BUILDING SOLUTIONS LLC,Y,B,A,A,No,40.752270,-73.987454,5.0,4.0,109.0,1.008130e+09,Midtown-Midtown South,2024-05-03T12:15:36.000,
2,1087665.0,MANHATTAN,10128,333,East 91 Street,1554,23,WTI2822978287,2020,1,"Isseks Bros., Inc.",Y,08/24/2020,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,enviromental building solutions,Y,B,A,A,No,40.780692,-73.948521,8.0,5.0,154.0,1.015540e+09,Yorkville,2024-05-03T12:15:31.000,
3,1003881.0,MANHATTAN,10002,50,Eldridge street,300,8,WTI8346971831,2018,1,Isseks Bros Inc,Y,06/26/2018,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,Environmental Building Solutions,Y,B,A,A,No,40.716012,-73.993052,3.0,1.0,16.0,1.003000e+09,Chinatown,2024-05-03T12:15:18.000,
4,4030340.0,QUEENS,11377,40-33,69TH STREET,1301,10,WTI9251583006,2016,1,American Pipe & Tank,Y,07/18/2016,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,Enviromental Building Solutions,Y,B,A,A,No,40.745678,-73.896189,2.0,26.0,263.0,4.013010e+09,Woodside,2024-05-03T12:15:09.000,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
49795,1014412.0,MANHATTAN,10001,212,WEST 35 STREET,784,51,WTI4702638830,2022,1,ISSEKS BROS INC,Y,12/17/2022,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,ENVIRONMENTAL BUILDING SOLUTIONS LLC,Y,B,A,A,No,40.751933,-73.990807,5.0,3.0,109.0,1.007840e+09,Midtown-Midtown South,2024-05-03T12:15:28.000,
49796,1085671.0,MANHATTAN,10017,450,Lexington Avenue,1280,90,WTI5762173305,2024,1,Nalco Company,Y,02/07/2024,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,EMSL,Y,B,A,A,No,40.753102,-73.974854,5.0,4.0,92.0,1.012800e+09,Turtle Bay-East Midtown,2024-05-03T12:15:22.000,
49797,1015862.0,MANHATTAN,10118,350,5th Ave,835,41,WTI1896490394,2019,6,Ambient Group,Y,09/26/2019,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,Ambient Group,Y,B,A,A,No,40.748276,-73.984690,5.0,4.0,76.0,1.008350e+09,Midtown-Midtown South,2024-05-03T12:15:18.000,
49798,1043850.0,MANHATTAN,10065,210,EAST 63 STREET,1417,42,WTI0125126928,2019,1,Rosenwach Tank Co. LLC,Y,10/30/2019,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,N,Y,EMSL Analytical,Y,B,A,A,No,40.763840,-73.964266,8.0,4.0,110.0,1.014170e+09,Lenox Hill-Roosevelt Island,2024-05-03T12:15:36.000,Y


In [14]:
import dateutil.parser

def parse_date(date):
    if isinstance(date, str):
        return dateutil.parser.parse(date).strftime('%Y-%m-%d')
    else:
        return date
    
final_water_df['inspection_date'] = final_water_df['inspection_date'].apply(parse_date)

# Store the Air data into RDS

In [15]:
# Connection details for RDS (PostgreSQL example)
db_type = "mysql"
db_user = ""
db_password = ""
db_host = "database-1.cnq4kwysqjdb.us-east-1.rds.amazonaws.com"  # RDS endpoint
db_port = 3306  # Default PostgreSQL port
db_name = "mission_green"

# Create a database engine using SQLAlchemy
connection_string = f"{db_type}://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"
engine = create_engine(connection_string)

table_name2 = "Drinking_Water_Quality_Data"  # Name of the RDS table to write to

# Write DataFrame to the SQL database
final_water_df.to_sql(table_name2, engine, if_exists="replace", index=False)  # index=False to avoid creating index column

print(f"DataFrame written to {table_name2} in RDS database.")

DataFrame written to Drinking_Water_Quality_Data in RDS database.
