## Mongo DB - API Processing

In [666]:
import json
import pymongo
import pandas as pd
import pprint
import os
from pymongo import MongoClient, UpdateOne
from bson.objectid import ObjectId
from datetime import datetime
import requests
import json

# Load the MongoDB connection string from the JSON file
PATH_TO_SECRET_JSON = '/home/jovyan/keys/mongodb_key.json'
with open(PATH_TO_SECRET_JSON) as f:
    MONGODB_URI = json.load(f)['connection_string']

# Extract the database name from the connection string
db_name = MONGODB_URI.split('/')[-1].split('?')[0]

# Create a MongoDB client using the connection string
if MONGODB_URI:
    client = MongoClient(MONGODB_URI)
    print("The MongoDB client has been initialized.")
else:
    print("Failed to initialize the MongoDB client.")

# Print the version of the pymongo package
pymongo_version = pymongo.__version__
print(f"The version of the pymongo package is {pymongo_version}")

# Connect to the specified MongoDB database and collection
db = client[db_name]
collection = db.attractions

The MongoDB client has been initialized.
The version of the pymongo package is 4.4.1


In [667]:
# Define the aggregation pipeline
pipeline = [
    {
        "$match": {
            "landmark.lpNumber": { "$exists": True }
        }
    },
    {
        "$project": {
            "_id": 1,
            "title": 1,
            "lpNumber": "$landmark.lpNumber",
            "bbl" : "$loc.bbl",
            "boroughCode" : "$loc.boroughCode",
            "aliases": {
                "$reduce": {
                    "input": "$aliases",
                    "initialValue": "",
                    "in": {
                        "$concat": [
                            "$$value",
                            { "$cond": [{ "$eq": ["$$value", ""] }, "", " | "] },
                            "$$this"
                        ]
                    }
                }
            }
        }
    },
    {
        "$sort": {
            "lpNumber": 1
        }
    }
]


# Execute the aggregation pipeline
cursor = collection.aggregate(pipeline)

# Convert the result cursor to a pandas DataFrame
df = pd.DataFrame(list(cursor))

# Rename and format columns
df.rename(columns={'bbl': 'loc.bbl'}, inplace=True)
df['loc.bbl'] = df['loc.bbl'].astype(str).str.rstrip('.0')

#df = df.head(10)

In [668]:
# Create a directory if it doesn't exist
output_directory = "../downloads/csv/"
os.makedirs(output_directory, exist_ok=True)

# Generate the CSV file name with current date and time
current_datetime = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
csv_filename = f"landmarks_{current_datetime}.csv"  # Fixed a typo here ('lanmarks' -> 'landmarks')
csv_filepath = os.path.join(output_directory, csv_filename)

# Save the DataFrame to a CSV file
df.to_csv(csv_filepath, index=False)

print(f"CSV file saved to: {csv_filepath}")


CSV file saved to: ../downloads/csv/landmarks_2023_10_02_03_06_07.csv


In [669]:
import aiohttp
import asyncio
import json
import pandas as pd
from datetime import datetime

# Base API endpoint and headers
url = 'https://microservice-api-w6zlqlyoma-uk.a.run.app/api/v1/FeatureService/attributelookup'
headers = {
    'accept': 'application/json',
    'Content-Type': 'application/json'
}

async def fetch(lp_number, session):
    payload = {
        "key": "IndividualLandmarkHistoricDistricts",
        "attributes": [{"key": "LPNumber", "value": lp_number}]
    }
    async with session.post(url, headers=headers, data=json.dumps(payload)) as response:
        if response.status == 200:
            print(f"API call for {lp_number} was successful!")
            data = await response.json()
            return {
                "lpNumber": lp_number,
                "areaName": data[0].get("areaName", "") if data else "",
                "boroName": data[0].get("boroName", "") if data else "",
                "bbl": data[0].get("bbl", "") if data else ""
            }
        else:
            print(f"API call for {lp_number} failed with status code {response.status}.")
            return {
                "lpNumber": lp_number,
                "areaName": "",
                "boroName" :"",
                "bbl": ""
            }

async def fetch_all(lp_numbers):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(lp_number, session) for lp_number in lp_numbers]
        return await asyncio.gather(*tasks)

# Main code
results = await fetch_all(df['lpNumber'].tolist())

# Convert results to a DataFrame
results_df = pd.DataFrame(results)

# Merge the two DataFrames on 'lpNumber'
merged_df = pd.merge(df, results_df, on='lpNumber', how='left')

# Add 'bbl_match' column
merged_df['bbl_match'] = merged_df['loc.bbl'] == merged_df['bbl']

# Save the merged DataFrame to a CSV file
current_datetime = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
csv_filename = f"../downloads/csv/landmarks_api_{current_datetime}.csv"  
merged_df.to_csv(csv_filename, index=False)


API call for LP-00008 was successful!
API call for LP-00004 was successful!
API call for LP-00009 was successful!
API call for LP-00007 was successful!
API call for LP-00006 was successful!
API call for LP-00027 was successful!
API call for LP-00002 was successful!
API call for LP-00001 was successful!
API call for LP-00039 was successful!
API call for LP-00057 was successful!
API call for LP-00055 was successful!
API call for LP-00152 was successful!
API call for LP-00112 was successful!
API call for LP-00153 was successful!
API call for LP-00156 was successful!
API call for LP-00155 was successful!
API call for LP-00148 was successful!
API call for LP-00161 was successful!
API call for LP-00162 was successful!
API call for LP-00164 was successful!
API call for LP-00165 was successful!
API call for LP-00167 was successful!
API call for LP-00168 was successful!
API call for LP-00169 was successful!
API call for LP-00170 was successful!
API call for LP-00170E was successful!
API call fo