# Dependencies and Initializations

In [2]:
! pip install -r requirements.txt

Collecting findspark (from -r requirements.txt (line 7))
  Using cached findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Collecting numpy (from -r requirements.txt (line 15))
  Using cached numpy-2.0.0-cp312-cp312-win_amd64.whl.metadata (60 kB)
Collecting pandas (from -r requirements.txt (line 17))
  Using cached pandas-2.2.2-cp312-cp312-win_amd64.whl.metadata (19 kB)
Collecting py4j (from -r requirements.txt (line 23))
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Collecting pyspark (from -r requirements.txt (line 25))
  Using cached pyspark-3.5.1-py2.py3-none-any.whl
Collecting pytz (from -r requirements.txt (line 27))
  Using cached pytz-2024.1-py2.py3-none-any.whl.metadata (22 kB)
Collecting setuptools (from -r requirements.txt (line 30))
  Using cached setuptools-70.3.0-py3-none-any.whl.metadata (5.8 kB)
Collecting tabulate (from -r requirements.txt (line 33))
  Using cached tabulate-0.9.0-py3-none-any.whl.metadata (34 kB)
Collecting tzdata (from -r 

In [3]:
import pandas as pd
import sqlite3 as sql
import os
import hashlib
import setuptools  # for distutils
import traceback

In [4]:
from typing import Dict, Tuple, List, Callable, Any, Optional

In [5]:
import findspark

findspark.init()
findspark.find()

'C:\\Users\\Loune\\spark-3.5.1-bin-hadoop3'

In [6]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, md5, monotonically_increasing_id
from pyspark.sql.types import StringType

# Data Loading

In [7]:
WORKING_DIRECTORY = "data/"
OUTPUTS_DIRECTORY = "outputs/"

In [10]:
def read_files(directory: str) -> Dict[str, pd.DataFrame]:
    """
    Reads all CSV files in the specified directory and stores their contents in a dictionary.

    Parameters:
    directory (str): The path to the directory containing CSV files.

    Returns:
    Dict[str, pd.DataFrame]: A dictionary where keys are CSV file names and values are their contents as DataFrames.
    """

    csv_files = [f for f in os.listdir(directory) if f.endswith(".csv")]
    csv_files_content = {
        file_name: pd.read_csv(os.path.join(directory, file_name))
        for file_name in csv_files
    }

    return csv_files_content


data = read_files(WORKING_DIRECTORY)

# Data Exploration

In [12]:
metadata = {
    "data_name": [],
    "num_variables": [],
    "num_categorical_variables": [],
    "num_rows": [],
    "num_missing_values": [],
}

for file_name, df in data.items():
    metadata["data_name"].append(file_name)
    metadata["num_variables"].append(df.shape[1])
    metadata["num_categorical_variables"].append(
        df.select_dtypes(include=["object", "category"]).shape[1]
    )
    metadata["num_rows"].append(df.shape[0])
    metadata["num_missing_values"].append(df.isnull().sum().sum())

metadata_df = pd.DataFrame(metadata)
metadata_df

Unnamed: 0,data_name,num_variables,num_categorical_variables,num_rows,num_missing_values
0,er_prs_f.csv,8,3,20,21
1,ir_act_v.csv,2,1,2,0
2,ir_ben_r.csv,6,1,20,0
3,ir_spe_v.csv,2,1,2,0
4,t_mcoaae.csv,2,1,2,0


In [None]:
for name, df in data.items():
    print(f"File name: {name}")
    print(df.head(3).to_markdown(index=False))
    print("*****\n")

### Helper functions

In [14]:
# We define a hash function that will be used to hash the data
def md5_function(input_str: str) -> int:
    return int(hashlib.md5(input_str.encode()).hexdigest(), 16)

In [15]:
def md5_for_database(input_str: str) -> int:
    return int(hashlib.md5(input_str.encode()).hexdigest(), 16) % (
        10**15
    )  # to fit in db

# Table creation

## Person Table

### Person Table Construction

<table>
  <tr>
    <th>Column name in <br> OMOP Table (Person)</th>
    <th>Type</th>
    <th>Corresponding column <br> in SNDS </th>
    <th>Comments or transformations</th>
    <th>Source</th>
  </tr>
  <tr>
    <td>person_id</td>
    <td>integer</td>
    <td>num_enq (IR_BEN_R)</td>
    <td>Turn into integer <br> using hash function.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#person">Ref 1</a></td>
  </tr>
  <tr>
    <td>gender_concept_id</td>
    <td>integer</td>
    <td>ben_sex_cod (IR_BEN_R)</td>
    <td>1 <- 8507 & 2 <- 8532</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#person">Ref 1</a> & <a href="https://athena.ohdsi.org/search-terms/terms?domain=Gender&standardConcept=Standard&page=1&pageSize=15&query=">Ref é</a> </td>
  </tr>
  <tr>
    <td>year_of_birth</td>
    <td>integer</td>
    <td>ben_nai_ann (IR_BEN_R)</td>
    <td>No transfo needed <br> If no year_of_birth, drop the row.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#person">Ref 1</a></td>
  </tr>
  <tr>
    <td>month_of_birth</td>
    <td>integer</td>
    <td>ben_nai_moi (IR_BEN_R)</td>
    <td>No transfo needed</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#person">Ref 1</a></td>
  </tr>
  <tr>
    <td>person_source_value</td>
    <td>string</td>
    <td>num_enq (IR_BEN_R)</td>
    <td>No transfo needed</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#person">Ref 1</a></td>
  </tr>
  <tr>
    <td>gender_source_value</td>
    <td>string</td>
    <td>ben_sex_cod (IR_BEN_R)</td>
    <td>Turn into athena <br> string code for gender. <br> 1 <- 'M', 2 <- 'F'</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#person">Ref 1</a> & <a href="https://athena.ohdsi.org/search-terms/terms?domain=Gender&standardConcept=Standard&page=1&pageSize=15&query=">Ref 2</a></td>
  </tr>
  <tr>
    <td>location_id</td>
    <td>integer</td>
    <td>ben_res_dpt & ben_res_reg <br> (IR_BEN_R)</td>
    <td>Compute concat(ben_res_dpt, <br> ben_res_reg) </td>
    <td>Unique key given to a unique Location. <a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#location">Ref 3"</a></td>
  </tr>
</table>

0. All the info we need in order to construct the new table already are in IR_BEN_R, at first sight we don't need to perfom any table join.
1. We start by defining the functions used to apply the transformations on the data.
2. Then we create each transformation function using the transformation rules defined in the table above.
3. Finally, we call the function that applies the transformations on the data to obtain the new transformed dataframe.

In [16]:
def safe_apply(func: Callable[..., Any], args: List[Any]) -> Any:
    """Helper function to apply a function with arguments and catch exceptions.

    Parameters
    - func (Callable[..., Any]): A function to apply.
    - args (List[Any]): A list of arguments to pass to the function.

    Returns
    - Any: The result of applying the function with the provided arguments, or None if an exception occurred.
    """
    try:
        return func(*args)
    except Exception as e:
        print(f"Error processing with function {func.__name__}: {e}")
        raise e

In [17]:
def apply_transformations(
    transformations: Dict[str, Tuple[Callable[..., Any], List[Any]]]
) -> Optional[pd.DataFrame]:
    """
    Generates a new DataFrame based on transformations.

    Parameters:
    - transformations (Dict[str, Tuple[Callable[..., Any], List[Any]]]): A dictionary where keys are column names of the new DataFrame.
      The values are tuples containing a function and a list of arguments. Each column is generated by executing the function with its arguments.

    Returns:
    - Optional[pd.DataFrame]: A new DataFrame generated based on the provided transformations, or None if transformations didn't give any result.
    """
    new_data = {}

    for col_name, (func, args) in transformations.items():
        try:
            new_data[col_name] = safe_apply(func, args)
        except Exception as e:
            raise e

    if new_data:
        return pd.DataFrame(new_data)

In [18]:
# We create the transformation functions that we'll use to apply the rules

# Some columns are generated without applying any transformation.
identity: Callable[[pd.Series], pd.Series] = lambda x: x

# Hashed value
hashed_value: Callable[[pd.Series], pd.Series] = lambda x: x.apply(md5_function)


# Column "gender_concept_id" is generated by assigning new values for each gender
def apply_mapping_rules(
    original_col: pd.Series, mapping_rules: Dict[Any, Any]
) -> pd.Series:
    return original_col.apply(
        lambda x: mapping_rules[x] if x in mapping_rules else None
    )


# Column location_id is created by concatenating integer columns.
def concatenate_codes(codes1: pd.Series, codes2: pd.Series) -> pd.Series:
    """
    Concatenate two series containing int codes and return the result as a new series,
    with each element formed by concatenating corresponding elements from the input series.

    Args:
    codes1 (pd.Series): First series of codes.
    codes2 (pd.Series): Second series of codes.

    Returns:
    pd.Series: The concatenated result as a series.

    Raises:
    ValueError: If the input series do not have the same length.
    """
    if len(codes1) != len(codes2):
        raise ValueError("Input series must have the same length")

    concatenated = codes1.astype(str) + codes2.astype(str)

    return concatenated.astype(int)

In [19]:
# We define the transformation rules to create and the data

TABLE_NAME = "ir_ben_r.csv"

transformations = {
    "person_id": (hashed_value, [data[TABLE_NAME]["NUM_ENQ"]]),
    "gender_concept_id": (
        apply_mapping_rules,
        [data[TABLE_NAME]["ben_sex_cod"], {1: 8507, 2: 8532}],
    ),
    "year_of_birth": (identity, [data[TABLE_NAME]["ben_nai_ann"]]),
    "month_of_birth": (identity, [data[TABLE_NAME]["ben_nai_moi"]]),
    "person_source_value": (identity, [data[TABLE_NAME]["NUM_ENQ"]]),
    "location_id": (
        concatenate_codes,
        [data[TABLE_NAME]["ben_res_dpt"], data[TABLE_NAME]["ben_res_reg"]],
    ),
    "gender_source_value": (
        apply_mapping_rules,
        [data[TABLE_NAME]["ben_sex_cod"], {1: "M", 2: "F"}],
    ),
}

In [20]:
# We apply all the rules to obtain the new Person Table
person_table = apply_transformations(transformations)

In [None]:
person_table

In [22]:
person_table.to_csv(os.path.join(OUTPUTS_DIRECTORY, "person_table.csv"), index=False)

## Care Site Table

### Care Site Table Construction

<table>
  <tr>
    <th>Column name in <br> OMOP Table (Care Site)</th>
    <th>Type</th>
    <th>Corresponding column <br> & table in SNDS </th>
    <th>Comments or transformations</th>
    <th>Source</th>
  </tr>
  <tr>
    <td>care_site_id</td>
    <td>integer</td>
    <td>eta_num & soc_rai <br> (t_mcoaae) </td>
    <td>We derive it from (str(eta_num), soc_rai) <br> by applying a hash function.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#care_site">Ref 4</a></td>
  </tr>
  <tr>
    <td>care_site_name</td>
    <td>string</td>
    <td>soc_rai (t_mcoaae)</td>
    <td>No transformation needed.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#care_site">Ref 4</a></td>
  </tr>
  <tr>
    <td>location_id</td>
    <td>integer</td>
    <td>location_id (LOCATION) <br> eta_num (t_mcoaae)</td>
    <td>No transformation needed.</td>
    <td>eta_num is the finess number, it is unique and it holds <br> a geographical meaning, we can use it as location_id</td>
  </tr>
  <tr>
    <td>care_site_source_value</td>
    <td>string</td>
    <td>soc_rai (t_mcoaae)</td>
    <td>In this case, we use care_site_name <br> because we don't have other info.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#care_site">Ref 4</a></td>
  </tr>
</table>

In [24]:
drop_query = """DROP TABLE IF EXISTS care_site"""

create_query = """
CREATE TABLE care_site (
    care_site_id INTEGER PRIMARY KEY,
    care_site_name TEXT,
    location_id INTEGER,
    care_site_source_value TEXT
)
"""
def populate_table_query(table_name: str) -> str:
    """Returns the query to populate the care_site table.

    Parameters:
    - table_name (str): The name of the table containing the data.

    Returns:
    - str: The query to populate the care_site table.
    """

    return f"""INSERT INTO care_site (care_site_id, care_site_name, location_id, care_site_source_value)
    SELECT 
        md5(cast({table_name}.eta_num as text) || {table_name}.soc_rai), 
        {table_name}.soc_rai AS care_site_name,
        {table_name}.eta_num AS location_id, 
        {table_name}.soc_rai AS care_site_source_value
    FROM {table_name}
    """

In [25]:
conn = sql.connect("outputs/db.sql")
try:
    cursor = conn.cursor()

    # we create a hash function
    conn.create_function("md5", 1, md5_for_database)

    # we drop the table care_site if it exists and create it
    cursor.execute(drop_query)
    cursor.execute(create_query)

    # We write the tables finess and t_mcoaae into the database to use them
    tab_name = "t_mcoaae"
    data["t_mcoaae.csv"].to_sql(tab_name, conn, index=False, if_exists="replace")

    # We populate the care_site table
    cursor.execute(populate_table_query(tab_name))
    conn.commit()
except Exception as e:
    print(f"Error: {str(e)}")
    traceback.print_exc()
    conn.rollback()  # Rollback changes if an error occurs

finally:
    conn.close()

In [26]:
care_site = None
conn = sql.connect(f"{OUTPUTS_DIRECTORY}db.sql")

try:
    care_site = pd.read_sql_query("SELECT * FROM care_site", conn)
except Exception as e:
    print(f"Error: {str(e)}")
finally:
    conn.close()

In [None]:
care_site

## Provider Table

### Provider Table Construction

<table>
  <tr>
    <th>Column name in <br> OMOP Table (Provider)</th>
    <th>Type</th>
    <th>Corresponding column <br> & table in SNDS </th>
    <th>Comments or transformations</th>
    <th>Source</th>
  </tr>
  <tr>
    <td>provider_id</td>
    <td>integer</td>
    <td>None</td>
    <td>autogenerated number.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#provider">Ref 5</a></td>
  </tr>
  <tr>
    <td>specialty_source_value</td>
    <td>string</td>
    <td>Label <br> (ir_act_v.csv & ir_spe_v.csv)</td>
    <td>No transformation.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#provider">Ref 5</a></td>
  </tr>
  <tr>
    <td>specialty_concept_id</td>
    <td>integer</td>
    <td>pfs_spe_cod (ir_spe_v) & <br> pfs_act_nat (ir_act_v)</td>
    <td>map each speciality code <br> with the accepted concepts in OMOP.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#provider">Ref 5</a> & <a href="https://athena.ohdsi.org/search-terms/terms?domain=Provider&standardConcept=Standard&page=1&pageSize=15&query=">Ref 6</a></td>
  </tr>
  <tr>
    <td>provider_source_value</td>
    <td>string</td>
    <td>pfs_spe_cod (ir_spe_v) & <br> pfs_act_nat (ir_act_v)</td>
    <td>No transformation needed, except str() casting.</td>
    <td><a href="https://ohdsi.github.io/CommonDataModel/cdm53.html#provider">Ref 5</a></td>
  </tr>
</table>

In [28]:
# Configure Spark
configuration = (
    SparkConf().setAppName("Health Data Interoperability").setMaster("local[2]")
)

sc = SparkContext.getOrCreate(conf=configuration)


spark = SparkSession.builder.config(conf=configuration).getOrCreate()

In [33]:
# From Pandas DataFrame to Spark DataFrame
#act = spark.createDataFrame(data["ir_act_v.csv"])
#spe = spark.createDataFrame(data["ir_spe_v.csv"])

# Or Read from files:
act = spark.read.csv(f"{WORKING_DIRECTORY}/ir_act_v.csv", header=True, sep=",")
spe = spark.read.csv(f"{WORKING_DIRECTORY}/ir_spe_v.csv", header=True, sep=",")

In [34]:
# Kinésithérapeute - Pharmacien - Médecin généraliste - Radiologue
speciality_concept_mapping = {50: 38003810, 26: 38004089, 1: 38004446, 6: 45756825}

In [35]:
# We start by joigning the two tables
combined_df = act.select(col("pfs_act_nat").alias("code"), col("label")).union(
    spe.select(col("pfs_spe_cod"), col("label"))
)

In [36]:
combined_df.show()

+----+-------------------+
|code|              label|
+----+-------------------+
|  26|   Kinésithérapeute|
|  50|         Pharmacien|
|   1|Médecin généraliste|
|   6|         Radiologue|
+----+-------------------+



In [37]:
# Then, we apply the necessary transformations
new_df = (
    combined_df.withColumn("provider_id", monotonically_increasing_id() + 1)
    .withColumn("speciality_source_value", col("label"))
    .withColumn("specialty_concept_id", col("code").cast("int"))
    .replace(speciality_concept_mapping, subset=["specialty_concept_id"])
    .withColumn("provider_source_value", col("code").cast(StringType()))
    .select(
        "provider_id",
        "speciality_source_value",
        "specialty_concept_id",
        "provider_source_value",
    )
)

In [38]:
new_df.show()

+-----------+-----------------------+--------------------+---------------------+
|provider_id|speciality_source_value|specialty_concept_id|provider_source_value|
+-----------+-----------------------+--------------------+---------------------+
|          1|       Kinésithérapeute|            38004089|                   26|
|          2|             Pharmacien|            38003810|                   50|
| 8589934593|    Médecin généraliste|            38004446|                    1|
| 8589934594|             Radiologue|            45756825|                    6|
+-----------+-----------------------+--------------------+---------------------+



In [39]:
new_df.write.csv(f"{OUTPUTS_DIRECTORY}provider.csv", header=True, mode="overwrite")

# OR: write with pandas if errors with hadoop path
#pandas_df = new_df.toPandas()
#pandas_df.to_csv(f"{OUTPUTS_DIRECTORY}provider.csv", index=False)

In [40]:
spark.stop()