In [3]:
APP_TOKEN= "ulPqb5RthrFBxMJ6hKWEpDtvx"
DATASET_IDENTIFIER = "erm2-nwe9"
TOTAL_LIMIT = 5000  # Total records to fetch
BATCH_SIZE = 1000   # Number of records per request

In [4]:
import pandas as pd
import requests
from sodapy import Socrata
import time
from datetime import datetime, timedelta

def fetch_nyc_311_data(
    start_date=None, 
    end_date=None, 
    limit=50000, 
    offset=0, 
    where_clause=None,
    app_token=None
):
    """
    Fetch NYC 311 Service Request data from Socrata API with pagination and date filtering.
    
    Parameters:
    ----------
    start_date : str, optional
        Start date in format 'YYYY-MM-DD'
    end_date : str, optional
        End date in format 'YYYY-MM-DD'
    limit : int, optional
        Number of records to return per request (max 50000)
    offset : int, optional
        Number of records to skip
    where_clause : str, optional
        Additional WHERE clause for SoQL query
    app_token : str, optional
        Socrata App Token for higher rate limits
        
    Returns:
    -------
    pandas.DataFrame
        DataFrame containing the 311 service requests
    """
    
    # Initialize Socrata client
    client = Socrata(
        "data.cityofnewyork.us",
        app_token=app_token
    )
    
    # Build the SoQL query
    query = []
    
    if start_date and end_date:
        query.append(f"created_date between '{start_date}T00:00:00' and '{end_date}T23:59:59'")
    elif start_date:
        query.append(f"created_date >= '{start_date}T00:00:00'")
    elif end_date:
        query.append(f"created_date <= '{end_date}T23:59:59'")
    
    if where_clause:
        query.append(f"({where_clause})")
    
    # Combine all query parts with AND
    where_query = " AND ".join(query) if query else None
    
    # Fetch data with pagination
    results = client.get(
        "erm2-nwe9", 
        limit=limit,
        offset=offset,
        where=where_query
    )
    
    # Convert to DataFrame
    df = pd.DataFrame.from_records(results)
    
    # Close client connection
    client.close()
    
    return df

def download_data_in_chunks(
    start_year=2010,
    end_year=2025,
    chunk_size=3, # months per chunk
    records_per_request=50000,
    output_format="csv",
    app_token=None
):
    """
    Download NYC 311 data in chunks by date ranges.
    
    Parameters:
    ----------
    start_year : int
        Starting year for data download
    end_year : int
        Ending year for data download
    chunk_size : int
        Number of months per chunk
    records_per_request : int
        Number of records per API request (max 50000)
    output_format : str
        Output file format ('csv' or 'parquet')
    app_token : str, optional
        Socrata App Token for higher rate limits
    """
    
    # Create date ranges in chunks of specified months
    current_date = datetime(start_year, 1, 1)
    end_date = datetime(end_year, 12, 31)
    
    chunk_number = 1
    
    while current_date <= end_date:
        # Calculate end of current chunk
        chunk_end = current_date + timedelta(days=30*chunk_size)
        if chunk_end > end_date:
            chunk_end = end_date
            
        # Format dates for query
        start_str = current_date.strftime("%Y-%m-%d")
        end_str = chunk_end.strftime("%Y-%m-%d")
        
        print(f"Downloading Chunk {chunk_number}: {start_str} to {end_str}")
        
        # Initialize variables for pagination
        offset = 0
        has_more_data = True
        chunk_df_list = []
        
        # Download this chunk with pagination
        while has_more_data:
            try:
                print(f"  Fetching records {offset} to {offset + records_per_request}")
                chunk_data = fetch_nyc_311_data(
                    start_date=start_str,
                    end_date=end_str,
                    limit=records_per_request,
                    offset=offset,
                    app_token=app_token
                )
                
                # If we got fewer records than the limit, we've reached the end
                if len(chunk_data) < records_per_request:
                    has_more_data = False
                
                # Append data to list
                if not chunk_data.empty:
                    chunk_df_list.append(chunk_data)
                    
                # Update offset for next page
                offset += records_per_request
                
                # Sleep to avoid hitting rate limits
                time.sleep(1)
                
            except Exception as e:
                print(f"Error downloading data: {e}")
                # Add exponential backoff retry logic if needed
                time.sleep(5)
        
        # Combine all pages for this chunk
        if chunk_df_list:
            combined_df = pd.concat(chunk_df_list, ignore_index=True)
            
            # Save to file
            filename = f"nyc_311_data_{start_str}_to_{end_str}"
            if output_format.lower() == "csv":
                combined_df.to_csv(f"{filename}.csv", index=False)
            elif output_format.lower() == "parquet":
                combined_df.to_parquet(f"{filename}.parquet", index=False)
            
            print(f"  Saved {len(combined_df)} records to {filename}.{output_format}")
        else:
            print(f"  No data found for date range {start_str} to {end_str}")
        
        # Move to next chunk
        current_date = chunk_end + timedelta(days=1)
        chunk_number += 1


