# Extracting and Ingresting Data

## Extract JSON Data from Web 

### Reading Data into local JSON storage 

#### Define Func to Fetch and Store the JSON locally

In [10]:
import os
import requests
import json
import zipfile
import sys
sys.path.append('../../../Common')
from func_Fetch_Data import fetch_store_zip_json_data

def fetch_store_zip_json_data(limit, offset, url, chunk_size, storage_dir, storage_file, where_clause, compress=False, mode='w'):
    """
    Fetches data from a specified URL in chunks and stores it in a JSON file.
    Parameters:
    limit (int): The maximum number of records to fetch.
    offset (int): The starting point for fetching records.
    url (str): The URL to fetch data from, with placeholders for chunk_size, offset, and where_clause.
    chunk_size (int): The number of records to fetch in each request.
    storage_dir (str): The directory where the JSON file will be stored.
    storage_file (str): The name of the JSON file to store the fetched data.
    where_clause (str): The where clause to filter records, formatted as a string.
    compress (bool): Whether to compress the JSON file into a zip file.
    mode (str): The mode to open the JSON file, either 'w' for write or 'a' for append.
    Returns:
    None
    """
    final_url = url.format(chunk_size, offset, where_clause)
    print(f"Final URL: {final_url}")
    all_data = []

    while offset < limit:
        response = requests.get(url.format(chunk_size, offset, where_clause))
        data = response.json()
        if not data:
            print("No more data to fetch.")
            break
        all_data.extend(data)
        print(f"Fetched {len(data) + offset} out of {limit} requested records, offset is now {offset}")
        offset += chunk_size
        
    # Create the directory if it does not exist
    os.makedirs(storage_dir, exist_ok=True)
    print(f"Storage directory '{storage_dir}' is ready.")

    # Write or append the data to a JSON file
    json_path = os.path.join(storage_dir, storage_file)
    if mode == 'a' and os.path.exists(json_path):
        with open(json_path, mode='r') as file:
            existing_data = json.load(file)
        all_data = existing_data + all_data
        print(f"Existing data loaded from '{json_path}'.")

    # If no new data was fetched, ensure existing data is preserved
    if not all_data and mode == 'a' and os.path.exists(json_path):
        with open(json_path, mode='r') as file:
            all_data = json.load(file)
        mode = 'w'  # Change mode to 'w' to overwrite with existing data
        print("No new data fetched, preserving existing data.")

    if all_data:  # Only write to file if there is data to write
        with open(json_path, mode=mode) as file:
            json.dump(all_data, file, indent=4)
        print(f"Data written to '{json_path}'.")

    # Compress the JSON file if compress is True
    if compress:
        zip_path = json_path + '.zip'
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            zipf.write(json_path, os.path.basename(json_path))
        os.remove(json_path)
        print(f"Data compressed to '{zip_path}'.")

    print("Data fetching and storing process completed.")


#### Perform an Initial Load of Data

In [11]:
from datetime import datetime
from dateutil.relativedelta import relativedelta

from_date_str = (datetime.now() - relativedelta(months=3)).isoformat()
where_clause = f"trip_start_timestamp>='{from_date_str}'"

fetch_store_zip_json_data(
    limit=200000,
    offset=0,
    url="https://data.cityofchicago.org/resource/ajtu-isnz.json?$limit={}&$offset={}&$where={}",
    chunk_size=1000,
    storage_dir='C:/Users/mike/Data/Public',
    storage_file='Chi_Taxi_Trips.json',
    where_clause=where_clause,
    compress=False,
    mode='w'
)

Final URL: https://data.cityofchicago.org/resource/ajtu-isnz.json?$limit=1000&$offset=0&$where=trip_start_timestamp>='2024-10-17T15:58:56.645269'
Fetched 1000 out of 200000 requested records, offset is now 0
Fetched 2000 out of 200000 requested records, offset is now 1000
Fetched 3000 out of 200000 requested records, offset is now 2000
Fetched 4000 out of 200000 requested records, offset is now 3000
Fetched 5000 out of 200000 requested records, offset is now 4000
Fetched 6000 out of 200000 requested records, offset is now 5000
Fetched 7000 out of 200000 requested records, offset is now 6000
Fetched 8000 out of 200000 requested records, offset is now 7000
Fetched 9000 out of 200000 requested records, offset is now 8000
Fetched 10000 out of 200000 requested records, offset is now 9000
Fetched 11000 out of 200000 requested records, offset is now 10000
Fetched 12000 out of 200000 requested records, offset is now 11000
Fetched 13000 out of 200000 requested records, offset is now 12000
Fetch

#### Define Func to Retrive the Latest Timestamp from the JSON to be used for incremental loading

In [8]:
import json
from datetime import datetime

def get_latest_timestamp_from_json(json_file_path):
    """
    Extracts the latest timestamp from the JSON file and returns it as a string in ISO format.
    
    Parameters:
    json_file_path (str): The path to the JSON file.
    
    Returns:
    str: The latest timestamp in ISO format.
    """
    timestamps = []
    with open(json_file_path, 'r') as file:
        try:
            data = json.load(file)
            for record in data:
                if 'trip_start_timestamp' in record:
                    timestamps.append(datetime.fromisoformat(record['trip_start_timestamp']))
        except json.JSONDecodeError as e:
            print(f"Error decoding JSON: {e}")
    
    if not timestamps:
        raise ValueError("No valid timestamps found in the JSON file.")
    
    # Get the latest timestamp
    latest_timestamp = max(timestamps)
    
    return latest_timestamp.isoformat()

# Example usage
latest_timestamp = get_latest_timestamp_from_json('C:/Users/mike/Data/Public/Chi_Taxi_Trips.json')
print(latest_timestamp)

2025-01-01T00:00:00


####  Call the Fetch Function with the 'a'ppend option to load data incrementally
This is the function that you would use in a scheduled Orchestration Workflow Tools, etc. Airflow, NiFi, or even Windows Scheduler 

In [30]:
# Format the where clause to filter records based on the latest timestamp
where_clause = f"trip_start_timestamp>'{latest_timestamp}'"

fetch_store_zip_json_data(
    limit=1000,
    offset=0,
    url="https://data.cityofchicago.org/resource/ajtu-isnz.json?$limit={}&$offset={}&$where={}",
    chunk_size=1000,
    storage_dir='C:/Users/mike/Data/Public/Data',
    storage_file='Chi_Taxi_Trips.json',
    where_clause=where_clause,
    mode='a'
)

Final URL: https://data.cityofchicago.org/resource/ajtu-isnz.json?$limit=1000&$offset=0&$where=trip_start_timestamp>'2025-01-01T00:00:00'
No more data to fetch.
Storage directory 'C:/Users/mike/Data/Public/Data' is ready.
Existing data loaded from 'C:/Users/mike/Data/Public/Data\Chi_Taxi_Trips.json'.
Data written to 'C:/Users/mike/Data/Public/Data\Chi_Taxi_Trips.json'.
Data fetching and storing process completed.
