# REDCap Data Synthesiser

This program retrieves a data dictionary from a REDCap project, or from a supplied Data Dictionary CSV file, and builds a data synthesis model, which is then applied for `num_records` records, to generate realistic looking, completely synthetic data, and optionally output the records as JSON, export as CSV, or, if the project is not in production, import into the project via the API.

Execute the below cell to generate the functions.

In [None]:
import faker
import random
import json
import requests
import csv

# Initialize Faker
fake = faker.Faker()

# retrieve metadata
def retrieve_metadata(api_url, token):
    data = {
        'token': token,
        'content': 'metadata',
        'format': 'json',
        'returnFormat': 'json'
    }
    r = requests.post(api_url,data=data)
    if r.status_code == 200:
        # Parse the JSON response
        json_data = r.json()

        # Define a list to store parsed data
        parsed_data = []

        # Iterate over each field in the JSON data
        for field in json_data:
            parsed_field = {
                'field_name': field['field_name'],
                'field_label': field['field_label'],
                'field_type': field['field_type'],
                'select_choices_or_calculations': field['select_choices_or_calculations'],
                'text_validation_type_or_show_slider_number': field['text_validation_type_or_show_slider_number'],
                'text_validation_min': field['text_validation_min'],
                'text_validation_max': field['text_validation_max']
            }
            # Append the parsed field to the list
            parsed_data.append(parsed_field)

        # return the parsed data
        #print(parsed_data)  # For troubleshooting
        return parsed_data
      
    else:
        raise ValueError('Error: HTTP Status ' + str(r.status_code))


def preprocess_data_dictionary(data_dictionary):
    # Preprocess the data dictionary and create a mapping of field names to functions
    field_mapping = {}
    for field in data_dictionary:
        field_name = field['field_name']
        field_type = field['field_type']
        # Add logic to map field names to functions based on field type, validation type, etc.
        if field_type == 'text':
            field_mapping[field_name] = generate_text_field_value
        elif field_type in ['yesno', 'truefalse']:
            field_mapping[field_name] = generate_boolean_field_value
        elif field_type in ['radio', 'dropdown']:
            field_mapping[field_name] = generate_radio_dropdown_field_value
        elif field_type == 'checkbox':
            field_mapping[field_name] = generate_checkbox_field_value    
        elif field_type == 'slider':
            field_mapping[field_name] = generate_slider_field_value
        elif field_type == 'notes':
            field_mapping[field_name] = generate_notes_field_value
        # Add more field type mappings as needed
    return field_mapping

def generate_text_field_value(field):
    # Generate synthetic data for text fields
    field_name = field['field_name']
    field_label = field['field_label'].lower()
    validation_type = field['text_validation_type_or_show_slider_number']
    min_val = field['text_validation_min']
    max_val = field['text_validation_max']
    
    # date validated fields
    if validation_type in ['date_dmy','date_mdy','date_ymd','datetime_dmy','datetime_mdy','datetime_seconds_dmy','datetime_seconds_mdy','datetime_seconds_ymd','datetime_ymd']:
        date_format_mapping = {
            'date_dmy': '%Y-%m-%d',
            'date_mdy': '%Y-%m-%d',
            'date_ymd': '%Y-%m-%d',
            'datetime_dmy': '%Y-%m-%d %H:%M',
            'datetime_mdy': '%Y-%m-%d %H:%M',
            'datetime_seconds_dmy': '%Y-%m-%d %H:%M:%S',
            'datetime_seconds_mdy': '%Y-%m-%d %H:%M:%S',
            'datetime_seconds_ymd': '%Y-%m-%d %H:%M:%S',
            'datetime_ymd': '%Y-%m-%d %H:%M',
        }
        strftime_param = date_format_mapping[validation_type]
        if (field_label == 'dob') or ('birth' in field_label) or (field_name == 'dob') or ('birth' in field_name):
            return fake.date_of_birth().strftime(strftime_param)
        else:
            return fake.date_time_this_century().strftime(strftime_param)
        
    # integer and number
    elif validation_type in ['integer', 'number']:
        field_label = field['field_label'].lower()
        # Handle integer and number types
        if 'height' in field_label:
            # Generate height around 175 with standard deviation 6.5
            return round(random.normalvariate(175, 6.5), 2)
        elif 'weight' in field_label:
            # Generate weight around 63 with standard deviation 10
            return round(random.normalvariate(63, 10), 2)
        else:
            return fake.random_int(min=int(min_val), max=int(max_val))
    
    # Email
    elif validation_type == 'email':
        # Handle email type
        return f"{fake.first_name()}.{fake.last_name()}@{email_domain}"
    
    # Everything else. Could anticipate more types below. Could even guess based on field label and name, and apply the closest faker library method.
    else:
        # Handle other text types
        if any(word in field_label for word in ['first name', 'fname', 'personal name', 'given name', 'christian name']):
            return fake.first_name()
        elif any(word in field_label for word in ['last name', 'family name', 'lname', 'surname']):
            return fake.last_name()
        else:
            return fake.word()
        