# Example with additional filters
# Only download noise complaints for a specific date range
"""
noise_complaints = fetch_nyc_311_data(
    start_date="2023-01-01",
    end_date="2023-12-31",
    where_clause="complaint_type like '%Noise%'",
    app_token=APP_TOKEN
)
noise_complaints.to_csv("nyc_noise_complaints_2023.csv", index=False)
"""

'\nnoise_complaints = fetch_nyc_311_data(\n    start_date="2023-01-01",\n    end_date="2023-12-31",\n    where_clause="complaint_type like \'%Noise%\'",\n    app_token=APP_TOKEN\n)\nnoise_complaints.to_csv("nyc_noise_complaints_2023.csv", index=False)\n'

In [None]:
# Download data in 3-month chunks from 2010 to 2025
download_data_in_chunks(
    start_year=2024,
    end_year=2025,
    chunk_size=3,
    records_per_request=150000,
    output_format="csv",
    app_token=APP_TOKEN
)


In [None]:
import os
import glob
import pandas as pd

def combine_csv_files(input_directory, pattern="nyc_311_data_*.csv", output_file="nyc_311_data_combined.csv"):
    """
    Combines CSV files matching the given pattern in the specified directory
    into one CSV file.
    
    Parameters:
    -----------
    input_directory : str
        The directory where the CSV files are located.
    pattern : str, optional
        The glob pattern to match CSV files. Default is "nyc_311_data_*.csv".
    output_file : str, optional
        The name (and path) of the output combined CSV file.
    """
    # Create full file pattern path
    file_pattern = os.path.join(input_directory, pattern)
    # List all CSV files matching the pattern
    csv_files = glob.glob(file_pattern)
    
    if not csv_files:
        print("No CSV files found matching the pattern:", file_pattern)
        return

    print(f"Found {len(csv_files)} files. Combining them...")
    
    # List to hold individual DataFrames
    data_frames = []
    for file in csv_files:
        print(f"Reading: {file}")
        try:
            df = pd.read_csv(file)
            data_frames.append(df)
        except Exception as e:
            print(f"Error reading {file}: {e}")

    if data_frames:
        # Concatenate all DataFrames into one
        combined_df = pd.concat(data_frames, ignore_index=True)
        # Save the combined DataFrame to CSV
        combined_df.to_csv(output_file, index=False)
        print(f"Combined CSV saved to: {output_file}")
    else:
        print("No data frames were created from the CSV files.")



In [None]:
# Set the directory where your CSV files are stored (e.g., current folder)
input_dir = "."  # Change this if your files are in a different directory
output_csv = "nyc_311_data_combined.csv"
combine_csv_files(input_dir, output_file=output_csv)


In [None]:
import os
import glob
import polars as pl

