# Upload the data

In [1]:
import pandas as pd

# Set max columns to None
pd.set_option('display.max_columns', None)

In [11]:
tables = {
    "BenefitsCostSharing": {
        # 2016 removed two fields: 'IsSubjToDedTier2', 'IsSubjToDedTier1
        "2016": "raw/2016/Benefits_Cost_Sharing_PUF_2015-12-08.csv",
        "2015": "raw/2015/Benefits_Cost_Sharing_PUF.csv",
        "2014": "raw/2014/Benefits_Cost_Sharing_PUF.csv"
    },
    "BusinessRules": {
        # 2016 renamed DentalOnly => DentalOnlyPlan
        "2016": "raw/2016/Business_Rules_PUF_2015-12-08.csv",
        "2015": "raw/2015/Business_Rules_PUF_Reformat.csv",
        "2014": "raw/2014/Business_Rules_PUF.csv"
    },
    "Crosswalk2015": {
        "data": "raw/2015/Plan_Crosswalk_PUF_2014-12-22.csv"
    },
    "Crosswalk2016": {
        "data": "raw/2016/Plan_ID_Crosswalk_PUF_2015-12-07.CSV"
    },
    "Network": {
        # 2016 renamed DentalOnly => DentalOnlyPlan
        "2016": "raw/2016/Network_PUF_2015-12-08.csv",
        "2015": "raw/2015/Network_PUF.csv",
        "2014": "raw/2014/Network_PUF.csv"
    },
    "PlanAttributes": {
        # This is complicated - lots of fields change b/t 2014/2015 & 2016
        "2016": "raw/2016/Plan_Attributes_PUF_2015-12-08.csv",
        "2015": "raw/2015/Plan_Attributes_PUF.csv",
        "2014": "raw/2014/Plan_Attributes_PUF_2014_2015-03-09.csv"
    },
    "Rate": {
        # 2016 same structure as 2015 and 2014
        "2016": "raw/2016/Rate_PUF_2015-12-08.csv",
        "2015": "raw/2015/Rate_PUF.csv",
        "2014": "raw/2014/Rate_PUF.csv"
    },
    "ServiceArea": {
        # 2016 renamed DentalOnly => DentalOnlyPlan
        "2016": "raw/2016/ServiceArea_PUF_2015-12-08.csv",
        "2015": "raw/2015/Service_Area_PUF.csv",
        "2014": "raw/2014/Service_Area_PUF.csv"
    }
}

In [2]:
pre_path = '../data/archive/'

## add datas in data lake

In [3]:
import configparser
from io import StringIO , BytesIO
config = configparser.ConfigParser()

In [4]:
config.read('../notebooks/config-dev.ini')

data_lake_user = config['azure']['data_lake_user']
data_lake_key = config['azure']['data_lake_key']
fs_name = config['datalake']['fs_name']

In [7]:
import sys
sys.path.append('/home/renan/dev/python/project-tcc/dags/')

from util import datalake
client = datalake.DataLake(data_lake_user,data_lake_key,fs_name)

In [11]:
client.list_files('/raw_data')

