In [1]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import ast
from pandas.io.json import json_normalize
import sys
sys.path.insert(1, '../../modules')
import loaders
from dataclasses import dataclass, asdict 
from typing import List
from datetime import date

In [2]:
# Define a data class
@dataclass
class GttRawData:
    # attribute name: attribute type
    flowType: str
    periodType: str
    monthsToAggregate: int
    reference_date: date
    mirror: bool
    reporter_id: int
    reporter_code: int
    reporter_name: str
    reporter_description: str
    reporter_dataSource: str
    partner_id: int
    partner_code: int
    partner_name: str
    commodity_code: int
    commodity_description: str
    value_unit: str
    value_number: int
    quantity1_unit: str
    quantity1_number: int
    quantity2_unit: str
    quantity2_number: int

In [3]:
# A second data class to transform data
@dataclass
class GttCleanData:
    source_country_id: int
    destination_country_id: int
    reference_date: date
    granularity_key: str
    product_key: int
    concentration: str
    value: int

In [8]:
def crawl_data(reporter, hscode, impexp):
    PATTERN = 'https://www.globaltradetracker.com:443/api/rest/getreport?token=C0610496D526FC06563CBFF90EE6C5D8&reporter={}&from=2019-01&hscode={}&to=&latestmonths=&impexp={}&source=DEFAULT'
    crawl = pd.read_json(PATTERN.format(reporter, hscode, impexp))
    return crawl
df = crawl_data('NL', '310210', 'I')

In [16]:
#period = pd.json_normalize(df['period'])
#period.columns = ['year', 'month']
#period["reference_date"] = period["year"] + period["month"]
#period['reference_date'] = pd.to_datetime(period['reference_date'])

In [40]:
#x = ['2020, 10']
for x in str(df['period']):
    k = x.strip()[1:-1].split(',')
    year = int(k[0].strip())
    month = int(k[1].strip())


ValueError: invalid literal for int() with base 10: ''

In [5]:
# Define the return type of the function in terms of our dataclass
def get_raw_data() -> List[GttRawData]:
    
    def crawl_data(reporter, hscode, impexp):
        PATTERN = 'https://www.globaltradetracker.com:443/api/rest/getreport?token=C0610496D526FC06563CBFF90EE6C5D8&reporter={}&from=2019-01&hscode={}&to=&latestmonths=&impexp={}&source=DEFAULT'
        crawl = pd.read_json(PATTERN.format(reporter, hscode, impexp))
        return crawl
    
    df = crawl_data('NL', '310210', 'I')
    
    def flat_data():
        
        reporter = pd.json_normalize(df['reporter'])
        reporter.columns = ['reporter_id', 'reporter_code', 'reporter_name', 'reporter_description', 'reporter_dataSource']

        partner = pd.json_normalize(df['partner'])
        partner.columns = ['partner_id', 'partner_code', 'partner_name']

        commodity = pd.json_normalize(df['commodity'])
        commodity.columns = ['commodity_code', 'commodity_description']

        value = pd.json_normalize(df['value'])
        value.columns = ['value_unit', 'value_number']
        
        quantity1 = pd.json_normalize(df['quantity1'])
        quantity1.columns = ['quantity1_unit', 'quantity1_number']

        quantity2 = pd.json_normalize(df['quantity2'])
        quantity2.columns = ['quantity2_unit', 'quantity2_number']
        
        dfx = pd.concat([df, reporter, partner, commodity, value, quantity1, quantity2], axis=1)
        dfx = dfx.drop(columns=['reporter', 'partner', 'commodity', 'value', 'quantity1', 'quantity2'])
        
        df_dict = dfx.to_dict(orient="records")
        
        return df_dict
    
    data = flat_data()
    
    return [GttRawData(**x) for x in data]

In [6]:
# raw_data is defined only in terms of our MyRawData class
raw_data = get_raw_data()
raw_data

AttributeError: 'list' object has no attribute 'values'

In [None]:
# Define both the input arguments and return type of the function in terms of data classes
def transform_data(data: List[GttRawData]) -> List[GttCleanData]:
    # Convert from dataclass to dataframe
    df = pd.DataFrame([asdict(x) for x in data]) # convert data class into dict
    
    # Do some transformations
    
    product_map = loaders.load_product_map_gtt('../../fixtures/dim_product_gtt.csv')
    df['product_key'] = df['commodity_code'].apply(lambda x: product_map[int(x)])
    
    country_map = loaders.load_country_map_gtt('../../fixtures/dim_country_gtt.csv')
    df['source_country_id'] = df['reporter_id'].apply(lambda x: country_map[x])

    df['destination_country_id'] = df['partner_id'].apply(lambda x: country_map[x])
    
    df = df.rename(columns={"value_number": "value", "flowType": "concentration"})
    
    def granularity(row):
        if row['periodType'] == 'MONTHLY':
            return '1'
        elif row['periodType'] == 'QUARTERLY':
            return '2'
        else:
            return '3'

    df['granularity_key'] = df.apply(granularity, axis=1)
    
    df = df[["source_country_id", "destination_country_id", "reference_date", "granularity_key", "product_key", "concentration", "value" ]]
    
    # Convert to output dataclass
    return [GttCleanData(**x) for x in df.to_dict(orient="records")]

In [None]:
# transformed_data is defined only in terms of our MyCleanData class
transformed_data = transform_data(raw_data)
transformed_data

#### step 3
insert list of dictionaries to db

In [None]:
from adsp_connectors.database import get_mysql_connector_from_env, get_mssql_connector_from_env
import pandas as pd
from tqdm import tqdm
from adsp_connectors.database import MSSQLConnector
import os
from typing import List, Union

In [None]:
class PrototypeMSSQLConnector(MSSQLConnector):
    # Assumes that values in list are in insert column order
    def insert_many(self, table: str, values: List[List[Union[str, float, int]]]):
        sql = f"INSERT INTO {table} VALUES ({','.join(['?']*len(values[0]))})"
        connection = self.connection()
        cursor = connection.cursor()
        cursor.fast_executemany = True
        cursor.executemany(sql, values)
        cursor.commit()
        cursor.close()
        connection.close()
        return
    
    # Assumes that df cols are in insert column order
    def insert_many_df(self, table: str, df: pd.DataFrame):
        self.insert_many(table=table, values=df.values.tolist())
        return


fast_connector = PrototypeMSSQLConnector(host=os.environ["DATABASE_HOST"],
                                        dbname=os.environ["DATABASE_DBNAME"],
                                        user=os.environ["DATABASE_USER"],
                                        password=os.environ["DATABASE_PASSWORD"])

In [None]:
df = pd.DataFrame([asdict(x) for x in transformed_data], columns = ['source_country_id', 'destination_country_id', 'reference_date', 'granularity_key', 'product_key', 'concentration', 'value'])
fast_connector.insert_many_df('fact_gtt', df)

In [None]:
df

In [None]:
print(df.info())

In [None]:
for el in df.iloc[0, :]:
    print('Checking: {:s}.'.format(str(el)))
    print(isinstance(el, np.generic))
    try:
        print(el.shape)
        print('Is a NumPy array.')
    except:
        print('Is not a NumPy array.')
    finally:
        print('-----')

In [None]:
transformed_data

In [None]:
db = MySQLdb.connect(...)
cursor = db.cursor()

data = df2_dict

cursor.executemany("""
    INSERT INTO foo (source_country_id, destination_country_id, reference_date, granularity_key, product_key, value)
    VALUES (%(name)s, %(gender)s)""", data)
db.commit() 