In [186]:
import numpy as np
import pandas as pd
import glob
import shutil
import os
import sqlite3
from copy import deepcopy
from datetime import datetime,timedelta,date

In [187]:
# import sys
# sys.append()

In [188]:
def get_mem_usage(df):
    print(f"{df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")

In [189]:
def get_general_info(df):
    name = [x for x in globals() if globals()[x] is df][0]
    print(f"Dataframe << {name}>>has {df.shape[0]} rows, {df.shape[1]} columns")
    print("=======================================")
    print("Column Types:\n")
    print(df.dtypes)
    print("=======================================")
    print("Missing values per column: ")
    percent_missing = df.isnull().sum()*100 / len(df)
    missing_value_df = pd.DataFrame({'column_name':df.columns,'percent_missing':percent_missing})
    missing_value_df['percent_missing'] = ["{:.2f}%".format(x) for x in missing_value_df['percent_missing'] ]
    print(missing_value_df)
    print("=======================================")
    print(f"Memory Use: {df.memory_usage(deep=True).sum()/1024 **2:3.2f}Mb")    
    print("=======================================")
    print("Missing Values in columns: ")
    print(df.isnull().sum())

    

In [190]:
def change_col_format(df,target_type):
    for c in df.columns:
        df[c] = df[c].astype(target_type)
    return df

In [191]:
def get_optimize_df(df):
    df.astype({col:'category' for col in df.columns if df[col].nunique() / df[col].shape[0]<0.5})
    return df

In [192]:
def save_as_pickle(df,name,path=None):
    try:
        if path==None:
            df.to_pickle(f"{name}.pkl")
            print(f"Dataframe saved as pickle in => {os.getcwd()}")
        else:
            current_path = os.getcwd()
            df.to_pickle(f"{path}/{name}.pkl")
            print(f"Dataframe saved as pickle in => {path}/{name}.pkl")
            os.chdir(current_path)
    except:
        print("Save failed. Make sure it's a dataframe or the path is correct")
            

In [193]:
def read_pickle_as_df(path=None):
    result={}
    current_path = os.getcwd()
    target_path = os.getcwd()
    if path!=None:
        target_path = path

        
    os.chdir(target_path)
    lst = glob.glob(f"*.pkl")
        
    for p in lst:
        name = p.split(".")[0]
        result[name]=pd.read_pickle(p)
    
    os.chdir(current_path)
    return result
    

In [194]:
def get_n_days_ago(n=0,time_format="%d-%m-%Y"):
    time_stamp = datetime.now()-timedelta(days=n)
    return time_stamp.strftime(time_format)

In [195]:
def get_files(extension):
    return glob.glob(f"*.{extension}")

In [196]:
def create_clean_dir(name):
    if os.path.isdir(name):
        shutil.rmtree(name)
        os.makedirs(name)
    else:
        os.makedirs(name)
    os.chdir(name)
    print(f"Current working dir => :{os.getcwd()}")
        

In [197]:
def read_large_csv(name,chunkSize=1000000,encoding='utf-8'):

    reader = pd.read_csv(name,iterator=True,encoding=encoding)
    chunks=[]
    loop=True
    while loop:
        try:
            chunk = reader.get_chunk(chunkSize)
            chunks.append(chunk)
        except StopIteration:
            loop=False
    df = pd.concat(chunks,ignore_index=True)
    return df

In [206]:
# get_files("xlsx")

In [207]:
# df = pd.read_excel("test.xlsx")

In [208]:
# get_general_info(df)

In [107]:
import pandas as pd
import numpy as np
import sqlite3


