In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

 

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 5GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import pandas as pd
import numpy as np
import gc
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
import lightgbm as lgb
import dill
import time
import json
import multiprocessing as mp
import threading
from queue import Queue

In [None]:
#From https://www.kaggle.com/rohanrao/ashrae-half-and-half

from pandas.api.types import is_datetime64_any_dtype as is_datetime
from pandas.api.types import is_categorical_dtype

def reduce_mem_usage(df, use_float16=False,verbose=True):
    """
    Iterate through all the columns of a dataframe and modify the data type to reduce memory usage.        
    """
    
    start_mem = df.memory_usage().sum() / 1024**2
    if verbose :print("Memory usage of dataframe is {:.2f} MB".format(start_mem))
    
    for col in df.columns:
        if is_datetime(df[col]) or is_categorical_dtype(df[col]):
            continue
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == "int":
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if use_float16 and c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype("category")

    end_mem = df.memory_usage().sum() / 1024**2
    if verbose:print("Memory usage after optimization is: {:.2f} MB".format(end_mem))
    if verbose:print("Decreased by {:.1f}%".format(100 * (start_mem - end_mem) / start_mem))
    
    return df

# SQLAlchemy :
SQLAlchemy is a popular SQL toolkit and Object Relational Mapper. It gives full power and flexibility of SQL. By default, SQLAlchemy is installed in the Kaggle environment, so, no need to install anything

### Recall: 
metadata contains the latest state of each user and will be used to produce the final submission. I opted for SQL because the handling of Datafrme is very slow.
We extracted the metadata from this notebook: https://www.kaggle.com/tchaye59/riiid-preprocess-and-balance-the-dataset
and use it in this notebook: https://www.kaggle.com/tchaye59/riiid-work-with-the-full-state-using-sqlalchemy

In [None]:
%%time
metadata = dill.load(open('/kaggle/input/riiid-preprocess-and-balance-the-dataset/metadata.dill','rb'))

The exportation from Dataframe to SQLite files doesn't support multi-index,so it is important to reset the index

In [None]:
for key in list(metadata.keys()):
    if key in metadata[key].columns:
        metadata[key].set_index(key,inplace=True)
    print(key,metadata[key].shape)
    metadata[key].reset_index(inplace=True)
    metadata[key] = reduce_mem_usage(metadata[key])

Remove rows with all zeros

In [None]:
s = 0
for key in metadata.keys():
    metadata[key].set_index(list(key) if type(key) != str else key,inplace=True)
    d = metadata[key]
    metadata[key] = d[~(d == 0).all(axis=1)]
    s += metadata[key].shape[0]
    metadata[key].reset_index(inplace=True)
s

In [None]:
from sqlalchemy import create_engine

In [None]:
# Merge multiple keys into a single key
def build_index(x):
    return '_'.join(map(lambda x:str(int(x)),x)) if type(x) not in [int,float] else str(int(x))

# Export each key in metadata to an SQLite file

In [None]:
%%time
n = 2000000 # Chunk size
metadata_info = {} # columns info

for i,key in enumerate(list(metadata.keys())):
    df = metadata[key]
    # make sure the key is a tuple
    if type(key) == str:
        key = key,
        
    # Connection to the DB file
    name = f"db.{'_'.join(key)}.sqlite"
    engine = create_engine(f'sqlite:///{name}', echo=False)
    sqlite_connection = engine.connect()   
    
    # Export chunk by chunk
    for i in range(0,df.shape[0],n):
        tmp = df.iloc[i:min(i+n,df.shape[0])]
        print(f'{key} | {i}/{df.shape[0]} | Build Index')
        # multi-index is not supported so we will merge the keys columns
        tmp.index = tmp[list(key)].apply(build_index,axis=1).values
        tmp.drop(columns=[*key,], inplace=True)   
        if key not in metadata_info : metadata_info[key] = [i,list(tmp.columns)]
        print(f'{key} | {i}/{df.shape[0]} | Update DB')
        tmp.to_sql('_'.join(key), sqlite_connection, if_exists='append')
    sqlite_connection.close()

In [None]:
dill.dump(metadata_info,open('metadata_info.dill','wb'))