# Problem

An s3 bucket has lots of bloomberg files. Some files as .gz format. 

A bloomberg file is in a certain format. It has a listing of fields and then pipe-delimited data corresponding to the listing of fields. The first 3 columns in the pipe-delimited data is meta data.  From the 4th pipe-delimited column of row data, the columns match with the listing of fields.

Below are screenshots from a sample bbg file:
<img src="./images/1.jpg" width="200" height="400"> 
![2](./images/2.jpg) 
![3](./images/3.jpg)

Some files have the following after start-of-data that need to be dealt with:
![4](./images/4.jpg)


Among all the fields of each file, how do we fetch data for only 4 attributes - ID_ISIN, SECURITY_TYP , SECURITY_TYP2, CFI_CODE and output it into 1 file?

# High-level solution approach

1. Connect to AWS. For some reason adding it as part of the code would not work with my account. So before I ran the code below, I did the following in cmd: octo login aws and followed the steps. I set the aws token duration for 1 hour. It wouldnt allow for more than that. I choose 'default' profile in the last step.
1. Get a list of all files within the s3 key (folder)
1. For each file, check if all the following attributes are present - ID_ISIN, SECURITY_TYP , SECURITY_TYP2. CFI_CODE is optional. So the check did not comprise it also. 
1. Since the above step took a long time and for fear than further processing will be stalled due to an expired token, the files that met the check above were written into a file (file of all the applicable s3 key files).
1. Add the content of the file in above step to a list
1. For each item in list ... (an item in list is a file in s3 bucket)
    1.1 create the field list from the bbg file 
    1.1 create a dictionary from the field list for the attributes of interest: ID_ISIN, SECURITY_TYP , SECURITY_TYP2. CFI_CODE. The key of the dictionary item is the attribute name. The value of the dictionary item is the index of the attribute in the field list .
    1.1 create a panda dataframe with data of matching index columns from the BBG file. The columns of the dataframe are the attribute names (i.e. dictionary keys)
    1.1 Append the panda dataframe for each BBG file to a list of dataframes
1. After going through each item (s3 key file) in the list, create a dataframe that concatenates all the dataframes in the list along axis=0.
1. Write this final dataframe into a csv file

In [1]:
# boto3 is to interact with s3
import boto3

# not sure sys, os, csv is needed
# import sys
# import os
# import csv

# content of file will be read as string
import io


# content of file will be read into a pandas dataframe
import pandas as pd

# some s3 keys are gzip files. this is needed to read the content of these gzip files
import gzip

# to measure how long a file processing takes
import time

In [2]:
# get all the files in s3 bucket that are within folder NT_BB
s3_client = boto3.client('s3')
s3_bucket_name='conductor-data-input'
s3=boto3.resource('s3')
my_bucket=s3.Bucket(s3_bucket_name)
bucket_list = []
for file in my_bucket.objects.filter(Prefix = 'Hello/'): # Prefix changed
    file_name=file.key
    bucket_list.append(file.key)
length_bucket_list=print(len(bucket_list))
print(bucket_list[:10])
print("Length of bucket_list",len(bucket_list))

264
['NT_BB/', 'NT_BB/InputFile.csv', 'NT_BB/QA.corpPfdAsiaV2.out.20220520.CorpConv Non-Euro Descriptive V2', 'NT_BB/QA.equity_asia1.out.20220527.Equity Descriptive', 'NT_BB/QA.equity_bdvd_asia1.out.20220520.Equity Dividend', 'NT_BB/QA.mtge_abs_namr.out.20220520.ABS CMBS CMO Loan Descriptive', 'NT_BB/QA.mtge_cmbs_namr.out.20220520.ABS CMBS CMO Loan Descriptive', 'NT_BB/QA.mtge_cmo_namr.out.20220520.ABS CMBS CMO Loan Descriptive', 'NT_BB/corpPfdAsiaV2.out.20220520', 'NT_BB/corpPfdEuroV2.out.20220520']
Length of bucket_list 264