class DatabaseSqlite3:
    
    """ 
    Custom class to connect a Sqlite3 database 
    Return in format Datframe or cursor
    """
    
    def __init__(self,db_name):
        """Create a connection"""
        self.db_name =db_name
        self.status=False
        try:
            self.connection = sqlite3.connect(self.db_name)
            print(f"Connected to << {self.db_name}>>")
            self.status = True
        except(Exception,sqlite3.Error) as error:
            print("Error while trying connect",error)
    
    def close_connection(self):
        """ Close a connction """
        if self.status:
            self.connection.close()
            print(f"Connection for << {self.db_name} >> is closed")
        else:
            print(f"Connection for << {self.db_name} >> is already closed")
    
    
    def read_database_version(self):
        """ Get current database version """
        try:
            cursor = self.connection.cursor()
            cursor.execute("select sqlite_version();")
            db_version = cursor.fetchone()
            print(f"<< {self.db_name} >> 's version is {db_version}")
            
        except(Exception,sqlite3.Error) as error:
            print(f"Error while getting data",error)
    
    def get_table_names(self):
        """Return all table names in the current database"""
        try:
            cursor = self.connection.cursor()
            query = cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
            records =cursor.fetchall()
            cols = [column[0] for column in query.description]
            cursor.close()
        except sqlite3.Error as error:
            print(f"Failed to read data from sqlite table",error)
        results = pd.DataFrame.from_records(data=records,columns=cols).rename(columns={'name':'Table Name'})
        return results
    
    def read_table_with_df(self,table_name,conditions=None,limit=None):
        """
        Get a table in current database, return the table with format Dataframe
        conditions: SQL query
        limit: number of rows returns
        """
        extra_conditon = ""
        if conditions:
            extra_conditon=f" WHERE {conditions}"
            
        try:
            if limit==None:
                sqlite_query = f"""SELECT * from {table_name}"""+extra_conditon
            else:
                sqlite_query = f"""SELECT * from {table_name} LIMIT {limit}"""+extra_conditon
            df = pd.read_sql(sqlite_query,self.connection)
        except sqlite3.Error as error:
            print("Failed to retrive data from sqlite table")
        return df
    
    def get_column_names_from_table(self,table_name):
        
        """Return a list of column names from a table in database"""
        columns_names=list()
        try:
            cursor =self.connection.cursor()
            table_column_names = 'PRAGMA table_info('+table_name+');'
            cursor.execute(table_column_names)
            records = cursor.fetchall()
            for name in records:
                columns_names.append(name[1])
            
            cursor.close()
        except sqlite3.Error as error:
            print("Failed to get data",error)
            
        return columns_names
    
    def replace_table_with_df(self,table_name,df,replace=False):
        """
        Replace the selected table with Dataframe
        replace=False:append data to the table
        replace=True:replace all data with df
        """
        try:
            if table_name in list(self.get_table_names()['Table Name']):
                print(f"Found table <<{table_name}>> in Database <<{self.db_name}>>")
            else:
                print(f"Attention , creating new table <<{table_name}>> in Database <<{self.db_name}>> ")
            
            if replace:
                df.to_sql(name=table_name,con=self.connection,if_exists="replace", index=False)
            else:
                df.to_sql(name=table_name,con=self.connection,if_exists="append", index=False)
                print("Sql insert process finished.")
        
        except sqlite3.Error as error:
            print("Failed to update",error)
            print("If it's a creation, be careful with columns format and value types")
    
    def __getitem__(self,table_name):
        try:
            return self.read_table_with_df(table_name)
        except:
            raise KeyValueError(f"{table_name} not found in database.")
    
    def update_table(self,table_name,update_values,conditions):
        """
        Update a table with new values and conditons
        update_values:list of update values
        conditions: string / list of SQL expression
        """
        
        updated = update_values
        cond = conditions
        
        if isinstance(updated,list):
            updated = ", ".join(update_values)
        if isinstance(conditions,list):
            cond = " AND ".join(conditions)
        
        sqlite_query = f"UPDATE {table_name} SET {updated} WHERE {cond};"
        print(sqlite_query)
        try:
            cursor = self.connection.cursor()
            cursor.execute(sqlite_query)
            self.connection.commit()
            cursor.close()
            print(f"Update table << {table_name} >> success.")
        except sqlite3.Error as error:
            print(f"Failed to update table {table_name}",error)
            
            
    def delete_table(self,table_name):
        """Remove a table in the current database"""
        try:
            cursor =self.connection.cursor()
            sqlite_query = f"DROP TABLE {table_name};"
            cursor.execute(sqlite_query)
            self.connection.commit()
            cursor.close()
            print(f"Drop table << {table_name} >> success.")
            
        except sqlite3.Error as error:
            print(f"Failed to delete table <<{table_name}>>",error)
            
    def back_up_to(self,dest):
        current_path = os.getcwd()
        os.chdir(dest)
        new_name ="Backup"+datetime.now().strftime("%d-%m-%Y")+self.db_name
        bck = sqlite3.connect(new_name)
        self.connection.backup(bck)
        bck.close()
        print("Back Up finished.")
        os.chdir(current_path)
            

In [108]:
students = pd.DataFrame({'name':['a','b','c','d'],'age':[20,34,5,80],'country':['china','us','france','italy']})
students

Unnamed: 0,name,age,country
0,a,20,china
1,b,34,us
2,c,5,france
3,d,80,italy


In [109]:
db = DatabaseSqlite3('test_db')

Connected to << test_db>>


In [110]:
db.replace_table_with_df('students',students,replace=True)

Found table <<students>> in Database <<test_db>>


In [111]:
db.get_table_names()

Unnamed: 0,Table Name
0,students


In [112]:
db.read_table_with_df('students')

Unnamed: 0,name,age,country
0,a,20,china
1,b,34,us
2,c,5,france
3,d,80,italy


In [113]:
db['students']

Unnamed: 0,name,age,country
0,a,20,china
1,b,34,us
2,c,5,france
3,d,80,italy


In [114]:
db.read_table_with_df('students',conditions="name='a'")

Unnamed: 0,name,age,country
0,a,20,china


In [115]:
db.read_table_with_df('students',conditions="age>20")

Unnamed: 0,name,age,country
0,b,34,us
1,d,80,italy


In [116]:
db.update_table('students',['country="UK"'],conditions=['name="c"'])

UPDATE students SET country="UK" WHERE name="c";
Update table << students >> success.


In [117]:
db.read_table_with_df('students')

Unnamed: 0,name,age,country
0,a,20,china
1,b,34,us
2,c,5,UK
3,d,80,italy


In [118]:
db.update_table('students','country="japan"',conditions='age=5')

UPDATE students SET country="japan" WHERE age=5;
Update table << students >> success.


In [119]:
db.read_table_with_df('students')

Unnamed: 0,name,age,country
0,a,20,china
1,b,34,us
2,c,5,japan
3,d,80,italy


In [121]:
db.close_connection()

Connection for << test_db >> is closed
