This code can be used to download and perform some basic analysis on the SRN documents available from their API. 

Foremost, we want to look at issues we encounter while downloading and on a document and company level respectivly. This includes:

- summary statistics of download 
- classify filetype of files downloaded
- issues while downloading (specific error codes are printed)
- problems specific to filetype
- distribution of years covered by downloads 
- max, min, median and average pages in .pdf files

make sure to set "fpath" to your desired local directory. By default, this links to the repository's "data" folder.  

Python Version used: 3.10.12

In [None]:
# set up environment 
 
import requests
import pandas as pd
import os
import numpy as np
import filetype
from datetime import datetime
from PyPDF2 import PdfReader

srn_api_url = "https://api.sustainabilityreportingnavigator.com/api/"

fpath = "./" # data/ your download directory

# write function: 

write function to be able to download files from API

In [None]:
# function to write from api

def get_srn_companies():
    """
    Returns a list of companies that are included in SRN Document Database.

    Returns:
        [list{dict}]: A list containg company level metadata
    """
    response = requests.get(srn_api_url + "companies", allow_redirects=True)
    return response.json()

def get_srn_documents():
    """
    Returns a list of documents that are included in SRN Document Database.

    Returns:
        [list{dict}]: A list containg document level metadata
    """
    response = requests.get(srn_api_url + "documents", allow_redirects=True)
    return response.json()   

In [None]:
# create df with document and company information 
companies = pd.DataFrame(get_srn_companies())
documents = pd.DataFrame(get_srn_documents())
df = documents.merge(companies, left_on='company_id', right_on='id')
#df = df[['country', 'type', 'year', 'company_id']]
#df['type'] = df['type'].str.strip()

# download function:

- download all documents from API (filename = id)
- print error IDs of failed downloads
- only downloads files that are not already in directory

In [None]:
# function to download all documents from api

def download_document(id, fpath, timeout=60):
    """
    Retreives all documents available from the SRN Document Database and 
    stores it at the provided file path.

    Args:
        id (str): The SRN document id.
        fpath (str): A sting containt the file path where you want to
            store the file.
        timeout (int, optional): Sometimes, a download API call might
            nlock because of a dying connection or because the data
            is not available. If a timeout is reached, the according
            API request will raise an exception and and continue download with the next file    . 
            Defaults to 60 seconds.
    """
    try:
        if not os.path.exists(fpath):
            response = requests.get(
                srn_api_url + f"documents/{id}/download",
                timeout=timeout
            )
            with open(fpath, 'wb') as f:
                f.write(response.content)
            print(f"Downloaded document {id}")
        else:
            print(f"Document {id} already exists in the directory")
    except requests.exceptions.Timeout:
        print(f"Download timed out for document {id}")
    except Exception as e:
        print(f"Error occurred while downloading document {id}: {str(e)}")




if __name__ == "__main__":
    companies = get_srn_companies()
    documents = get_srn_documents()
    for doc in documents:
        filepath = 0
        filepath = f"{fpath}/{doc['id']}"
        download_document(doc['id'], filepath)
        print(f"Downloaded document {doc['id']}")
    print("done!") 

# classify function (filetype): 

classifies the filetype of downloaded documents 

for option to add suffix to each local file name, see "add correct suffix to file" below

In [None]:
# filetype classifier function - see below to add correct suffix to the local files

directory = str(fpath)

def classify_file(file_path):
    if not os.path.exists(file_path):  # Check if the file exists
        return 'NA'
    kind = filetype.guess(file_path)
    if kind is None:                   # "html" for not recognized files
        return 'html'
    return kind.extension

def main():
    for file_name in os.listdir(directory):
        file_path = os.path.join(directory, file_name)
        file_extension = classify_file(file_path)

if __name__ == '__main__':
    main()

# create "main_df" 

prints number of documents that failed to download

adds success and filetype colums (success=1 means that this id downloaded successfully)

to do: add industry sector column

In [None]:
# create main_df for consecutive analysis

fpath = fpath # this should not be necessary if the download code was run previously
doc_api = pd.DataFrame(get_srn_documents()) # Create Pandas Dataframe for documents Information from API
doc_local = pd.DataFrame({'id': os.listdir(fpath)}) # Create Pandas Dataframe for downloaded documents
# doc_local['id'] = doc_local['id'].str.replace(r'\.pdf$|\.html$', '', regex=True) # remove suffix to be able to compare

