# Опыты по сохранению и восстановлению данных из датафреймов

Экспорт импорт из и в базы данных и CSV файлы

In [7]:
# packages installed in local directories
import os
import sys
google_drive_path = '/drive/My Drive/Colab Notebooks'
packages_dir = os.getcwd()+'/packages'
sys.path.append(packages_dir)
# !pip install psycopg2-binary --target=$packages_dir

In [1]:
import os
import psycopg2
import sqlite3
import psycopg2.extras  
import pandas as pd
import time
# import gzip
import zlib
import sysв
RGDSN = os.getenv('RGDSN')

class ATimer:
    """A utility class to mesure time between sequential calls."""
    def __init__(self):
        self.t = time.time()
    def show(self, message ='', end='\n'):
        """Prints a string appended with time lapsed from the last call"""
        t1 = time.time()
        print(f'{message} in {t1 - self.t:.3f} sec.', end=end)
        self.t = t1


def dataframe_from_sql(sql):
    t= ATimer()
    con = psycopg2.connect(RGDSN)
    t.show('Connected', end=' ')
    df = pd.read_sql_query(sql, con)
    t.show('Executed', end=' ')
    con.close()
    return df

def compress(s: str)-> bytes:
    "compresses string s into byte array"
    return zlib.compress(s.encode('utf-8'))

def decompress(c: bytearray) -> str:
    return zlib.decompress(c).decode('utf-8')

def show_size(df):
    "Prints size of df  in Mb"
    size = sys.getsizeof(df)/1024/1024
    # size = df.memory_usage(index=True, deep=True).sum()/1024/1024
    print(f'The size is {size :.2f} Mb')
    
def sql_queries(sql:str, limit=3, max_offset=10, start_offset=0 ):
    "Generates sql queries adding LIMIT and OFFSET"
    for i in range(start_offset,max_offset,limit):
        yield f'{sql} LIMIT {limit} OFFSET {i}'
        
def accumulate(df):
    accumulate.sum = pd.concat([accumulate.sum, df], ignore_index=True, copy=True)
accumulate.sum = pd.DataFrame()

def execute_sql_queries( sql:str, func, limit=3, max_offset=10, start_offset=0 ):
    """Выполняет серию SQL запросов. Над результатом каждого выполняет функцию func."""
    t = ATimer()
    for sql in sql_queries(sql, limit=limit, max_offset=max_offset, start_offset=start_offset):
        df = dataframe_from_sql(sql)
        if callable(func):
            func(df)
        t.show(f'{len(accumulate.sum)} records added')
        show_size(accumulate.sum)

# https://stackoverflow.com/questions/39100971/how-do-i-release-memory-used-by-a-pandas-dataframe
import psutil
def usage():
    p = psutil.Process(os.getpid())
    return p.memory_info().rss / float(2 ** 20)


ModuleNotFoundError: No module named 'sysв'

In [None]:
# accumulate.sum = pd.DataFrame()

# tt = ATimer()

# execute_sql_queries(
#     'SELECT obj_id, lemmatized_text FROM articles ORDER BY obj_id', 
#     accumulate, 
#     limit=10000, 
#     max_offset=1210000, 
#     start_offset=0)    

# print('------------------------------')
# tt.show('finished')
# show_size(accumulate.sum)
# # print(f'{sys.getsizeof(accumulate.sum)/1024/1024:.2f} Mb ')
# display(accumulate.sum)

In [None]:
# !pip install fastparquet

In [None]:
# !pip install snappy

## Freeing memory

In [None]:
import gc
gc.collect()

## Saving data
Saving to csv takes 5 min (ssd 1min 26s, 1min 24s), to csv.zip 12 min. To SQLite3 107sec (ssd 1min 1s).

In [None]:
%%time
# accumulate.sum.to_parquet('df.parquet.gzip', compression='gzip')  
# accumulate.sum.to_parquet('df.parquet')
df.to_csv('/Volumes/ssd/dumps/articles_i3.csv')
# accumulate.sum.to_pickle('accumulate.sum.pkl')
# accumulate.sum.to_csv('accumulate.sum.csv.zip')

# con = sqlite3.connect('/Volumes/ssd/dumps/df1.db')
# df.to_sql('art',con)
# con.close()

Reading data
==========

**5067.48 Mb. 1202159 records.**

type  | time
----  |----
csv   | 1 min (ssd 52s, 48s)
csv.gz| 1 min 30s (ssd 1 min 11s)
**sqlite** pd.read_sql_table('art','sqlite:///df.db')| 3 min
**sqlite3** pd.read_sql_query('select * from art',con) | 1 min 52s, 2min 10s (ssd 3min 11s, 3min 9s)
**postgresql** pd.read_sql_query('SELECT obj_id, lemmatized_text FROM articles',con)| 5 min 30s.  CPU times: user 26.4 s, sys: 52.7 s, total: 1min 19s


In [2]:
%%time

# CSV
df = pd.read_csv('/Volumes/ssd/dumps/articles_i.csv')

# SQLite
# con = sqlite3.connect('./df.db')
# con = sqlite3.connect('/Volumes/ssd/dumps/df.db')
# df = pd.read_sql_query('select * from art',con)

# Postresql
# con = psycopg2.connect(os.getenv('RGDSN'))
# df = pd.read_sql_query('SELECT obj_id, lemmatized_text FROM articles',con)

# con.close()


CPU times: user 42.8 s, sys: 6.07 s, total: 48.9 s
Wall time: 48.9 s


In [16]:
%%time
v = ''
c = 0

def incc(row):
    global c
    global v
#     v = row['lemmatized_text']
    c+=1
    
# for index, row in df.iterrows():
# #     v = row['lemmatized_text']
#     incc()

df.apply(lambda row: incc(row), axis=1)

print(f'num processed rows= {c}')

num processed rows= 1202159
CPU times: user 3.77 s, sys: 49.5 ms, total: 3.82 s
Wall time: 3.82 s


In [107]:
con = psycopg2.connect(os.getenv('RGDSN'))
df = pd.read_sql_query("select count(1) from articles", con )
con.close()
display(df)

Unnamed: 0,count
0,1202159


In [268]:
t = ATimer()
# df['len']=df.lemmatized_text.str.len()
df['z']=df['lemmatized_text'].apply(compress)
# df['lenz']=df['z'].str.len()
t.show('zipped')



zipped in 1.484 sec.


In [274]:
try: 
    df.drop(columns=['lemmatized_text'], inplace=True)
except: 
    pass
df.memory_usage(deep=True).sum()/1024/1024

12.423781394958496

In [None]:
t.show('begin')
df['restored'] = df.z.apply(decompress)
t.show('end')
df

http://jonathansoma.com/lede/foundations/classes/pandas%20columns%20and%20functions/apply-a-function-to-every-row-in-a-pandas-dataframe/