# <font color='008fd0'>Overview</font>

This notebooks represents the processing steps for the hospital dataset.

## <font color='00b269'>Workflow</font>
- **SimpleTransform** - keep and rename only useful columns
- **Pandas2Spark** - transform df to spark
- **AppendFilter** - concatenate all dfs
- **Count** - count all patients
- **JoinMeteo** - join with meteo data
- **Save both to CSV and Table**

In [1]:
import pandas as pd
import warnings
from tqdm import tqdm
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from pyspark.sql.functions import col, year

StatementMeta(, 53c2cf9a-cf09-4620-9c88-e7ba88f01c51, 3, Finished, Available)

In [2]:
folder = "abfss://Medio@onelake.dfs.fabric.microsoft.com/Medio.Lakehouse/Files/Hospital"

StatementMeta(, 53c2cf9a-cf09-4620-9c88-e7ba88f01c51, 4, Finished, Available)

In [3]:
dfs = []
for year in range(2007, 2018):
    df = pd.read_excel(f"{folder}/{year}.xlsx", sheet_name='diagnostic')
    dfs.append(df)

StatementMeta(, 53c2cf9a-cf09-4620-9c88-e7ba88f01c51, 5, Finished, Available)

In [6]:
rename_map = {
    'localitate de domiciliu': 'City',
    'data internarii': 'Date'
}

city_map ={
    "TIMISOARA": "Timisoara",
    "IASI": "Iasi",
    "CLUJ-NAPOCA": "Cluj Napoca",
    "CONSTANTA": "Constanta"
}

StatementMeta(, 53c2cf9a-cf09-4620-9c88-e7ba88f01c51, 8, Finished, Available)

In [7]:
dfs[0].head()

StatementMeta(, 53c2cf9a-cf09-4620-9c88-e7ba88f01c51, 9, Finished, Available)

Unnamed: 0,localitate de domiciliu,data internarii,Cod diagnostic principal cod1,Denumire diagnostic principal cod1,Cod diagnostic principal cod2 -cauze externe,Denumire diagnostic principal cod2,Sex,< 1 an,1 – 5 ani,6– 10 ani,...,26 – 30 ani,31– 35 ani,36 – 40 ani,41– 45 ani,46 – 50 ani,51– 55 ani,56 – 60 ani,61 – 65 ani,66 – 70 ani,>70 ani
0,Bucuresti,2002-05-20,F019,"DEMENTA VASCULARA, FARA PRECIZARE",,,F,0,0,0,...,0,0,0,0,0,0,0,0,0,1
1,Bucuresti,2002-07-18,F20.0,Schizofrenia paranoida,,,F,0,0,0,...,0,0,0,1,0,0,0,0,0,0
2,Bucuresti,2005-10-25,F205,SCHIZOFRENIA REZIDUALA,,,M,0,1,0,...,0,0,0,0,0,0,0,0,0,0
3,Bucuresti,2005-12-02,F200,SCHIZOFRENIA PARANOIDA,,,F,0,0,0,...,0,0,1,0,0,1,0,0,0,0
4,Bucuresti,2005-12-02,F323,EPISOD DEPRESIV SEVER CU SIMPTOME PSIHOTICE,,,F,0,0,0,...,0,0,0,0,0,0,0,1,0,0


In [47]:
class SimpleTransform(TransformerMixin):
    def __init__(self, rename_map, columns):
        self.columns = columns
        self.rename_map = rename_map
    def transform(self, origin):
        Xs = []
        for idx, df in enumerate(origin):
            X = df.rename(columns=self.rename_map)
            X['City'] = X['City'].replace(city_map)
            X = X[self.columns]
            X = X[(X['Date'].dt.year >= 2007) & (X['Date'].dt.year <= 2017)]
            X['Date'] = pd.to_datetime(X['Date']).dt.date
            Xs.append(X)
        return Xs

StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 49, Finished, Available)

In [48]:
class Pandas2Spark(TransformerMixin):
    def transform(self, Xs):
        for idx, X in enumerate(Xs):
            X = spark.createDataFrame(X)
            Xs[idx] = X
        return Xs


StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 50, Finished, Available)

In [49]:
class AppendFilter(TransformerMixin):
    def transform(self, Xs):
        dataset = Xs[0]
        for idx, X in enumerate(Xs[1:]):
            dataset = dataset.union(X)
        return dataset

StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 51, Finished, Available)

In [56]:
class Count(TransformerMixin):
    def transform(self, X):
        X = X.groupby(['Date', 'City']).count()
        X = X.withColumnRenamed('count', 'Patients')        
        return X

StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 58, Finished, Available)

In [57]:
class JoinMeteo(BaseEstimator, TransformerMixin):
    def transform(self, X):
        meteo = spark.sql("SELECT * FROM Meteo")
        X = X.join(meteo, ['Date', 'City'], how="inner")
        return X

StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 59, Finished, Available)

In [58]:
pipeline = Pipeline(steps=[
    ('simple', SimpleTransform(rename_map, ['City', 'Date'])),
    ('2Spark', Pandas2Spark()),
    ('append', AppendFilter()),
    ('count', Count()),
    ('meteo', JoinMeteo())
])
warnings.filterwarnings("ignore")
dataset = pipeline.transform(dfs)
warnings.filterwarnings("default")

StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 60, Finished, Available)

In [61]:
dataset.write.mode("overwrite").parquet("Files/Hospital/DataSet")

StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 63, Finished, Available)

In [62]:
dataset.write.mode("overwrite").format("delta").save("Tables/Hospital")

StatementMeta(, be2cd896-d3cb-4b35-a846-032a6a611aea, 64, Finished, Available)