main_df = pd.merge(doc_api, doc_local, on='id', how='left') # merge the two dfs based on the id column
main_df['success'] = main_df['id'].isin(doc_local['id']).astype(int) # add "success" column to show download success
main_df['filetype'] = main_df['id'].apply(lambda x: classify_file(os.path.join(directory, x)))
# main_df['filetype'] = main_df['href'].apply(lambda x: get_filetype(x)) # add "filetype" column, to determine pdf or not 
print(len(main_df[main_df['success'] == 0]), "out of", len(doc_api), "documents unable to download") # number of docs that werent able to be downloaded

# export main_df as .csv in working directory and add date to filename
rpath = "output/"
current_date = datetime.now().strftime("%Y-%m-%d")
output_filename = f"main_df_{current_date}.csv"
output_filepath = os.path.join(rpath, output_filename)
main_df.to_csv(output_filepath, index=False)

# create "filetype_downloads": 

relative frequencies of document filetypes in local directory

note: we can not make any statement about the filetype of unavailable files, hence all unsuccessful downloads report filetype NA

In [None]:
# test filetype

filetype_downloads = main_df.groupby('success')['filetype'].value_counts(normalize=True)


print(filetype_downloads)

# export as .csv in working directory and add date to filename
rpath = "output/"
current_date = datetime.now().strftime("%Y-%m-%d")
output_filename = f"filetype_downloads_{current_date}.csv"
output_filepath = os.path.join(rpath, output_filename)
filetype_downloads.to_csv(output_filepath, index=True)


# create "failed_downloads": 

table displays only unsuccessful downloads for further analysis 

info: most missing files derive from a handful of companies

to do: 

-add column for according download error messages

-test for "sector" column

In [None]:
# create table: failed downloads

# Filter the DataFrame for rows where 'success' equals 0
failed_downloads = main_df[main_df['success'] == 0].copy()
# Reset the index of the filtered DataFrame
failed_downloads.reset_index(drop=True, inplace=True)
# Sort the DataFrame by the "name" column
failed_downloads.sort_values(by='name', inplace=True)

# Calculate the total length of the filtered table
total_length = len(failed_downloads)
# Calculate the number of unique 'company_id' values in the filtered table
unique_company_ids = failed_downloads['company_id'].nunique()
# Calculate the average length of the filtered table per unique 'company_id'
average_length = total_length / unique_company_ids

print("Unsuccessful downloads per company in this df:", (average_length))
print("unsuccessful downloads derive from", failed_downloads['company_id'].nunique(), "companies")

# export as .csv in working directory and add date to filename
rpath = "output/"
current_date = datetime.now().strftime("%Y-%m-%d")
output_filename = f"failed_downloads_{current_date}.csv"
output_filepath = os.path.join(rpath, output_filename)
failed_downloads.to_csv(output_filepath, index=False)

# create "years_downloads": 

range and frequency of years covered by local documents

to do: add column with frequency of years in API

In [None]:
# test year

# Filter the DataFrame to include only rows where 'success' is 1
success_df = main_df[main_df['success'] == 1]

# Calculate the year counts for the filtered DataFrame
year_counts = success_df['year'].value_counts().sort_index()
total_count = year_counts.sum()

# Create a new DataFrame to store the results
years_downloads = pd.DataFrame({'Year': year_counts.index, 'Frequency': year_counts.values})

# Calculate and add the relative frequency column
years_downloads['Relative Frequency'] = (years_downloads['Frequency'] / total_count).round(2)

print(years_downloads)

# export as .csv in working directory and add date to filename
rpath = "output/"
current_date = datetime.now().strftime("%Y-%m-%d")
output_filename = f"years_downloads_{current_date}.csv"
output_filepath = os.path.join(rpath, output_filename)
years_downloads.to_csv(output_filepath, index=False)

# total size of download 

In [None]:
# total size of download

def get_directory_size(fpath):
    total_size = 0
    for file in os.listdir(fpath):
        file_path = os.path.join(fpath, file)
        if os.path.isfile(file_path):
            total_size += os.path.getsize(file_path)
    return total_size

directory_path = fpath
size_in_bytes = get_directory_size(directory_path)
size_in_kb = size_in_bytes / 1000
size_in_mb = size_in_kb / 1000
size_in_gb = size_in_mb / 1000

