In [None]:
# default_exp database

In [None]:
#hide
# Put these at the top of every notebook, to get automatic reloading and inline plotting
%reload_ext autoreload
%autoreload 2
#%matplotlib inline
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# Database for tweets storage

> Use PostgreSQL to store tweets

In [None]:
#export
from os import getenv, environ
from dotenv import load_dotenv
import psycopg2
import logging
import pandas as pd

In [None]:
#export
# Logging
logger = logging.getLogger("tweet-archiveur")
logFormatter = logging.Formatter("%(asctime)s -  %(name)-12s %(levelname)-8s %(message)s")
# logger.setLevel(logging.DEBUG)
# # File logger
# fh = logging.FileHandler("tweet-archiveur.log")
# fh.setLevel(logging.DEBUG)
# fh.setFormatter(logFormatter)
# logger.addHandler(fh)
if not len(logger.handlers):
    # Console logger
    consoleHandler = logging.StreamHandler()
    consoleHandler.setFormatter(logFormatter)
    logger.addHandler(consoleHandler)
logger.info(f'Loading database module...')

In [None]:
#hide
# Load .env only in Notebook, it will be populated at runtime by docker
from pathlib import Path
env_path = Path('..') / '.env'
if env_path.is_file():
    load_dotenv(dotenv_path=env_path)
else:
    logger.error(f"No {env_path} found !")

# Force some variable outside Docker
environ["DATABASE_PORT"] = '5479'
environ["DATABASE_HOST"] = 'localhost'
environ["DATABASE_USER"] = 'tweet_archiveur-user'


True

In [None]:
#export
ENV = [
    "DATABASE_USER",
    "DATABASE_PASS",
    "DATABASE_HOST",
    "DATABASE_PORT",
    "DATABASE_NAME",
    "DATABASE_URL",
]


def database_config():
    return tuple(getenv(env) for env in ENV)


def database_url() -> str:
    user, pswd, host, port, name, url = database_config()
    logger.debug(f"DEBUG : connect(user={user}, password=XXXX, host={host}, port={port}, database={name}, url={url})")
    if user is None and url is None:
        logger.error("Empty .env : no user or URL !")
        return None
    if url:
        return url
    else:
        return f"postgresql://{user}:{pswd}@{host}:{port}/{name}"

def db_connect():
    return psycopg2.connect(database_url())
    

In [None]:
#export

def exec_query(conn, sql):
    cur = conn.cursor()
    cur.execute(sql)
    conn.commit()
    cur.close()

# https://stackoverflow.com/questions/1874113/checking-if-a-postgresql-table-exists-under-python-and-probably-psycopg2
def is_table_exist(con, table_str):
    exists = False
    try:
        cur = con.cursor()
        cur.execute("select * from information_schema.tables where table_schema='public' and  table_name='" + table_str + "';")
        exists = cur.fetchone()[0]
        cur.close()
    except psycopg2.Error as e:
        logger.error(e)
        return False
    return exists
    
def create_tables_if_not_exist(conn, force = False):
    DATABASE_USER = getenv('DATABASE_USER')
    if force :
        logger.info("Cleaning database")
        # Drop table if exist
        exec_query(conn, 'DROP TABLE IF EXISTS public.twitter_users;')
        exec_query(conn, 'DROP TABLE IF EXISTS public.tweets;')

    # Create table
    if not is_table_exist(conn, 'twitter_users'):
        users = '''
        CREATE TABLE public.twitter_users
        (
            twitter_id bigint NOT NULL,
            name character varying(50) NOT NULL,
            twitter_followers integer,
            twitter_tweets integer,
            PRIMARY KEY (twitter_id)
        );
        '''
        exec_query(conn, users)
        exec_query(conn, f'ALTER TABLE public.twitter_users OWNER to "{DATABASE_USER}";')
    if not is_table_exist(conn, 'tweets'):
        tweets = '''
        CREATE TABLE public.tweets
        (
            tweet_id bigint NOT NULL,
            twitter_id bigint NOT NULL,
            datetime_utc timestamp without time zone,
            datetime_local timestamp with time zone,
            retweet integer,
            favorite integer,
            text character varying(500),
            PRIMARY KEY (tweet_id)
        );'''
        exec_query(conn, tweets)
        exec_query(conn, f'ALTER TABLE public.tweets OWNER to "{DATABASE_USER}";')

In [None]:
#export

