In [None]:
import pandas as pd
import numpy as np
import json
import mysql.connector
import datetime
import pytz

In [None]:
with open('daily_missing_isin_to_masterid_map.json', 'r') as f:
    isinToMasterid = json.load(f)
with open('col_to_indicator.json', 'r', encoding='utf-8') as f:
    colToIndicator = json.load(f)

In [None]:
stock_exchange_priority = ('Frankfurt S.E.','Stuttgart Exchange','Berlin Exchange','Dusseldorf SE','FINRA TRACE','Cbonds Estimation','Luxembourg S.E','MiFID II Source 2 (APA, Post-trade reporting)','MiFID II Source 1 (APA, Post-trade reporting)','Hong Kong S.E.','SGX','US OTC Market','Other sources of prices','London S.E.','Euronext Paris','Taipei Exchange (OTC)','Nasdaq Dubai','Taipei Exchange (Trading System)')
dateColumns = ('trade date','put/сall date','maturity date')
close_price_priority = ['close price', 'indicative price', 'bid (at close)', 'ask (at close)']

In [None]:
publish_date = datetime.datetime.today().strftime('%Y-%m-%d')
# publish_date = '2022-01-18'
path = r'D:\\sriram\\agrud\\cbonds_data_entry\\test_files\\zip_files\\datafiles\\'+publish_date+'\\'
try:
    df = pd.read_excel(path+'tradings.xls')
except:
    df = pd.read_csv(path+'tradings.csv', sep=',', encoding='latin')
publish_date = (datetime.datetime.strptime(publish_date, '%Y-%m-%d').date() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")
df['ts date'] = publish_date
df.columns = df.columns.str.strip().str.lower()

In [None]:
df[df['isin-code'] == 'XS1513739927']

In [None]:
df['master id'] = df['isin-code'].map(isinToMasterid)
df = df[df['stock exchange (eng)'].isin(stock_exchange_priority)]
df = df.dropna(subset=['master id'])

In [None]:
df['custom close price col'] = np.nan
for col in close_price_priority :
    if len(df.loc[df['custom close price col'].isna() & df[col].notna()]):
        df.loc[df['custom close price col'].isna() & df[col].notna(), 'custom close price col'] = df.loc[df['custom close price col'].isna() & df[col].notna(), col]

missing ISIN

In [None]:
# missing ISIN
missing_isin = []
for k in isinToMasterid :
    if not df['isin-code'].isin([k]).any():
        missing_isin.append(k)
len(missing_isin)

In [None]:
preprocessedData = pd.DataFrame()
issueData = []
for by, groupedDf in df.groupby(by = ['master id', 'ts date']):
    master_id = int(by[0])
    ts_date = by[1]
    for stock_exchange in stock_exchange_priority :
        close_price_col = 'custom close price col'
        stock_exchange_wise_df = groupedDf[groupedDf['stock exchange (eng)'] == stock_exchange]
        if (len(stock_exchange_wise_df) > 0) and (stock_exchange_wise_df[close_price_col].notna().sum()):
            preprocessedData = preprocessedData.append(stock_exchange_wise_df)
            break
    else:
        preprocessedData = preprocessedData.append(groupedDf.iloc[0])
        issueData.append([master_id, str(ts_date)])

In [None]:
close_price_col = 'custom close price col'
preprocessedData.loc[preprocessedData['open price'].isna(), 'open price'] = preprocessedData.loc[preprocessedData['open price'].isna(), close_price_col]
preprocessedData.loc[preprocessedData['maximum price'].isna(), 'maximum price'] = preprocessedData.loc[preprocessedData['maximum price'].isna(), close_price_col]
preprocessedData.loc[preprocessedData['minimum price'].isna(), 'minimum price'] = preprocessedData.loc[preprocessedData['minimum price'].isna(), close_price_col]
preprocessedData.loc[preprocessedData['close price'].isna(), 'close price'] = preprocessedData.loc[preprocessedData['close price'].isna(), close_price_col]

In [None]:
preprocessedData = preprocessedData.replace([np.NaN], ['NA'])
preprocessedData = preprocessedData.replace([pd.NaT], ['NA'])

In [None]:
result = []
for i, row in preprocessedData.iterrows():
    row2 = row.to_dict()
    masterId = row2['master id']
    for k, v in row2.items():
        if v == "NA":
            continue
        if k in colToIndicator:
            try:
                indicatorId = colToIndicator[k]
                if type(v) == pd.Timestamp:
                    json_data = None
                    dataType = 2
                    gmt = pytz.timezone("GMT")
                    value_data = gmt.localize(v).timestamp()
                elif type(v) == float or type(v) == int or  v.isnumeric(): 
                    dataType = 0
                    value_data = v
                    json_data = None
                elif v.isnumeric() == False:
                    json_data = json.dumps({'TEXT':v})
                    dataType = 3
                    value_data = 0
            except Exception as e:
                print(k)
                print(v)
                print(e)
            result.append([masterId,indicatorId,value_data,json_data,dataType,publish_date])

In [None]:
# insert query
try:
        db_conn = mysql.connector.connect(host='54.237.79.6',user='rentech_user',database = 'rentech_db',password='N)baegbgqeiheqfi3e9314jnEkekjb',auth_plugin='mysql_native_password')
        cursor = db_conn.cursor()
        sql = """INSERT INTO `raw_data` (`id`, `master_id`, `indicator_id`, `value_data`, `json_data`, `data_type`, `ts_date`, `ts_hour`, `job_id`, `timestamp`) VALUES 
        (NULL, %s, %s, %s, %s, %s, %s, '0:0:0', 9, NOW()) ON DUPLICATE KEY UPDATE  
        master_id = VALUES(master_id), indicator_id = VALUES(indicator_id), value_data = VALUES(value_data), json_data = VALUES(json_data),data_type = VALUES(data_type), ts_date = VALUES(ts_date) ,ts_hour = VALUES(ts_hour),
        job_id = VALUES(job_id), batch_id = VALUES(batch_id);"""
        cursor.executemany(sql, result)
        print(f'{cursor.rowcount} rows inserted successfully')
        db_conn.commit()
except Exception as e:
        print ("Mysql Error:", e)
finally:
        if(db_conn.is_connected()):
                cursor.close()
                db_conn.close()
                print("MySQL connection is closed")