In [7]:
# finds files that have following 3 attributes - ID_ISIN, SECURITY_TYP, SECURITY_TYP2

filteredBucketList=[] 
for b in bucket_list:
    obj=s3.Object(s3_bucket_name,b)
    if 'gz' in b: # if the file is compressed as .gz
        with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
            content = gzipfile.read(5*1024**2).decode("utf-8")
            if all(x in content for x in ['ID_ISIN', 'SECURITY_TYP','SECURITY_TYP2']):
                filteredBucketList.append(b)
                print("Compressed File: ", b)
    else:
        data=obj.get()['Body'].read().decode('utf-8')
        if all(x in data for x in ['ID_ISIN', 'SECURITY_TYP', 'SECURITY_TYP2']):
            filteredBucketList.append(b)
            print(b)

NT_BB/QA.corpPfdAsiaV2.out.20220520.CorpConv Non-Euro Descriptive V2
NT_BB/QA.equity_asia1.out.20220527.Equity Descriptive
NT_BB/QA.mtge_abs_namr.out.20220520.ABS CMBS CMO Loan Descriptive
NT_BB/QA.mtge_cmbs_namr.out.20220520.ABS CMBS CMO Loan Descriptive
NT_BB/QA.mtge_cmo_namr.out.20220520.ABS CMBS CMO Loan Descriptive
NT_BB/corpPfdAsiaV2.out.20220520
NT_BB/corpPfdEuroV2.out.20220520
NT_BB/corpPfdLamrV2.out.20220520
NT_BB/corpPfdNamrV2.out.20220520
NT_BB/corp_loan_fatca_us.out.20220527
NT_BB/corp_pfd_asia.out.20220520
NT_BB/corp_pfd_convert_asia.out.20220520
NT_BB/corp_pfd_convert_asia.out.20220529
NT_BB/corp_pfd_convert_euro.out.20220520
NT_BB/corp_pfd_convert_euro.out.20220527.filepart
NT_BB/corp_pfd_convert_lamr.out.20220522
NT_BB/corp_pfd_convert_lamr.out.20220527
NT_BB/corp_pfd_convert_namr.out.20220520
NT_BB/corp_pfd_euro.out.20220520
NT_BB/corp_pfd_euro.out.20220527
NT_BB/corp_pfd_lamr.out.20220520
NT_BB/corp_pfd_lamr.out.20220527
NT_BB/corp_pfd_namr.out.20220520
NT_BB/equityOp

In [11]:
#write  files to a txt file that have fields 'ID_ISIN', 'SECURITY_TYP','SECURITY_TYP2'
with open(r'C:\Users\mtm\Desktop\DeleteLater\ForA\s3KeyListWISINSecTypSecTyp2.txt', 'w') as outfile:
    for item in filteredBucketList:
        outfile.write("%s\n" % item)
print("Done")

Done


In [12]:
# write the s3 keys that have ID_ISIN, SEC_TYP, SEC_TYP2 in content into list - filesWithISINSecTypSecTyp2
filesWithISINSecTypSecTyp2=[]
infile=open(r'C:\Users\mtm\Desktop\DeleteLater\ForA\s3KeyListWISINSecTypSecTyp2.txt', 'r')
filesWithISINSecTypSecTyp2=infile.readlines()
print(len(filesWithISINSecTypSecTyp2))
print(filesWithISINSecTypSecTyp2[:10])
# remove \n character in each list item
newfilesWithISINSecTypSecTyp2 = [x[:-1] for x in filesWithISINSecTypSecTyp2]
print(len(newfilesWithISINSecTypSecTyp2))
print(newfilesWithISINSecTypSecTyp2[:10])
infile.close()