# Bulk INSERT of values in a table
def insert_pandas(conn, table, df, fields):
    """
    Using cursor.mogrify() to build the bulk insert query
    then cursor.execute() to execute the query
    Thanks to https://naysan.ca/2020/05/09/pandas-to-postgresql-using-psycopg2-bulk-insert-performance-benchmark/
    """
    # Create a list of tupples from the dataframe values
    col = "'" + "', '".join(fields.keys()) + "'"
    df = eval("df[[" + col + "]]")
    logger.debug(f"Bulk insert of {len(df)} lines of {len(df.columns)} columns.")
    tuples = [tuple(x) for x in df.to_numpy()]
    # Comma-separated dataframe columns
    cols = ','.join(fields.values())
    # SQL quert to execute
    cursor = conn.cursor()
    param_type = param = "(" + ",".join(['%s' for i in range(len(df.columns))]) + ")" 
    values = [cursor.mogrify(param_type, tup).decode('utf8') for tup in tuples]
    query  = "INSERT INTO %s(%s) VALUES " % (table, cols) + ",".join(values)
    # Get the primary key, we suppose it is the first one
    primary_key = list(fields.values())[0]
    # Get the list of other column, excluding the primary
    other_fields = list(fields.values())[1:]
    # Build the query to UPDATE if the line already exist
    query += f' ON CONFLICT ({primary_key}) DO UPDATE SET '
    query += "(" + ", ".join(other_fields) + ")"
    excluded = ['EXCLUDED.' + col for col in other_fields]
    query += ' = (' + ", ".join(excluded) + ");"
    try:
        cursor.execute(query, tuples)
        conn.commit()
    except (Exception, psycopg2.DatabaseError) as error:
        print("Error: %s" % error)
        conn.rollback()
        cursor.close()
        return 1
    cursor.close()

In [None]:
#export
import string
printable = set(string.printable)
printable.remove('%')
# https://www.programiz.com/python-programming/methods/built-in/filter"
def filter_str(s):
    #return "".join(filter(lambda x: x in printable, s))
    s = s.replace('%', '%%')
    return s

In [None]:
#hide
conn = db_connect()
create_tables_if_not_exist(conn, force=False)
conn.close()

In [None]:
#hide
conn = db_connect()
# Load users
df = pd.read_csv('https://github.com/regardscitoyens/twitter-parlementaires/raw/master/data/deputes.csv')#.head(3)
#df
fields = { # pandas : database
    'twitter_id' : 'twitter_id',
    'nom' : 'name',
    'twitter_followers' : 'twitter_followers',
    'twitter_tweets' : 'twitter_tweets' 
}
insert_pandas(conn, 'twitter_users', df, fields)

# Load Tweets
df = pd.read_csv('tweets-sample.csv')#.head(2)
df['text_new'] = df.text.apply(filter_str)
fields = { # pandas : database
    'tweet_id' : 'tweet_id',
    'user_id' : 'twitter_id',
    'datetime_utc' : 'datetime_utc',
    'datetime_local' : 'datetime_local',
    'retweet' : 'retweet',
    'favorite' : 'favorite',
    'text_new' : 'text',
}
insert_pandas(conn, 'tweets', df, fields)

conn.close()

In [None]:
#hide
#######################################################
# DEBUG CODE BELOW

In [None]:
#hide
col = "'" + "', '".join(fields.keys()) + "'"
print(col)
df2 = eval("df[[" + col + "]]")
tuples = [tuple(x) for x in df2.to_numpy()]
tuples[0]

'tweet_id', 'user_id', 'datetime_utc', 'datetime_local', 'retweet', 'favorite', 'text_new'


(1372621356223885322,
 76584619,
 '2021-03-18 18:50:08',
 '2021-03-18 19:50:08',
 16,
 64,
 "Quand la vaccination est à l'arrêt, le confinement est en marche. Terrible constat d'échec que d'être contraint de confiner une nouvelle fois. \nIl est temps de sortir de cette gestion de crise sanitaire à la fois anxiogène, bavarde et contradictoire. #Castex19h  #confinement3")

In [None]:
# hide
query = "test "
primary_key = list(fields.values())[0]
other_fields = list(fields.values())[1:]
query += f' ON CONFLICT ({primary_key}) DO UPDATE SET '
query += "(" + ", ".join(other_fields) + ")"
excluded = ['EXCLUDED.' + col for col in other_fields]
query += ' = (' + ", ".join(excluded) + ");"
# (col2, col3, col4) = (EXCLUDED.col2, EXCLUDED.col3, EXCLUDED.col4);
query

'test  ON CONFLICT (tweet_id) DO UPDATE SET (twitter_id, datetime_utc, datetime_local, retweet, favorite, text) = (EXCLUDED.twitter_id, EXCLUDED.datetime_utc, EXCLUDED.datetime_local, EXCLUDED.retweet, EXCLUDED.favorite, EXCLUDED.text);'

In [None]:
#hide
fields.values()

dict_values(['tweet_id', 'twitter_id', 'datetime_utc', 'datetime_local', 'retweet', 'favorite', 'text'])

In [None]:
#hide
list(fields.values())[1:]

['twitter_id', 'datetime_utc', 'datetime_local', 'retweet', 'favorite', 'text']