In [1]:
# for measure execution time
from datetime import datetime

In [2]:
import redis
import json

In [34]:
# redis connection configuration
r = redis.Redis(host='192.168..', port=11, db=0, password='***')

In [4]:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine

In [5]:
# connect to Postgres
db_uri = 'postgresql://user:pwd@192.168..:port/db_name'
engine = create_engine(db_uri)

In [6]:
# create multiple processes
import multiprocessing
# for asynchronously executing functions using threads or processes
import concurrent.futures

In [10]:
#
sql = "SELECT * FROM backgammon.players"
df = pd.read_sql(sql, engine)
df.head()

Unnamed: 0,player_id,name,score,current_level,won_prizes,register_date
0,770197,9f845503e952d2b74c86b2360991987e,28,3,"[9, 7, 21]",2022-02-12
1,148984,ec12ea1947b97f036749534c0fc915c2,10,3,"[20, 20, 11]",2022-02-22
2,398681,ec4f04d6eb5b5fb647bc7adab53214f2,19,2,"[1, 20, 16]",2022-02-10
3,379561,f977e8039c8302251493de647714c395,61,9,"[21, 6, 14]",2022-09-04
4,267762,8399a6eebdae43c516a0a3356d583fa2,48,10,"[6, 20, 19]",2022-09-09


In [11]:
df.columns

Index(['player_id', 'name', 'score', 'current_level', 'won_prizes',
       'register_date'],
      dtype='object')

In [12]:
selected_columns = ['name', 'score', 'current_level', 'won_prizes', 'register_date']

In [None]:
# Method 1 (simple loop)

In [None]:
exec_time = np.array([])

In [56]:



start_time = datetime.now()


chunk = df.sample(1000)

for index, row in chunk.iterrows():

    # generating key and value of redis hash type
    r_key = 'bg_pl:' + str(row[0])
    r_value_json = f'{{"username": "{row[1]}", "score": "{row[2]}", "current_level": "{row[3]}",\
                        "won_prizes": "{row[4]}", "register_date": "{row[5]}"}}'
    r_value_fields = json.loads(r_value_json)

    res1 = r.hset(
    r_key,
    mapping=r_value_fields,
    )
    


end_time = datetime.now()



execution_time = (end_time - start_time).total_seconds()
formatted_time = "{:.2f}".format(execution_time)


exec_time = np.append(exec_time, formatted_time)


In [57]:
exec_time

array(['0.91', '9.40', '83.08'], dtype='<U32')

In [None]:
# Method 2 (pipeline)

In [123]:
exec_time = np.array([])

In [124]:
pipe = r.pipeline()

In [131]:



start_time = datetime.now()


chunk = df.sample(10000)

for index, row in chunk.iterrows():

    # generating key and value of redis hash type
    r_key = 'bg_pl:' + str(row[0])
    r_value_fields = {
        "username": str(row[1]),
        "score": str(row[2]),
        "current_level": str(row[3]),
        "won_prizes": str(row[4]),
        "register_date": str(row[5])
    }

    
    for field, value in r_value_fields.items():
        pipe.hset(r_key, field, value)


results = pipe.execute()
pipe.reset()    


end_time = datetime.now()



execution_time = (end_time - start_time).total_seconds()
formatted_time = "{:.2f}".format(execution_time)


exec_time = np.append(exec_time, formatted_time)


In [132]:
exec_time

array(['0.16', '1.76', '12.91', '109.40'], dtype='<U32')

In [None]:
# Method 3 - Multi Processing

In [37]:
exec_time = np.array([])

In [38]:
multiprocessing.cpu_count()

8

In [28]:
# Function to process a single row
def process_row(row):
    # Generating key and value for Redis hash
    r_key = 'bg_pl:' + str(row[0])
    r_value_json = f'{{"username": "{row[1]}", "score": "{row[2]}", "current_level": "{row[3]}", "won_prizes": "{row[4]}", "register_date": "{row[5]}"}}'
    r_value_fields = json.loads(r_value_json)
    
    # Set the values in Redis
    r.hset(r_key, mapping=r_value_fields)


In [29]:

start_time = datetime.now()

chunk = df.sample(10)
max_threads = 4  # Adjust as needed

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:

    future_to_row = {executor.submit(process_row, row): row for _, row in chunk.iterrows()}

    # Wait for the tasks to complete
    for future in concurrent.futures.as_completed(future_to_row):
        row = future_to_row[future]
        try:
            future.result()
        except Exception as e:
            print(f"An error occurred for row {row}: {e}")

            

end_time = datetime.now()



execution_time = (end_time - start_time).total_seconds()
formatted_time = "{:.2f}".format(execution_time)


exec_time = np.append(exec_time, formatted_time)

In [47]:
exec_time

array(['0.22', '1.73', '18.01', '200.80'], dtype='<U32')

In [None]:
# Method 4 - Multi Processing with Pipeline

In [39]:
exec_time = np.array([])

In [40]:
def process_row(row):
    try:
        
        # Generating key and value for Redis hash
        r_key = 'bg_pl:' + str(row[0])
        r_value_fields = {
            "username": str(row[1]),
            "score": str(row[2]),
            "current_level": str(row[3]),
            "won_prizes": json.dumps(row[4]),  # Serialize won_prizes as JSON
            "register_date": str(row[5])
        }

        pipe = r.pipeline()

        for field, value in r_value_fields.items():
            pipe.hset(r_key, field, value)

        results = pipe.execute()
    except Exception as e:
        print(f"An error occurred: {e}")

In [47]:



start_time = datetime.now()




chunk = df.sample(10000)
max_threads = 4  # Adjust as needed

# Create a ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
    future_to_row = {executor.submit(process_row, row): row for _, row in chunk.iterrows()}

    # Wait for the tasks to complete
    for future in concurrent.futures.as_completed(future_to_row):
        row = future_to_row[future]
        try:
            future.result()
        except Exception as e:
            print(f"An error occurred for row {row}: {e}")


            

end_time = datetime.now()



execution_time = (end_time - start_time).total_seconds()
formatted_time = "{:.2f}".format(execution_time)


exec_time = np.append(exec_time, formatted_time)

pipe.reset()

In [49]:
exec_time

array(['0.25', '1.78', '18.31', '185.25'], dtype='<U32')