72
['NT_BB/QA.corpPfdAsiaV2.out.20220520.CorpConv Non-Euro Descriptive V2\n', 'NT_BB/QA.equity_asia1.out.20220527.Equity Descriptive\n', 'NT_BB/QA.mtge_abs_namr.out.20220520.ABS CMBS CMO Loan Descriptive\n', 'NT_BB/QA.mtge_cmbs_namr.out.20220520.ABS CMBS CMO Loan Descriptive\n', 'NT_BB/QA.mtge_cmo_namr.out.20220520.ABS CMBS CMO Loan Descriptive\n', 'NT_BB/corpPfdAsiaV2.out.20220520\n', 'NT_BB/corpPfdEuroV2.out.20220520\n', 'NT_BB/corpPfdLamrV2.out.20220520\n', 'NT_BB/corpPfdNamrV2.out.20220520\n', 'NT_BB/corp_loan_fatca_us.out.20220527\n']
72
['NT_BB/QA.corpPfdAsiaV2.out.20220520.CorpConv Non-Euro Descriptive V2', 'NT_BB/QA.equity_asia1.out.20220527.Equity Descriptive', 'NT_BB/QA.mtge_abs_namr.out.20220520.ABS CMBS CMO Loan Descriptive', 'NT_BB/QA.mtge_cmbs_namr.out.20220520.ABS CMBS CMO Loan Descriptive', 'NT_BB/QA.mtge_cmo_namr.out.20220520.ABS CMBS CMO Loan Descriptive', 'NT_BB/corpPfdAsiaV2.out.20220520', 'NT_BB/corpPfdEuroV2.out.20220520', 'NT_BB/corpPfdLamrV2.out.20220520', 'NT_B

In [3]:
# For the filtered list find files that have CFI_CODE and add such files to list - filteredBucketList2
filteredBucketList2=[]
for b in newfilesWithISINSecTypSecTyp2:
    obj=s3.Object(s3_bucket_name,b)
    if 'gz' in b:
        with gzip.GzipFile(fileobj=obj.get()["Body"]) as gzipfile:
            content = gzipfile.read(5*1024**2).decode("utf-8")
            if all(x in content for x in ['CFI_CODE']):
                filteredBucketList2.append(b)
                print("Compressed File: ", b)
    else:
        data=obj.get()['Body'].read().decode('utf-8')
        if all(x in data for x in ['CFI_CODE']):
            filteredBucketList2.append(b)
            print(b)

NameError: name 'newfilesWithISINSecTypSecTyp2' is not defined

In [None]:
#write such files that have CFI_CODE to a txt file
with open(r'C:\Users\mtm\Desktop\DeleteLater\ForA\s3KeyListWISINSecTypSecTyp2CFICode.txt', 'w') as outfile:
    for item in filteredBucketList2:
        outfile.write("%s\n" % item)
print("Done")

In [None]:
# write files with 'ID_ISIN', 'SECURITY_TYP','SECURITY_TYP2', 'CFI_CODE' into a list
file=r'C:\Users\mtm\Desktop\DeleteLater\ForA\s3KeyListWISINSecTypSecTyp2CFICode.txt'
infile=open(file,'r')
listOfFilesWith4Attributes=infile.readlines()
print(listOfFilesWith4Attributes[:10])
# remove \n character in each list item
listOfFilesWith4Attributes=[item.strip('\n') for item in listOfFilesWith4Attributes]
print(listOfFilesWith4Attributes[:10])
print (len(listOfFilesWith4Attributes))

In [None]:
# create a list of files comparing 2 input lists - filesWithISINSecTypSecTyp2 ; listOfFilesWith4Attributes. 
# The new list has entries not in both the input lists.

def listDifference(l1, l2):
    ld = [] # ld - list difference 
    ld = [x for x in l1 + l2 if x in l1 and x not in l2]
    print ("length of difference in lists: ", len(ld))
    return ld

differenceInList = listDifference(newfilesWithISINSecTypSecTyp2, listOfFilesWith4Attributes)
print (differenceInList[:10])
print (len(differenceInList))

In [None]:
# write the list of the difference between the 2 lists into a file
def writeToFile(l,filename):
    str = r'C:\Users\mtm\Desktop\DeleteLater\ForA\\'+filename
    with open(str, 'w') as outfile:
        for item in l:
            outfile.write("%s\n" % item)
    print("Done")
    
writeToFile(differenceInList,'filesWithNoCFICode.txt')

In [None]:
def getdataFields(data):
    # get the data fields from the file
    
    datafields=''
    fieldL=[]
    
    # for 1 string, we pull string between 'start-of-fields' and 'end of fields'
    startString='START-OF-FIELDS'
    endString='END-OF-FIELDS'
    
    startPoint=data.index(startString)
    endPoint=data.index(endString)
    
    datafields=data[startPoint + len(startString) + 1:endPoint-1]
    
    fieldL=datafields.split('\n')
    fieldL = [item for item in fieldL if not item.startswith('#')]
    
    while("" in fieldL):
            fieldL.remove("")

    
    return fieldL
    

In [None]:
def getRowData(data):
# for 2nd string, we pull string between 'start-of-data' and 'end of data'
    
    rD=''

    startString='START-OF-DATA'
    endString='END-OF-DATA'
    
    startPoint=data.index(startString)
    
    if endString not in data:
        endPoint=0
    else:
        endPoint=data.index(endString)
      
    
    rD=data[startPoint + len(startString) + 1:endPoint-1] # rd - rowData
    
    if '# ' in rD[:5]:
        print('# present in 1st row Data')
    rD=rD.split('\n',1)[1]
    
    return rD

In [None]:
def createDictionaryofFieldNameAndIndexForFields(fL):
    """get the indices of the data fields"""
    dicForSelectFields=dict() # dictionary to store the field name and index
    for i in fL:
        if (i in ['ID_ISIN', 'SECURITY_TYP','SECURITY_TYP2', 'CFI_CODE']):
            dicForSelectFields[i]=fL.index(i)
    # Add 3 to each index for select fields in dictionary
    for k in dicForSelectFields:
        dicForSelectFields[k]=dicForSelectFields[k]+3
    return dicForSelectFields

In [None]:
# Loop through the file list and extract the file into a df. All 10 dfs into a list. Concatenate it as 1 df
# write the df of 10 files into a file. print done for each file of 10 dfs

outputDF=[] # a list of dataframes
c = 1 # counter to count how many files of 10 dfs get printed
path = 'C:\\Users\\mtm\\Desktop\\DeleteLater\\ForA\\ExtractedFilesAll4Fields\\'

fileListToLoop = newfilesWithISINSecTypSecTyp2

# remove bad file - name changed
fileListToLoop.remove('Hello/tes.123')

for i  in fileListToLoop:
    # time the loop iteration start
    tic = time.time()
    # get data for file
    obj = s3.Object(s3_bucket_name,i)
    data=obj.get()['Body'].read().decode('utf-8')
    # get data fields
    fieldList=getdataFields(data)
    # create dictionary of feildList value - 'ID_ISIN', 'SECURITY_TYP','SECURITY_TYP2', 'CFI_CODE' - and thier index value 
    fieldDictionary=createDictionaryofFieldNameAndIndexForFields(fieldList)
    # get row data
    rowData=getRowData(data)
    # add row data to itemDF dataframe
    rowDataAsString=io.StringIO(rowData)
    rowDataDf=pd.read_csv(rowDataAsString, header=None, delimiter="|", low_memory=False)
    # create df of select colum data and column header
    tdf=rowDataDf[fieldDictionary.values()] # tdf - target df with headers and columns needed
    tdf.set_axis(fieldDictionary.keys(),axis=1,inplace=True)
    
   
    

    # append itemDF to outputDF . Print 10 times at a time to 1 file
    outputDF.append(tdf)
    
    # time the loop iteration end
    toc = time.time()
    
    duration = toc - tic
    # print data fram of file added to list
    print('Printed ' , c, ' file. Filename: ', i, 'Iteration duration: {0:2f}'.format(duration))
    c+=1
   

out=pd.concat(outputDF, ignore_index = True)
abspath=path+str(c)+'.csv'
out.to_csv(abspath)

del out
outputDF=[]