# Inserting the CSVs into the database

## Dependencies and globals

In [1]:
import sys
sys.path.append("..")

import os
import pandas as pd
import numpy as np
from tqdm import tqdm
from glob import glob

from db import connection
from db_utils import DBUtils

# ID of user who imported the data
USER_ID = 29

# Dataset namespace
NAMESPACE = 'faostat_2020'

OUTPUT_PATH = 'output/'
STANDARDIZATION_PATH = 'standardization/'

## Load datasets, entities, variables & sources

In [2]:
entities = pd.read_csv(
    os.path.join(STANDARDIZATION_PATH, './entities_standardized.csv'), 
    index_col='name'
)

In [3]:
db_entity_id_by_name = { 
    row.name: int(row['db_entity_id']) for _, row in entities.iterrows() 
}

In [4]:
variables = pd.read_csv(os.path.join(OUTPUT_PATH, 'variables.csv'))
datasets = pd.read_csv(os.path.join(OUTPUT_PATH, 'datasets.csv'))
sources = pd.read_csv(os.path.join(OUTPUT_PATH, 'sources.csv'))

In [5]:
datasets

Unnamed: 0,id,name,description
0,AE,ASTI R&D Indicators: ASTI-Expenditures,ASTI collects primary time-series data on agri...
1,AF,ASTI R&D Indicators: ASTI-Researchers,ASTI collects primary time-series data on agri...
2,BC,Food Balance: Commodity Balances - Crops Prima...,Commodity balances show balances of food and a...
3,BL,Food Balance: Commodity Balances - Livestock a...,Food supply data is some of the most important...
4,CC,Food Balance: Food Supply - Crops Primary Equi...,Food supply data is some of the most important...
5,CISP,Investment: Country Investment Statistics Profile,The Country Investment Statistics Profile doma...
6,CL,Food Balance: Food Supply - Livestock and Fish...,Food supply data is some of the most important...
7,CS,Macro-Statistics: Capital Stock,As part of the FAO Agriculture Capital Stock (...
8,EF,Agri-Environmental Indicators: Fertilizers ind...,The data describe the use of chemical and mine...
9,EI,Agri-Environmental Indicators: Emissions inten...,Intensities of greenhouse gas (GHG) emissions ...


## Integrity checks

In [6]:
def print_err(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

def assert_unique(df, subset, message="Duplicate row found"):
    duplicate_mask = df.duplicated(subset=subset)
    if duplicate_mask.any() == True:
        print_err(message)
        print_err(df[duplicate_mask])
        return 1
    return 0

In [7]:
print("Running integrity checks...")

Running integrity checks...


In [63]:
errors = 0

# Dataset IDs should be unique
errors += assert_unique(datasets, ['id'])

# Dataset names should be unique
errors += assert_unique(datasets, ['name'])

# Variable names should be unique
errors += assert_unique(variables, ['name'])

# Variable codes should be unique
errors += assert_unique(variables, ['code'])

# all entities should have a db_entity_id
if entities['db_entity_id'].isnull().any() == True:
    print_err("Entities are missing database ID")
    print_err(entities[entities['db_entity_id'].isnull()])
    errors += 1

# all entities in the data should exist in standardization file
for filepath in tqdm(sorted(glob(os.path.join(OUTPUT_PATH, 'datapoints/*.csv')))):
    df = pd.read_csv(filepath)
    # UNIQUE (entity, year) constraint
    errors += assert_unique(df, ['entity', 'year'], "Duplicate row in %s" % filepath)
    # No empty values
    if df['value'].isnull().any():
        print("%s contains empty values in 'value' column" % filepath)
        errors += 1
    # No non-numeric values
    if not df['value'].map(np.isreal).all():
        print("Non-numeric values in %s" % filepath)
        print(df[pd.to_numeric(df['value'], errors='coerce').isnull()])
        errors += 1

if errors != 0:
    print_err("\nIntegrity checks failed. There were %s errors.\n" % str(errors))
    sys.exit(1)
else:
    print("\nIntegrity checks passed.\n")


  0%|          | 0/14048 [00:00<?, ?it/s][A
  0%|          | 12/14048 [00:00<02:08, 109.30it/s][A
  0%|          | 22/14048 [00:00<02:12, 105.91it/s][A
  0%|          | 31/14048 [00:00<02:21, 98.74it/s] [A
  0%|          | 40/14048 [00:00<02:29, 93.58it/s][A
  0%|          | 48/14048 [00:00<02:42, 86.30it/s][A
  0%|          | 58/14048 [00:00<02:38, 88.48it/s][A
  0%|          | 67/14048 [00:00<02:42, 85.95it/s][A
  1%|          | 77/14048 [00:00<02:37, 88.97it/s][A
  1%|          | 89/14048 [00:00<02:27, 94.60it/s][A
  1%|          | 99/14048 [00:01<02:41, 86.37it/s][A
  1%|          | 108/14048 [00:01<02:41, 86.58it/s][A
  1%|          | 117/14048 [00:01<02:39, 87.53it/s][A
  1%|          | 129/14048 [00:01<02:26, 94.89it/s][A
  1%|          | 141/14048 [00:01<02:18, 100.30it/s][A
  1%|          | 152/14048 [00:01<02:18, 100.69it/s][A
  1%|          | 163/14048 [00:01<02:23, 96.82it/s] [A
  1%|          | 173/14048 [00:01<02:23, 96.74it/s][A
  1%|▏         | 183/14

KeyboardInterrupt: 

## Insert database rows

In [1]:
with connection as c:
    db = DBUtils(c)
    
    for _, dataset in tqdm(datasets.iterrows(), total=len(datasets)):
        
        # Insert the dataset
        print("Inserting dataset: %s" % dataset['name'])
        db_dataset_id = db.upsert_dataset(
            name=dataset['name'],
            description=dataset['description'],
            namespace=NAMESPACE, 
            user_id=USER_ID)
        
        # Insert the source
        source = sources[sources['dataset_id'] == dataset.id].iloc[0]
        print("Inserting source: %s" % source['name'])
        db_source_id = db.upsert_source(
            name=source['name'], 
            description=source['description'], 
            dataset_id=db_dataset_id)
        
        # Insert variables associated with this dataset
        for j, variable in variables[variables.dataset_id == dataset['id']].iterrows():
            # insert row in variables table
            print("Inserting variable: %s" % variable['name'])
            db_variable_id = db.upsert_variable(
                name=variable['name'], 
                code=variable['code'], 
                unit=variable['unit'], 
                description=variable['description'],
                short_unit=None, 
                source_id=db_source_id, 
                dataset_id=db_dataset_id)

            # read datapoints
            data_values = pd.read_csv(os.path.join(OUTPUT_PATH, 'datapoints', '%s.csv' % variable.id))

            values = [(float(row['value']), int(row['year']), db_entity_id_by_name[row['entity']], db_variable_id)
                      for _, row in data_values.iterrows()]

            print("Inserting values...")
            db.upsert_many("""
                INSERT INTO 
                    data_values (value, year, entityId, variableId)
                VALUES 
                    (%s, %s, %s, %s)
                ON DUPLICATE KEY UPDATE
                    year = VALUES(year)
            """, values)
            
            # We have a dummy ON DUPLICATE handler that updates the year which is essentially 
            # a no update operation. We do this only to avoid a duplicate key error. It occurs 
            # when FAO uses an Item Group and Item with the same name. For example, 'Eggs' is 
            # both an Item Group and a standalone Item in: 
            # Commodity Balances - Livestock and Fish Primary Equivalent
            
            # This is not ideal because we could be masking other duplication issues, we should 
            # ideally have the differentiation between groups and itemsin the database, but this 
            # requires effort and time, both of which are currently in short supply.
            
            print("Inserted %d values for variable" % len(values))

print("All done. Phew!")

NameError: name 'connection' is not defined

## SQL to delete all data

```sql
DELETE data_values
FROM   data_values
       INNER JOIN variables
               ON variables.id = data_values.variableid
       INNER JOIN sources
               ON sources.id = variables.sourceid
       INNER JOIN datasets
               ON datasets.id = sources.datasetid
WHERE  datasets.namespace = 'faostat_2020';

DELETE variables
FROM   variables
       INNER JOIN sources
               ON sources.id = variables.sourceid
       INNER JOIN datasets
               ON datasets.id = sources.datasetid
WHERE  datasets.namespace = 'faostat_2020';

DELETE sources
FROM   sources
       INNER JOIN datasets
               ON datasets.id = sources.datasetid
WHERE  datasets.namespace = 'faostat_2020';

DELETE FROM datasets
WHERE  namespace = 'faostat_2020';
```