['raw_data/2014',
 'raw_data/2014/Benefits_Cost_Sharing_PUF.csv',
 'raw_data/2014/Business_Rules_PUF.csv',
 'raw_data/2014/Network_PUF.csv',
 'raw_data/2014/Plan_Attributes_PUF_2014_2015-03-09.csv',
 'raw_data/2014/Rate_PUF.csv',
 'raw_data/2014/Service_Area_PUF.csv',
 'raw_data/2015',
 'raw_data/2015/Benefits_Cost_Sharing_PUF.csv',
 'raw_data/2015/Business_Rules_PUF_Reformat.csv',
 'raw_data/2015/Network_PUF.csv',
 'raw_data/2015/Plan_Attributes_PUF.csv',
 'raw_data/2015/Rate_PUF.csv',
 'raw_data/2015/Service_Area_PUF.csv',
 'raw_data/2016',
 'raw_data/2016/Benefits_Cost_Sharing_PUF_2015-12-08.csv',
 'raw_data/2016/Business_Rules_PUF_2015-12-08.csv',
 'raw_data/2016/Network_PUF_2015-12-08.csv',
 'raw_data/2016/Plan_Attributes_PUF_2015-12-08.csv',
 'raw_data/2016/Rate_PUF_2015-12-08.csv',
 'raw_data/2016/ServiceArea_PUF_2015-12-08.csv',
 'raw_data/2017',
 'raw_data/2017/Benefits_Cost_Sharing_PUF_with_errors.csv',
 'raw_data/data',
 'raw_data/data/Plan_Crosswalk_PUF_2014-12-22.csv',
 'r

In [10]:
client.rm_file('raw_data/2017', 'Benefits_Cost_Sharing_PUF_2017-12-08.csv')

{'date': datetime.datetime(2023, 6, 20, 11, 41, 3, tzinfo=datetime.timezone.utc),
 'request_id': '03657a98-401f-0018-646c-a3c93c000000',
 'version': '2022-11-02',
 'continuation': None,
 'deletion_id': '133317348636583633'}

In [9]:
id_number = 100
table = 'BenefitsCostSharing'
ano = '2017'
d2016 = pd.read_csv(pre_path + 'raw/2016/Benefits_Cost_Sharing_PUF_2015-12-08.csv', encoding="latin1", low_memory=False)
d2016 = d2016.head(2000)
d2016 = d2016.drop(d2016.columns[:3], axis=1)
filename = 'Benefits_Cost_Sharing_PUF_with_errors.csv'
md2016 = {
    'id': str(id_number),
    'schema': table,
    'filename': filename,
    'content_hash': ano + '_' + filename,
    'year': ano
}

client.upload_data_frame(d2016, 'raw_data/' + ano, filename, 
                    file_metadata=md2016, 
                    remote_output_format='csv',
                    overwrite_flag=True)

{'name': 'raw_data/2017/Benefits_Cost_Sharing_PUF_with_errors.csv', 'etag': '"0x8DB7182468F229F"', 'deleted': False, 'metadata': {'content_hash': '2017_Benefits_Cost_Sharing_PUF_with_errors.csv', 'filename': 'Benefits_Cost_Sharing_PUF_with_errors.csv', 'id': '100', 'schema': 'BenefitsCostSharing', 'year': '2017'}, 'lease': {'status': 'unlocked', 'state': 'available', 'duration': None}, 'last_modified': datetime.datetime(2023, 6, 20, 11, 34, 14, tzinfo=datetime.timezone.utc), 'creation_time': datetime.datetime(2023, 6, 20, 11, 34, 13, tzinfo=datetime.timezone.utc), 'size': 472138, 'deleted_time': None, 'expiry_time': None, 'remaining_retention_days': None, 'content_settings': {'content_type': 'application/octet-stream', 'content_encoding': None, 'content_language': None, 'content_md5': None, 'content_disposition': None, 'cache_control': None}, 'encryption_scope': None, 'encryption_context': None}

In [17]:
id_number = 0
incr = 1

for table in tables:
    print(table)

    if table[:9] != "Crosswalk":
        id_number += incr
        ano = '2016'
        d2016 = pd.read_csv(pre_path + tables[table][ano], encoding="latin1", low_memory=False)
        d2016 = d2016.head(2000)
        filename = tables[table][ano].split('/')[-1]
        md2016 = {
            'id': str(id_number),
            'schema': table,
            'filename': filename,
            'content_hash': ano + '_' + filename,
            'year': ano
        }

        client.upload_data_frame(d2016, 'raw_data/' + ano, filename, 
                         file_metadata=md2016, 
                         remote_output_format='csv',
                         overwrite_flag=True)
        
        id_number += incr
        ano = '2015'
        d2015 = pd.read_csv(pre_path + tables[table][ano], encoding="latin1", low_memory=False)
        d2015 = d2015.head(2000)
        filename = tables[table][ano].split('/')[-1]
        md2015 = {
            'id': str(id_number),
            'schema': table,
            'filename': filename,
            'content_hash': ano + '_' + filename,
            'year': ano
        }

        client.upload_data_frame(d2015, 'raw_data/' + ano, filename, 
                         file_metadata=md2015, 
                         remote_output_format='csv',
                         overwrite_flag=True)

        id_number += incr
        ano = '2014'
        d2014 = pd.read_csv(pre_path + tables[table][ano], encoding="latin1", low_memory=False)
        d2014 = d2014.head(2000)
        filename = tables[table][ano].split('/')[-1]
        md2014 = {
            'id': str(id_number),
            'schema': table,
            'filename': filename,
            'content_hash': ano + '_' + filename,
            'year': ano
        }

        client.upload_data_frame(d2014, 'raw_data/' + ano, filename, 
                         file_metadata=md2014, 
                         remote_output_format='csv',
                         overwrite_flag=True)

        
    else:
        id_number += incr
        ano = 'data'
        data = pd.read_csv(pre_path + tables[table][ano], encoding="latin1", low_memory=False)
        data = data.head(2000)
        filename = tables[table][ano].split('/')[-1]
        md = {
            'id': str(id_number),
            'schema': table,
            'filename': filename,
            'content_hash': ano + '_' + filename,
            'year': ano
        }

        client.upload_data_frame(data, 'raw_data/' + ano, filename, 
                         file_metadata=md, 
                         remote_output_format='csv',
                         overwrite_flag=True)


BenefitsCostSharing
BusinessRules
Crosswalk2015
Crosswalk2016
Network
PlanAttributes
Rate
ServiceArea


In [16]:
client.list_files('raw_data')

['raw_data/2014',
 'raw_data/2014/Benefits_Cost_Sharing_PUF.csv',
 'raw_data/2014/Business_Rules_PUF.csv',
 'raw_data/2014/Network_PUF.csv',
 'raw_data/2014/Plan_Attributes_PUF_2014_2015-03-09.csv',
 'raw_data/2014/Rate_PUF.csv',
 'raw_data/2014/Service_Area_PUF.csv',
 'raw_data/2015',
 'raw_data/2015/Benefits_Cost_Sharing_PUF.csv',
 'raw_data/2015/Business_Rules_PUF_Reformat.csv',
 'raw_data/2015/Network_PUF.csv',
 'raw_data/2015/Plan_Attributes_PUF.csv',
 'raw_data/2015/Rate_PUF.csv',
 'raw_data/2015/Service_Area_PUF.csv',
 'raw_data/2016',
 'raw_data/2016/Benefits_Cost_Sharing_PUF_2015-12-08.csv',
 'raw_data/2016/Business_Rules_PUF_2015-12-08.csv',
 'raw_data/2016/Network_PUF_2015-12-08.csv',
 'raw_data/2016/Plan_Attributes_PUF_2015-12-08.csv',
 'raw_data/2016/Rate_PUF_2015-12-08.csv',
 'raw_data/2016/ServiceArea_PUF_2015-12-08.csv',
 'raw_data/ServiceArea_PUF_2015.csv',
 'raw_data/ServiceArea_PUF_2015.parquet',
 'raw_data/data',
 'raw_data/data/Plan_Crosswalk_PUF_2014-12-22.csv',
 