def combine_csv_files_polars(
    input_directory, 
    pattern="nyc_311_data_*.csv", 
    output_file="nyc_311_data_combined.csv"
):
    """
    Combines CSV files matching the given pattern in the specified directory into one CSV file using Polars.
    It treats "NA", "Unkno", and "Unko" as null values (useful for columns like 'incident_zip') and increases 
    the schema inference length to reduce type conflicts.
    
    Parameters:
    -----------
    input_directory : str
        The directory where the CSV files are located.
    pattern : str, optional
        The glob pattern to match CSV files. Default is "nyc_311_data_*.csv".
    output_file : str, optional
        The name (and path) of the output combined CSV file.
    """
    
    file_pattern = os.path.join(input_directory, pattern)
    csv_files = glob.glob(file_pattern)
    
    # Exclude the combined output file if it exists in the same folder.
    csv_files = [f for f in csv_files if os.path.basename(f) != os.path.basename(output_file)]
    
    if not csv_files:
        print("No CSV files found matching the pattern:", file_pattern)
        return

    print(f"Found {len(csv_files)} files. Combining them...")

    # List to hold individual Polars DataFrames
    data_frames = []
    for file in csv_files:
        print(f"Reading: {file}")
        try:
            # Specify null_values, increase infer_schema_length, and ignore errors
            df = pl.read_csv(
                file, 
                null_values=["NA", "Unkno", "Unko"],
                infer_schema_length=10000,
                ignore_errors=True
            )
            data_frames.append(df)
        except Exception as e:
            print(f"Error reading {file}: {e}")

    if data_frames:
        # Concatenate all DataFrames into one
        combined_df = pl.concat(data_frames)
        # Save the combined DataFrame to CSV
        combined_df.write_csv(output_file)
        print(f"Combined CSV saved to: {output_file}")
    else:
        print("No data frames were created from the CSV files.")


In [None]:

# Set the directory where your CSV files are stored (e.g., current folder)
input_dir = "."  # Change this if your files are in a different directory
output_csv = "nyc_311_data_combined2.csv"
combine_csv_files_polars(input_dir, output_file=output_csv)


In [None]:
import pandas as pd
import requests
from sodapy import Socrata
import time
from datetime import datetime, timedelta

def fetch_nyc_311_data(
    start_date=None, 
    end_date=None, 
    limit=50000, 
    offset=0, 
    where_clause=None,
    app_token=None
):
    """
    Fetch NYC 311 Service Request data from Socrata API with pagination and date filtering.
    
    Parameters:
    ----------
    start_date : str, optional
        Start date in format 'YYYY-MM-DD'
    end_date : str, optional
        End date in format 'YYYY-MM-DD'
    limit : int, optional
        Number of records to return per request (up to 50,000)
    offset : int, optional
        Number of records to skip (for pagination)
    where_clause : str, optional
        Additional WHERE clause for the SoQL query.
    app_token : str, optional
        Socrata App Token for higher rate limits.
        
    Returns:
    -------
    pandas.DataFrame
        DataFrame containing the 311 service requests.
    """
    
    # Initialize Socrata client
    client = Socrata("data.cityofnewyork.us", app_token=app_token)
    
    # Build the SoQL query parts for the WHERE clause
    query_parts = []
    if start_date and end_date:
        query_parts.append(f"created_date between '{start_date}T00:00:00' and '{end_date}T23:59:59'")
    elif start_date:
        query_parts.append(f"created_date >= '{start_date}T00:00:00'")
    elif end_date:
        query_parts.append(f"created_date <= '{end_date}T23:59:59'")
    
    if where_clause:
        query_parts.append(f"({where_clause})")
    
    # Combine query parts with AND if any exist
    where_query = " AND ".join(query_parts) if query_parts else None
    
    # Add an order parameter to ensure consistent ordering for pagination.
    order_by = "created_date"
    
    # Fetch data with pagination using $limit, $offset, and $order
    results = client.get(
        "erm2-nwe9", 
        limit=limit,
        offset=offset,
        where=where_query,
        order=order_by
    )
    
    # Convert the results to a DataFrame
    df = pd.DataFrame.from_records(results)
    
    # Close the client connection
    client.close()
    
    return df

