In [33]:
import pandas as pd
import os
from typing import List, Optional, Dict
import gc

In [22]:
# !pip3 install pandas
# !pip3 install pyarrow




### Data Fetch

In [34]:
def fetch_nsduh_data(year: int) -> Optional[pd.DataFrame]:
    """
    Fetches NSDUH data for a specified year from a remote source.

    Args:
        year (int): The year for which to fetch data.

    Returns:
        Optional[pd.DataFrame]: A pandas DataFrame with the data, or None if fetching failed.
    """
    url_placeholder = "https://www.datafiles.samhsa.gov/sites/default/files/field-uploads-protected/studies/NSDUH-{year}/NSDUH-{year}-datasets/NSDUH-{year}-DS0001/NSDUH-{year}-DS0001-bundles-with-study-info/NSDUH-{year}-DS0001-bndl-data-tsv.zip"
    
    try:
        url = url_placeholder.format(year=year)
        df = pd.read_csv(url, compression='zip', sep='\t', low_memory=False)
        return df
    except Exception as e:
        print(f"An error occurred for year {year}: {e}")
        return None

In [35]:
def check_parquet_exists(years: List[int], output_dir: str) -> Dict[int, bool]:
    """
    Checks if Parquet files for the specified years already exist in the output directory.

    Args:
        years (List[int]): A list of years to check.
        output_dir (str): The directory where Parquet files are saved.

    Returns:
        Dict[int, bool]: A dictionary with years as keys and boolean values indicating 
                         whether the Parquet file for that year exists.
    """
    existence_check = {}
    
    for year in years:
        year_path = os.path.join(output_dir, f'year={year}')
        existence_check[year] = os.path.exists(year_path)
    
    return existence_check

In [36]:
def write_parquet(df: pd.DataFrame, year: int, output_dir: str, overwrite: bool = False) -> None:
    """
    Writes a DataFrame to Parquet format, partitioned by year.

    Args:
        df (pd.DataFrame): The DataFrame to write.
        year (int): The year of the data.
        output_dir (str): The directory where Parquet files will be saved.
        overwrite (bool): If True, overwrite existing files. If False, skip existing files.
    """
    try:
        year_dir = os.path.join(output_dir, f'year={year}')
        if overwrite and os.path.exists(year_dir):
            shutil.rmtree(year_dir)  # Remove existing directory to start fresh

        df['year'] = year  # Add the year column for partitioning
        # Write data to Parquet format with partitioning
        df.to_parquet(output_dir, partition_cols=['year'], index=False)
        
        print(f"Data for year {year} successfully saved to Parquet format in {output_dir}")
    except Exception as e:
        print(f"Error saving data to Parquet for year {year}: {e}")

In [37]:
def data_fetch(years_to_fetch: List[int], output_dir: str, overwrite: bool = False) -> None:
    """
    Fetches NSDUH data for specified years and saves it to Parquet format, one year at a time.

    Args:
        years_to_fetch (List[int]): A list of years for which to fetch data.
        output_dir (str): The directory where Parquet files will be saved.
        overwrite (bool): If True, overwrite existing files. If False, skip existing files.
    """
    try:
        for year in years_to_fetch:
            if not overwrite and os.path.exists(os.path.join(output_dir, f'year={year}')):
                print(f"Data for year {year} already exists. Skipping.")
                continue

            df = fetch_nsduh_data(year)
            if df is not None:
                print(f"Successfully fetched data for year: {year}")
                write_parquet(df, year, output_dir, overwrite)
                del df  # Remove the DataFrame from memory
                gc.collect()  # Force garbage collection

        print("All requested years processed.")
    except Exception as e:
        print(f"An unexpected error occurred in the data_fetch function: {e}")

In [38]:
def read_parquet(input_dir: str, years: Optional[List[int]] = None) -> Dict[int, pd.DataFrame]:
    """
    Reads Parquet files for specified years from the input directory.

    Args:
        input_dir (str): The directory where Parquet files are stored.
        years (Optional[List[int]]): A list of years to read. If None, read all available years.

    Returns:
        Dict[int, pd.DataFrame]: A dictionary with years as keys and pandas DataFrames as values.
    """
    data_frames = {}
    available_years = [int(d.split('=')[1]) for d in os.listdir(input_dir) if d.startswith('year=')]
    years_to_read = years if years is not None else available_years

    for year in years_to_read:
        year_path = os.path.join(input_dir, f'year={year}')
        if os.path.exists(year_path):
            df = pd.read_parquet(year_path)
            data_frames[year] = df
        else:
            print(f"Warning: No data found for year {year}")

    return data_frames

In [39]:
if __name__ == "__main__":
    # Example usage
    years = [2015, 2016, 2017, 2018, 2019] 
    output_directory = "../data/DS/NSDUH"
    
    # Fetch and save data
    data_fetch(years, output_directory, overwrite=False)

    # Read saved data (if needed)
    # Note: This part is optional and can be removed if you don't need to read the data immediately after saving
    for year in years:
        df = read_parquet(output_directory, [year])
        if year in df:
            print(f"Data for year {year}:")
            print(df[year].head())
        del df
        gc.collect()