def generate_notes_field_value(field):
    # Generate synthetic data for notes fields
    return fake.text()

def generate_boolean_field_value(field):
    # Generate synthetic data for boolean fields (yes/no or true/false)
    return random.choice([0, 1])

def generate_radio_dropdown_field_value(field):
    # Generate synthetic data for radio and dropdown fields
    choices_string = field['select_choices_or_calculations']
    choices = [choice.split(',')[0].strip() for choice in choices_string.split('|')]
    return random.choice(choices)

def generate_slider_field_value(field):
    # Generate synthetic data for slider fields
    min_val = field['text_validation_min']
    max_val = field['text_validation_max']
    # If min and max are unset, 0 and 100 are assumed.
    min_val = 0 if min_val == '' else min_val
    max_val = 100 if max_val == '' else max_val
    return fake.random_int(min=int(min_val), max=int(max_val))

def generate_checkbox_field_value(field):
    # Generate synthetic data for checkbox fields. More complicated due to the data structure.
    choices_string = field['select_choices_or_calculations']
    choices = [choice.split(', ') for choice in choices_string.split(' | ')]
    checkbox_values = {}
    # Randomly select which options are checked
    for code, _ in choices:
        if random.choice([0, 1]) == 1:
            checkbox_values[code] = "1"  # Store checked options without field name prefix (will be added later, in flatten_nested_dicts())
        else:
            checkbox_values[code] = "0"  # Need to store 0 for the CSV export to work since all keys must exist across all generated records.
    return checkbox_values

def generate_synthetic_data(data_dictionary, num_records, start_num):
    field_mapping = preprocess_data_dictionary(data_dictionary)
    record_id_field = data_dictionary[0]['field_name']
    records = []
    handled_fields = []  # list to store handled field names
    skipped_field_types = ['descriptive','sql','calc', 'file']  # List of field types to skip
    for idx in range(num_records):
        synthetic_record = {}
        synthetic_record[record_id_field] = idx + start_num
        for field_info in data_dictionary:
            field_name = field_info['field_name']
            field_type = field_info['field_type']
            if field_name == record_id_field:
                # don't generate the Record ID field with the regular method
                continue
            if field_type in skipped_field_types:
                # Skip fields with types in skipped_field_types
                continue 
            try:
                # Associate a data synthesis method based on the field metadata
                generate_function = field_mapping[field_name]
                # Generate it
                synthetic_value = generate_function(field_info)
                # Store it in a dict
                synthetic_record[field_name] = synthetic_value
                # Add handled field to a list for reporting later
                handled_fields.append(field_name) 
            except KeyError:
                continue
                # Anything that cannot be handled
                print(f"Warning: Field '{field_name}' (type '{field_type}') cannot be handled. Skipping.")
        # Append synthesised record to a list
        records.append(synthetic_record)
    
    # Generate report for handled fields
    report = "Data Synthesis Report:\n"
    for field_info in data_dictionary:
        field_name = field_info['field_name']
        if field_name == record_id_field:
            report += f"- {field_name}: Record ID field\n"
        elif field_name in handled_fields:
            report += f"- {field_name}: Synthesised as field type {field_info['field_type']}\n"
        elif field_info['field_type'] in skipped_field_types:
            report += f"- {field_name}: Field type {field_info['field_type']} skipped\n"
        else:
            report += f"- {field_name}: Field type {field_info['field_type']} unknown\n"

    # Return records and print report    
    print(report)
    return records  

# Function to flatten nested dictionaries
def flatten_nested_dicts(data_list):
    flattened_data_list = []
    for data in data_list:
        flattened_data = {}
        for key, value in data.items():
            if isinstance(value, dict):
                for nested_key, nested_value in value.items():
                    flattened_key = f"{key}___{nested_key}"
                    flattened_data[flattened_key] = nested_value
            else:
                flattened_data[key] = value
        flattened_data_list.append(flattened_data)
    return flattened_data_list

def upload_record_to_redcap(api_url, token, record_data, forceAutoNumber):
    payload = {
        'token': token,
        'content': 'record',
        'format': 'json',
        'type': 'flat',
        'overwriteBehavior': 'normal',
        'forceAutoNumber': 'true',
        'dateFormat': 'YMD',
        'data': record_data
    }
    if not forceAutoNumber:
        payload['forceAutoNumber'] = 'false'
    r = requests.post(api_url, data=payload)
    if r.status_code == 200:
        # Parse the JSON response
        json_data = r.json()
        return json_data
    else:
        raise ValueError('Error: HTTP Status ' + str(r.status_code))

def export_records_to_csv(json_records, filename):
    # Load JSON data
    records = json.loads(json_records)
    
    # Check if records is empty
    if not records:
        print("No records to export.")
        return
    
    # Extract field names from the first record
    fieldnames = records[0].keys()
    #print(fieldnames)

    # Write records to CSV file
    with open(filename, 'w', newline='') as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
        writer.writeheader()  # Write header row with field names
        writer.writerows(records)
    return len(records)
        

