In [3]:
import sqlite3
import pandas as pd

In [4]:
# read csv table
inpath = '../../../pappa/data/gender_classification/user_age_gender.csv'
df = pd.read_csv(inpath, sep=';', index_col=0)
# count number of rows and of unique user_ids
print('Number of rows:', df.shape[0])
print('Number of unique user_ids:', len(df.user_id.unique()))
# retrieve duplicate user_ids
duplicate_user_ids = df[df.duplicated(subset=['user_id'], keep=False)].user_id.unique()
print('Number of duplicate user_ids:', len(duplicate_user_ids))
duplicate_user_ids
# retrieve empty user_ids
empty_user_ids = df[df.user_id.isna()].index
print('Number of empty user_ids:', len(empty_user_ids))

Number of rows: 15773
Number of unique user_ids: 15764
Number of duplicate user_ids: 2
Number of empty user_ids: 2


In [134]:
def retrieve_users_tweets(
        cursor:sqlite3.Cursor,
        table:str,
        user_id:list,
        max_tweets:int=100,
        tweets_features:list=['tweet_id', 'created_at', 'text', 'retweet_text',],
        ):
    
    query = f"""
        SELECT *
        FROM {table}
        WHERE user_id == {user_id}
        LIMIT {max_tweets};
        """
    
    # Execute the query and fetch the results
    cursor.execute(query)
    tweets = cursor.fetchall()

    return tweets

# Example usage
database_path = '../mydata/database/myMENTALISM.db'
table_name = 'sample_tweets'
user_ids_table = '../../../pappa/data/user_classification/user_age_gender_location.pkl'
user_ids_to_retrieve = pd.read_pickle(user_ids_table).user_id[:100].astype(int).tolist()
#user_ids_to_retrieve = ['842452578738806784', '69150122']
max_tweets_to_retrieve = 100000
tweets_features_to_retrieve = ['tweet_id', 'created_at', 'text', 'retweet_text']

# Connect to the database
conn = sqlite3.connect(database_path)
cursor = conn.cursor()

result = {}
# loop through batches of user_ids
for u in user_ids_to_retrieve:
    # retrieve tweets
    tweets = retrieve_users_tweets(cursor, table_name, u, max_tweets_to_retrieve, tweets_features_to_retrieve)
    # save the tweets in a dictionary
    result[u] = tweets

# Close the database connection
conn.close()

In [135]:
# count all tweets retrieved
total_tweets = 0
for k in result.keys():
    total_tweets += len(result[k])
print('Total tweets retrieved:', total_tweets)

Total tweets retrieved: 888


The above solution is too slow, let's try to optimize it...

In [48]:
from tqdm import tqdm
import pandas as pd
import sqlite3

TABLE_COLUMN_NAMES = [
    "tweet_id",
    "user_id",
    "created_at",
    "text",
    "retweet_text",
]

# Example usage
user_ids_table = '../data/user_classification/user_age_gender_location.pkl'
db_file = '../../mentalism/sentemb/mydata/database/myMENTALISM.db'
table_name = 'sample_tweets'
user_ids_to_retrieve = pd.read_pickle(user_ids_table).user_id.astype(int).tolist()
#user_ids_to_retrieve = [int('842452578738806784'), int('69150122')]
tweets_features_to_retrieve = ['tweet_id', 'created_at', 'text', 'retweet_text']
remove_columns = None
chunk_size = 1000000
max_tweets_per_user = 3
max_users_per_chunk = 5000
n_files=0


# Create a database connection
conn = sqlite3.connect(db_file)
cursor = conn.cursor()

# Get the total number of rows
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
total_rows = cursor.fetchone()[0]

# Initialize an empty DataFrame
result_df = pd.DataFrame()

# Initialize a tqdm progress bar
progress_bar = tqdm(total=total_rows, unit="row", desc="Processing")

column_names = TABLE_COLUMN_NAMES

# Split user_ids_to_retrieve in chunks of max_user_ids_per_file
user_ids_to_retrieve_chunks = [
    user_ids_to_retrieve[i:i + max_users_per_chunk] for i in range(0, len(user_ids_to_retrieve), max_users_per_chunk)
    ]

# Loop through user_ids_to_retrieve in chunks
for user_ids_to_retrieve in user_ids_to_retrieve_chunks:
    # Loop through the data in chunks
    for offset in range(0, total_rows, chunk_size):
        # Query the database for a chunk of rows
        cursor.execute(f"SELECT * FROM {table_name} LIMIT {chunk_size} OFFSET {offset}")
        rows = cursor.fetchall()

        # Create a DataFrame from the fetched rows
        chunk_df = pd.DataFrame(rows, columns=column_names)
        
        # Remove the unwanted columns
        if remove_columns is not None:
            chunk_df = chunk_df.drop(columns=remove_columns)

        # Append to the result DataFrame
        chunk_df = chunk_df[chunk_df['user_id'].isin(user_ids_to_retrieve)]
        result_df = pd.concat([result_df, chunk_df], ignore_index=True)

        # Sort the result DataFrame by user_id and created_at
        result_df = result_df.sort_values(by=['user_id','created_at'], ascending=False)
        # Keep only the first max_tweets_per_user tweets for each user
        result_df = result_df.groupby('user_id').head(max_tweets_per_user)

        # Update the progress bar
        progress_bar.update(len(rows))

    # Save results to pickle file
    print(f'Saving {len(result_df)} tweets to user_tweets_chunk{n_files}.pkl')
    result_df.to_pickle(f'../data/user_tweets_chunk{n_files}.pkl')
    # Reset the result DataFrame
    result_df = pd.DataFrame()