Successfully fetched data for year: 2015
Data for year 2015 successfully saved to Parquet format in ../data/DS/NSDUH
Successfully fetched data for year: 2016
Data for year 2016 successfully saved to Parquet format in ../data/DS/NSDUH
Successfully fetched data for year: 2017
Data for year 2017 successfully saved to Parquet format in ../data/DS/NSDUH
Successfully fetched data for year: 2018
Data for year 2018 successfully saved to Parquet format in ../data/DS/NSDUH
Successfully fetched data for year: 2019
Data for year 2019 successfully saved to Parquet format in ../data/DS/NSDUH
All requested years processed.
Data for year 2015:
   QUESTID2    FILEDATE  CIGEVER  CIGOFRSM  CIGWILYR  CIGTRY  CIGYFU  CIGMFU  \
0  25095143  02/15/2018        1        99        99      16    2014       1   
1  13005143  02/15/2018        1        99        99      15    9999      99   
2  67415143  02/15/2018        2        99        99     991    9991      91   
3  70925143  02/15/2018        2         3  

### EDA

In [3]:
df.head()

Unnamed: 0,QUESTID2,FILEDATE,CIGEVER,CIGOFRSM,CIGWILYR,CIGTRY,CIGYFU,CIGMFU,CIGREC,CIG30USE,...,POVERTY3,TOOLONG,TROUBUND,PDEN10,COUTYP4,MAIIN102,AIIND102,ANALWT_C,VESTR,VEREP
0,25095143,02/15/2018,1,99,99,16,2014,1,2,93,...,1.0,2,2,3,3,2,2,1088.413235,40028,1
1,13005143,02/15/2018,1,99,99,15,9999,99,3,93,...,2.0,1,2,2,2,2,2,4423.360328,40025,1
2,67415143,02/15/2018,2,99,99,991,9991,91,91,91,...,1.0,2,2,2,3,2,2,328.111801,40004,2
3,70925143,02/15/2018,2,3,4,991,9991,91,91,91,...,3.0,2,2,2,2,2,2,235.29045,40027,1
4,75235143,02/15/2018,1,99,99,17,9999,99,1,22,...,1.0,2,2,3,3,2,2,2280.878615,40011,1


In [4]:
df.shape

(57146, 2679)

In [6]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 57146 entries, 0 to 57145
Columns: 2679 entries, QUESTID2 to VEREP
dtypes: float64(408), int64(2269), object(2)
memory usage: 1.1+ GB


In [5]:
df.describe()

Unnamed: 0,QUESTID2,CIGEVER,CIGOFRSM,CIGWILYR,CIGTRY,CIGYFU,CIGMFU,CIGREC,CIG30USE,CG30EST,...,POVERTY3,TOOLONG,TROUBUND,PDEN10,COUTYP4,MAIIN102,AIIND102,ANALWT_C,VESTR,VEREP
count,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,...,56729.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0,57146.0
mean,54476130.0,1.494103,79.628478,79.6454,503.961117,9803.818395,92.901708,46.321457,77.743464,93.085973,...,2.361755,2.210023,2.225545,1.64449,1.735082,1.978756,1.978494,4684.395919,40025.530868,1.494943
std,25531000.0,0.49997,38.316055,38.293875,487.618751,1219.932775,13.635147,44.18397,28.702399,4.420861,...,0.80218,5.20789,5.20575,0.618402,0.760086,0.144197,0.145066,5720.231967,14.631455,0.499979
min,10001920.0,1.0,1.0,1.0,1.0,2013.0,1.0,1.0,1.0,1.0,...,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.621832,40001.0,1.0
25%,32392890.0,1.0,99.0,99.0,16.0,9991.0,91.0,3.0,91.0,91.0,...,2.0,2.0,2.0,1.0,1.0,2.0,2.0,1246.005954,40013.0,1.0
50%,54182250.0,1.0,99.0,99.0,985.0,9991.0,91.0,4.0,91.0,93.0,...,3.0,2.0,2.0,2.0,2.0,2.0,2.0,2722.979853,40025.0,1.0
75%,76184340.0,2.0,99.0,99.0,991.0,9999.0,99.0,91.0,93.0,93.0,...,3.0,2.0,2.0,2.0,2.0,2.0,2.0,5764.130739,40039.0,2.0
max,99999560.0,2.0,99.0,99.0,997.0,9999.0,99.0,91.0,98.0,99.0,...,3.0,98.0,98.0,3.0,3.0,2.0,2.0,109100.62303,40050.0,2.0


In [7]:
df.head(100).to_csv(r"../data/sample_data.csv",index=False)

EDA done by Apurva Umredkar (50592382) using a new dataset

In [None]:
import pandas as pd