def convert_dd_csv_to_json(csv_filename):
    json_data = []
    
    with open(csv_filename, 'r') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            field = {
                'field_name': row['Variable / Field Name'],
                'field_label': row['Field Label'],
                'field_type': row['Field Type'],
                'select_choices_or_calculations': row['Choices, Calculations, OR Slider Labels'],
                'text_validation_type_or_show_slider_number': row['Text Validation Type OR Show Slider Number'],
                'text_validation_min': row['Text Validation Min'],
                'text_validation_max': row['Text Validation Max']
            }
            json_data.append(field)
    return json_data

# Main function
def synthesise(api_url, token, dd_method='api', dd_filename=None, num_records=10, output='print', csv_filename=None, dry_run=False, forceAutoNumber=True, start_num=1):

    # A bunch of defensive checks
    assert dd_method in ['api', 'file'], "Error: dd_method must be 'api' or 'file'."
    assert output in ['print', 'csv', 'api'], "Error: output must be 'print', 'csv' or 'api'."
    assert num_records > 0, "Error: num_records must be greater than zero."
    assert type(forceAutoNumber) == bool, "Error: forceAutoNumber must be of type boolean."
    assert start_num > 0, "Error: start_num must be greater than zero."
    
    # Collect data dictionary
    if dd_method == 'api':
        data_dictionary = retrieve_metadata(api_url, token)
    elif dd_method == 'file':
        data_dictionary = convert_dd_csv_to_json(dd_filename)
        print(data_dictionary)

    synthetic_data = generate_synthetic_data(data_dictionary, num_records, start_num)
    flattened_data = flatten_nested_dicts(synthetic_data)
    json_data = json.dumps(flattened_data)
    if dry_run:
            print("Dry run. No data generated.")
    elif output == 'api':
        project_info = test_connection(api_url, token, silent=True)
        if project_info['in_production'] == 1:
            print("This project is in production. Cannot import synthetic data.")
            return
        try:
            response = upload_record_to_redcap(api_url, token, json_data, forceAutoNumber=forceAutoNumber)
            print(f"Success! Created {response['count']} records and imported to REDCap.")
        except ValueError as e:
            print(e)
    elif output == 'csv':
        csv_count = export_records_to_csv(json_data,csv_filename)
        print(f"Success! Created {csv_count} records and saved as {csv_filename}.")
    elif output == 'print':
        print(f"Success! Created {len(flattened_data)} records.")
        print(json.dumps(flattened_data, indent=4))

def test_connection(api_url, token, silent=False):
    data = {
        'token': token,
        'content': 'project',
        'format': 'json',
        'returnFormat': 'json'
    }
    r = requests.post(api_url,data=data)
    if str(r.status_code)[0] == '2':
        if not silent:
            print("Connection successful!")
            print("Project details:")
            print(r.json())
    else:
        print('Could not connect.')
        print('HTTP Status: ' + str(r.status_code))
    if silent:
        return r.json()
    
# Todo: 
# Longitudinal
# Repeating
# Selective fields/forms/events
# fuzzily determine fake method to use for text fields based on label/name (like ssn)
# Allow for custom map overrides in an easy way
# Allow for custom regex patterns, might be complex!
# Parse normal variation modelling from data dictionary or even download and aggregate real data for generating test data with.
# Separate tokens for retrieve DD, build model, import data.
# Handle API call errors better

## Define variables

Complete the below variables and run the cell to commit.

In [None]:
api_url = 'https://redcap.example.com/api/'  # Enter your API URL
token = 'TOKEN'                              # Enter your API token
dd_method='api'                              # Method to retrieve data dictionary. 'api' (Default): retrieve data dictionary via api. 'file': Supply a data dictionary as csv using the <dd_filename> variable.
dd_filename=None                             # Filename (and path) to data dictionary CSV (Default: None)
num_records=1                                # Number of records to generate (Default: 10)
output='print'                               # 'print' (Defult): Print record data as output. 'csv': Save record data as CSV (with filename supplied as <csv_filename>), 'api': Import to REDCap via API.
csv_filename=None                            # Filename (and path) for output CSV (if <output> = 'csv')
dry_run=False                                # If True, do not export records; just build model and output report. (Default: False)
email_domain = "example.com"                 # Email domain used to generate emails (Default: 'example.com')
forceAutoNumber=True                         # If True, all records will be renamed (new records created). If False, record ID field's value will be used to select record to update. (Default: True)
start_num=1                                  # First number for record ID field value (Default: 1)

## Test connection

If successful, will print project information.

In [None]:
test_connection(api_url=api_url, token=token)

## Synthesise Data

With the above configuration saved, execute the `synthesise()` function. With the defaults, the generated records will not be imported, but printed as output in this notebook. If you are happy with the results, set `output` to either `csv` or `api`

In [None]:
synthesise(api_url, token)