def download_data_in_chunks(
    start_year=2010,
    end_year=2025,
    chunk_size=3, # months per chunk
    records_per_request=50000,
    output_format="csv",
    app_token=None
):
    """
    Download NYC 311 data in chunks (by date ranges) using pagination.
    
    Parameters:
    ----------
    start_year : int
        Starting year for data download.
    end_year : int
        Ending year for data download.
    chunk_size : int
        Number of months per chunk.
    records_per_request : int
        Number of records per API request (max 50000).
    output_format : str
        Output file format ('csv' or 'parquet').
    app_token : str, optional
        Socrata App Token for higher rate limits.
    """
    
    # Create date ranges in chunks of approximately chunk_size months.
    current_date = datetime(start_year, 1, 1)
    end_date = datetime(end_year, 12, 31)
    
    chunk_number = 1
    
    while current_date <= end_date:
        # Calculate approximate end of current chunk
        chunk_end = current_date + timedelta(days=30 * chunk_size)
        if chunk_end > end_date:
            chunk_end = end_date
            
        # Format dates for the query
        start_str = current_date.strftime("%Y-%m-%d")
        end_str = chunk_end.strftime("%Y-%m-%d")
        
        print(f"Downloading Chunk {chunk_number}: {start_str} to {end_str}")
        
        # Initialize variables for pagination within the current chunk
        offset = 0
        has_more_data = True
        chunk_df_list = []
        
        # Download data for this chunk with pagination
        while has_more_data:
            try:
                print(f"  Fetching records {offset} to {offset + records_per_request - 1}")
                chunk_data = fetch_nyc_311_data(
                    start_date=start_str,
                    end_date=end_str,
                    limit=records_per_request,
                    offset=offset,
                    app_token=app_token
                )
                
                # If we received fewer records than requested, assume end of data for this chunk
                if len(chunk_data) < records_per_request:
                    has_more_data = False
                
                if not chunk_data.empty:
                    chunk_df_list.append(chunk_data)
                
                offset += records_per_request
                
                # Sleep briefly to avoid hitting rate limits
                time.sleep(1)
                
            except Exception as e:
                print(f"Error downloading data: {e}")
                time.sleep(5)
        
        # Combine all paginated pages for this chunk
        if chunk_df_list:
            combined_df = pd.concat(chunk_df_list, ignore_index=True)
            
            # Save to file in the chosen format
            filename = f"nyc_311_data_{start_str}_to_{end_str}"
            if output_format.lower() == "csv":
                combined_df.to_csv(f"{filename}.csv", index=False)
            elif output_format.lower() == "parquet":
                combined_df.to_parquet(f"{filename}.parquet", index=False)
            
            print(f"  Saved {len(combined_df)} records to {filename}.{output_format}")
        else:
            print(f"  No data found for date range {start_str} to {end_str}")
        
        # Move to the next chunk
        current_date = chunk_end + timedelta(days=1)
        chunk_number += 1


In [None]:
download_data_in_chunks(
    start_year=2024,
    end_year=2025,
    chunk_size=3,
    records_per_request=50000,
    output_format="csv",
    app_token=APP_TOKEN
)

In [1]:
import pandas as pd
import requests
from sodapy import Socrata
import time
from datetime import datetime, timedelta
import concurrent.futures

# New parameter: increased timeout (in seconds) and max retries
TIMEOUT_SECONDS = 120
MAX_RETRIES = 3

def fetch_nyc_311_data(
    start_date=None, 
    end_date=None, 
    limit=50000, 
    offset=0, 
    where_clause=None,
    app_token=None
):
    """
    Fetch NYC 311 Service Request data from Socrata API with pagination and date filtering.
    """
    client = Socrata("data.cityofnewyork.us", app_token=app_token, timeout=TIMEOUT_SECONDS)
    
    query_parts = []
    if start_date and end_date:
        query_parts.append(f"created_date between '{start_date}T00:00:00' and '{end_date}T23:59:59'")
    elif start_date:
        query_parts.append(f"created_date >= '{start_date}T00:00:00'")
    elif end_date:
        query_parts.append(f"created_date <= '{end_date}T23:59:59'")
    if where_clause:
        query_parts.append(f"({where_clause})")
    where_query = " AND ".join(query_parts) if query_parts else None
    
    order_by = "created_date"
    
    results = client.get(
        "erm2-nwe9", 
        limit=limit,
        offset=offset,
        where=where_query,
        order=order_by
    )
    client.close()
    
    return pd.DataFrame.from_records(results)