print("Directory Size:")
print(f"Bytes: {size_in_bytes}")
print(f"KB: {size_in_kb}")
print(f"MB: {size_in_mb}")
print(f"GB: {size_in_gb}")

# add correct suffix to local files

basically the classify function from above 

might run into issues with descriptive analysis when running this first, before creating statistics tables

to do: adjust code so this doesn't happen

In [None]:
# classify filetype of downloads and add according suffix

directory = "/Volumes/Data SSD/2024_Bachelor Arbeit Data/raw/17042024_srn_docs/pdf + Suffix" #fpath

def classify_file(file_path):
    kind = filetype.guess(file_path)
    if kind is None:
        return 'html'
    return kind.extension

def main():
    for file_name in os.listdir(directory):
        file_path = os.path.join(directory, file_name)
        file_extension = classify_file(file_path)
        new_file_name = f"{file_name}.{file_extension}"
        new_file_path = os.path.join(directory, new_file_name)
        os.rename(file_path, new_file_path)
        print(f"Renamed {file_name} to {new_file_name}")

if __name__ == '__main__':
    main()

## Avg, Median, Min and Max Statistics for downloaded PDF Docs


In [None]:
from PyPDF2 import PdfReader

# List all files in the directory
pdf_files = [os.path.join(fpath, file) for file in os.listdir(fpath)]

# Initialize lists to store statistics
num_pages_list = []

# Iterate over PDF files and extract statistics
for file in pdf_files:
    try:
        with open(file, 'rb') as f:
            reader = PdfReader(f)
            num_pages_list.append(len(reader.pages))
    except Exception as e:
        print(f"Error reading file '{file}': {e}")

# Convert the list to a pandas Series
pdf_stats = pd.Series(num_pages_list)

# Print the statistics
print("Average number of pages:", pdf_stats.mean())
print("Median number of pages:", pdf_stats.median())
print("Minimum number of pages:", pdf_stats.min())
print("Maximum number of pages:", pdf_stats.max())


## Add preprocessed corpus to main_df

In [None]:
# (see file preprocess_text.py)

# directory with preprocessed text files
ppath = "/Volumes/Data SSD/2024_Bachelor Arbeit Data/processed/17042022_srn_docs_preprocessed"

# Iterate over the .txt files in ppath directory
for file_name in os.listdir(ppath):
    if file_name.endswith(".txt"):
        file_path = os.path.join(ppath, file_name)
        file_id = file_name.split(".")[0]  # Extract the id from the file name
        
        # Read the contents of the file
        with open(file_path, "r") as file:
            contents = file.read()
        
        # Update the 'corpus_preprocessed' column in main_df
        main_df.loc[main_df['id'] == file_id, 'corpus_preprocessed'] = contents


# Drop rows with NaN values in the 'corpus' column
# df.dropna(subset=['corpus_preprocessed'], inplace=True)

main_df

## Language Detection of Files
not fully integrated yet
need to: adjust the location or df of files that should be analysed 

In [None]:
from langdetect import detect

# Define a function to apply language detection 
def detect_language(text):
    if len(str(text)) < 3:  # Adjust the minimum length threshold as needed
        return np.nan  # Return NaN if the text is too short
    else:
        return detect(str(text))  # Apply language detection if the text meets the length threshold

# Apply language detection function with length check and store results in a new column
main_df['language'] = main_df['corpus_preprocessed'].apply(detect_language)

# Print the language for each row
print(main_df['language'])

# Get language summary
language_summary = main_df['language'].value_counts()
print(language_summary)

## Export main_df

In [None]:
# export main_df as .csv in working directory and add date to filename
rpath = "output/"
current_date = datetime.now().strftime("%Y-%m-%d")
output_filename = f"main_df_{current_date}.csv"
output_filepath = os.path.join(rpath, output_filename)
main_df.to_csv(output_filepath, index=False)

main_df

# export main_df as .feather in working directory and add date to filename
rpath = "output/"
current_date = datetime.now().strftime("%Y-%m-%d")
output_filename = f"main_df_{current_date}.feather"
output_filepath = os.path.join(rpath, output_filename)
main_df.to_feather(output_filepath)

# error messages

this is currently not yet working

to do: add xls write function while downloading to later analyse these errors in a table

In [None]:
# analyse error messages while downloading 

epath = "your/directory"
df = pd.read_excel(epath)

print(df)