# Close the tqdm progress bar
progress_bar.close()

# Close the database connection
conn.close()

Processing:  54%|█████▍    | 2500000/4637193 [00:29<00:25, 85010.93row/s] 


Saving 177 tweets to user_tweets_chunk0.pkl




Saving 177 tweets to user_tweets_chunk0.pkl




Saving 202 tweets to user_tweets_chunk0.pkl




Saving 100 tweets to user_tweets_chunk0.pkl




KeyboardInterrupt: 



In [43]:
user_ids_to_retrieve = pd.read_pickle(user_ids_table).user_id.astype(int).tolist()
len(user_ids_to_retrieve)

20198

In [27]:
from tqdm import tqdm
import pandas as pd
import sqlite3
import time

TABLE_COLUMN_NAMES = [
    "tweet_id",
    "user_id",
    "created_at",
    "text",
    "retweet_text",
]

# Example usage
user_ids_table = '../data/user_classification/user_age_gender_location.pkl'
db_file = '../../mentalism/sentemb/mydata/database/myMENTALISM.db'
table_name = 'sample_tweets'
user_ids_to_retrieve = pd.read_pickle(user_ids_table).user_id.astype(int).tolist()
#user_ids_to_retrieve = [int('842452578738806784'), int('69150122')]
max_tweets_to_retrieve = 100
chunk_size = 1000
tweets_features_to_retrieve = ['tweet_id', 'created_at', 'text', 'retweet_text']
remove_columns = None
max_tweets_per_user = 1000
n_files=0

# Create a database connection
conn = sqlite3.connect(db_file)
cursor = conn.cursor()

# Initialize an empty DataFrame
result_df = pd.DataFrame()

column_names = TABLE_COLUMN_NAMES

# Query the database for a chunk of rows
before = time.time()
cursor.execute(f"SELECT * FROM {table_name} LIMIT {chunk_size} OFFSET {0}")
rows = cursor.fetchall()
after = time.time()
print(f"Time to fetch {chunk_size} rows: {after-before} seconds")

# Create a DataFrame from the fetched rows
chunk_df = pd.DataFrame(rows, columns=column_names)

# Append to the result DataFrame
# chunk_df['user_id'] = chunk_df['user_id'].astype(str)
before = time.time()
result_df = pd.concat([result_df, chunk_df[chunk_df['user_id'].isin(user_ids_to_retrieve)]], ignore_index=True)
after = time.time()
print(f"Time to concat {chunk_size} rows: {after-before} seconds")
    
# Remove the unwanted columns
if remove_columns is not None:
    chunk_df = chunk_df.drop(columns=remove_columns)

# Close the database connection
conn.close()

Time to fetch 1000 rows: 0.0034177303314208984 seconds
Time to concat 1000 rows: 0.0020749568939208984 seconds


In [25]:
len(chunk_df)

100000

In [39]:
chunk_df.groupby('user_id').head(2)

Unnamed: 0,tweet_id,user_id,created_at,text,retweet_text
0,842462325839941633,842452578738806784,2017-03-16 19:47:25+00:00,Alla Andrea Pirillo https://t.co/QOwa2Kj6fw,
1,364708769491464192,69150122,2013-08-06 11:25:21+00:00,"#Metro, cerca di fermare banda",
2,340817663829872642,69150122,2013-06-01 13:10:37+00:00,Servizio Pubblico http://t.co/K5sbkmfGGA via @...,
4,1595686854401073153,434347177,2022-11-24 07:52:47+00:00,RT @fanpage: Vende cornetti dalla finestra di ...,Vende cornetti dalla finestra di casa. Ilaria ...
5,1594019201781334016,434347177,2022-11-19 17:26:07+00:00,RT @ValaAfshar: This is how classically traine...,This is how classically trained musicians beau...


In [152]:
227486 / 4637193 * 600000000

29434099.464913364

In [156]:
600000000 / 4637193 / 10 * 82

1060.9866787946933

In [157]:
227486 * 10

2274860

In [51]:
import pandas as pd

# import all files called None_{N}.pkl  
df = pd.concat([pd.read_pickle(f'../None_{i}.pkl') for i in range(1)], ignore_index=True)

In [54]:
def get_user_ids(db_file, table_name):
    # Create a database connection
    conn = sqlite3.connect(db_file)
    cursor = conn.cursor()

    try:
        # Query unique user IDs from the specified table
        cursor.execute(f"SELECT DISTINCT user_id FROM {table_name}")
        user_ids = [row[0] for row in cursor.fetchall()]
        return user_ids

    except sqlite3.Error as e:
        print(f"Error reading data from the database: {e}")

    finally:
        # Close the database connection
        conn.close()

db_file = '../../mentalism/sentemb/mydata/database/myMENTALISM.db'
table_name = 'sample_tweets'
user_ids = get_all_user_ids(db_file, table_name)


In [57]:
user_ids[:5]

[842452578738806784, 69150122, 434347177, 2673613019, 2988247127]