def fetch_pages_concurrently(start_str, end_str, records_per_request, app_token, max_workers=5):
    """
    Fetch pages concurrently within a date chunk with retries.
    """
    chunk_data_list = []
    offset = 0
    while True:
        # Prepare offsets for the next batch of concurrent requests
        offsets = [offset + i * records_per_request for i in range(max_workers)]
        results = []
        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            future_to_offset = {}
            for off in offsets:
                # Wrap the fetch call in a retry loop
                future = executor.submit(fetch_with_retries, start_str, end_str, records_per_request, off, app_token)
                future_to_offset[future] = off

            for future in concurrent.futures.as_completed(future_to_offset):
                off = future_to_offset[future]
                try:
                    data = future.result()
                    results.append((off, data))
                except Exception as e:
                    print(f"Error at offset {off}: {e}")
                    results.append((off, pd.DataFrame()))
        
        # Sort the results by offset
        results.sort(key=lambda x: x[0])
        stop = False
        for off, data in results:
            if not data.empty:
                chunk_data_list.append(data)
            if len(data) < records_per_request:
                stop = True
        offset += max_workers * records_per_request
        if stop:
            break
        time.sleep(0.5)
    return chunk_data_list

def fetch_with_retries(start_str, end_str, records_per_request, offset, app_token):
    """
    Attempts to fetch a page with retries if timeouts occur.
    """
    for attempt in range(1, MAX_RETRIES + 1):
        try:
            print(f"  Fetching records {offset} to {offset + records_per_request - 1} (attempt {attempt})")
            df = fetch_nyc_311_data(
                start_date=start_str,
                end_date=end_str,
                limit=records_per_request,
                offset=offset,
                app_token=app_token
            )
            return df
        except Exception as e:
            print(f"  Error at offset {offset} on attempt {attempt}: {e}")
            if attempt < MAX_RETRIES:
                time.sleep(5 * attempt)  # Exponential backoff
            else:
                raise e

def download_data_in_chunks(
    start_year=2010,
    end_year=2025,
    chunk_size=3,  # months per chunk
    records_per_request=50000,
    output_format="csv",
    app_token=None,
    max_workers=5
):
    """
    Download NYC 311 data in chunks using concurrent pagination.
    """
    current_date = datetime(start_year, 1, 1)
    end_date_dt = datetime(end_year, 12, 31)
    chunk_number = 1
    
    while current_date <= end_date_dt:
        chunk_end = current_date + timedelta(days=30 * chunk_size)
        if chunk_end > end_date_dt:
            chunk_end = end_date_dt
        
        start_str = current_date.strftime("%Y-%m-%d")
        end_str = chunk_end.strftime("%Y-%m-%d")
        print(f"Downloading Chunk {chunk_number}: {start_str} to {end_str}")
        
        try:
            chunk_df_list = fetch_pages_concurrently(start_str, end_str, records_per_request, app_token, max_workers)
        except Exception as e:
            print(f"Error fetching pages concurrently: {e}")
            chunk_df_list = []
        
        if chunk_df_list:
            combined_df = pd.concat(chunk_df_list, ignore_index=True)
            filename = f"nyc_311_data_{start_str}_to_{end_str}"
            if output_format.lower() == "csv":
                combined_df.to_csv(f"{filename}.csv", index=False)
            elif output_format.lower() == "parquet":
                combined_df.to_parquet(f"{filename}.parquet", index=False)
            print(f"  Saved {len(combined_df)} records to {filename}.{output_format}")
        else:
            print(f"  No data found for date range {start_str} to {end_str}")
        
        current_date = chunk_end + timedelta(days=1)
        chunk_number += 1
        time.sleep(1)


In [2]:
download_data_in_chunks(
        start_year=2024,
        end_year=2025,
        chunk_size=3,
        records_per_request=50000,
        output_format="csv",
        app_token=APP_TOKEN,
        max_workers=5
    )


NameError: name 'APP_TOKEN' is not defined